Coverage Report - com.aragost.javahg.internals.ServerPool
 
Classes in this File Line Coverage Branch Coverage Complexity
ServerPool
85%
85/99
67%
23/34
2.333
ServerPool$1
N/A
N/A
2.333
ServerPool$ServerSupervisor
100%
8/8
100%
4/4
2.333
ServerPool$ServerSupervisor$1
100%
3/3
N/A
2.333
 
 1  
 package com.aragost.javahg.internals;
 2  
 
 3  
 import java.io.File;
 4  
 import java.nio.charset.Charset;
 5  
 import java.nio.charset.CharsetDecoder;
 6  
 import java.nio.charset.CodingErrorAction;
 7  
 import java.util.List;
 8  
 import java.util.concurrent.BlockingQueue;
 9  
 import java.util.concurrent.LinkedBlockingQueue;
 10  
 import java.util.concurrent.TimeUnit;
 11  
 
 12  
 import com.aragost.javahg.HgVersion;
 13  
 import com.aragost.javahg.Repository;
 14  
 import com.aragost.javahg.RepositoryConfiguration;
 15  
 import com.aragost.javahg.commands.VersionCommand;
 16  
 import com.aragost.javahg.internals.AbstractCommand.State;
 17  
 import com.aragost.javahg.log.Logger;
 18  
 import com.aragost.javahg.log.LoggerFactory;
 19  
 import com.google.common.annotations.VisibleForTesting;
 20  
 import com.google.common.collect.Lists;
 21  
 
 22  
 /**
 23  
  * A pool of Server instances. Use {@link #take(AbstractCommand)} and
 24  
  * {@link #put(Server)}.
 25  
  * 
 26  
  * Contains up to {@link #maxServers} servers. When the maximum number of
 27  
  * servers are running commands are queued and queued commands may be cancelled.
 28  
  */
 29  13
 public class ServerPool {
 30  
 
 31  1
     private static final Logger LOG = LoggerFactory.getLogger(ServerPool.class);
 32  
 
 33  
     /**
 34  
      * Period at which to check if a command waiting for a server has been
 35  
      * cancelled.
 36  
      */
 37  
     private static final long WAIT_CHECK_MILLIS = 500;
 38  
 
 39  
     /**
 40  
      * If a server can't be obtained with in this number of seconds a warning is
 41  
      * written to the log
 42  
      */
 43  
     private static final int WAIT_WARN_MILLIS = 10 * 1000;
 44  
 
 45  
     /**
 46  
      * The character encoding used for the server.
 47  
      */
 48  
     private final Charset encoding;
 49  
 
 50  
     /**
 51  
      * The number of {@link Repository} instances referencing this pool
 52  
      */
 53  
     private int refCount;
 54  
 
 55  
     /**
 56  
      * Used as a stack so that hot servers are used more often.
 57  
      */
 58  212
     private final BlockingQueue<Server> freeServers = new LinkedBlockingQueue<Server>();
 59  
 
 60  
     /**
 61  
      * The maximum number of servers to use
 62  
      */
 63  
     private int maxServers;
 64  
 
 65  
     /**
 66  
      * All the currently running servers.
 67  
      */
 68  212
     private List<Server> servers = Lists.newArrayList();
 69  
 
 70  
     /**
 71  
      * The version of the underlying Mercurial. It is lazy initialized.
 72  
      */
 73  
     private HgVersion hgVersion;
 74  
 
 75  
     private final RepositoryConfiguration configuration;
 76  
 
 77  
     /**
 78  
      * Mercurial repository directory
 79  
      */
 80  
     private final File directory;
 81  
 
 82  
     public ServerPool(RepositoryConfiguration conf, File directory,
 83  212
             boolean performInit, String cloneUrl) {
 84  212
         this.maxServers = Math.max(1, conf.getConcurrency());
 85  212
         this.configuration = conf;
 86  212
         this.directory = directory;
 87  212
         this.encoding = conf.getEncoding();
 88  
 
 89  212
         Server server = createServer();
 90  212
         if (performInit) {
 91  203
             server.initMecurialRepository(directory);
 92  9
         } else if (cloneUrl != null) {
 93  4
             server.cloneMercurialRepository(directory, conf.getHgrcPath(),
 94  
                     cloneUrl);
 95  
         }
 96  212
         startServer(server);
 97  196
         freeServers.add(server);
 98  196
         servers.add(server);
 99  196
     }
 100  
 
 101  
     /**
 102  
      * Increment the refCount for this server pool.
 103  
      */
 104  
     public void incrementRefCount() {
 105  202
         this.refCount++;
 106  202
     }
 107  
 
 108  
     /**
 109  
      * Decrement the refCount. If it reaches 0 then the server pool is stopped.
 110  
      */
 111  
     public void decrementRefCount() {
 112  201
         this.refCount--;
 113  201
         if (this.refCount == 0) {
 114  194
             stop();
 115  
         }
 116  201
     }
 117  
 
 118  
     private void stop() {
 119  194
         synchronized (servers) {
 120  194
             maxServers = 0;
 121  194
             for (Server server : servers) {
 122  179
                 server.stop();
 123  
             }
 124  194
             servers.clear();
 125  194
         }
 126  194
     }
 127  
 
 128  
     public CharsetDecoder newDecoder() {
 129  5678
         CodingErrorAction errorAction = this.configuration
 130  
                 .getCodingErrorAction();
 131  
 
 132  5678
         CharsetDecoder decoder = this.encoding.newDecoder();
 133  5678
         decoder.onMalformedInput(errorAction);
 134  5678
         decoder.onUnmappableCharacter(errorAction);
 135  5678
         return decoder;
 136  
     }
 137  
 
 138  
     /**
 139  
      * Get a server. If there are fewer than {@link #maxServers} a new server is
 140  
      * started. If no servers are available the thread blocks until there is a
 141  
      * server available. Caller must call {@link #put(Server)} after command is
 142  
      * completed.
 143  
      * 
 144  
      * @return The next available server
 145  
      * @throws InterruptedException
 146  
      *             If interrupted while waiting for a server to become free or
 147  
      *             the command was cancelled.
 148  
      * @see #put(Server)
 149  
      */
 150  
     public Server take(AbstractCommand command) throws InterruptedException {
 151  3809
         Server server = freeServers.poll();
 152  
 
 153  3809
         if (server == null) {
 154  1991
             synchronized (servers) {
 155  1991
                 if (maxServers == 0) {
 156  0
                     throw new IllegalStateException("Server pool is stopped");
 157  
                 }
 158  1991
                 if (servers.size() < maxServers) {
 159  4
                     server = createServer();
 160  4
                     startServer(server);
 161  4
                     servers.add(server);
 162  
                 }
 163  1991
             }
 164  
 
 165  
             // Already at capacity, wait for a server to become free
 166  1991
             if (server == null) {
 167  1987
                 server = waitForServer(command);
 168  
             }
 169  
         }
 170  
 
 171  3808
         return server;
 172  
     }
 173  
 
 174  
     /**
 175  
      * Block the current thread until a server becomes available.
 176  
      * 
 177  
      * After {@link #WAIT_WARN_MILLIS} a warning logged. After
 178  
      * {@link RepositoryConfiguration#getCommandWaitTimeout()} an error is
 179  
      * logged and an exception is thrown.
 180  
      * 
 181  
      * @param command
 182  
      *            The command
 183  
      * @return Never null
 184  
      * @throws InterruptedException
 185  
      *             If the command is cancelled.
 186  
      */
 187  
     private Server waitForServer(AbstractCommand command)
 188  
             throws InterruptedException {
 189  1987
         boolean warned = false;
 190  1987
         long startedWaitingTime = System.currentTimeMillis();
 191  1987
         long failTimeoutMillis = configuration.getCommandWaitTimeout() * 1000l;
 192  
 
 193  
         // Check for cancellation twice per second
 194  
         // Log if waiting for too long
 195  
         while (true) {
 196  1987
             Server server = freeServers.poll(WAIT_CHECK_MILLIS,
 197  
                     TimeUnit.MILLISECONDS);
 198  
 
 199  1987
             if (command.getState() == State.CANCELING) {
 200  1
                 throw new InterruptedException(
 201  
                         "Command cancelled while waiting for comand server to become available");
 202  
             }
 203  
 
 204  1986
             if (server != null) {
 205  1986
                 return server;
 206  
             }
 207  
 
 208  
             // Check for timeouts
 209  0
             long elapsed = System.currentTimeMillis() - startedWaitingTime;
 210  0
             if (!warned && elapsed > WAIT_WARN_MILLIS) {
 211  0
                 LOG.warn("Waited " + (WAIT_WARN_MILLIS / 1000)
 212  
                         + " seconds for server lock without obtaining it");
 213  0
                 warned = true;
 214  0
             } else if (elapsed > failTimeoutMillis) {
 215  0
                 String msg = "Did not obtain server lock after "
 216  
                         + failTimeoutMillis / 1000 + " seconds.";
 217  0
                 LOG.error(msg);
 218  0
                 throw new RuntimeException(msg);
 219  
             }
 220  0
         }
 221  
     }
 222  
 
 223  
     /**
 224  
      * Return the server to the pool of available servers.
 225  
      * 
 226  
      * @param server
 227  
      *            The server to return
 228  
      * @see #take(AbstractCommand)
 229  
      * @see #abort(Server)
 230  
      */
 231  
     public void put(Server server) {
 232  3787
         Server unusedServer = freeServers.poll();
 233  
 
 234  3787
         if (unusedServer != null) {
 235  0
             stop(unusedServer);
 236  
         }
 237  
 
 238  3787
         freeServers.add(server);
 239  3787
     }
 240  
 
 241  
     /**
 242  
      * Stop the given server because it is in an invalid state and not able to
 243  
      * service requests.
 244  
      * 
 245  
      * @param server
 246  
      *            The server to stop.
 247  
      */
 248  
     void abort(Server server) {
 249  
         try {
 250  18
             LOG.info("Aborting server " + server);
 251  18
             stop(server);
 252  0
         } catch (Throwable t) {
 253  0
             LOG.error("Additional error stopping server", t);
 254  0
             assert false;
 255  18
         }
 256  18
     }
 257  
 
 258  
     /**
 259  
      * Stop the given server and remove it from the list of servers. Assumes not
 260  
      * present in freeServers.
 261  
      * 
 262  
      * @param server
 263  
      *            The server to stop
 264  
      */
 265  
     private void stop(Server server) {
 266  20
         synchronized (servers) {
 267  20
             servers.remove(server);
 268  20
         }
 269  20
         server.stop();
 270  20
     }
 271  
 
 272  
     private void startServer(Server server) {
 273  216
         List<String> extensionArgs = ExtensionManager.getInstance().process(
 274  
                 this.configuration.getExtensionClasses());
 275  216
         Runnable supervisor = null;
 276  216
         if (this.configuration.getServerIdleTime() != Integer.MAX_VALUE) {
 277  2
             supervisor = new ServerSupervisor(server);
 278  
         }
 279  
 
 280  216
         server.start(this.directory, this.configuration.getHgrcPath(),
 281  
                 extensionArgs, this.configuration.getEnvironment(), supervisor);
 282  200
     }
 283  
 
 284  
     private Server createServer() {
 285  216
         Server server = new Server(this.configuration.getHgBin(), encoding);
 286  216
         server.setStderrBufferSize(this.configuration.getStderrBufferSize());
 287  216
         server.setErrorAction(this.configuration.getCodingErrorAction());
 288  216
         server.setEnablePendingChangesets(configuration.isEnablePendingChangesets());
 289  
 
 290  216
         return server;
 291  
     }
 292  
 
 293  
     public HgVersion getHgVersion(Repository repo) {
 294  14
         if (this.hgVersion == null) {
 295  12
             this.hgVersion = VersionCommand.on(repo).execute();
 296  
         }
 297  14
         return this.hgVersion;
 298  
     }
 299  
 
 300  
     @VisibleForTesting
 301  
     public List<Server> getServers() {
 302  24
         return servers;
 303  
     }
 304  
 
 305  
     public int getNumIdleServers() {
 306  5
         return freeServers.size(); 
 307  
     }
 308  
 
 309  
     // inner types
 310  
 
 311  4
     private final class ServerSupervisor implements Runnable {
 312  
         private final Server server;
 313  
 
 314  2
         private ServerSupervisor(Server server) {
 315  2
             this.server = server;
 316  2
         }
 317  
 
 318  
         public void run() {
 319  6
             if ((System.currentTimeMillis() - server
 320  
                     .getLastActiveTime()) > configuration
 321  
                     .getServerIdleTime() * 1000) {
 322  4
                 if (freeServers.remove(server)) {
 323  2
                     new Thread(new Runnable() {
 324  
                         public void run() {
 325  2
                             stop(server);
 326  2
                         }
 327  
                     }).start();
 328  
                 }
 329  
                 // Else the server is running a long command and isn't
 330  
                 // actually idle.
 331  
             }
 332  6
         }
 333  
     }
 334  
 }