Coverage Report - com.aragost.javahg.internals.Server
 
Classes in this File Line Coverage Branch Coverage Complexity
Server
83%
169/203
72%
37/51
3.143
Server$1
100%
1/1
N/A
3.143
Server$State
100%
2/2
N/A
3.143
Server$StderrReader
85%
61/71
50%
11/22
3.143
 
 1  
 /*
 2  
  * #%L
 3  
  * JavaHg
 4  
  * %%
 5  
  * Copyright (C) 2011 aragost Trifork ag
 6  
  * %%
 7  
  * Permission is hereby granted, free of charge, to any person obtaining a copy
 8  
  * of this software and associated documentation files (the "Software"), to deal
 9  
  * in the Software without restriction, including without limitation the rights
 10  
  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 11  
  * copies of the Software, and to permit persons to whom the Software is
 12  
  * furnished to do so, subject to the following conditions:
 13  
  * 
 14  
  * The above copyright notice and this permission notice shall be included in
 15  
  * all copies or substantial portions of the Software.
 16  
  * 
 17  
  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 18  
  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 19  
  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 20  
  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 21  
  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 22  
  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 23  
  * THE SOFTWARE.
 24  
  * #L%
 25  
  */
 26  
 package com.aragost.javahg.internals;
 27  
 
 28  
 import java.io.ByteArrayOutputStream;
 29  
 import java.io.File;
 30  
 import java.io.IOException;
 31  
 import java.io.InputStream;
 32  
 import java.io.OutputStream;
 33  
 import java.nio.BufferOverflowException;
 34  
 import java.nio.ByteBuffer;
 35  
 import java.nio.CharBuffer;
 36  
 import java.nio.charset.CharacterCodingException;
 37  
 import java.nio.charset.Charset;
 38  
 import java.nio.charset.CharsetDecoder;
 39  
 import java.nio.charset.CharsetEncoder;
 40  
 import java.nio.charset.CodingErrorAction;
 41  
 import java.util.ArrayList;
 42  
 import java.util.Arrays;
 43  
 import java.util.Iterator;
 44  
 import java.util.List;
 45  
 import java.util.Map;
 46  
 
 47  
 import com.aragost.javahg.MercurialExtension;
 48  
 import com.aragost.javahg.log.Logger;
 49  
 import com.aragost.javahg.log.LoggerFactory;
 50  
 import com.google.common.annotations.VisibleForTesting;
 51  
 import com.google.common.collect.Lists;
 52  
 import com.google.common.io.Closeables;
 53  
 
 54  
 /**
 55  
  * Java class representing a Mercurial commandserver
 56  
  */
 57  
 public class Server {
 58  
 
 59  1
     private static final Logger LOG = LoggerFactory.getLogger(Server.class);
 60  
 
 61  
     /**
 62  
      * Enum to represent the state of the server.
 63  
      * <p>
 64  
      * The state will always go from lower ordinal to higher
 65  
      */
 66  8
     private enum State {
 67  1
         NOT_STARTED, STARTING, RUNNING, STOPPING, STOPPED, CRASHED
 68  
     };
 69  
 
 70  1
     private static final byte[] RUNCOMMAND = "runcommand\n".getBytes();
 71  
 
 72  
     /**
 73  
      * The character encoding used for the server. For now it is
 74  
      * always utf-8, in the future there might be an API to set this.
 75  
      * Future: move to ServerPool?
 76  
      */
 77  
     private final Charset encoding;
 78  
 
 79  
     /**
 80  
      * Policy for handling decoding errors.
 81  
      * Future: move to ServerPool?
 82  
      */
 83  222
     private CodingErrorAction errorAction = CodingErrorAction.REPORT;
 84  
 
 85  
     /**
 86  
      * Size of buffer to buffer stderr from Mercurial server process
 87  
      */
 88  222
     private int stderrBufferSize = 1024;
 89  
 
 90  
     /**
 91  
      * Stderr from Mercurial server process generated duing start up.
 92  
      */
 93  222
     private String startupStderr = "";
 94  
 
 95  
     /**
 96  
      * The underlying OS process
 97  
      */
 98  
     private Process process;
 99  
 
 100  
     /**
 101  
      * Thread that read stderr from command server
 102  
      * <p>
 103  
      * In general if the server writes something to stderr (not the
 104  
      * 'e' channel but the actual stderr) it is something seriously
 105  
      * wrong and the server will me stopped and an exception thrown.
 106  
      * During start up of the server messages to stdout is accepted.
 107  
      */
 108  
     private StderrReader errorReaderThread;
 109  
 
 110  222
     private State state = State.NOT_STARTED;
 111  
 
 112  
     /**
 113  
      * If non-null the {@link AbstractCommand} the server is currently
 114  
      * executing.
 115  
      * <p>
 116  
      * The server is single treaded and can only execute one command
 117  
      * at a time.
 118  
      */
 119  
     private AbstractCommand currentCommand;
 120  
 
 121  
     /**
 122  
      * If the appropriate log level is enabled and {@link #currentCommand} is
 123  
      * not null then this contains a log message.
 124  
      */
 125  
     private String currentLog;
 126  
 
 127  
     /**
 128  
      * If the appropriate log level is enabled and {@link #currentCommand} is
 129  
      * not null then this contains the start time.
 130  
      */
 131  
     private long currentStartTime;
 132  
 
 133  
     /**
 134  
      * The directory containing the Mercurial repository where the
 135  
      * server is running.
 136  
      */
 137  
     private File directory;
 138  
 
 139  
     /**
 140  
      * Location of the Mercurial binary.
 141  
      */
 142  
     private final String hgBin;
 143  
 
 144  
     /**
 145  
      * Time this server was last active.
 146  
      */
 147  
     private volatile long lastActiveTime;
 148  
 
 149  
     /**
 150  
      * Enable access to pending changesets.
 151  
      */
 152  222
     private boolean enablePendingChangesets = false;
 153  
 
 154  
     /**
 155  
      * Create a new Server object
 156  
      * 
 157  
      * @param hgBin
 158  
      *            the Mercurial binary to use
 159  
      */
 160  222
     public Server(String hgBin, Charset encoding) {
 161  222
         this.hgBin = hgBin;
 162  222
         this.encoding = encoding;
 163  222
     }
 164  
 
 165  
     public CodingErrorAction getErrorAction() {
 166  0
         return errorAction;
 167  
     }
 168  
 
 169  
     public void setErrorAction(CodingErrorAction errorAction) {
 170  216
         this.errorAction = errorAction;
 171  216
     }
 172  
 
 173  
     public int getStderrBufferSize() {
 174  0
         return stderrBufferSize;
 175  
     }
 176  
 
 177  
     public String getStartupStderr() {
 178  1
         return startupStderr;
 179  
     }
 180  
 
 181  
     public void setEnablePendingChangesets(boolean enablePendingChangesets){
 182  216
       this.enablePendingChangesets = enablePendingChangesets;
 183  216
     }
 184  
 
 185  
     public boolean isEnablePendingChangesets(){
 186  0
       return enablePendingChangesets;
 187  
     }
 188  
 
 189  
     /**
 190  
      * Set the buffer size for stderr from Mercurial server process.
 191  
      * <p>
 192  
      * There is probably no reason to set this, but it is here to
 193  
      * facilitate testcases for buffer overflow.
 194  
      * 
 195  
      * @param stderrBufferSize
 196  
      */
 197  
     public void setStderrBufferSize(int stderrBufferSize) {
 198  216
         this.stderrBufferSize = stderrBufferSize;
 199  216
     }
 200  
 
 201  
     /**
 202  
      * @return a new {@link CharsetDecoder} instance for this server.
 203  
      */
 204  
     public CharsetDecoder newDecoder() {
 205  8214
         CharsetDecoder decoder = this.encoding.newDecoder();
 206  8214
         decoder.onMalformedInput(this.errorAction);
 207  8214
         decoder.onUnmappableCharacter(this.errorAction);
 208  8214
         return decoder;
 209  
     }
 210  
 
 211  
     /**
 212  
      * @return a new {@link CharsetEncoder} instance for this server.
 213  
      */
 214  
     public CharsetEncoder newEncoder() {
 215  3809
         CharsetEncoder encoder = this.encoding.newEncoder();
 216  3809
         encoder.onMalformedInput(this.errorAction);
 217  3809
         encoder.onUnmappableCharacter(this.errorAction);
 218  3809
         return encoder;
 219  
     }
 220  
 
 221  
     /**
 222  
      * Start the server in the specified directory. The directory is must be the
 223  
      * root directory of a Mercurial repository.
 224  
      * 
 225  
      * @param directory
 226  
      *            The directory of to start the command server in
 227  
      * @param hgrcPath
 228  
      *            The path to the hgrc config file to use. May be null
 229  
      * @param extraArguments
 230  
      *            Additional argument to start the command server with. Eg
 231  
      *            extensions to enable
 232  
      * @param env Optional map of environment variables to set when starting 
 233  
      *            the command server. 
 234  
      * @param supervisor
 235  
      *            Optional function to be executed periodically by the error
 236  
      *            stream thread. May be null
 237  
      * @return The hello message from the server
 238  
      */
 239  
     public String start(File directory, final String hgrcPath, List<String> extraArguments, Map<String, String> env, Runnable supervisor) {
 240  218
         this.directory = directory.getAbsoluteFile();
 241  218
         if (!new File(directory, ".hg").isDirectory()) {
 242  1
             throw new IllegalArgumentException("No .hg in " + directory);
 243  
         }
 244  
         String result;
 245  
         try {
 246  217
             List<String> args = Lists.newArrayList("serve", "--cmdserver", "pipe", "--config", "ui.interactive=true",
 247  
                     "--config", "ui.merge=internal:fail");
 248  217
             ArrayList<Class<? extends MercurialExtension>> extList = Lists.newArrayList();
 249  217
             extList.add(JavaHgMercurialExtension.class);
 250  217
             args.addAll(ExtensionManager.getInstance().process(extList));
 251  217
             args.addAll(extraArguments);
 252  217
             this.state = State.STARTING;
 253  217
             this.process = execHg(this.directory, hgrcPath, args, env);
 254  
 
 255  217
             active();
 256  217
             this.errorReaderThread = new StderrReader(this.process, this.stderrBufferSize, supervisor);
 257  217
             this.errorReaderThread.start();
 258  
             // If for some reason the server failed to start the
 259  
             // creation of the BlockInputStream will fail (because the
 260  
             // the stdout stream on the process has been closed). So
 261  
             // before we try to create the BlockInputStream the error
 262  
             // reader thread should be started to print the error
 263  217
             BlockInputStream block = new BlockInputStream(this.process.getInputStream());
 264  202
             result = Utils.readStream(block, newDecoder());
 265  
             // The hello message from the server has been read and it
 266  
             // is now running normally.
 267  
 
 268  202
             checkStderr();
 269  202
             this.state = State.RUNNING;
 270  
 
 271  202
             LOG.info("Command server started: {}", this.directory);
 272  15
         } catch (Exception e) {
 273  15
             verifyServerProcess(e);
 274  0
             throw Utils.asRuntime(e);
 275  202
         }
 276  202
         verifyServerProcess(null);
 277  202
         return result;
 278  
     }
 279  
 
 280  
     private static final int ERROR_READER_THREAD_TIMEOUT = 5000; // milliseconds
 281  
 
 282  
     /**
 283  
      * Stop the Mercurial server process
 284  
      */
 285  
     void stop() {
 286  203
         if (this.process == null || this.state == State.STOPPED) {
 287  16
             LOG.warn("Trying to stop already stopped server");
 288  16
             return;
 289  
         }
 290  
 
 291  187
         this.state = State.STOPPING;
 292  187
         Closeables.closeQuietly(this.process.getOutputStream());
 293  187
         Closeables.closeQuietly(this.process.getInputStream());
 294  187
         this.errorReaderThread.finish();
 295  187
         this.errorReaderThread = null;
 296  187
         Closeables.closeQuietly(this.process.getErrorStream());
 297  
         // Closing the streams of the process will stop the
 298  
         // process and close the error stream, which will return int
 299  
         // the errorReaderThread to terminate.
 300  
         try {
 301  187
             this.process.waitFor();
 302  0
         } catch (InterruptedException e) {
 303  0
             LOG.error("Process for Mercurial server interrupted", e);
 304  0
             throw Utils.asRuntime(e);
 305  187
         }
 306  187
         LOG.info("Command server stopped: {}", this.directory);
 307  187
         this.currentCommand = null;
 308  187
         this.process = null;
 309  187
         this.directory = null;
 310  187
     }
 311  
 
 312  
     private void checkStderr() {
 313  7799
         String s = this.errorReaderThread.bufferAsString(newDecoder());
 314  7799
         if (s.length() > 0) {
 315  19
             LOG.error("stderr from Mercurial: {}", s);
 316  1
             switch (this.state) {
 317  
             case STARTING:
 318  1
                 this.startupStderr += s;
 319  1
                 break;
 320  
             case STOPPING:
 321  
             case CRASHED:
 322  
                 // stderr is accepted, and logged as error
 323  16
                 break;
 324  
             case RUNNING:
 325  
                 // stderr is not accepted, stop server and throw
 326  
                 // exception
 327  2
                 stop();
 328  
                 // TODO What exception would be appropiate to throw?
 329  2
                 throw new RuntimeException(s);
 330  
             default:
 331  0
                 throw new RuntimeException("This should not happen");
 332  
             }
 333  
         }
 334  7797
     }
 335  
 
 336  
     /**
 337  
      * Verify that the server process hasn't terminated. If it has
 338  
      * throw an {@link UnexpectedServerTerminationException},
 339  
      * otherwise just return normally.
 340  
      * 
 341  
      * @param exception
 342  
      */
 343  
     void verifyServerProcess(Exception exception) {
 344  253
         if (exception instanceof UnexpectedServerTerminationException) {
 345  
             // Already the right type
 346  0
             throw (UnexpectedServerTerminationException) exception;
 347  
         }
 348  253
         if (this.process != null) {
 349  
             // When the process shutdowns there is a small window
 350  
             // where the input stream is closed but the
 351  
             // process is still running. If the input stream is
 352  
             // closed we get a
 353  
             // BlockInputStream.InvalidStreamException
 354  
             // exception. In this case loop a few times with a delay
 355  
             // do see if the process did actually exit.
 356  
             // Similar the is a small window when writing to the
 357  
             // output stream, here we get an IOException
 358  249
             int n = 0;
 359  249
             if (exception instanceof RuntimeIOException) {
 360  0
                 exception = ((RuntimeIOException) exception).getIOException();
 361  
             }
 362  
             while (true) {
 363  
                 try {
 364  268
                     int exitValue = this.process.exitValue();
 365  29
                     this.state = State.CRASHED;
 366  29
                     checkStderr();
 367  29
                     String msg = "Server process terminated premature with: " + exitValue;
 368  29
                     System.err.println("JavaHg: " + msg);
 369  29
                     this.process = null;
 370  29
                     throw new UnexpectedServerTerminationException(exitValue, exception);
 371  239
                 } catch (IllegalThreadStateException e) {
 372  239
                     if (exception instanceof BlockInputStream.InvalidStreamException
 373  
                             || exception instanceof IOException) {
 374  19
                         if (n++ == 4) {
 375  0
                             return;
 376  
                         }
 377  19
                         sleep(100);
 378  
                     } else {
 379  220
                         return;
 380  
                     }
 381  
 
 382  19
                 }
 383  
             }
 384  
         }
 385  4
     }
 386  
 
 387  
     /**
 388  
      * Run the specified command and return a stream with the content
 389  
      * of the output channel.
 390  
      * <p>
 391  
      * The client <em>must</em> empty the return stream. The server
 392  
      * will not accept other commands until all out is read.
 393  
      * 
 394  
      * @param cmdLine
 395  
      * @param command
 396  
      * @return the standard output from the command.
 397  
      * @throws IOException
 398  
      */
 399  
     public OutputChannelInputStream runCommand(List<String> cmdLine, AbstractCommand command) throws IOException {
 400  3810
         if (this.currentCommand != null) {
 401  1
             throw new IllegalStateException("Trying to execute new command when command already running: "
 402  
                     + this.currentCommand);
 403  
         }
 404  
 
 405  3809
         if (LOG.isInfoEnabled()) {
 406  0
             StringBuilder buf = new StringBuilder(256);
 407  
 
 408  0
             for (String s : cmdLine) {
 409  0
                 buf.append(Utils.obfuscateLoginData(s));
 410  0
                 buf.append(' ');
 411  
             }
 412  
 
 413  0
             currentLog = buf.toString(); 
 414  0
             currentStartTime = System.currentTimeMillis(); 
 415  
         }
 416  
         
 417  3809
         this.currentCommand = command;
 418  3809
         sendCommand(cmdLine);
 419  3808
         OutputChannelInputStream stdout = new OutputChannelInputStream(this.process.getInputStream(), this, command);
 420  3779
         checkStderr();
 421  3779
         return stdout;
 422  
     }
 423  
 
 424  
     void clearCurrentCommand(AbstractCommand cmd) {
 425  3789
         if (cmd != this.currentCommand) {
 426  0
             throw new IllegalStateException("Wrong command");
 427  
         }
 428  3789
         active();
 429  3789
         checkStderr();
 430  3787
         this.currentCommand = null;
 431  
 
 432  3787
         if (LOG.isInfoEnabled()) {
 433  0
             LOG.info("runcommand({}ms) {}", System.currentTimeMillis() - currentStartTime,
 434  
                     currentLog);
 435  
         }
 436  3787
     }
 437  
 
 438  222
     private ByteArrayOutputStream baos = new ByteArrayOutputStream();
 439  
 
 440  
     private void sendCommand(List<String> cmdLine) throws IOException {
 441  3809
         active();
 442  3809
         baos.reset();
 443  3809
         CharsetEncoder encoder = newEncoder();
 444  3809
         encode(cmdLine.get(0), baos, encoder);
 445  3809
         for (String s : cmdLine.subList(1, cmdLine.size())) {
 446  12069
             baos.write('\0');
 447  12069
             encode(s, baos, encoder);
 448  
         }
 449  
 
 450  3809
         OutputStream outputStream = this.process.getOutputStream();
 451  
         try {
 452  3809
             outputStream.write(RUNCOMMAND);
 453  3809
             Utils.writeBigEndian(baos.size(), outputStream);
 454  3809
             baos.writeTo(outputStream);
 455  3809
             outputStream.flush();
 456  1
         } catch (IOException e) {
 457  1
             verifyServerProcess(e);
 458  0
             throw e;
 459  3808
         }
 460  3808
     }
 461  
 
 462  
     private Process execHg(File directory, String hgrcPath, List<String> arguments, Map<String, String> env) throws IOException {
 463  430
         ArrayList<String> cmdLine = Lists.newArrayList(this.hgBin);
 464  430
         cmdLine.addAll(arguments);
 465  430
         ProcessBuilder processBuilder = new ProcessBuilder(cmdLine);
 466  430
         if (directory != null) {
 467  217
             processBuilder.directory(directory);
 468  
         }
 469  430
         Map<String, String> environment = processBuilder.environment();
 470  
 
 471  430
         if (env != null)
 472  
         {
 473  215
             for (Iterator<String> it = env.keySet().iterator(); it.hasNext();)
 474  
             {
 475  0
                 String key = (String)it.next();
 476  
 
 477  0
                 environment.put(key, env.get(key));
 478  0
             }
 479  
         }
 480  
 
 481  430
         environment.put("HGENCODING", this.encoding.displayName());
 482  430
         environment.put("HGPLAIN", "1");
 483  430
         if (hgrcPath != null) {
 484  428
             environment.put("HGRCPATH", hgrcPath);
 485  
         }
 486  430
         if (enablePendingChangesets){
 487  0
           environment.put("HG_PENDING", directory.getAbsolutePath());
 488  
         }
 489  430
         return processBuilder.start();
 490  
     }
 491  
 
 492  
     /**
 493  
      * Convenience method to initialize a mercurial repository in a
 494  
      * directory.
 495  
      * <p>
 496  
      * This method is not using any commandserver functionality
 497  
      * 
 498  
      * @param directory
 499  
      */
 500  
     public void initMecurialRepository(File directory) {
 501  207
         execHgCommand(null, "", "init", directory.getAbsolutePath());
 502  207
     }
 503  
 
 504  
     /**
 505  
      * Convenience method to clone a mercurial repository in a directory.
 506  
      * <p>
 507  
      * This method is not using any commandserver functionality
 508  
      *
 509  
      * @param directory
 510  
      * @param hgrcPath
 511  
      * @param cloneUrl
 512  
      */
 513  
     public void cloneMercurialRepository(File directory, String hgrcPath, String cloneUrl) {
 514  6
       execHgCommand(null, hgrcPath, "clone", cloneUrl, directory.getAbsolutePath());
 515  5
     }
 516  
 
 517  
     private void execHgCommand(File directory, String hgrcPath, String... args) {
 518  
         try {
 519  213
             Process process = execHg(directory, hgrcPath, Arrays.asList(args), null);
 520  213
             String stderr = Utils.readStream(process.getErrorStream(), newDecoder());
 521  213
             Utils.consumeAll(process.getInputStream());
 522  
 
 523  213
             if (process.waitFor() != 0) {
 524  1
                 throw new RuntimeException(stderr);
 525  
             }
 526  0
         } catch (IOException e) {
 527  0
             throw new RuntimeIOException(e);
 528  0
         } catch (InterruptedException e) {
 529  0
             throw Utils.asRuntime(e);
 530  212
         }
 531  212
     }
 532  
 
 533  
     @Override
 534  
     public String toString() {
 535  18
         return "cmdserver@" + this.directory;
 536  
     }
 537  
 
 538  
     public void sendLine(String answer) {
 539  3
         OutputStream outputStream = this.process.getOutputStream();
 540  
         try {
 541  3
             byte[] bytes = (answer + "\n").getBytes(this.encoding.name());
 542  3
             Utils.writeBigEndian(bytes.length, outputStream);
 543  3
             outputStream.write(bytes);
 544  3
             outputStream.flush();
 545  0
         } catch (IOException e) {
 546  0
             throw new RuntimeIOException(e);
 547  3
         }
 548  3
     }
 549  
 
 550  
     /**
 551  
      * Encode the string and write it to the OutputStream
 552  
      * 
 553  
      * @param s
 554  
      * @param output
 555  
      * @throws IOException
 556  
      */
 557  
     private void encode(String s, OutputStream output, CharsetEncoder encoder) throws IOException {
 558  15878
         ByteBuffer byteBuffer = encoder.encode(CharBuffer.wrap(s));
 559  15878
         output.write(byteBuffer.array(), byteBuffer.arrayOffset(), byteBuffer.limit());
 560  15878
     }
 561  
 
 562  
     /**
 563  
      * @return The last time this server started or finished executing a command
 564  
      */
 565  
     long getLastActiveTime() {
 566  6
         return lastActiveTime;
 567  
     }
 568  
 
 569  
     /**
 570  
      * Call when this server is active
 571  
      */
 572  
     private void active() {
 573  7815
         lastActiveTime = System.currentTimeMillis();
 574  7815
     }
 575  
 
 576  
     private static void sleep(int ms) {
 577  
         try {
 578  19
             Thread.sleep(ms);
 579  0
         } catch (InterruptedException e) {
 580  0
             throw Utils.asRuntime(e);
 581  19
         }
 582  19
     }
 583  
 
 584  
     @Override
 585  
     protected void finalize() {
 586  220
         if (this.process != null) {
 587  0
             LOG.error("Stopping cmdserver via finalize. Please explicit stop server.");
 588  0
             stop();
 589  
         }
 590  220
     }
 591  
 
 592  
     @VisibleForTesting
 593  
     Process getProcess() {
 594  17
         return process;
 595  
     }
 596  
 
 597  
     /**
 598  
      * Thread to read stderr from the Mercurial server process. The
 599  
      * stderr is written to a {@link ByteBuffer} that is then process
 600  
      * from the main thread.
 601  
      * <p>
 602  
      * If the buffer is full before being processed then the extra
 603  
      * data from stderr is discarded silently.
 604  
      */
 605  1
     static class StderrReader extends Thread {
 606  
 
 607  1
         private static final Logger LOG = LoggerFactory.getLogger(StderrReader.class);
 608  
 
 609  
         private final InputStream errorStream;
 610  
 
 611  
         private final ByteBuffer stderrBuffer;
 612  
 
 613  217
         private volatile boolean stop = false;
 614  
 
 615  
         private final Runnable supervisor;
 616  
 
 617  
         StderrReader(Process process, int bufSize, Runnable supervisor) {
 618  217
             super("JavaHg stderr reader");
 619  217
             this.errorStream = process.getErrorStream();
 620  217
             this.stderrBuffer = ByteBuffer.allocate(bufSize);
 621  217
             this.supervisor = supervisor;
 622  217
             setDaemon(true);
 623  217
         }
 624  
 
 625  
         /**
 626  
          * Finish this thread and read all available stderr data.
 627  
          */
 628  
         void finish() {
 629  187
             this.stop = true;
 630  187
             interrupt();
 631  
             try {
 632  187
                 readAllAvailableFromStderr();
 633  1
             } catch (IOException e) {
 634  1
                 handleIOException(e);
 635  186
             }
 636  
             try {
 637  187
                 join(ERROR_READER_THREAD_TIMEOUT);
 638  187
                 if (isAlive()) {
 639  0
                     assert false;
 640  0
                     throw new RuntimeException("thread is alive. This should not happen");
 641  
                 }
 642  0
             } catch (InterruptedException e) {
 643  0
                 assert false;
 644  0
                 throw new RuntimeException("This should not happen");
 645  187
             }
 646  
 
 647  187
         }
 648  
 
 649  
         /**
 650  
          * Reset the buffer and return the content as a String
 651  
          * 
 652  
          * @param decoder
 653  
          *            used to decode the bytes as a String.
 654  
          * @return the bytes converted to a String
 655  
          */
 656  
         String bufferAsString(CharsetDecoder decoder) {
 657  
             try {
 658  7799
                 readAllAvailableFromStderr();
 659  12
             } catch (IOException e) {
 660  12
                 LOG.warn("Exception trying to read stderr: {}", e.getMessage());
 661  12
                 return "";
 662  7787
             }
 663  
 
 664  7787
             ByteBuffer buf = this.stderrBuffer;
 665  7787
             synchronized (buf) {
 666  7787
                 if (buf.position() > 0) {
 667  
                     CharBuffer charBuffer;
 668  19
                     buf.limit(buf.position());
 669  19
                     buf.position(0);
 670  
                     try {
 671  19
                         charBuffer = decoder.decode(buf);
 672  0
                     } catch (CharacterCodingException e) {
 673  0
                         throw Utils.asRuntime(e);
 674  19
                     }
 675  19
                     buf.limit(buf.capacity());
 676  19
                     String s = new String(charBuffer.array(), charBuffer.arrayOffset(), charBuffer.limit());
 677  19
                     buf.position(0);
 678  19
                     return s;
 679  
                 } else {
 680  7768
                     return "";
 681  
                 }
 682  0
             }
 683  
         }
 684  
 
 685  
         public void run() {
 686  
             try {
 687  720
                 while (!this.stop) {
 688  533
                     readAllAvailableFromStderr();
 689  
                     try {
 690  521
                         Thread.sleep(1000);
 691  187
                     } catch (InterruptedException e) {
 692  
                         // When stopping server the stderr thread is
 693  
                         // interrupted
 694  316
                     }
 695  503
                     if (supervisor != null) {
 696  6
                         supervisor.run();
 697  
                     }
 698  
                 }
 699  12
             } catch (IOException e) {
 700  12
                 handleIOException(e);
 701  187
             }
 702  199
         }
 703  
 
 704  
         private void handleIOException(IOException e) {
 705  13
             String message = e.getMessage();
 706  13
             if (message.equals("Bad file descriptor") || message.equals("Stream Closed") || message.equals("Stream closed")) {
 707  13
                 LOG.warn("errorReaderThread could not read stderr. Most likely the Mercurial server process is dead.");
 708  
             } else {
 709  0
                 throw new RuntimeIOException(e);
 710  
             }
 711  13
         }
 712  
 
 713  
         private void readAllAvailableFromStderr() throws IOException {
 714  13737
             while (this.errorStream.available() > 0) {
 715  5218
                 int b = this.errorStream.read();
 716  5218
                 if (b == -1) {
 717  
                     // The stream is at eof, the process must
 718  
                     // have exited, so stop the thread.
 719  0
                     break;
 720  
                 }
 721  5218
                 synchronized (this.stderrBuffer) {
 722  
                     try {
 723  5218
                         this.stderrBuffer.put((byte) b);
 724  1740
                     } catch (BufferOverflowException e) {
 725  
                         // just ignore
 726  3478
                     }
 727  5218
                 }
 728  5218
             }
 729  
 
 730  8494
         }
 731  
 
 732  
     }
 733  
 }