View Javadoc

1   /*
2    * #%L
3    * JavaHg
4    * %%
5    * Copyright (C) 2011 aragost Trifork ag
6    * %%
7    * Permission is hereby granted, free of charge, to any person obtaining a copy
8    * of this software and associated documentation files (the "Software"), to deal
9    * in the Software without restriction, including without limitation the rights
10   * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11   * copies of the Software, and to permit persons to whom the Software is
12   * furnished to do so, subject to the following conditions:
13   * 
14   * The above copyright notice and this permission notice shall be included in
15   * all copies or substantial portions of the Software.
16   * 
17   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20   * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22   * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23   * THE SOFTWARE.
24   * #L%
25   */
26  package com.aragost.javahg.internals;
27  
28  import java.io.ByteArrayOutputStream;
29  import java.io.File;
30  import java.io.IOException;
31  import java.io.InputStream;
32  import java.io.OutputStream;
33  import java.nio.BufferOverflowException;
34  import java.nio.ByteBuffer;
35  import java.nio.CharBuffer;
36  import java.nio.charset.CharacterCodingException;
37  import java.nio.charset.Charset;
38  import java.nio.charset.CharsetDecoder;
39  import java.nio.charset.CharsetEncoder;
40  import java.nio.charset.CodingErrorAction;
41  import java.util.ArrayList;
42  import java.util.Arrays;
43  import java.util.Iterator;
44  import java.util.List;
45  import java.util.Map;
46  
47  import com.aragost.javahg.MercurialExtension;
48  import com.aragost.javahg.log.Logger;
49  import com.aragost.javahg.log.LoggerFactory;
50  import com.google.common.annotations.VisibleForTesting;
51  import com.google.common.collect.Lists;
52  import com.google.common.io.Closeables;
53  
54  /**
55   * Java class representing a Mercurial commandserver
56   */
57  public class Server {
58  
59      private static final Logger LOG = LoggerFactory.getLogger(Server.class);
60  
61      /**
62       * Enum to represent the state of the server.
63       * <p>
64       * The state will always go from lower ordinal to higher
65       */
66      private enum State {
67          NOT_STARTED, STARTING, RUNNING, STOPPING, STOPPED, CRASHED
68      };
69  
70      private static final byte[] RUNCOMMAND = "runcommand\n".getBytes();
71  
72      /**
73       * The character encoding used for the server. For now it is
74       * always utf-8, in the future there might be an API to set this.
75       * Future: move to ServerPool?
76       */
77      private final Charset encoding;
78  
79      /**
80       * Policy for handling decoding errors.
81       * Future: move to ServerPool?
82       */
83      private CodingErrorAction errorAction = CodingErrorAction.REPORT;
84  
85      /**
86       * Size of buffer to buffer stderr from Mercurial server process
87       */
88      private int stderrBufferSize = 1024;
89  
90      /**
91       * Stderr from Mercurial server process generated duing start up.
92       */
93      private String startupStderr = "";
94  
95      /**
96       * The underlying OS process
97       */
98      private Process process;
99  
100     /**
101      * Thread that read stderr from command server
102      * <p>
103      * In general if the server writes something to stderr (not the
104      * 'e' channel but the actual stderr) it is something seriously
105      * wrong and the server will me stopped and an exception thrown.
106      * During start up of the server messages to stdout is accepted.
107      */
108     private StderrReader errorReaderThread;
109 
110     private State state = State.NOT_STARTED;
111 
112     /**
113      * If non-null the {@link AbstractCommand} the server is currently
114      * executing.
115      * <p>
116      * The server is single treaded and can only execute one command
117      * at a time.
118      */
119     private AbstractCommand currentCommand;
120 
121     /**
122      * If the appropriate log level is enabled and {@link #currentCommand} is
123      * not null then this contains a log message.
124      */
125     private String currentLog;
126 
127     /**
128      * If the appropriate log level is enabled and {@link #currentCommand} is
129      * not null then this contains the start time.
130      */
131     private long currentStartTime;
132 
133     /**
134      * The directory containing the Mercurial repository where the
135      * server is running.
136      */
137     private File directory;
138 
139     /**
140      * Location of the Mercurial binary.
141      */
142     private final String hgBin;
143 
144     /**
145      * Time this server was last active.
146      */
147     private volatile long lastActiveTime;
148 
149     /**
150      * Enable access to pending changesets.
151      */
152     private boolean enablePendingChangesets = false;
153 
154     /**
155      * Create a new Server object
156      * 
157      * @param hgBin
158      *            the Mercurial binary to use
159      */
160     public Server(String hgBin, Charset encoding) {
161         this.hgBin = hgBin;
162         this.encoding = encoding;
163     }
164 
165     public CodingErrorAction getErrorAction() {
166         return errorAction;
167     }
168 
169     public void setErrorAction(CodingErrorAction errorAction) {
170         this.errorAction = errorAction;
171     }
172 
173     public int getStderrBufferSize() {
174         return stderrBufferSize;
175     }
176 
177     public String getStartupStderr() {
178         return startupStderr;
179     }
180 
181     public void setEnablePendingChangesets(boolean enablePendingChangesets){
182       this.enablePendingChangesets = enablePendingChangesets;
183     }
184 
185     public boolean isEnablePendingChangesets(){
186       return enablePendingChangesets;
187     }
188 
189     /**
190      * Set the buffer size for stderr from Mercurial server process.
191      * <p>
192      * There is probably no reason to set this, but it is here to
193      * facilitate testcases for buffer overflow.
194      * 
195      * @param stderrBufferSize
196      */
197     public void setStderrBufferSize(int stderrBufferSize) {
198         this.stderrBufferSize = stderrBufferSize;
199     }
200 
201     /**
202      * @return a new {@link CharsetDecoder} instance for this server.
203      */
204     public CharsetDecoder newDecoder() {
205         CharsetDecoder decoder = this.encoding.newDecoder();
206         decoder.onMalformedInput(this.errorAction);
207         decoder.onUnmappableCharacter(this.errorAction);
208         return decoder;
209     }
210 
211     /**
212      * @return a new {@link CharsetEncoder} instance for this server.
213      */
214     public CharsetEncoder newEncoder() {
215         CharsetEncoder encoder = this.encoding.newEncoder();
216         encoder.onMalformedInput(this.errorAction);
217         encoder.onUnmappableCharacter(this.errorAction);
218         return encoder;
219     }
220 
221     /**
222      * Start the server in the specified directory. The directory is must be the
223      * root directory of a Mercurial repository.
224      * 
225      * @param directory
226      *            The directory of to start the command server in
227      * @param hgrcPath
228      *            The path to the hgrc config file to use. May be null
229      * @param extraArguments
230      *            Additional argument to start the command server with. Eg
231      *            extensions to enable
232      * @param env Optional map of environment variables to set when starting 
233      *            the command server. 
234      * @param supervisor
235      *            Optional function to be executed periodically by the error
236      *            stream thread. May be null
237      * @return The hello message from the server
238      */
239     public String start(File directory, final String hgrcPath, List<String> extraArguments, Map<String, String> env, Runnable supervisor) {
240         this.directory = directory.getAbsoluteFile();
241         if (!new File(directory, ".hg").isDirectory()) {
242             throw new IllegalArgumentException("No .hg in " + directory);
243         }
244         String result;
245         try {
246             List<String> args = Lists.newArrayList("serve", "--cmdserver", "pipe", "--config", "ui.interactive=true",
247                     "--config", "ui.merge=internal:fail");
248             ArrayList<Class<? extends MercurialExtension>> extList = Lists.newArrayList();
249             extList.add(JavaHgMercurialExtension.class);
250             args.addAll(ExtensionManager.getInstance().process(extList));
251             args.addAll(extraArguments);
252             this.state = State.STARTING;
253             this.process = execHg(this.directory, hgrcPath, args, env);
254 
255             active();
256             this.errorReaderThread = new StderrReader(this.process, this.stderrBufferSize, supervisor);
257             this.errorReaderThread.start();
258             // If for some reason the server failed to start the
259             // creation of the BlockInputStream will fail (because the
260             // the stdout stream on the process has been closed). So
261             // before we try to create the BlockInputStream the error
262             // reader thread should be started to print the error
263             BlockInputStream block = new BlockInputStream(this.process.getInputStream());
264             result = Utils.readStream(block, newDecoder());
265             // The hello message from the server has been read and it
266             // is now running normally.
267 
268             checkStderr();
269             this.state = State.RUNNING;
270 
271             LOG.info("Command server started: {}", this.directory);
272         } catch (Exception e) {
273             verifyServerProcess(e);
274             throw Utils.asRuntime(e);
275         }
276         verifyServerProcess(null);
277         return result;
278     }
279 
280     private static final int ERROR_READER_THREAD_TIMEOUT = 5000; // milliseconds
281 
282     /**
283      * Stop the Mercurial server process
284      */
285     void stop() {
286         if (this.process == null || this.state == State.STOPPED) {
287             LOG.warn("Trying to stop already stopped server");
288             return;
289         }
290 
291         this.state = State.STOPPING;
292         Closeables.closeQuietly(this.process.getOutputStream());
293         Closeables.closeQuietly(this.process.getInputStream());
294         this.errorReaderThread.finish();
295         this.errorReaderThread = null;
296         Closeables.closeQuietly(this.process.getErrorStream());
297         // Closing the streams of the process will stop the
298         // process and close the error stream, which will return int
299         // the errorReaderThread to terminate.
300         try {
301             this.process.waitFor();
302         } catch (InterruptedException e) {
303             LOG.error("Process for Mercurial server interrupted", e);
304             throw Utils.asRuntime(e);
305         }
306         LOG.info("Command server stopped: {}", this.directory);
307         this.currentCommand = null;
308         this.process = null;
309         this.directory = null;
310     }
311 
312     private void checkStderr() {
313         String s = this.errorReaderThread.bufferAsString(newDecoder());
314         if (s.length() > 0) {
315             LOG.error("stderr from Mercurial: {}", s);
316             switch (this.state) {
317             case STARTING:
318                 this.startupStderr += s;
319                 break;
320             case STOPPING:
321             case CRASHED:
322                 // stderr is accepted, and logged as error
323                 break;
324             case RUNNING:
325                 // stderr is not accepted, stop server and throw
326                 // exception
327                 stop();
328                 // TODO What exception would be appropiate to throw?
329                 throw new RuntimeException(s);
330             default:
331                 throw new RuntimeException("This should not happen");
332             }
333         }
334     }
335 
336     /**
337      * Verify that the server process hasn't terminated. If it has
338      * throw an {@link UnexpectedServerTerminationException},
339      * otherwise just return normally.
340      * 
341      * @param exception
342      */
343     void verifyServerProcess(Exception exception) {
344         if (exception instanceof UnexpectedServerTerminationException) {
345             // Already the right type
346             throw (UnexpectedServerTerminationException) exception;
347         }
348         if (this.process != null) {
349             // When the process shutdowns there is a small window
350             // where the input stream is closed but the
351             // process is still running. If the input stream is
352             // closed we get a
353             // BlockInputStream.InvalidStreamException
354             // exception. In this case loop a few times with a delay
355             // do see if the process did actually exit.
356             // Similar the is a small window when writing to the
357             // output stream, here we get an IOException
358             int n = 0;
359             if (exception instanceof RuntimeIOException) {
360                 exception = ((RuntimeIOException) exception).getIOException();
361             }
362             while (true) {
363                 try {
364                     int exitValue = this.process.exitValue();
365                     this.state = State.CRASHED;
366                     checkStderr();
367                     String msg = "Server process terminated premature with: " + exitValue;
368                     System.err.println("JavaHg: " + msg);
369                     this.process = null;
370                     throw new UnexpectedServerTerminationException(exitValue, exception);
371                 } catch (IllegalThreadStateException e) {
372                     if (exception instanceof BlockInputStream.InvalidStreamException
373                             || exception instanceof IOException) {
374                         if (n++ == 4) {
375                             return;
376                         }
377                         sleep(100);
378                     } else {
379                         return;
380                     }
381 
382                 }
383             }
384         }
385     }
386 
387     /**
388      * Run the specified command and return a stream with the content
389      * of the output channel.
390      * <p>
391      * The client <em>must</em> empty the return stream. The server
392      * will not accept other commands until all out is read.
393      * 
394      * @param cmdLine
395      * @param command
396      * @return the standard output from the command.
397      * @throws IOException
398      */
399     public OutputChannelInputStream runCommand(List<String> cmdLine, AbstractCommand command) throws IOException {
400         if (this.currentCommand != null) {
401             throw new IllegalStateException("Trying to execute new command when command already running: "
402                     + this.currentCommand);
403         }
404 
405         if (LOG.isInfoEnabled()) {
406             StringBuilder buf = new StringBuilder(256);
407 
408             for (String s : cmdLine) {
409                 buf.append(Utils.obfuscateLoginData(s));
410                 buf.append(' ');
411             }
412 
413             currentLog = buf.toString(); 
414             currentStartTime = System.currentTimeMillis(); 
415         }
416         
417         this.currentCommand = command;
418         sendCommand(cmdLine);
419         OutputChannelInputStream stdout = new OutputChannelInputStream(this.process.getInputStream(), this, command);
420         checkStderr();
421         return stdout;
422     }
423 
424     void clearCurrentCommand(AbstractCommand cmd) {
425         if (cmd != this.currentCommand) {
426             throw new IllegalStateException("Wrong command");
427         }
428         active();
429         checkStderr();
430         this.currentCommand = null;
431 
432         if (LOG.isInfoEnabled()) {
433             LOG.info("runcommand({}ms) {}", System.currentTimeMillis() - currentStartTime,
434                     currentLog);
435         }
436     }
437 
438     private ByteArrayOutputStream baos = new ByteArrayOutputStream();
439 
440     private void sendCommand(List<String> cmdLine) throws IOException {
441         active();
442         baos.reset();
443         CharsetEncoder encoder = newEncoder();
444         encode(cmdLine.get(0), baos, encoder);
445         for (String s : cmdLine.subList(1, cmdLine.size())) {
446             baos.write('\0');
447             encode(s, baos, encoder);
448         }
449 
450         OutputStream outputStream = this.process.getOutputStream();
451         try {
452             outputStream.write(RUNCOMMAND);
453             Utils.writeBigEndian(baos.size(), outputStream);
454             baos.writeTo(outputStream);
455             outputStream.flush();
456         } catch (IOException e) {
457             verifyServerProcess(e);
458             throw e;
459         }
460     }
461 
462     private Process execHg(File directory, String hgrcPath, List<String> arguments, Map<String, String> env) throws IOException {
463         ArrayList<String> cmdLine = Lists.newArrayList(this.hgBin);
464         cmdLine.addAll(arguments);
465         ProcessBuilder processBuilder = new ProcessBuilder(cmdLine);
466         if (directory != null) {
467             processBuilder.directory(directory);
468         }
469         Map<String, String> environment = processBuilder.environment();
470 
471         if (env != null)
472         {
473             for (Iterator<String> it = env.keySet().iterator(); it.hasNext();)
474             {
475                 String key = (String)it.next();
476 
477                 environment.put(key, env.get(key));
478             }
479         }
480 
481         environment.put("HGENCODING", this.encoding.displayName());
482         environment.put("HGPLAIN", "1");
483         if (hgrcPath != null) {
484             environment.put("HGRCPATH", hgrcPath);
485         }
486         if (enablePendingChangesets){
487           environment.put("HG_PENDING", directory.getAbsolutePath());
488         }
489         return processBuilder.start();
490     }
491 
492     /**
493      * Convenience method to initialize a mercurial repository in a
494      * directory.
495      * <p>
496      * This method is not using any commandserver functionality
497      * 
498      * @param directory
499      */
500     public void initMecurialRepository(File directory) {
501         execHgCommand(null, "", "init", directory.getAbsolutePath());
502     }
503 
504     /**
505      * Convenience method to clone a mercurial repository in a directory.
506      * <p>
507      * This method is not using any commandserver functionality
508      *
509      * @param directory
510      * @param hgrcPath
511      * @param cloneUrl
512      */
513     public void cloneMercurialRepository(File directory, String hgrcPath, String cloneUrl) {
514       execHgCommand(null, hgrcPath, "clone", cloneUrl, directory.getAbsolutePath());
515     }
516 
517     private void execHgCommand(File directory, String hgrcPath, String... args) {
518         try {
519             Process process = execHg(directory, hgrcPath, Arrays.asList(args), null);
520             String stderr = Utils.readStream(process.getErrorStream(), newDecoder());
521             Utils.consumeAll(process.getInputStream());
522 
523             if (process.waitFor() != 0) {
524                 throw new RuntimeException(stderr);
525             }
526         } catch (IOException e) {
527             throw new RuntimeIOException(e);
528         } catch (InterruptedException e) {
529             throw Utils.asRuntime(e);
530         }
531     }
532 
533     @Override
534     public String toString() {
535         return "cmdserver@" + this.directory;
536     }
537 
538     public void sendLine(String answer) {
539         OutputStream outputStream = this.process.getOutputStream();
540         try {
541             byte[] bytes = (answer + "\n").getBytes(this.encoding.name());
542             Utils.writeBigEndian(bytes.length, outputStream);
543             outputStream.write(bytes);
544             outputStream.flush();
545         } catch (IOException e) {
546             throw new RuntimeIOException(e);
547         }
548     }
549 
550     /**
551      * Encode the string and write it to the OutputStream
552      * 
553      * @param s
554      * @param output
555      * @throws IOException
556      */
557     private void encode(String s, OutputStream output, CharsetEncoder encoder) throws IOException {
558         ByteBuffer byteBuffer = encoder.encode(CharBuffer.wrap(s));
559         output.write(byteBuffer.array(), byteBuffer.arrayOffset(), byteBuffer.limit());
560     }
561 
562     /**
563      * @return The last time this server started or finished executing a command
564      */
565     long getLastActiveTime() {
566         return lastActiveTime;
567     }
568 
569     /**
570      * Call when this server is active
571      */
572     private void active() {
573         lastActiveTime = System.currentTimeMillis();
574     }
575 
576     private static void sleep(int ms) {
577         try {
578             Thread.sleep(ms);
579         } catch (InterruptedException e) {
580             throw Utils.asRuntime(e);
581         }
582     }
583 
584     @Override
585     protected void finalize() {
586         if (this.process != null) {
587             LOG.error("Stopping cmdserver via finalize. Please explicit stop server.");
588             stop();
589         }
590     }
591 
592     @VisibleForTesting
593     Process getProcess() {
594         return process;
595     }
596 
597     /**
598      * Thread to read stderr from the Mercurial server process. The
599      * stderr is written to a {@link ByteBuffer} that is then process
600      * from the main thread.
601      * <p>
602      * If the buffer is full before being processed then the extra
603      * data from stderr is discarded silently.
604      */
605     static class StderrReader extends Thread {
606 
607         private static final Logger LOG = LoggerFactory.getLogger(StderrReader.class);
608 
609         private final InputStream errorStream;
610 
611         private final ByteBuffer stderrBuffer;
612 
613         private volatile boolean stop = false;
614 
615         private final Runnable supervisor;
616 
617         StderrReader(Process process, int bufSize, Runnable supervisor) {
618             super("JavaHg stderr reader");
619             this.errorStream = process.getErrorStream();
620             this.stderrBuffer = ByteBuffer.allocate(bufSize);
621             this.supervisor = supervisor;
622             setDaemon(true);
623         }
624 
625         /**
626          * Finish this thread and read all available stderr data.
627          */
628         void finish() {
629             this.stop = true;
630             interrupt();
631             try {
632                 readAllAvailableFromStderr();
633             } catch (IOException e) {
634                 handleIOException(e);
635             }
636             try {
637                 join(ERROR_READER_THREAD_TIMEOUT);
638                 if (isAlive()) {
639                     assert false;
640                     throw new RuntimeException("thread is alive. This should not happen");
641                 }
642             } catch (InterruptedException e) {
643                 assert false;
644                 throw new RuntimeException("This should not happen");
645             }
646 
647         }
648 
649         /**
650          * Reset the buffer and return the content as a String
651          * 
652          * @param decoder
653          *            used to decode the bytes as a String.
654          * @return the bytes converted to a String
655          */
656         String bufferAsString(CharsetDecoder decoder) {
657             try {
658                 readAllAvailableFromStderr();
659             } catch (IOException e) {
660                 LOG.warn("Exception trying to read stderr: {}", e.getMessage());
661                 return "";
662             }
663 
664             ByteBuffer buf = this.stderrBuffer;
665             synchronized (buf) {
666                 if (buf.position() > 0) {
667                     CharBuffer charBuffer;
668                     buf.limit(buf.position());
669                     buf.position(0);
670                     try {
671                         charBuffer = decoder.decode(buf);
672                     } catch (CharacterCodingException e) {
673                         throw Utils.asRuntime(e);
674                     }
675                     buf.limit(buf.capacity());
676                     String s = new String(charBuffer.array(), charBuffer.arrayOffset(), charBuffer.limit());
677                     buf.position(0);
678                     return s;
679                 } else {
680                     return "";
681                 }
682             }
683         }
684 
685         public void run() {
686             try {
687                 while (!this.stop) {
688                     readAllAvailableFromStderr();
689                     try {
690                         Thread.sleep(1000);
691                     } catch (InterruptedException e) {
692                         // When stopping server the stderr thread is
693                         // interrupted
694                     }
695                     if (supervisor != null) {
696                         supervisor.run();
697                     }
698                 }
699             } catch (IOException e) {
700                 handleIOException(e);
701             }
702         }
703 
704         private void handleIOException(IOException e) {
705             String message = e.getMessage();
706             if (message.equals("Bad file descriptor") || message.equals("Stream Closed") || message.equals("Stream closed")) {
707                 LOG.warn("errorReaderThread could not read stderr. Most likely the Mercurial server process is dead.");
708             } else {
709                 throw new RuntimeIOException(e);
710             }
711         }
712 
713         private void readAllAvailableFromStderr() throws IOException {
714             while (this.errorStream.available() > 0) {
715                 int b = this.errorStream.read();
716                 if (b == -1) {
717                     // The stream is at eof, the process must
718                     // have exited, so stop the thread.
719                     break;
720                 }
721                 synchronized (this.stderrBuffer) {
722                     try {
723                         this.stderrBuffer.put((byte) b);
724                     } catch (BufferOverflowException e) {
725                         // just ignore
726                     }
727                 }
728             }
729 
730         }
731 
732     }
733 }