View Javadoc

1   package com.aragost.javahg.internals;
2   
3   import java.io.File;
4   import java.nio.charset.Charset;
5   import java.nio.charset.CharsetDecoder;
6   import java.nio.charset.CodingErrorAction;
7   import java.util.List;
8   import java.util.concurrent.BlockingQueue;
9   import java.util.concurrent.LinkedBlockingQueue;
10  import java.util.concurrent.TimeUnit;
11  
12  import com.aragost.javahg.HgVersion;
13  import com.aragost.javahg.Repository;
14  import com.aragost.javahg.RepositoryConfiguration;
15  import com.aragost.javahg.commands.VersionCommand;
16  import com.aragost.javahg.internals.AbstractCommand.State;
17  import com.aragost.javahg.log.Logger;
18  import com.aragost.javahg.log.LoggerFactory;
19  import com.google.common.annotations.VisibleForTesting;
20  import com.google.common.collect.Lists;
21  
22  /**
23   * A pool of Server instances. Use {@link #take(AbstractCommand)} and
24   * {@link #put(Server)}.
25   * 
26   * Contains up to {@link #maxServers} servers. When the maximum number of
27   * servers are running commands are queued and queued commands may be cancelled.
28   */
29  public class ServerPool {
30  
31      private static final Logger LOG = LoggerFactory.getLogger(ServerPool.class);
32  
33      /**
34       * Period at which to check if a command waiting for a server has been
35       * cancelled.
36       */
37      private static final long WAIT_CHECK_MILLIS = 500;
38  
39      /**
40       * If a server can't be obtained with in this number of seconds a warning is
41       * written to the log
42       */
43      private static final int WAIT_WARN_MILLIS = 10 * 1000;
44  
45      /**
46       * The character encoding used for the server.
47       */
48      private final Charset encoding;
49  
50      /**
51       * The number of {@link Repository} instances referencing this pool
52       */
53      private int refCount;
54  
55      /**
56       * Used as a stack so that hot servers are used more often.
57       */
58      private final BlockingQueue<Server> freeServers = new LinkedBlockingQueue<Server>();
59  
60      /**
61       * The maximum number of servers to use
62       */
63      private int maxServers;
64  
65      /**
66       * All the currently running servers.
67       */
68      private List<Server> servers = Lists.newArrayList();
69  
70      /**
71       * The version of the underlying Mercurial. It is lazy initialized.
72       */
73      private HgVersion hgVersion;
74  
75      private final RepositoryConfiguration configuration;
76  
77      /**
78       * Mercurial repository directory
79       */
80      private final File directory;
81  
82      public ServerPool(RepositoryConfiguration conf, File directory,
83              boolean performInit, String cloneUrl) {
84          this.maxServers = Math.max(1, conf.getConcurrency());
85          this.configuration = conf;
86          this.directory = directory;
87          this.encoding = conf.getEncoding();
88  
89          Server server = createServer();
90          if (performInit) {
91              server.initMecurialRepository(directory);
92          } else if (cloneUrl != null) {
93              server.cloneMercurialRepository(directory, conf.getHgrcPath(),
94                      cloneUrl);
95          }
96          startServer(server);
97          freeServers.add(server);
98          servers.add(server);
99      }
100 
101     /**
102      * Increment the refCount for this server pool.
103      */
104     public void incrementRefCount() {
105         this.refCount++;
106     }
107 
108     /**
109      * Decrement the refCount. If it reaches 0 then the server pool is stopped.
110      */
111     public void decrementRefCount() {
112         this.refCount--;
113         if (this.refCount == 0) {
114             stop();
115         }
116     }
117 
118     private void stop() {
119         synchronized (servers) {
120             maxServers = 0;
121             for (Server server : servers) {
122                 server.stop();
123             }
124             servers.clear();
125         }
126     }
127 
128     public CharsetDecoder newDecoder() {
129         CodingErrorAction errorAction = this.configuration
130                 .getCodingErrorAction();
131 
132         CharsetDecoder decoder = this.encoding.newDecoder();
133         decoder.onMalformedInput(errorAction);
134         decoder.onUnmappableCharacter(errorAction);
135         return decoder;
136     }
137 
138     /**
139      * Get a server. If there are fewer than {@link #maxServers} a new server is
140      * started. If no servers are available the thread blocks until there is a
141      * server available. Caller must call {@link #put(Server)} after command is
142      * completed.
143      * 
144      * @return The next available server
145      * @throws InterruptedException
146      *             If interrupted while waiting for a server to become free or
147      *             the command was cancelled.
148      * @see #put(Server)
149      */
150     public Server take(AbstractCommand command) throws InterruptedException {
151         Server server = freeServers.poll();
152 
153         if (server == null) {
154             synchronized (servers) {
155                 if (maxServers == 0) {
156                     throw new IllegalStateException("Server pool is stopped");
157                 }
158                 if (servers.size() < maxServers) {
159                     server = createServer();
160                     startServer(server);
161                     servers.add(server);
162                 }
163             }
164 
165             // Already at capacity, wait for a server to become free
166             if (server == null) {
167                 server = waitForServer(command);
168             }
169         }
170 
171         return server;
172     }
173 
174     /**
175      * Block the current thread until a server becomes available.
176      * 
177      * After {@link #WAIT_WARN_MILLIS} a warning logged. After
178      * {@link RepositoryConfiguration#getCommandWaitTimeout()} an error is
179      * logged and an exception is thrown.
180      * 
181      * @param command
182      *            The command
183      * @return Never null
184      * @throws InterruptedException
185      *             If the command is cancelled.
186      */
187     private Server waitForServer(AbstractCommand command)
188             throws InterruptedException {
189         boolean warned = false;
190         long startedWaitingTime = System.currentTimeMillis();
191         long failTimeoutMillis = configuration.getCommandWaitTimeout() * 1000l;
192 
193         // Check for cancellation twice per second
194         // Log if waiting for too long
195         while (true) {
196             Server server = freeServers.poll(WAIT_CHECK_MILLIS,
197                     TimeUnit.MILLISECONDS);
198 
199             if (command.getState() == State.CANCELING) {
200                 throw new InterruptedException(
201                         "Command cancelled while waiting for comand server to become available");
202             }
203 
204             if (server != null) {
205                 return server;
206             }
207 
208             // Check for timeouts
209             long elapsed = System.currentTimeMillis() - startedWaitingTime;
210             if (!warned && elapsed > WAIT_WARN_MILLIS) {
211                 LOG.warn("Waited " + (WAIT_WARN_MILLIS / 1000)
212                         + " seconds for server lock without obtaining it");
213                 warned = true;
214             } else if (elapsed > failTimeoutMillis) {
215                 String msg = "Did not obtain server lock after "
216                         + failTimeoutMillis / 1000 + " seconds.";
217                 LOG.error(msg);
218                 throw new RuntimeException(msg);
219             }
220         }
221     }
222 
223     /**
224      * Return the server to the pool of available servers.
225      * 
226      * @param server
227      *            The server to return
228      * @see #take(AbstractCommand)
229      * @see #abort(Server)
230      */
231     public void put(Server server) {
232         Server unusedServer = freeServers.poll();
233 
234         if (unusedServer != null) {
235             stop(unusedServer);
236         }
237 
238         freeServers.add(server);
239     }
240 
241     /**
242      * Stop the given server because it is in an invalid state and not able to
243      * service requests.
244      * 
245      * @param server
246      *            The server to stop.
247      */
248     void abort(Server server) {
249         try {
250             LOG.info("Aborting server " + server);
251             stop(server);
252         } catch (Throwable t) {
253             LOG.error("Additional error stopping server", t);
254             assert false;
255         }
256     }
257 
258     /**
259      * Stop the given server and remove it from the list of servers. Assumes not
260      * present in freeServers.
261      * 
262      * @param server
263      *            The server to stop
264      */
265     private void stop(Server server) {
266         synchronized (servers) {
267             servers.remove(server);
268         }
269         server.stop();
270     }
271 
272     private void startServer(Server server) {
273         List<String> extensionArgs = ExtensionManager.getInstance().process(
274                 this.configuration.getExtensionClasses());
275         Runnable supervisor = null;
276         if (this.configuration.getServerIdleTime() != Integer.MAX_VALUE) {
277             supervisor = new ServerSupervisor(server);
278         }
279 
280         server.start(this.directory, this.configuration.getHgrcPath(),
281                 extensionArgs, this.configuration.getEnvironment(), supervisor);
282     }
283 
284     private Server createServer() {
285         Server server = new Server(this.configuration.getHgBin(), encoding);
286         server.setStderrBufferSize(this.configuration.getStderrBufferSize());
287         server.setErrorAction(this.configuration.getCodingErrorAction());
288         server.setEnablePendingChangesets(configuration.isEnablePendingChangesets());
289 
290         return server;
291     }
292 
293     public HgVersion getHgVersion(Repository repo) {
294         if (this.hgVersion == null) {
295             this.hgVersion = VersionCommand.on(repo).execute();
296         }
297         return this.hgVersion;
298     }
299 
300     @VisibleForTesting
301     public List<Server> getServers() {
302         return servers;
303     }
304 
305     public int getNumIdleServers() {
306         return freeServers.size(); 
307     }
308 
309     // inner types
310 
311     private final class ServerSupervisor implements Runnable {
312         private final Server server;
313 
314         private ServerSupervisor(Server server) {
315             this.server = server;
316         }
317 
318         public void run() {
319             if ((System.currentTimeMillis() - server
320                     .getLastActiveTime()) > configuration
321                     .getServerIdleTime() * 1000) {
322                 if (freeServers.remove(server)) {
323                     new Thread(new Runnable() {
324                         public void run() {
325                             stop(server);
326                         }
327                     }).start();
328                 }
329                 // Else the server is running a long command and isn't
330                 // actually idle.
331             }
332         }
333     }
334 }