1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package com.aragost.javahg.internals;
27
28 import java.io.ByteArrayOutputStream;
29 import java.io.File;
30 import java.io.IOException;
31 import java.io.InputStream;
32 import java.io.OutputStream;
33 import java.nio.BufferOverflowException;
34 import java.nio.ByteBuffer;
35 import java.nio.CharBuffer;
36 import java.nio.charset.CharacterCodingException;
37 import java.nio.charset.Charset;
38 import java.nio.charset.CharsetDecoder;
39 import java.nio.charset.CharsetEncoder;
40 import java.nio.charset.CodingErrorAction;
41 import java.util.ArrayList;
42 import java.util.Arrays;
43 import java.util.Iterator;
44 import java.util.List;
45 import java.util.Map;
46
47 import com.aragost.javahg.MercurialExtension;
48 import com.aragost.javahg.log.Logger;
49 import com.aragost.javahg.log.LoggerFactory;
50 import com.google.common.annotations.VisibleForTesting;
51 import com.google.common.collect.Lists;
52 import com.google.common.io.Closeables;
53
54
55
56
57 public class Server {
58
59 private static final Logger LOG = LoggerFactory.getLogger(Server.class);
60
61
62
63
64
65
66 private enum State {
67 NOT_STARTED, STARTING, RUNNING, STOPPING, STOPPED, CRASHED
68 };
69
70 private static final byte[] RUNCOMMAND = "runcommand\n".getBytes();
71
72
73
74
75
76
77 private final Charset encoding;
78
79
80
81
82
83 private CodingErrorAction errorAction = CodingErrorAction.REPORT;
84
85
86
87
88 private int stderrBufferSize = 1024;
89
90
91
92
93 private String startupStderr = "";
94
95
96
97
98 private Process process;
99
100
101
102
103
104
105
106
107
108 private StderrReader errorReaderThread;
109
110 private State state = State.NOT_STARTED;
111
112
113
114
115
116
117
118
119 private AbstractCommand currentCommand;
120
121
122
123
124
125 private String currentLog;
126
127
128
129
130
131 private long currentStartTime;
132
133
134
135
136
137 private File directory;
138
139
140
141
142 private final String hgBin;
143
144
145
146
147 private volatile long lastActiveTime;
148
149
150
151
152 private boolean enablePendingChangesets = false;
153
154
155
156
157
158
159
160 public Server(String hgBin, Charset encoding) {
161 this.hgBin = hgBin;
162 this.encoding = encoding;
163 }
164
165 public CodingErrorAction getErrorAction() {
166 return errorAction;
167 }
168
169 public void setErrorAction(CodingErrorAction errorAction) {
170 this.errorAction = errorAction;
171 }
172
173 public int getStderrBufferSize() {
174 return stderrBufferSize;
175 }
176
177 public String getStartupStderr() {
178 return startupStderr;
179 }
180
181 public void setEnablePendingChangesets(boolean enablePendingChangesets){
182 this.enablePendingChangesets = enablePendingChangesets;
183 }
184
185 public boolean isEnablePendingChangesets(){
186 return enablePendingChangesets;
187 }
188
189
190
191
192
193
194
195
196
197 public void setStderrBufferSize(int stderrBufferSize) {
198 this.stderrBufferSize = stderrBufferSize;
199 }
200
201
202
203
204 public CharsetDecoder newDecoder() {
205 CharsetDecoder decoder = this.encoding.newDecoder();
206 decoder.onMalformedInput(this.errorAction);
207 decoder.onUnmappableCharacter(this.errorAction);
208 return decoder;
209 }
210
211
212
213
214 public CharsetEncoder newEncoder() {
215 CharsetEncoder encoder = this.encoding.newEncoder();
216 encoder.onMalformedInput(this.errorAction);
217 encoder.onUnmappableCharacter(this.errorAction);
218 return encoder;
219 }
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239 public String start(File directory, final String hgrcPath, List<String> extraArguments, Map<String, String> env, Runnable supervisor) {
240 this.directory = directory.getAbsoluteFile();
241 if (!new File(directory, ".hg").isDirectory()) {
242 throw new IllegalArgumentException("No .hg in " + directory);
243 }
244 String result;
245 try {
246 List<String> args = Lists.newArrayList("serve", "--cmdserver", "pipe", "--config", "ui.interactive=true",
247 "--config", "ui.merge=internal:fail");
248 ArrayList<Class<? extends MercurialExtension>> extList = Lists.newArrayList();
249 extList.add(JavaHgMercurialExtension.class);
250 args.addAll(ExtensionManager.getInstance().process(extList));
251 args.addAll(extraArguments);
252 this.state = State.STARTING;
253 this.process = execHg(this.directory, hgrcPath, args, env);
254
255 active();
256 this.errorReaderThread = new StderrReader(this.process, this.stderrBufferSize, supervisor);
257 this.errorReaderThread.start();
258
259
260
261
262
263 BlockInputStream block = new BlockInputStream(this.process.getInputStream());
264 result = Utils.readStream(block, newDecoder());
265
266
267
268 checkStderr();
269 this.state = State.RUNNING;
270
271 LOG.info("Command server started: {}", this.directory);
272 } catch (Exception e) {
273 verifyServerProcess(e);
274 throw Utils.asRuntime(e);
275 }
276 verifyServerProcess(null);
277 return result;
278 }
279
280 private static final int ERROR_READER_THREAD_TIMEOUT = 5000;
281
282
283
284
285 void stop() {
286 if (this.process == null || this.state == State.STOPPED) {
287 LOG.warn("Trying to stop already stopped server");
288 return;
289 }
290
291 this.state = State.STOPPING;
292 Closeables.closeQuietly(this.process.getOutputStream());
293 Closeables.closeQuietly(this.process.getInputStream());
294 this.errorReaderThread.finish();
295 this.errorReaderThread = null;
296 Closeables.closeQuietly(this.process.getErrorStream());
297
298
299
300 try {
301 this.process.waitFor();
302 } catch (InterruptedException e) {
303 LOG.error("Process for Mercurial server interrupted", e);
304 throw Utils.asRuntime(e);
305 }
306 LOG.info("Command server stopped: {}", this.directory);
307 this.currentCommand = null;
308 this.process = null;
309 this.directory = null;
310 }
311
312 private void checkStderr() {
313 String s = this.errorReaderThread.bufferAsString(newDecoder());
314 if (s.length() > 0) {
315 LOG.error("stderr from Mercurial: {}", s);
316 switch (this.state) {
317 case STARTING:
318 this.startupStderr += s;
319 break;
320 case STOPPING:
321 case CRASHED:
322
323 break;
324 case RUNNING:
325
326
327 stop();
328
329 throw new RuntimeException(s);
330 default:
331 throw new RuntimeException("This should not happen");
332 }
333 }
334 }
335
336
337
338
339
340
341
342
343 void verifyServerProcess(Exception exception) {
344 if (exception instanceof UnexpectedServerTerminationException) {
345
346 throw (UnexpectedServerTerminationException) exception;
347 }
348 if (this.process != null) {
349
350
351
352
353
354
355
356
357
358 int n = 0;
359 if (exception instanceof RuntimeIOException) {
360 exception = ((RuntimeIOException) exception).getIOException();
361 }
362 while (true) {
363 try {
364 int exitValue = this.process.exitValue();
365 this.state = State.CRASHED;
366 checkStderr();
367 String msg = "Server process terminated premature with: " + exitValue;
368 System.err.println("JavaHg: " + msg);
369 this.process = null;
370 throw new UnexpectedServerTerminationException(exitValue, exception);
371 } catch (IllegalThreadStateException e) {
372 if (exception instanceof BlockInputStream.InvalidStreamException
373 || exception instanceof IOException) {
374 if (n++ == 4) {
375 return;
376 }
377 sleep(100);
378 } else {
379 return;
380 }
381
382 }
383 }
384 }
385 }
386
387
388
389
390
391
392
393
394
395
396
397
398
399 public OutputChannelInputStream runCommand(List<String> cmdLine, AbstractCommand command) throws IOException {
400 if (this.currentCommand != null) {
401 throw new IllegalStateException("Trying to execute new command when command already running: "
402 + this.currentCommand);
403 }
404
405 if (LOG.isInfoEnabled()) {
406 StringBuilder buf = new StringBuilder(256);
407
408 for (String s : cmdLine) {
409 buf.append(Utils.obfuscateLoginData(s));
410 buf.append(' ');
411 }
412
413 currentLog = buf.toString();
414 currentStartTime = System.currentTimeMillis();
415 }
416
417 this.currentCommand = command;
418 sendCommand(cmdLine);
419 OutputChannelInputStream stdout = new OutputChannelInputStream(this.process.getInputStream(), this, command);
420 checkStderr();
421 return stdout;
422 }
423
424 void clearCurrentCommand(AbstractCommand cmd) {
425 if (cmd != this.currentCommand) {
426 throw new IllegalStateException("Wrong command");
427 }
428 active();
429 checkStderr();
430 this.currentCommand = null;
431
432 if (LOG.isInfoEnabled()) {
433 LOG.info("runcommand({}ms) {}", System.currentTimeMillis() - currentStartTime,
434 currentLog);
435 }
436 }
437
438 private ByteArrayOutputStream baos = new ByteArrayOutputStream();
439
440 private void sendCommand(List<String> cmdLine) throws IOException {
441 active();
442 baos.reset();
443 CharsetEncoder encoder = newEncoder();
444 encode(cmdLine.get(0), baos, encoder);
445 for (String s : cmdLine.subList(1, cmdLine.size())) {
446 baos.write('\0');
447 encode(s, baos, encoder);
448 }
449
450 OutputStream outputStream = this.process.getOutputStream();
451 try {
452 outputStream.write(RUNCOMMAND);
453 Utils.writeBigEndian(baos.size(), outputStream);
454 baos.writeTo(outputStream);
455 outputStream.flush();
456 } catch (IOException e) {
457 verifyServerProcess(e);
458 throw e;
459 }
460 }
461
462 private Process execHg(File directory, String hgrcPath, List<String> arguments, Map<String, String> env) throws IOException {
463 ArrayList<String> cmdLine = Lists.newArrayList(this.hgBin);
464 cmdLine.addAll(arguments);
465 ProcessBuilder processBuilder = new ProcessBuilder(cmdLine);
466 if (directory != null) {
467 processBuilder.directory(directory);
468 }
469 Map<String, String> environment = processBuilder.environment();
470
471 if (env != null)
472 {
473 for (Iterator<String> it = env.keySet().iterator(); it.hasNext();)
474 {
475 String key = (String)it.next();
476
477 environment.put(key, env.get(key));
478 }
479 }
480
481 environment.put("HGENCODING", this.encoding.displayName());
482 environment.put("HGPLAIN", "1");
483 if (hgrcPath != null) {
484 environment.put("HGRCPATH", hgrcPath);
485 }
486 if (enablePendingChangesets){
487 environment.put("HG_PENDING", directory.getAbsolutePath());
488 }
489 return processBuilder.start();
490 }
491
492
493
494
495
496
497
498
499
500 public void initMecurialRepository(File directory) {
501 execHgCommand(null, "", "init", directory.getAbsolutePath());
502 }
503
504
505
506
507
508
509
510
511
512
513 public void cloneMercurialRepository(File directory, String hgrcPath, String cloneUrl) {
514 execHgCommand(null, hgrcPath, "clone", cloneUrl, directory.getAbsolutePath());
515 }
516
517 private void execHgCommand(File directory, String hgrcPath, String... args) {
518 try {
519 Process process = execHg(directory, hgrcPath, Arrays.asList(args), null);
520 String stderr = Utils.readStream(process.getErrorStream(), newDecoder());
521 Utils.consumeAll(process.getInputStream());
522
523 if (process.waitFor() != 0) {
524 throw new RuntimeException(stderr);
525 }
526 } catch (IOException e) {
527 throw new RuntimeIOException(e);
528 } catch (InterruptedException e) {
529 throw Utils.asRuntime(e);
530 }
531 }
532
533 @Override
534 public String toString() {
535 return "cmdserver@" + this.directory;
536 }
537
538 public void sendLine(String answer) {
539 OutputStream outputStream = this.process.getOutputStream();
540 try {
541 byte[] bytes = (answer + "\n").getBytes(this.encoding.name());
542 Utils.writeBigEndian(bytes.length, outputStream);
543 outputStream.write(bytes);
544 outputStream.flush();
545 } catch (IOException e) {
546 throw new RuntimeIOException(e);
547 }
548 }
549
550
551
552
553
554
555
556
557 private void encode(String s, OutputStream output, CharsetEncoder encoder) throws IOException {
558 ByteBuffer byteBuffer = encoder.encode(CharBuffer.wrap(s));
559 output.write(byteBuffer.array(), byteBuffer.arrayOffset(), byteBuffer.limit());
560 }
561
562
563
564
565 long getLastActiveTime() {
566 return lastActiveTime;
567 }
568
569
570
571
572 private void active() {
573 lastActiveTime = System.currentTimeMillis();
574 }
575
576 private static void sleep(int ms) {
577 try {
578 Thread.sleep(ms);
579 } catch (InterruptedException e) {
580 throw Utils.asRuntime(e);
581 }
582 }
583
584 @Override
585 protected void finalize() {
586 if (this.process != null) {
587 LOG.error("Stopping cmdserver via finalize. Please explicit stop server.");
588 stop();
589 }
590 }
591
592 @VisibleForTesting
593 Process getProcess() {
594 return process;
595 }
596
597
598
599
600
601
602
603
604
605 static class StderrReader extends Thread {
606
607 private static final Logger LOG = LoggerFactory.getLogger(StderrReader.class);
608
609 private final InputStream errorStream;
610
611 private final ByteBuffer stderrBuffer;
612
613 private volatile boolean stop = false;
614
615 private final Runnable supervisor;
616
617 StderrReader(Process process, int bufSize, Runnable supervisor) {
618 super("JavaHg stderr reader");
619 this.errorStream = process.getErrorStream();
620 this.stderrBuffer = ByteBuffer.allocate(bufSize);
621 this.supervisor = supervisor;
622 setDaemon(true);
623 }
624
625
626
627
628 void finish() {
629 this.stop = true;
630 interrupt();
631 try {
632 readAllAvailableFromStderr();
633 } catch (IOException e) {
634 handleIOException(e);
635 }
636 try {
637 join(ERROR_READER_THREAD_TIMEOUT);
638 if (isAlive()) {
639 assert false;
640 throw new RuntimeException("thread is alive. This should not happen");
641 }
642 } catch (InterruptedException e) {
643 assert false;
644 throw new RuntimeException("This should not happen");
645 }
646
647 }
648
649
650
651
652
653
654
655
656 String bufferAsString(CharsetDecoder decoder) {
657 try {
658 readAllAvailableFromStderr();
659 } catch (IOException e) {
660 LOG.warn("Exception trying to read stderr: {}", e.getMessage());
661 return "";
662 }
663
664 ByteBuffer buf = this.stderrBuffer;
665 synchronized (buf) {
666 if (buf.position() > 0) {
667 CharBuffer charBuffer;
668 buf.limit(buf.position());
669 buf.position(0);
670 try {
671 charBuffer = decoder.decode(buf);
672 } catch (CharacterCodingException e) {
673 throw Utils.asRuntime(e);
674 }
675 buf.limit(buf.capacity());
676 String s = new String(charBuffer.array(), charBuffer.arrayOffset(), charBuffer.limit());
677 buf.position(0);
678 return s;
679 } else {
680 return "";
681 }
682 }
683 }
684
685 public void run() {
686 try {
687 while (!this.stop) {
688 readAllAvailableFromStderr();
689 try {
690 Thread.sleep(1000);
691 } catch (InterruptedException e) {
692
693
694 }
695 if (supervisor != null) {
696 supervisor.run();
697 }
698 }
699 } catch (IOException e) {
700 handleIOException(e);
701 }
702 }
703
704 private void handleIOException(IOException e) {
705 String message = e.getMessage();
706 if (message.equals("Bad file descriptor") || message.equals("Stream Closed") || message.equals("Stream closed")) {
707 LOG.warn("errorReaderThread could not read stderr. Most likely the Mercurial server process is dead.");
708 } else {
709 throw new RuntimeIOException(e);
710 }
711 }
712
713 private void readAllAvailableFromStderr() throws IOException {
714 while (this.errorStream.available() > 0) {
715 int b = this.errorStream.read();
716 if (b == -1) {
717
718
719 break;
720 }
721 synchronized (this.stderrBuffer) {
722 try {
723 this.stderrBuffer.put((byte) b);
724 } catch (BufferOverflowException e) {
725
726 }
727 }
728 }
729
730 }
731
732 }
733 }