View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2003-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: MultiplexInputStream.java,v 1.2 2005/04/02 13:23:12 tanderson Exp $
44   */
45  package org.exolab.jms.net.multiplexer;
46  
47  import java.io.DataInputStream;
48  import java.io.IOException;
49  import java.io.InputStream;
50  
51  import org.apache.commons.logging.Log;
52  import org.apache.commons.logging.LogFactory;
53  
54  
55  /***
56   * An <code>InputStream</code> which reads multiplexed data over a shared
57   * physical connection, managed by a {@link Multiplexer}.
58   * <p/>
59   * <em>NOTE:</em> the <code>InputStream</code> methods of this class are not
60   * thread safe
61   *
62   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
63   * @version $Revision: 1.2 $
64   * @see Multiplexer
65   */
66  class MultiplexInputStream extends InputStream implements Constants {
67  
68      /***
69       * The channel identifier.
70       */
71      private final int _channelId;
72  
73      /***
74       * The multiplexer.
75       */
76      private Multiplexer _multiplexer;
77  
78      /***
79       * The local data buffer.
80       */
81      private byte[] _data;
82  
83      /***
84       * Temporary buffer for single byte reads.
85       */
86      private byte[] _byte = new byte[1];
87  
88      /***
89       * The index into <code>_data</code> where data starts.
90       */
91      private int _index = 0;
92  
93      /***
94       * The number of available bytes in <code>_data</code>.
95       */
96      private int _available = 0;
97  
98      /***
99       * Indicates if the underyling connection has been closed.
100      */
101     private boolean _disconnected = false;
102 
103     /***
104      * The no. of bytes to read before notifying the remote endpoint.
105      */
106     private final int _lowWaterMark;
107 
108     /***
109      * The number of bytes read from this stream since the last.
110      * <code>notifyRead()</code> call
111      */
112     private int _read = 0;
113 
114     /***
115      * Synchronization helper.
116      */
117     private final Object _lock = new Object();
118 
119     /***
120      * The logger.
121      */
122     private final Log _log = LogFactory.getLog(MultiplexInputStream.class);
123 
124 
125     /***
126      * Construct a new <code>MultiplexInputStream</code>.
127      *
128      * @param channelId   the channel identifier
129      * @param multiplexer the multiplexer
130      * @param size        the size of the local data buffer
131      */
132     public MultiplexInputStream(int channelId, Multiplexer multiplexer,
133                                 int size) {
134         _channelId = channelId;
135         _multiplexer = multiplexer;
136         _data = new byte[size];
137         _lowWaterMark = size / 2;
138     }
139 
140     /***
141      * This implementation is a no-op, as the stream is re-used.
142      */
143     public void close() {
144     }
145 
146     /***
147      * Closes this input stream and releases any resources associated with it.
148      *
149      * @throws IOException if an I/O error occurs
150      */
151     public void destroy() throws IOException {
152         // notify the endpoint iff it hasn't notified this of disconnection
153         synchronized (_lock) {
154             if (!_disconnected) {
155                 //_multiplexer.closed(this);
156             }
157         }
158         _multiplexer = null;
159         _data = null;
160     }
161 
162     /***
163      * Reads the next byte of data from the input stream. The value byte is
164      * returned as an <code>int</code> in the range <code>0</code> to
165      * <code>255</code>. If no byte is available because the end of the stream
166      * has been reached, the value <code>-1</code> is returned. This method
167      * blocks until input data is available, the end of the stream is detected,
168      * or an exception is thrown.
169      *
170      * @return the next byte of data, or <code>-1</code> if the end of the
171      *         stream is reached.
172      * @throws IOException if an I/O error occurs.
173      */
174     public int read() throws IOException {
175         final int mask = 0xFF;
176         int count = read(_byte, 0, 1);
177         return (count == 1) ? _byte[0] & mask : -1;
178     }
179 
180     /***
181      * Reads up to <code>length</code> bytes of data from the input stream into
182      * an array of bytes.  An attempt is made to read as many as
183      * <code>length</code> bytes, but a smaller number may be read, possibly
184      * zero. The number of bytes actually read is returned as an integer.
185      * <p/>
186      * <p> If the first byte cannot be read for any reason other than end of
187      * file, then an <code>IOException</code> is thrown. In particular, an
188      * <code>IOException</code> is thrown if the input stream has been closed.
189      *
190      * @param buffer the buffer into which the data is read
191      * @param offset the start offset in array <code>buffer</code> at which the
192      *               data is written
193      * @param length the maximum number of bytes to read
194      * @return the total number of bytes read into the buffer, or
195      *         <code>-1</code> if there is no more data because the end of the
196      *         stream has been reached.
197      * @throws IOException               if an I/O error occurs.
198      * @throws IndexOutOfBoundsException if <code>offset</code> is negative, or
199      *                                   <code>length</code> is negative, or
200      *                                   <code>offset+length</code> is greater
201      *                                   than the length of the array
202      * @throws NullPointerException      if <code>buffer</code> is null
203      */
204     public int read(byte[] buffer, int offset, int length) throws IOException {
205         int count = 0;
206         if (length > 0) {
207             synchronized (_lock) {
208                 count = (length <= _available) ? length : _available;
209                 if (_log.isDebugEnabled()) {
210                     _log.debug("read(length=" + length + ") [channelId="
211                                + _channelId
212                                + ", available=" + _available + "]");
213                 }
214 
215                 if (count > 0) {
216                     // copy the available data into the buffer
217                     copy(buffer, offset, count);
218                 }
219 
220                 if (count < length) {
221                     // wait for more data to become available
222                     int more = length - count;
223                     while ((_available < more) && !_disconnected) {
224                         if (_log.isDebugEnabled()) {
225                             _log.debug("read() waiting on data [channelId="
226                                        + _channelId
227                                        + ", available=" + _available
228                                        + ", requested=" + more + "]");
229                         }
230 
231                         try {
232                             _lock.wait();
233                         } catch (InterruptedException ignore) {
234                         }
235                     }
236 
237                     if (_available > 0) {
238                         // more data available, so copy it
239                         more = (more <= _available) ? more : _available;
240                         offset += count;
241                         copy(buffer, offset, more);
242                         count += more;
243                     }
244                 }
245 
246                 if ((count == 0) && _disconnected) {
247                     // no data was read, and we were disconnected. Indicate
248                     // end of stream to user
249                     count = -1;
250                 }
251             }
252         }
253         return count;
254     }
255 
256     /***
257      * Returns the number of bytes that can be read (or skipped over) from this
258      * input stream without blocking by the next caller of a method for this
259      * input stream.
260      *
261      * @return the number of bytes that can be read from this input stream
262      *         without blocking.
263      */
264     public int available() {
265         int result;
266         synchronized (_lock) {
267             result = _available;
268         }
269         return result;
270     }
271 
272     /***
273      * Invoked when the underlying physical connection is closed.
274      */
275     public void disconnected() {
276         synchronized (_lock) {
277             _disconnected = true;
278             _lock.notifyAll();
279         }
280     }
281 
282     /***
283      * Returns a string representation of this.
284      *
285      * @return a string representation of this
286      */
287     public String toString() {
288         return "MultiplexInputStream[available=" + _available + "]";
289     }
290 
291     /***
292      * Invoked by {@link Multiplexer} when data is available for this stream.
293      *
294      * @param input  the stream to read data from
295      * @param length the number of bytes to read
296      * @throws IOException if an I/O error occurs
297      */
298     protected void receive(DataInputStream input, int length)
299             throws IOException {
300 
301         synchronized (_lock) {
302             int space = _data.length - _available;
303             if (length > space) {
304                 throw new IOException("Buffer overflow: buffer size="
305                                       + _data.length
306                                       + ", space available=" + space
307                                       + ", requested size=" + length);
308             }
309 
310             int freeAtEnd = _data.length - (_index + _available);
311             if (length > freeAtEnd) {
312                 // make space at the end of the buffer, by shuffling data
313                 // to the start
314                 System.arraycopy(_data, _index, _data, 0, _available);
315                 _index = 0;
316             }
317             input.readFully(_data, _index + _available, length);
318 
319             if (_log.isDebugEnabled()) {
320                 _log.debug("receive(length=" + length
321                            + ") [channelId=" + _channelId
322                            + ", available=" + _available
323                            + ", space=" + (_data.length - _available) + "]");
324 
325 /*
326                 StringBuffer buf = new StringBuffer();
327                 for (int i = 0; i < length; ++i) {
328                     if (i > 0) {
329                       buf.append(", ");
330                     }
331                     final int mask = 0xff;
332                     int value = _data[_index + i + _available] & mask;
333                     buf.append(Integer.toHexString(value));
334                 }
335                 _log.debug("receive[channelId=" + _channelId
336                            + "], length=" + length + ", data=" + buf);
337 */
338             }
339 
340             _available += length;
341 
342             _lock.notifyAll();
343         }
344     }
345 
346     /***
347      * Helper to copy data to a user buffer, notifying the remote endpoint if
348      * more data should be sent.
349      *
350      * @param buffer the buffer into which the data is read
351      * @param offset the start offset in array <code>buffer</code> at which the
352      *               data is written
353      * @param length the maximum number of bytes to read
354      * @throws IOException               if an I/O error occurs.
355      * @throws IndexOutOfBoundsException if <code>offset</code> is negative, or
356      *                                   <code>length</code> is negative, or
357      *                                   <code>offset+length</code> is greater
358      *                                   than the length of the array
359      * @throws NullPointerException      if <code>buffer</code> is null
360      */
361     private void copy(byte[] buffer, int offset, int length)
362             throws IOException {
363 
364         System.arraycopy(_data, _index, buffer, offset, length);
365         _index += length;
366         _available -= length;
367         _read += length;
368         if (_read >= _lowWaterMark) {
369             notifyRead();
370         }
371     }
372 
373     /***
374      * Notify the remote endpoint of the current no. of bytes read.
375      *
376      * @throws IOException if the notification fails
377      */
378     private void notifyRead() throws IOException {
379         if (_log.isDebugEnabled()) {
380             _log.debug("notifyRead() [channelId=" + _channelId
381                        + ", read=" + _read + "]");
382         }
383         _multiplexer.send(FLOW_READ, _channelId, _read);
384         _read = 0;
385     }
386 
387 }