1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package com.aragost.javahg.internals;
27
28 import java.io.ByteArrayOutputStream;
29 import java.io.IOException;
30 import java.io.InputStream;
31
32 import com.aragost.javahg.commands.CancelledExecutionException;
33 import com.aragost.javahg.internals.AbstractCommand.State;
34 import com.aragost.javahg.log.Logger;
35 import com.aragost.javahg.log.LoggerFactory;
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 public class OutputChannelInputStream extends InputStream {
52
53 private static final Logger LOG = LoggerFactory.getLogger(OutputChannelInputStream.class);
54
55 private final InputStream channelStream;
56
57 private final AbstractCommand cmd;
58
59 private final Server server;
60
61
62
63
64
65 private BlockInputStream currentOutputChannelBlock = BlockInputStream.EMPTY;
66
67
68
69
70
71
72
73 OutputChannelInputStream(InputStream channelStream, Server server, AbstractCommand cmd) throws IOException {
74 this.channelStream = channelStream;
75 this.cmd = cmd;
76 this.server = server;
77 try {
78 findNextOutputChannelBlock();
79 } catch (RuntimeException e) {
80 verifyServerProcess(e);
81 throw e;
82 } catch (IOException e) {
83 verifyServerProcess(e);
84 throw e;
85 }
86 }
87
88 @Override
89 public int read() throws IOException {
90 if (this.currentOutputChannelBlock == null) {
91 return -1;
92 }
93 try {
94 int ch = this.currentOutputChannelBlock.read();
95 if (ch == -1) {
96 findNextOutputChannelBlock();
97 return read();
98 }
99 return ch;
100 } catch (RuntimeException e) {
101 verifyServerProcess(e);
102 throw e;
103 } catch (IOException e) {
104 verifyServerProcess(e);
105 throw e;
106 }
107 }
108
109 @Override
110 public int read(byte[] b, int off, int len) throws IOException {
111 if (this.currentOutputChannelBlock == null) {
112 return -1;
113 }
114 try {
115 int n = this.currentOutputChannelBlock.read(b, off, len);
116 if (n > 0) {
117 return n;
118 } else if (n == -1) {
119 findNextOutputChannelBlock();
120 return read(b, off, len);
121 } else {
122 return 0;
123 }
124 } catch (RuntimeException e) {
125 verifyServerProcess(e);
126 throw e;
127 } catch (IOException e) {
128 verifyServerProcess(e);
129 throw e;
130 }
131 }
132
133 @Override
134 public int available() throws IOException {
135 if (this.currentOutputChannelBlock == null) {
136 return 0;
137 } else {
138 return this.currentOutputChannelBlock.available();
139 }
140 }
141
142
143
144
145
146
147
148
149
150 private void findNextOutputChannelBlock() throws IOException {
151 if (this.currentOutputChannelBlock == null) {
152 throw new IllegalStateException();
153 }
154 if (this.currentOutputChannelBlock.getBytesLeft() > 0) {
155 throw new IllegalStateException("Still bytes available in currentOutputChannelBlock");
156 }
157 this.currentOutputChannelBlock = null;
158 BlockInputStream blockInputStream = null;
159 while (true) {
160 if (cmd.getState() == State.CANCELING) {
161 throw new CancelledExecutionException(cmd);
162 }
163
164 blockInputStream = new BlockInputStream(this.channelStream);
165 char channel = blockInputStream.getChannel();
166 switch (channel) {
167 case 'o':
168 this.currentOutputChannelBlock = blockInputStream;
169 return;
170 case 'e':
171 this.cmd.addToError(blockInputStream);
172 break;
173 case 'r':
174 if (blockInputStream.getLength() != 4) {
175 throw new IllegalStateException("Length 4 expected for channel 'r'");
176 }
177 int returnCode = blockInputStream.readInt();
178 LOG.debug("Command '{}' gave return code: {}", this.cmd.getCommandName(), returnCode);
179 this.cmd.handleReturnCode(returnCode);
180 return;
181 case 'L':
182 this.cmd.setLineChannelLength(blockInputStream.getLength());
183 return;
184 default:
185 if (Character.isLowerCase(channel)) {
186
187 Utils.consumeAll(blockInputStream);
188 } else {
189
190 throw new IllegalStateException("Unknown channel: " + channel);
191 }
192 }
193 }
194 }
195
196
197
198
199
200
201
202
203 public void reopen() {
204 this.currentOutputChannelBlock = BlockInputStream.EMPTY;
205 try {
206 findNextOutputChannelBlock();
207 } catch (IOException e) {
208 throw new RuntimeIOException(e);
209 }
210 }
211
212
213
214
215 private void verifyServerProcess(Exception e) {
216 this.server.verifyServerProcess(e);
217 }
218 }