1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  package com.aragost.javahg.internals;
27  
28  import java.io.ByteArrayOutputStream;
29  import java.io.IOException;
30  import java.io.InputStream;
31  
32  import com.aragost.javahg.commands.CancelledExecutionException;
33  import com.aragost.javahg.internals.AbstractCommand.State;
34  import com.aragost.javahg.log.Logger;
35  import com.aragost.javahg.log.LoggerFactory;
36  
37  
38  
39  
40  
41  
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  public class OutputChannelInputStream extends InputStream {
52  
53      private static final Logger LOG = LoggerFactory.getLogger(OutputChannelInputStream.class);
54  
55      private final InputStream channelStream;
56  
57      private final AbstractCommand cmd;
58  
59      private final Server server;
60  
61      
62  
63  
64  
65      private BlockInputStream currentOutputChannelBlock = BlockInputStream.EMPTY;
66  
67      
68  
69  
70  
71  
72  
73      OutputChannelInputStream(InputStream channelStream, Server server, AbstractCommand cmd) throws IOException {
74          this.channelStream = channelStream;
75          this.cmd = cmd;
76          this.server = server;
77          try {
78              findNextOutputChannelBlock();
79          } catch (RuntimeException e) {
80              verifyServerProcess(e);
81              throw e;
82          } catch (IOException e) {
83              verifyServerProcess(e);
84              throw e;
85          }
86      }
87  
88      @Override
89      public int read() throws IOException {
90          if (this.currentOutputChannelBlock == null) {
91              return -1;
92          }
93          try {
94              int ch = this.currentOutputChannelBlock.read();
95              if (ch == -1) {
96                  findNextOutputChannelBlock();
97                  return read();
98              }
99              return ch;
100         } catch (RuntimeException e) {
101             verifyServerProcess(e);
102             throw e;
103         } catch (IOException e) {
104             verifyServerProcess(e);
105             throw e;
106         }
107     }
108 
109     @Override
110     public int read(byte[] b, int off, int len) throws IOException {
111         if (this.currentOutputChannelBlock == null) {
112             return -1;
113         }
114         try {
115             int n = this.currentOutputChannelBlock.read(b, off, len);
116             if (n > 0) {
117                 return n;
118             } else if (n == -1) {
119                 findNextOutputChannelBlock();
120                 return read(b, off, len);
121             } else {
122                 return 0;
123             }
124         } catch (RuntimeException e) {
125             verifyServerProcess(e);
126             throw e;
127         } catch (IOException e) {
128             verifyServerProcess(e);
129             throw e;
130         }
131     }
132 
133     @Override
134     public int available() throws IOException {
135         if (this.currentOutputChannelBlock == null) {
136             return 0;
137         } else {
138             return this.currentOutputChannelBlock.available();
139         }
140     }
141 
142     
143 
144 
145 
146 
147 
148 
149 
150     private void findNextOutputChannelBlock() throws IOException {
151         if (this.currentOutputChannelBlock == null) {
152             throw new IllegalStateException();
153         }
154         if (this.currentOutputChannelBlock.getBytesLeft() > 0) {
155             throw new IllegalStateException("Still bytes available in currentOutputChannelBlock");
156         }
157         this.currentOutputChannelBlock = null;
158         BlockInputStream blockInputStream = null;
159         while (true) {
160             if (cmd.getState() == State.CANCELING) {
161                 throw new CancelledExecutionException(cmd);
162             }
163 
164             blockInputStream = new BlockInputStream(this.channelStream);
165             char channel = blockInputStream.getChannel();
166             switch (channel) {
167             case 'o':
168                 this.currentOutputChannelBlock = blockInputStream;
169                 return;
170             case 'e':
171                 this.cmd.addToError(blockInputStream);
172                 break;
173             case 'r':
174                 if (blockInputStream.getLength() != 4) {
175                     throw new IllegalStateException("Length 4 expected for channel 'r'");
176                 }
177                 int returnCode = blockInputStream.readInt();
178                 LOG.debug("Command '{}' gave return code: {}", this.cmd.getCommandName(), returnCode);
179                 this.cmd.handleReturnCode(returnCode);
180                 return;
181             case 'L':
182                 this.cmd.setLineChannelLength(blockInputStream.getLength());
183                 return;
184             default:
185                 if (Character.isLowerCase(channel)) {
186                     
187                     Utils.consumeAll(blockInputStream);
188                 } else {
189                     
190                     throw new IllegalStateException("Unknown channel: " + channel);
191                 }
192             }
193         }
194     }
195 
196     
197 
198 
199 
200 
201 
202 
203     public void reopen() {
204         this.currentOutputChannelBlock = BlockInputStream.EMPTY;
205         try {
206             findNextOutputChannelBlock();
207         } catch (IOException e) {
208             throw new RuntimeIOException(e);
209         }
210     }
211 
212     
213 
214 
215     private void verifyServerProcess(Exception e) {
216         this.server.verifyServerProcess(e);
217     }
218 }