1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  package com.aragost.javahg.internals;
27  
28  import java.io.ByteArrayOutputStream;
29  import java.io.File;
30  import java.io.IOException;
31  import java.io.InputStream;
32  import java.io.OutputStream;
33  import java.nio.BufferOverflowException;
34  import java.nio.ByteBuffer;
35  import java.nio.CharBuffer;
36  import java.nio.charset.CharacterCodingException;
37  import java.nio.charset.Charset;
38  import java.nio.charset.CharsetDecoder;
39  import java.nio.charset.CharsetEncoder;
40  import java.nio.charset.CodingErrorAction;
41  import java.util.ArrayList;
42  import java.util.Arrays;
43  import java.util.Iterator;
44  import java.util.List;
45  import java.util.Map;
46  
47  import com.aragost.javahg.MercurialExtension;
48  import com.aragost.javahg.log.Logger;
49  import com.aragost.javahg.log.LoggerFactory;
50  import com.google.common.annotations.VisibleForTesting;
51  import com.google.common.collect.Lists;
52  import com.google.common.io.Closeables;
53  
54  
55  
56  
57  public class Server {
58  
59      private static final Logger LOG = LoggerFactory.getLogger(Server.class);
60  
61      
62  
63  
64  
65  
66      private enum State {
67          NOT_STARTED, STARTING, RUNNING, STOPPING, STOPPED, CRASHED
68      };
69  
70      private static final byte[] RUNCOMMAND = "runcommand\n".getBytes();
71  
72      
73  
74  
75  
76  
77      private final Charset encoding;
78  
79      
80  
81  
82  
83      private CodingErrorAction errorAction = CodingErrorAction.REPORT;
84  
85      
86  
87  
88      private int stderrBufferSize = 1024;
89  
90      
91  
92  
93      private String startupStderr = "";
94  
95      
96  
97  
98      private Process process;
99  
100     
101 
102 
103 
104 
105 
106 
107 
108     private StderrReader errorReaderThread;
109 
110     private State state = State.NOT_STARTED;
111 
112     
113 
114 
115 
116 
117 
118 
119     private AbstractCommand currentCommand;
120 
121     
122 
123 
124 
125     private String currentLog;
126 
127     
128 
129 
130 
131     private long currentStartTime;
132 
133     
134 
135 
136 
137     private File directory;
138 
139     
140 
141 
142     private final String hgBin;
143 
144     
145 
146 
147     private volatile long lastActiveTime;
148 
149     
150 
151 
152     private boolean enablePendingChangesets = false;
153 
154     
155 
156 
157 
158 
159 
160     public Server(String hgBin, Charset encoding) {
161         this.hgBin = hgBin;
162         this.encoding = encoding;
163     }
164 
165     public CodingErrorAction getErrorAction() {
166         return errorAction;
167     }
168 
169     public void setErrorAction(CodingErrorAction errorAction) {
170         this.errorAction = errorAction;
171     }
172 
173     public int getStderrBufferSize() {
174         return stderrBufferSize;
175     }
176 
177     public String getStartupStderr() {
178         return startupStderr;
179     }
180 
181     public void setEnablePendingChangesets(boolean enablePendingChangesets){
182       this.enablePendingChangesets = enablePendingChangesets;
183     }
184 
185     public boolean isEnablePendingChangesets(){
186       return enablePendingChangesets;
187     }
188 
189     
190 
191 
192 
193 
194 
195 
196 
197     public void setStderrBufferSize(int stderrBufferSize) {
198         this.stderrBufferSize = stderrBufferSize;
199     }
200 
201     
202 
203 
204     public CharsetDecoder newDecoder() {
205         CharsetDecoder decoder = this.encoding.newDecoder();
206         decoder.onMalformedInput(this.errorAction);
207         decoder.onUnmappableCharacter(this.errorAction);
208         return decoder;
209     }
210 
211     
212 
213 
214     public CharsetEncoder newEncoder() {
215         CharsetEncoder encoder = this.encoding.newEncoder();
216         encoder.onMalformedInput(this.errorAction);
217         encoder.onUnmappableCharacter(this.errorAction);
218         return encoder;
219     }
220 
221     
222 
223 
224 
225 
226 
227 
228 
229 
230 
231 
232 
233 
234 
235 
236 
237 
238 
239     public String start(File directory, final String hgrcPath, List<String> extraArguments, Map<String, String> env, Runnable supervisor) {
240         this.directory = directory.getAbsoluteFile();
241         if (!new File(directory, ".hg").isDirectory()) {
242             throw new IllegalArgumentException("No .hg in " + directory);
243         }
244         String result;
245         try {
246             List<String> args = Lists.newArrayList("serve", "--cmdserver", "pipe", "--config", "ui.interactive=true",
247                     "--config", "ui.merge=internal:fail");
248             ArrayList<Class<? extends MercurialExtension>> extList = Lists.newArrayList();
249             extList.add(JavaHgMercurialExtension.class);
250             args.addAll(ExtensionManager.getInstance().process(extList));
251             args.addAll(extraArguments);
252             this.state = State.STARTING;
253             this.process = execHg(this.directory, hgrcPath, args, env);
254 
255             active();
256             this.errorReaderThread = new StderrReader(this.process, this.stderrBufferSize, supervisor);
257             this.errorReaderThread.start();
258             
259             
260             
261             
262             
263             BlockInputStream block = new BlockInputStream(this.process.getInputStream());
264             result = Utils.readStream(block, newDecoder());
265             
266             
267 
268             checkStderr();
269             this.state = State.RUNNING;
270 
271             LOG.info("Command server started: {}", this.directory);
272         } catch (Exception e) {
273             verifyServerProcess(e);
274             throw Utils.asRuntime(e);
275         }
276         verifyServerProcess(null);
277         return result;
278     }
279 
280     private static final int ERROR_READER_THREAD_TIMEOUT = 5000; 
281 
282     
283 
284 
285     void stop() {
286         if (this.process == null || this.state == State.STOPPED) {
287             LOG.warn("Trying to stop already stopped server");
288             return;
289         }
290 
291         this.state = State.STOPPING;
292         Closeables.closeQuietly(this.process.getOutputStream());
293         Closeables.closeQuietly(this.process.getInputStream());
294         this.errorReaderThread.finish();
295         this.errorReaderThread = null;
296         Closeables.closeQuietly(this.process.getErrorStream());
297         
298         
299         
300         try {
301             this.process.waitFor();
302         } catch (InterruptedException e) {
303             LOG.error("Process for Mercurial server interrupted", e);
304             throw Utils.asRuntime(e);
305         }
306         LOG.info("Command server stopped: {}", this.directory);
307         this.currentCommand = null;
308         this.process = null;
309         this.directory = null;
310     }
311 
312     private void checkStderr() {
313         String s = this.errorReaderThread.bufferAsString(newDecoder());
314         if (s.length() > 0) {
315             LOG.error("stderr from Mercurial: {}", s);
316             switch (this.state) {
317             case STARTING:
318                 this.startupStderr += s;
319                 break;
320             case STOPPING:
321             case CRASHED:
322                 
323                 break;
324             case RUNNING:
325                 
326                 
327                 stop();
328                 
329                 throw new RuntimeException(s);
330             default:
331                 throw new RuntimeException("This should not happen");
332             }
333         }
334     }
335 
336     
337 
338 
339 
340 
341 
342 
343     void verifyServerProcess(Exception exception) {
344         if (exception instanceof UnexpectedServerTerminationException) {
345             
346             throw (UnexpectedServerTerminationException) exception;
347         }
348         if (this.process != null) {
349             
350             
351             
352             
353             
354             
355             
356             
357             
358             int n = 0;
359             if (exception instanceof RuntimeIOException) {
360                 exception = ((RuntimeIOException) exception).getIOException();
361             }
362             while (true) {
363                 try {
364                     int exitValue = this.process.exitValue();
365                     this.state = State.CRASHED;
366                     checkStderr();
367                     String msg = "Server process terminated premature with: " + exitValue;
368                     System.err.println("JavaHg: " + msg);
369                     this.process = null;
370                     throw new UnexpectedServerTerminationException(exitValue, exception);
371                 } catch (IllegalThreadStateException e) {
372                     if (exception instanceof BlockInputStream.InvalidStreamException
373                             || exception instanceof IOException) {
374                         if (n++ == 4) {
375                             return;
376                         }
377                         sleep(100);
378                     } else {
379                         return;
380                     }
381 
382                 }
383             }
384         }
385     }
386 
387     
388 
389 
390 
391 
392 
393 
394 
395 
396 
397 
398 
399     public OutputChannelInputStream runCommand(List<String> cmdLine, AbstractCommand command) throws IOException {
400         if (this.currentCommand != null) {
401             throw new IllegalStateException("Trying to execute new command when command already running: "
402                     + this.currentCommand);
403         }
404 
405         if (LOG.isInfoEnabled()) {
406             StringBuilder buf = new StringBuilder(256);
407 
408             for (String s : cmdLine) {
409                 buf.append(Utils.obfuscateLoginData(s));
410                 buf.append(' ');
411             }
412 
413             currentLog = buf.toString(); 
414             currentStartTime = System.currentTimeMillis(); 
415         }
416         
417         this.currentCommand = command;
418         sendCommand(cmdLine);
419         OutputChannelInputStream stdout = new OutputChannelInputStream(this.process.getInputStream(), this, command);
420         checkStderr();
421         return stdout;
422     }
423 
424     void clearCurrentCommand(AbstractCommand cmd) {
425         if (cmd != this.currentCommand) {
426             throw new IllegalStateException("Wrong command");
427         }
428         active();
429         checkStderr();
430         this.currentCommand = null;
431 
432         if (LOG.isInfoEnabled()) {
433             LOG.info("runcommand({}ms) {}", System.currentTimeMillis() - currentStartTime,
434                     currentLog);
435         }
436     }
437 
438     private ByteArrayOutputStream baos = new ByteArrayOutputStream();
439 
440     private void sendCommand(List<String> cmdLine) throws IOException {
441         active();
442         baos.reset();
443         CharsetEncoder encoder = newEncoder();
444         encode(cmdLine.get(0), baos, encoder);
445         for (String s : cmdLine.subList(1, cmdLine.size())) {
446             baos.write('\0');
447             encode(s, baos, encoder);
448         }
449 
450         OutputStream outputStream = this.process.getOutputStream();
451         try {
452             outputStream.write(RUNCOMMAND);
453             Utils.writeBigEndian(baos.size(), outputStream);
454             baos.writeTo(outputStream);
455             outputStream.flush();
456         } catch (IOException e) {
457             verifyServerProcess(e);
458             throw e;
459         }
460     }
461 
462     private Process execHg(File directory, String hgrcPath, List<String> arguments, Map<String, String> env) throws IOException {
463         ArrayList<String> cmdLine = Lists.newArrayList(this.hgBin);
464         cmdLine.addAll(arguments);
465         ProcessBuilder processBuilder = new ProcessBuilder(cmdLine);
466         if (directory != null) {
467             processBuilder.directory(directory);
468         }
469         Map<String, String> environment = processBuilder.environment();
470 
471         if (env != null)
472         {
473             for (Iterator<String> it = env.keySet().iterator(); it.hasNext();)
474             {
475                 String key = (String)it.next();
476 
477                 environment.put(key, env.get(key));
478             }
479         }
480 
481         environment.put("HGENCODING", this.encoding.displayName());
482         environment.put("HGPLAIN", "1");
483         if (hgrcPath != null) {
484             environment.put("HGRCPATH", hgrcPath);
485         }
486         if (enablePendingChangesets){
487           environment.put("HG_PENDING", directory.getAbsolutePath());
488         }
489         return processBuilder.start();
490     }
491 
492     
493 
494 
495 
496 
497 
498 
499 
500     public void initMecurialRepository(File directory) {
501         execHgCommand(null, "", "init", directory.getAbsolutePath());
502     }
503 
504     
505 
506 
507 
508 
509 
510 
511 
512 
513     public void cloneMercurialRepository(File directory, String hgrcPath, String cloneUrl) {
514       execHgCommand(null, hgrcPath, "clone", cloneUrl, directory.getAbsolutePath());
515     }
516 
517     private void execHgCommand(File directory, String hgrcPath, String... args) {
518         try {
519             Process process = execHg(directory, hgrcPath, Arrays.asList(args), null);
520             String stderr = Utils.readStream(process.getErrorStream(), newDecoder());
521             Utils.consumeAll(process.getInputStream());
522 
523             if (process.waitFor() != 0) {
524                 throw new RuntimeException(stderr);
525             }
526         } catch (IOException e) {
527             throw new RuntimeIOException(e);
528         } catch (InterruptedException e) {
529             throw Utils.asRuntime(e);
530         }
531     }
532 
533     @Override
534     public String toString() {
535         return "cmdserver@" + this.directory;
536     }
537 
538     public void sendLine(String answer) {
539         OutputStream outputStream = this.process.getOutputStream();
540         try {
541             byte[] bytes = (answer + "\n").getBytes(this.encoding.name());
542             Utils.writeBigEndian(bytes.length, outputStream);
543             outputStream.write(bytes);
544             outputStream.flush();
545         } catch (IOException e) {
546             throw new RuntimeIOException(e);
547         }
548     }
549 
550     
551 
552 
553 
554 
555 
556 
557     private void encode(String s, OutputStream output, CharsetEncoder encoder) throws IOException {
558         ByteBuffer byteBuffer = encoder.encode(CharBuffer.wrap(s));
559         output.write(byteBuffer.array(), byteBuffer.arrayOffset(), byteBuffer.limit());
560     }
561 
562     
563 
564 
565     long getLastActiveTime() {
566         return lastActiveTime;
567     }
568 
569     
570 
571 
572     private void active() {
573         lastActiveTime = System.currentTimeMillis();
574     }
575 
576     private static void sleep(int ms) {
577         try {
578             Thread.sleep(ms);
579         } catch (InterruptedException e) {
580             throw Utils.asRuntime(e);
581         }
582     }
583 
584     @Override
585     protected void finalize() {
586         if (this.process != null) {
587             LOG.error("Stopping cmdserver via finalize. Please explicit stop server.");
588             stop();
589         }
590     }
591 
592     @VisibleForTesting
593     Process getProcess() {
594         return process;
595     }
596 
597     
598 
599 
600 
601 
602 
603 
604 
605     static class StderrReader extends Thread {
606 
607         private static final Logger LOG = LoggerFactory.getLogger(StderrReader.class);
608 
609         private final InputStream errorStream;
610 
611         private final ByteBuffer stderrBuffer;
612 
613         private volatile boolean stop = false;
614 
615         private final Runnable supervisor;
616 
617         StderrReader(Process process, int bufSize, Runnable supervisor) {
618             super("JavaHg stderr reader");
619             this.errorStream = process.getErrorStream();
620             this.stderrBuffer = ByteBuffer.allocate(bufSize);
621             this.supervisor = supervisor;
622             setDaemon(true);
623         }
624 
625         
626 
627 
628         void finish() {
629             this.stop = true;
630             interrupt();
631             try {
632                 readAllAvailableFromStderr();
633             } catch (IOException e) {
634                 handleIOException(e);
635             }
636             try {
637                 join(ERROR_READER_THREAD_TIMEOUT);
638                 if (isAlive()) {
639                     assert false;
640                     throw new RuntimeException("thread is alive. This should not happen");
641                 }
642             } catch (InterruptedException e) {
643                 assert false;
644                 throw new RuntimeException("This should not happen");
645             }
646 
647         }
648 
649         
650 
651 
652 
653 
654 
655 
656         String bufferAsString(CharsetDecoder decoder) {
657             try {
658                 readAllAvailableFromStderr();
659             } catch (IOException e) {
660                 LOG.warn("Exception trying to read stderr: {}", e.getMessage());
661                 return "";
662             }
663 
664             ByteBuffer buf = this.stderrBuffer;
665             synchronized (buf) {
666                 if (buf.position() > 0) {
667                     CharBuffer charBuffer;
668                     buf.limit(buf.position());
669                     buf.position(0);
670                     try {
671                         charBuffer = decoder.decode(buf);
672                     } catch (CharacterCodingException e) {
673                         throw Utils.asRuntime(e);
674                     }
675                     buf.limit(buf.capacity());
676                     String s = new String(charBuffer.array(), charBuffer.arrayOffset(), charBuffer.limit());
677                     buf.position(0);
678                     return s;
679                 } else {
680                     return "";
681                 }
682             }
683         }
684 
685         public void run() {
686             try {
687                 while (!this.stop) {
688                     readAllAvailableFromStderr();
689                     try {
690                         Thread.sleep(1000);
691                     } catch (InterruptedException e) {
692                         
693                         
694                     }
695                     if (supervisor != null) {
696                         supervisor.run();
697                     }
698                 }
699             } catch (IOException e) {
700                 handleIOException(e);
701             }
702         }
703 
704         private void handleIOException(IOException e) {
705             String message = e.getMessage();
706             if (message.equals("Bad file descriptor") || message.equals("Stream Closed") || message.equals("Stream closed")) {
707                 LOG.warn("errorReaderThread could not read stderr. Most likely the Mercurial server process is dead.");
708             } else {
709                 throw new RuntimeIOException(e);
710             }
711         }
712 
713         private void readAllAvailableFromStderr() throws IOException {
714             while (this.errorStream.available() > 0) {
715                 int b = this.errorStream.read();
716                 if (b == -1) {
717                     
718                     
719                     break;
720                 }
721                 synchronized (this.stderrBuffer) {
722                     try {
723                         this.stderrBuffer.put((byte) b);
724                     } catch (BufferOverflowException e) {
725                         
726                     }
727                 }
728             }
729 
730         }
731 
732     }
733 }