1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2003-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: MultiplexInputStream.java,v 1.2 2005/04/02 13:23:12 tanderson Exp $
44 */
45 package org.exolab.jms.net.multiplexer;
46
47 import java.io.DataInputStream;
48 import java.io.IOException;
49 import java.io.InputStream;
50
51 import org.apache.commons.logging.Log;
52 import org.apache.commons.logging.LogFactory;
53
54
55 /***
56 * An <code>InputStream</code> which reads multiplexed data over a shared
57 * physical connection, managed by a {@link Multiplexer}.
58 * <p/>
59 * <em>NOTE:</em> the <code>InputStream</code> methods of this class are not
60 * thread safe
61 *
62 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
63 * @version $Revision: 1.2 $
64 * @see Multiplexer
65 */
66 class MultiplexInputStream extends InputStream implements Constants {
67
68 /***
69 * The channel identifier.
70 */
71 private final int _channelId;
72
73 /***
74 * The multiplexer.
75 */
76 private Multiplexer _multiplexer;
77
78 /***
79 * The local data buffer.
80 */
81 private byte[] _data;
82
83 /***
84 * Temporary buffer for single byte reads.
85 */
86 private byte[] _byte = new byte[1];
87
88 /***
89 * The index into <code>_data</code> where data starts.
90 */
91 private int _index = 0;
92
93 /***
94 * The number of available bytes in <code>_data</code>.
95 */
96 private int _available = 0;
97
98 /***
99 * Indicates if the underyling connection has been closed.
100 */
101 private boolean _disconnected = false;
102
103 /***
104 * The no. of bytes to read before notifying the remote endpoint.
105 */
106 private final int _lowWaterMark;
107
108 /***
109 * The number of bytes read from this stream since the last.
110 * <code>notifyRead()</code> call
111 */
112 private int _read = 0;
113
114 /***
115 * Synchronization helper.
116 */
117 private final Object _lock = new Object();
118
119 /***
120 * The logger.
121 */
122 private final Log _log = LogFactory.getLog(MultiplexInputStream.class);
123
124
125 /***
126 * Construct a new <code>MultiplexInputStream</code>.
127 *
128 * @param channelId the channel identifier
129 * @param multiplexer the multiplexer
130 * @param size the size of the local data buffer
131 */
132 public MultiplexInputStream(int channelId, Multiplexer multiplexer,
133 int size) {
134 _channelId = channelId;
135 _multiplexer = multiplexer;
136 _data = new byte[size];
137 _lowWaterMark = size / 2;
138 }
139
140 /***
141 * This implementation is a no-op, as the stream is re-used.
142 */
143 public void close() {
144 }
145
146 /***
147 * Closes this input stream and releases any resources associated with it.
148 *
149 * @throws IOException if an I/O error occurs
150 */
151 public void destroy() throws IOException {
152
153 synchronized (_lock) {
154 if (!_disconnected) {
155
156 }
157 }
158 _multiplexer = null;
159 _data = null;
160 }
161
162 /***
163 * Reads the next byte of data from the input stream. The value byte is
164 * returned as an <code>int</code> in the range <code>0</code> to
165 * <code>255</code>. If no byte is available because the end of the stream
166 * has been reached, the value <code>-1</code> is returned. This method
167 * blocks until input data is available, the end of the stream is detected,
168 * or an exception is thrown.
169 *
170 * @return the next byte of data, or <code>-1</code> if the end of the
171 * stream is reached.
172 * @throws IOException if an I/O error occurs.
173 */
174 public int read() throws IOException {
175 final int mask = 0xFF;
176 int count = read(_byte, 0, 1);
177 return (count == 1) ? _byte[0] & mask : -1;
178 }
179
180 /***
181 * Reads up to <code>length</code> bytes of data from the input stream into
182 * an array of bytes. An attempt is made to read as many as
183 * <code>length</code> bytes, but a smaller number may be read, possibly
184 * zero. The number of bytes actually read is returned as an integer.
185 * <p/>
186 * <p> If the first byte cannot be read for any reason other than end of
187 * file, then an <code>IOException</code> is thrown. In particular, an
188 * <code>IOException</code> is thrown if the input stream has been closed.
189 *
190 * @param buffer the buffer into which the data is read
191 * @param offset the start offset in array <code>buffer</code> at which the
192 * data is written
193 * @param length the maximum number of bytes to read
194 * @return the total number of bytes read into the buffer, or
195 * <code>-1</code> if there is no more data because the end of the
196 * stream has been reached.
197 * @throws IOException if an I/O error occurs.
198 * @throws IndexOutOfBoundsException if <code>offset</code> is negative, or
199 * <code>length</code> is negative, or
200 * <code>offset+length</code> is greater
201 * than the length of the array
202 * @throws NullPointerException if <code>buffer</code> is null
203 */
204 public int read(byte[] buffer, int offset, int length) throws IOException {
205 int count = 0;
206 if (length > 0) {
207 synchronized (_lock) {
208 count = (length <= _available) ? length : _available;
209 if (_log.isDebugEnabled()) {
210 _log.debug("read(length=" + length + ") [channelId="
211 + _channelId
212 + ", available=" + _available + "]");
213 }
214
215 if (count > 0) {
216
217 copy(buffer, offset, count);
218 }
219
220 if (count < length) {
221
222 int more = length - count;
223 while ((_available < more) && !_disconnected) {
224 if (_log.isDebugEnabled()) {
225 _log.debug("read() waiting on data [channelId="
226 + _channelId
227 + ", available=" + _available
228 + ", requested=" + more + "]");
229 }
230
231 try {
232 _lock.wait();
233 } catch (InterruptedException ignore) {
234 }
235 }
236
237 if (_available > 0) {
238
239 more = (more <= _available) ? more : _available;
240 offset += count;
241 copy(buffer, offset, more);
242 count += more;
243 }
244 }
245
246 if ((count == 0) && _disconnected) {
247
248
249 count = -1;
250 }
251 }
252 }
253 return count;
254 }
255
256 /***
257 * Returns the number of bytes that can be read (or skipped over) from this
258 * input stream without blocking by the next caller of a method for this
259 * input stream.
260 *
261 * @return the number of bytes that can be read from this input stream
262 * without blocking.
263 */
264 public int available() {
265 int result;
266 synchronized (_lock) {
267 result = _available;
268 }
269 return result;
270 }
271
272 /***
273 * Invoked when the underlying physical connection is closed.
274 */
275 public void disconnected() {
276 synchronized (_lock) {
277 _disconnected = true;
278 _lock.notifyAll();
279 }
280 }
281
282 /***
283 * Returns a string representation of this.
284 *
285 * @return a string representation of this
286 */
287 public String toString() {
288 return "MultiplexInputStream[available=" + _available + "]";
289 }
290
291 /***
292 * Invoked by {@link Multiplexer} when data is available for this stream.
293 *
294 * @param input the stream to read data from
295 * @param length the number of bytes to read
296 * @throws IOException if an I/O error occurs
297 */
298 protected void receive(DataInputStream input, int length)
299 throws IOException {
300
301 synchronized (_lock) {
302 int space = _data.length - _available;
303 if (length > space) {
304 throw new IOException("Buffer overflow: buffer size="
305 + _data.length
306 + ", space available=" + space
307 + ", requested size=" + length);
308 }
309
310 int freeAtEnd = _data.length - (_index + _available);
311 if (length > freeAtEnd) {
312
313
314 System.arraycopy(_data, _index, _data, 0, _available);
315 _index = 0;
316 }
317 input.readFully(_data, _index + _available, length);
318
319 if (_log.isDebugEnabled()) {
320 _log.debug("receive(length=" + length
321 + ") [channelId=" + _channelId
322 + ", available=" + _available
323 + ", space=" + (_data.length - _available) + "]");
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338 }
339
340 _available += length;
341
342 _lock.notifyAll();
343 }
344 }
345
346 /***
347 * Helper to copy data to a user buffer, notifying the remote endpoint if
348 * more data should be sent.
349 *
350 * @param buffer the buffer into which the data is read
351 * @param offset the start offset in array <code>buffer</code> at which the
352 * data is written
353 * @param length the maximum number of bytes to read
354 * @throws IOException if an I/O error occurs.
355 * @throws IndexOutOfBoundsException if <code>offset</code> is negative, or
356 * <code>length</code> is negative, or
357 * <code>offset+length</code> is greater
358 * than the length of the array
359 * @throws NullPointerException if <code>buffer</code> is null
360 */
361 private void copy(byte[] buffer, int offset, int length)
362 throws IOException {
363
364 System.arraycopy(_data, _index, buffer, offset, length);
365 _index += length;
366 _available -= length;
367 _read += length;
368 if (_read >= _lowWaterMark) {
369 notifyRead();
370 }
371 }
372
373 /***
374 * Notify the remote endpoint of the current no. of bytes read.
375 *
376 * @throws IOException if the notification fails
377 */
378 private void notifyRead() throws IOException {
379 if (_log.isDebugEnabled()) {
380 _log.debug("notifyRead() [channelId=" + _channelId
381 + ", read=" + _read + "]");
382 }
383 _multiplexer.send(FLOW_READ, _channelId, _read);
384 _read = 0;
385 }
386
387 }