View Javadoc

1   /*
2    * #%L
3    * JavaHg
4    * %%
5    * Copyright (C) 2011 aragost Trifork ag
6    * %%
7    * Permission is hereby granted, free of charge, to any person obtaining a copy
8    * of this software and associated documentation files (the "Software"), to deal
9    * in the Software without restriction, including without limitation the rights
10   * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11   * copies of the Software, and to permit persons to whom the Software is
12   * furnished to do so, subject to the following conditions:
13   * 
14   * The above copyright notice and this permission notice shall be included in
15   * all copies or substantial portions of the Software.
16   * 
17   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20   * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22   * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23   * THE SOFTWARE.
24   * #L%
25   */
26  package com.aragost.javahg.internals;
27  
28  import java.io.ByteArrayOutputStream;
29  import java.io.IOException;
30  import java.io.InputStream;
31  import java.io.OutputStream;
32  import java.nio.charset.CharsetDecoder;
33  import java.util.ArrayList;
34  import java.util.Arrays;
35  import java.util.ConcurrentModificationException;
36  import java.util.List;
37  import java.util.concurrent.atomic.AtomicReference;
38  
39  import com.aragost.javahg.Changeset;
40  import com.aragost.javahg.DateTime;
41  import com.aragost.javahg.Repository;
42  import com.aragost.javahg.UnknownCommandException;
43  import com.aragost.javahg.commands.CancelledExecutionException;
44  import com.aragost.javahg.commands.ExecutionException;
45  import com.google.common.collect.Lists;
46  import com.google.common.io.ByteStreams;
47  
48  /**
49   * Base class for the command classes.
50   * 
51   * Each Mercurial command (e.g., "log", "commit", etc) is mapped to a command
52   * class, which is a subclass of this class. The command classes will provide
53   * methods for setting command line flags and for actually executing the
54   * command.
55   * 
56   * Concurrency: Instances of this class should be accessed only on a single
57   * thread. The only exception is {@link #cancel()}.
58   * 
59   * States: Normally: READY->QUEUED->RUNNING->READY. If cancelled: From READY,
60   * QUEUED, or RUNNING -> CANCELLING -> READY. If cancelled the executing thread
61   * will have a CancelledExecutionException thrown and state will be READY.
62   */
63  public abstract class AbstractCommand {
64  
65      public enum State { READY, QUEUED, RUNNING, CANCELING }; 
66  
67      private AtomicReference<State> state = new AtomicReference<AbstractCommand.State>(State.READY);
68  
69      private final List<String> cmdLine;
70  
71      private final Repository repository;
72  
73      private OutputChannelInputStream outputChannelStream;
74  
75      private ByteArrayOutputStream error = new ByteArrayOutputStream();
76  
77      private int returnCode = Integer.MIN_VALUE;
78  
79      private int lineChannelLength;
80  
81      /**
82       * Not null while in {@link State#RUNNING}.
83       */
84      private volatile Server server;
85  
86      protected AbstractCommand(Repository repository) {
87          this.repository = repository;
88          this.cmdLine = Lists.newArrayList(getCommandName());
89      }
90  
91      protected AbstractCommand(Repository repository, String commandName) {
92          this.repository = repository;
93          this.cmdLine = Lists.newArrayList(commandName);
94      }
95  
96      /**
97       * @return the name of this Mercurial command, i.e., "add", "log",
98       *         etc
99       */
100     public abstract String getCommandName();
101 
102     public void cmdAppend(String option) {
103         this.cmdLine.add(option);
104     }
105 
106     public void cmdAppend(String option, String arg) {
107         if (arg == null) {
108             throw new NullPointerException("cannot pass null for " + option + " flag");
109         }
110         this.cmdLine.add(option);
111         this.cmdLine.add(arg);
112     }
113 
114     public void cmdAppend(String option, String[] args) {
115         for (String arg : args) {
116             cmdAppend(option, arg);
117         }
118     }
119 
120     public void cmdAppend(String option, int arg) {
121         this.cmdLine.add(option);
122         this.cmdLine.add("" + arg);
123     }
124 
125     public void cmdAppend(String option, DateTime date) {
126         if (date == null) {
127             throw new NullPointerException("cannot pass null for " + option + " flag");
128         }
129         this.cmdLine.add(option);
130         this.cmdLine.add(date.getHgString());
131     }
132 
133     /**
134      * Launch the command and return stdout as a String.
135      * 
136      * @param args
137      *            extra command line arguments (optional).
138      * @return stdout as a String.
139      * @throws IOException
140      */
141     protected final String launchString(String... args) {
142         InputStream stdout = launchStream(args);
143         try {
144             return Utils.readStream(stdout, getRepository().newDecoder());
145         } finally {
146             cleanUp();
147         }
148     }
149 
150     /**
151      * Launch the command and return stdout as a InputStream.
152      * 
153      * @param args
154      *            extra command line arguments (optional).
155      * @return stdout stream
156      */
157     protected final HgInputStream launchStream(String... args) {
158         clear();
159         changeState(State.READY, State.QUEUED, true);
160 
161         try {
162             server = repository.getServerPool().take(this);
163             changeState(State.QUEUED, State.RUNNING, true);
164         } catch (InterruptedException e1) {
165             changeState(State.CANCELING, State.READY, false);
166             throw new CancelledExecutionException(this);
167         }
168 
169         List<String> commandLine = new ArrayList<String>(this.cmdLine);
170         boolean ok = false;
171 
172         getRepository().addToCommandLine(commandLine);
173         commandLine.addAll(Arrays.asList(args));
174 
175         try {
176             this.outputChannelStream = server.runCommand(commandLine, this);
177             HgInputStream stream = new HgInputStream(outputChannelStream, this.repository.newDecoder());
178             ok = true;
179             return stream;
180         } catch (UnexpectedServerTerminationException e) {
181             if (state.get() == State.CANCELING) {
182                 throw new CancelledExecutionException(this);
183             }
184             throw e;
185         } catch (IOException e) {
186             throw new RuntimeIOException(e);
187         } finally {
188             // If an exception is thrown there is no chance the command will
189             // finish normally so abort the server. E.g protocol error
190             if (!ok && state.get() != State.READY) {
191                 state.set(State.READY);
192                 repository.getServerPool().abort(server);
193                 server = null;
194             }
195         }
196     }
197 
198     /**
199      * Changes state checking for cancellation
200      * 
201      * @param current
202      *            The expected state
203      * @param next
204      *            The state to change to
205      * @param strict
206      *            If true a {@link ConcurrentModificationException} is thrown if
207      *            in unexpected state
208      * @throws CancelledExecutionException
209      *             If in cancelled state
210      * @throws ConcurrentModificationException
211      *             If in another unexpected state and if strict is true
212      */
213     private void changeState(State current, State next, boolean strict) {
214         if (!state.compareAndSet(current, next)) {
215             if (state.compareAndSet(State.CANCELING, State.READY)) {
216                 throw new CancelledExecutionException(this);                
217             }
218             if (strict) {
219                 throw new ConcurrentModificationException("Unexpected command state");
220             }
221         }
222     }
223 
224     protected final LineIterator launchIterator(String... args) {
225         return new LineIterator(launchStream(args));
226     }
227 
228     /**
229      * Open the output stream again after sending input to the command
230      * server. When the server alternates between sending output on
231      * the 'o' channel and reading input lines, the output channel
232      * will run dry several times. Call this method after sending
233      * input to the command server in response to reading from the 'L'
234      * channel. New output from the server will then appear on the
235      * output channel.
236      */
237     public void reopenOutputChannelStream() {
238         this.outputChannelStream.reopen();
239     }
240 
241     /**
242      * Send input line to command server in response to reading a
243      * block on the 'L' channel.
244      * 
245      * @param s
246      *            line of input.
247      */
248     public void sendLine(String s) {
249         if (this.lineChannelLength == 0) {
250             throw new IllegalStateException("No input expected");
251         }
252 
253         server.sendLine(s);
254         this.lineChannelLength = 0;
255     }
256 
257     /**
258      * @return true if we have read a 'L' channel from the server.
259      */
260     public boolean needsInputLine() {
261         return this.lineChannelLength > 0;
262     }
263 
264     /**
265      * Finish the request by consuming any remaining input on the
266      * output channel from the server. The server wont respond to new
267      * commands until this is done.
268      */
269     void cleanUp() {
270         if (this.outputChannelStream != null) {
271             try {
272                 Utils.consumeAll(this.outputChannelStream);
273             } catch (IOException e) {
274                 throw new RuntimeIOException(e);
275             }
276         }
277     }
278 
279     protected void clear() {
280         this.error.reset();
281         this.returnCode = Integer.MIN_VALUE;
282     }
283 
284     /**
285      * Check if the command ended with a zero return code. Subclasses
286      * can override this to accept other return codes as successful.
287      * 
288      * @return true if the command ended successfully.
289      */
290     protected boolean isSuccessful() {
291         return getReturnCode() == 0;
292     }
293 
294     private String streamAsString(OutputStream stream) {
295         if (stream instanceof ByteArrayOutputStream) {
296             byte[] bytes = ((ByteArrayOutputStream) stream).toByteArray();
297             CharsetDecoder decoder = getRepository().newDecoder();
298             return Utils.decodeBytes(bytes, decoder);
299         } else {
300             throw new IllegalStateException();
301         }
302     }
303 
304     /**
305      * @return data read on the 'e' channel
306      */
307     public String getErrorString() {
308         return streamAsString(this.error);
309     }
310 
311     /**
312      * @return the return code read from the 'r' channel
313      */
314     public int getReturnCode() {
315         if (this.returnCode == Integer.MIN_VALUE) {
316             throw new IllegalStateException("cmdserver is still executing request");
317         }
318         return this.returnCode;
319     }
320 
321     /**
322      * @return the {@link Repository} associated with this command.
323      */
324     public Repository getRepository() {
325         return repository;
326     }
327 
328     @Override
329     public String toString() {
330         return getCommandName();
331     }
332 
333     /**
334      * @param lineChannelLength
335      *            the line length requested by the server on the 'L'
336      *            channel.
337      */
338     void setLineChannelLength(int lineChannelLength) {
339         this.lineChannelLength = lineChannelLength;
340     }
341 
342     /**
343      * Add to the stderr data stored in this command.
344      * 
345      * @param cin
346      *            stderr of the command server.
347      * @throws IOException
348      */
349     void addToError(BlockInputStream cin) throws IOException {
350         ByteStreams.copy(cin, this.error);
351     }
352 
353     protected void withDebugAndChangesetStyle() {
354         withDebugFlag();
355         cmdAppend("--style", Changeset.CHANGESET_STYLE_PATH);
356     }
357 
358     protected void withDebugFlag() {
359         cmdAppend("--debug");
360     }
361 
362     /**
363      * Called exactly once when this command finishes executing. The server is
364      * freed and the state is changed to DONE.
365      * 
366      * @param returnCode The exit code of the completed command
367      */
368     final void handleReturnCode(int returnCode) {
369         this.returnCode = returnCode;
370         server.clearCurrentCommand(this);
371         repository.getServerPool().put(server);
372         server = null;
373         changeState(State.RUNNING, State.READY, false);
374 
375         if (returnCode == -1) {
376             // This can for example happens for an unknown command
377             String errorString = getErrorString();
378             if (errorString.startsWith("hg: unknown command '")) {
379                 throw new UnknownCommandException(this);
380             }
381         }
382 
383         doneHook();
384 
385         if (!isSuccessful()) {
386             throw new ExecutionException(this);
387         }
388     }
389 
390     /**
391      * Cancel a running command. May be called from a different thread. Returns
392      * immediately. The thread executing the command should return soon after
393      * with a {@link CancelledExecutionException} thrown.
394      */
395     public final void cancel() {
396         State oldState = state.getAndSet(State.CANCELING);
397 
398         if (oldState != State.READY) {
399             // There is a small chance the final state of a command will be
400             // CANCELLING rather than DONE but this doesn't actually matter.
401             Server server = this.server;
402             Process process;
403 
404             if (server != null && (process = server.getProcess()) != null) {
405                 process.destroy();
406             }
407         }
408     }
409 
410     /**
411      * @return The current state of the command.
412      */
413     State getState() {
414         return state.get();
415     }
416 
417     /**
418      * This method is called when the processing of a command is
419      * finished. More precise it is called just after the 'r' channel
420      * is read
421      * <p>
422      * It can be overridden in subclasses.
423      */
424     protected void doneHook() {
425 
426     }
427 }