View Javadoc

1   /*
2    * #%L
3    * JavaHg
4    * %%
5    * Copyright (C) 2011 aragost Trifork ag
6    * %%
7    * Permission is hereby granted, free of charge, to any person obtaining a copy
8    * of this software and associated documentation files (the "Software"), to deal
9    * in the Software without restriction, including without limitation the rights
10   * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11   * copies of the Software, and to permit persons to whom the Software is
12   * furnished to do so, subject to the following conditions:
13   * 
14   * The above copyright notice and this permission notice shall be included in
15   * all copies or substantial portions of the Software.
16   * 
17   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20   * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22   * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23   * THE SOFTWARE.
24   * #L%
25   */
26  package com.aragost.javahg.internals;
27  
28  import java.io.ByteArrayOutputStream;
29  import java.io.IOException;
30  import java.io.InputStream;
31  
32  import com.aragost.javahg.commands.CancelledExecutionException;
33  import com.aragost.javahg.internals.AbstractCommand.State;
34  import com.aragost.javahg.log.Logger;
35  import com.aragost.javahg.log.LoggerFactory;
36  
37  /**
38   * An input stream that will make everything written to the output
39   * channel for one command available as an input stream.
40   * <p>
41   * Reading from this stream will add to the
42   * {@link ByteArrayOutputStream} for the other channels, as blocks for
43   * other channels are detected.
44   * <p>
45   * The stream will indicate EOF when the 'r'esult block has been read.
46   * <p>
47   * If a 'L' block is read it will also indicate EOF. After a respond
48   * to the 'L' block is sent to the server, you can call reopen() and
49   * read the rest of the input.
50   */
51  public class OutputChannelInputStream extends InputStream {
52  
53      private static final Logger LOG = LoggerFactory.getLogger(OutputChannelInputStream.class);
54  
55      private final InputStream channelStream;
56  
57      private final AbstractCommand cmd;
58  
59      private final Server server;
60  
61      /**
62       * The current 'o' channel. If this is {@code null} then eof has
63       * been reached.
64       */
65      private BlockInputStream currentOutputChannelBlock = BlockInputStream.EMPTY;
66  
67      /**
68       * 
69       * @param channelStream
70       * @param cmd
71       * @throws IOException
72       */
73      OutputChannelInputStream(InputStream channelStream, Server server, AbstractCommand cmd) throws IOException {
74          this.channelStream = channelStream;
75          this.cmd = cmd;
76          this.server = server;
77          try {
78              findNextOutputChannelBlock();
79          } catch (RuntimeException e) {
80              verifyServerProcess(e);
81              throw e;
82          } catch (IOException e) {
83              verifyServerProcess(e);
84              throw e;
85          }
86      }
87  
88      @Override
89      public int read() throws IOException {
90          if (this.currentOutputChannelBlock == null) {
91              return -1;
92          }
93          try {
94              int ch = this.currentOutputChannelBlock.read();
95              if (ch == -1) {
96                  findNextOutputChannelBlock();
97                  return read();
98              }
99              return ch;
100         } catch (RuntimeException e) {
101             verifyServerProcess(e);
102             throw e;
103         } catch (IOException e) {
104             verifyServerProcess(e);
105             throw e;
106         }
107     }
108 
109     @Override
110     public int read(byte[] b, int off, int len) throws IOException {
111         if (this.currentOutputChannelBlock == null) {
112             return -1;
113         }
114         try {
115             int n = this.currentOutputChannelBlock.read(b, off, len);
116             if (n > 0) {
117                 return n;
118             } else if (n == -1) {
119                 findNextOutputChannelBlock();
120                 return read(b, off, len);
121             } else {
122                 return 0;
123             }
124         } catch (RuntimeException e) {
125             verifyServerProcess(e);
126             throw e;
127         } catch (IOException e) {
128             verifyServerProcess(e);
129             throw e;
130         }
131     }
132 
133     @Override
134     public int available() throws IOException {
135         if (this.currentOutputChannelBlock == null) {
136             return 0;
137         } else {
138             return this.currentOutputChannelBlock.available();
139         }
140     }
141 
142     /**
143      * Find the next output channel block.
144      * <p>
145      * The method will handle other channels while looking for an
146      * o-channel block.
147      * 
148      * @throws IOException
149      */
150     private void findNextOutputChannelBlock() throws IOException {
151         if (this.currentOutputChannelBlock == null) {
152             throw new IllegalStateException();
153         }
154         if (this.currentOutputChannelBlock.getBytesLeft() > 0) {
155             throw new IllegalStateException("Still bytes available in currentOutputChannelBlock");
156         }
157         this.currentOutputChannelBlock = null;
158         BlockInputStream blockInputStream = null;
159         while (true) {
160             if (cmd.getState() == State.CANCELING) {
161                 throw new CancelledExecutionException(cmd);
162             }
163 
164             blockInputStream = new BlockInputStream(this.channelStream);
165             char channel = blockInputStream.getChannel();
166             switch (channel) {
167             case 'o':
168                 this.currentOutputChannelBlock = blockInputStream;
169                 return;
170             case 'e':
171                 this.cmd.addToError(blockInputStream);
172                 break;
173             case 'r':
174                 if (blockInputStream.getLength() != 4) {
175                     throw new IllegalStateException("Length 4 expected for channel 'r'");
176                 }
177                 int returnCode = blockInputStream.readInt();
178                 LOG.debug("Command '{}' gave return code: {}", this.cmd.getCommandName(), returnCode);
179                 this.cmd.handleReturnCode(returnCode);
180                 return;
181             case 'L':
182                 this.cmd.setLineChannelLength(blockInputStream.getLength());
183                 return;
184             default:
185                 if (Character.isLowerCase(channel)) {
186                     // Ignore an unrecognized channel
187                     Utils.consumeAll(blockInputStream);
188                 } else {
189                     // Unrecognized but mandatory channel
190                     throw new IllegalStateException("Unknown channel: " + channel);
191                 }
192             }
193         }
194     }
195 
196     /**
197      * Reopen this stream to read pending 'o' blocks.
198      * <p>
199      * The stream is indicating EOF when an 'L' block is encountered.
200      * With this method the rest of the rest of the 'o' blocks can be
201      * read.
202      */
203     public void reopen() {
204         this.currentOutputChannelBlock = BlockInputStream.EMPTY;
205         try {
206             findNextOutputChannelBlock();
207         } catch (IOException e) {
208             throw new RuntimeIOException(e);
209         }
210     }
211 
212     /**
213      * @param e
214      */
215     private void verifyServerProcess(Exception e) {
216         this.server.verifyServerProcess(e);
217     }
218 }