1 package com.aragost.javahg.internals;
2
3 import java.io.File;
4 import java.nio.charset.Charset;
5 import java.nio.charset.CharsetDecoder;
6 import java.nio.charset.CodingErrorAction;
7 import java.util.List;
8 import java.util.concurrent.BlockingQueue;
9 import java.util.concurrent.LinkedBlockingQueue;
10 import java.util.concurrent.TimeUnit;
11
12 import com.aragost.javahg.HgVersion;
13 import com.aragost.javahg.Repository;
14 import com.aragost.javahg.RepositoryConfiguration;
15 import com.aragost.javahg.commands.VersionCommand;
16 import com.aragost.javahg.internals.AbstractCommand.State;
17 import com.aragost.javahg.log.Logger;
18 import com.aragost.javahg.log.LoggerFactory;
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.collect.Lists;
21
22
23
24
25
26
27
28
29 public class ServerPool {
30
31 private static final Logger LOG = LoggerFactory.getLogger(ServerPool.class);
32
33
34
35
36
37 private static final long WAIT_CHECK_MILLIS = 500;
38
39
40
41
42
43 private static final int WAIT_WARN_MILLIS = 10 * 1000;
44
45
46
47
48 private final Charset encoding;
49
50
51
52
53 private int refCount;
54
55
56
57
58 private final BlockingQueue<Server> freeServers = new LinkedBlockingQueue<Server>();
59
60
61
62
63 private int maxServers;
64
65
66
67
68 private List<Server> servers = Lists.newArrayList();
69
70
71
72
73 private HgVersion hgVersion;
74
75 private final RepositoryConfiguration configuration;
76
77
78
79
80 private final File directory;
81
82 public ServerPool(RepositoryConfiguration conf, File directory,
83 boolean performInit, String cloneUrl) {
84 this.maxServers = Math.max(1, conf.getConcurrency());
85 this.configuration = conf;
86 this.directory = directory;
87 this.encoding = conf.getEncoding();
88
89 Server server = createServer();
90 if (performInit) {
91 server.initMecurialRepository(directory);
92 } else if (cloneUrl != null) {
93 server.cloneMercurialRepository(directory, conf.getHgrcPath(),
94 cloneUrl);
95 }
96 startServer(server);
97 freeServers.add(server);
98 servers.add(server);
99 }
100
101
102
103
104 public void incrementRefCount() {
105 this.refCount++;
106 }
107
108
109
110
111 public void decrementRefCount() {
112 this.refCount--;
113 if (this.refCount == 0) {
114 stop();
115 }
116 }
117
118 private void stop() {
119 synchronized (servers) {
120 maxServers = 0;
121 for (Server server : servers) {
122 server.stop();
123 }
124 servers.clear();
125 }
126 }
127
128 public CharsetDecoder newDecoder() {
129 CodingErrorAction errorAction = this.configuration
130 .getCodingErrorAction();
131
132 CharsetDecoder decoder = this.encoding.newDecoder();
133 decoder.onMalformedInput(errorAction);
134 decoder.onUnmappableCharacter(errorAction);
135 return decoder;
136 }
137
138
139
140
141
142
143
144
145
146
147
148
149
150 public Server take(AbstractCommand command) throws InterruptedException {
151 Server server = freeServers.poll();
152
153 if (server == null) {
154 synchronized (servers) {
155 if (maxServers == 0) {
156 throw new IllegalStateException("Server pool is stopped");
157 }
158 if (servers.size() < maxServers) {
159 server = createServer();
160 startServer(server);
161 servers.add(server);
162 }
163 }
164
165
166 if (server == null) {
167 server = waitForServer(command);
168 }
169 }
170
171 return server;
172 }
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187 private Server waitForServer(AbstractCommand command)
188 throws InterruptedException {
189 boolean warned = false;
190 long startedWaitingTime = System.currentTimeMillis();
191 long failTimeoutMillis = configuration.getCommandWaitTimeout() * 1000l;
192
193
194
195 while (true) {
196 Server server = freeServers.poll(WAIT_CHECK_MILLIS,
197 TimeUnit.MILLISECONDS);
198
199 if (command.getState() == State.CANCELING) {
200 throw new InterruptedException(
201 "Command cancelled while waiting for comand server to become available");
202 }
203
204 if (server != null) {
205 return server;
206 }
207
208
209 long elapsed = System.currentTimeMillis() - startedWaitingTime;
210 if (!warned && elapsed > WAIT_WARN_MILLIS) {
211 LOG.warn("Waited " + (WAIT_WARN_MILLIS / 1000)
212 + " seconds for server lock without obtaining it");
213 warned = true;
214 } else if (elapsed > failTimeoutMillis) {
215 String msg = "Did not obtain server lock after "
216 + failTimeoutMillis / 1000 + " seconds.";
217 LOG.error(msg);
218 throw new RuntimeException(msg);
219 }
220 }
221 }
222
223
224
225
226
227
228
229
230
231 public void put(Server server) {
232 Server unusedServer = freeServers.poll();
233
234 if (unusedServer != null) {
235 stop(unusedServer);
236 }
237
238 freeServers.add(server);
239 }
240
241
242
243
244
245
246
247
248 void abort(Server server) {
249 try {
250 LOG.info("Aborting server " + server);
251 stop(server);
252 } catch (Throwable t) {
253 LOG.error("Additional error stopping server", t);
254 assert false;
255 }
256 }
257
258
259
260
261
262
263
264
265 private void stop(Server server) {
266 synchronized (servers) {
267 servers.remove(server);
268 }
269 server.stop();
270 }
271
272 private void startServer(Server server) {
273 List<String> extensionArgs = ExtensionManager.getInstance().process(
274 this.configuration.getExtensionClasses());
275 Runnable supervisor = null;
276 if (this.configuration.getServerIdleTime() != Integer.MAX_VALUE) {
277 supervisor = new ServerSupervisor(server);
278 }
279
280 server.start(this.directory, this.configuration.getHgrcPath(),
281 extensionArgs, this.configuration.getEnvironment(), supervisor);
282 }
283
284 private Server createServer() {
285 Server server = new Server(this.configuration.getHgBin(), encoding);
286 server.setStderrBufferSize(this.configuration.getStderrBufferSize());
287 server.setErrorAction(this.configuration.getCodingErrorAction());
288 server.setEnablePendingChangesets(configuration.isEnablePendingChangesets());
289
290 return server;
291 }
292
293 public HgVersion getHgVersion(Repository repo) {
294 if (this.hgVersion == null) {
295 this.hgVersion = VersionCommand.on(repo).execute();
296 }
297 return this.hgVersion;
298 }
299
300 @VisibleForTesting
301 public List<Server> getServers() {
302 return servers;
303 }
304
305 public int getNumIdleServers() {
306 return freeServers.size();
307 }
308
309
310
311 private final class ServerSupervisor implements Runnable {
312 private final Server server;
313
314 private ServerSupervisor(Server server) {
315 this.server = server;
316 }
317
318 public void run() {
319 if ((System.currentTimeMillis() - server
320 .getLastActiveTime()) > configuration
321 .getServerIdleTime() * 1000) {
322 if (freeServers.remove(server)) {
323 new Thread(new Runnable() {
324 public void run() {
325 stop(server);
326 }
327 }).start();
328 }
329
330
331 }
332 }
333 }
334 }