View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2003-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: MultiplexOutputStream.java,v 1.2 2005/04/02 13:23:12 tanderson Exp $
44   */
45  package org.exolab.jms.net.multiplexer;
46  
47  import java.io.IOException;
48  import java.io.OutputStream;
49  
50  import org.apache.commons.logging.Log;
51  import org.apache.commons.logging.LogFactory;
52  
53  
54  /***
55   * An <code>OutputStream</code> which multiplexes data over a shared physical
56   * connection, managed by a {@link Multiplexer}.
57   * <p/>
58   * <em>NOTE:</em> the <code>OutputStream</code> methods of this class are not
59   * thread safe
60   *
61   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
62   * @version $Revision: 1.2 $ $Date: 2005/04/02 13:23:12 $
63   * @see Multiplexer
64   */
65  class MultiplexOutputStream extends OutputStream implements Constants {
66  
67      /***
68       * The channel identifier, used to associate packets with a channel.
69       */
70      private final int _channelId;
71  
72      /***
73       * The packet type.
74       */
75      private byte _type;
76  
77      /***
78       * The multiplexer which handles this stream's output.
79       */
80      private Multiplexer _multiplexer;
81  
82      /***
83       * The local data buffer.
84       */
85      private byte[] _data;
86  
87      /***
88       * The current index into <code>_data</code>.
89       */
90      private int _index;
91  
92      /***
93       * The no. of bytes that the remote endpoint can currently accept.
94       */
95      private int _remoteSpace;
96  
97      /***
98       * The maximum no. of bytes that the remote endpoint can accept.
99       */
100     private final int _maxRemoteSpace;
101 
102     /***
103      * Indicates if the underlying connection has been closed.
104      */
105     private boolean _disconnected;
106 
107     /***
108      * Synchronization helper.
109      */
110     private final Object _lock = new Object();
111 
112     /***
113      * The logger.
114      */
115     private static final Log _log =
116             LogFactory.getLog(MultiplexOutputStream.class);
117 
118 
119     /***
120      * Construct a new <code>MultiplexOutputStream</code>.
121      *
122      * @param channelId   the channel identifier
123      * @param multiplexer the multiplexer which handles this stream's output
124      * @param size        the size of the local data buffer
125      * @param remoteSize  the size of the remote endpoint's data buffer
126      */
127     public MultiplexOutputStream(int channelId, Multiplexer multiplexer,
128                                  int size, int remoteSize) {
129         _channelId = channelId;
130         _multiplexer = multiplexer;
131         _data = new byte[size];
132         _maxRemoteSpace = remoteSize;
133         _remoteSpace = remoteSize;
134     }
135 
136     /***
137      * Set the packet type.
138      *
139      * @param type the packet type
140      */
141     public void setType(byte type) {
142         _type = type;
143     }
144 
145     /***
146      * This implementation flushes the stream, rather than closing it, as the
147      * stream is re-used.
148      *
149      * @throws IOException if an I/O error occurs
150      */
151     public void close() throws IOException {
152         flush();
153     }
154 
155     /***
156      * Flushes this output stream and forces any buffered output bytes to be
157      * written out.
158      *
159      * @throws IOException if an I/O error occurs
160      */
161     public void flush() throws IOException {
162         int offset = 0;
163         int length = _index;
164         while (offset < _index) {
165             int available = waitForSpace();
166             int size = (length <= available) ? length : available;
167 
168             send(_data, offset, size);
169             offset += size;
170             length -= size;
171         }
172         _index = 0;
173     }
174 
175     /***
176      * Writes length bytes from the specified byte array starting at offset to
177      * this output stream.
178      *
179      * @param buffer the data to write
180      * @param offset the start offset in the data
181      * @param length the number of bytes to write
182      * @throws IOException if an I/O error occurs
183      */
184     public void write(byte[] buffer, int offset, int length)
185             throws IOException {
186 
187         int space = _data.length - _index;
188         if (space >= length) {
189             // got enough space, so copy it to the buffer
190             System.arraycopy(buffer, offset, _data, _index, length);
191             _index += length;
192         } else {
193             flush();
194             int size = length;
195             // send the buffer, when the endpoint has enough free space
196             while (size > 0) {
197                 int available = waitForSpace();
198                 int count = (size <= available) ? size : available;
199                 send(buffer, offset, count);
200                 offset += count;
201                 size -= count;
202             }
203         }
204     }
205 
206     /***
207      * Writes the specified byte to this output stream.
208      *
209      * @param value the byte value
210      * @throws IOException if an I/O error occurs
211      */
212     public void write(int value) throws IOException {
213         if (_index >= _data.length) {
214             flush();
215         }
216         _data[_index++] = (byte) value;
217     }
218 
219     /***
220      * Notify this of the no. of bytes read by the remote endpoint.
221      *
222      * @param read the number of bytes read
223      * @throws IOException if the no. of bytes exceeds that expected
224      */
225     public void notifyRead(int read) throws IOException {
226         synchronized (_lock) {
227             int space = _remoteSpace + read;
228             if (space > _maxRemoteSpace) {
229                 throw new IOException("Remote space=" + space
230                                       + " exceeds expected space="
231                                       + _maxRemoteSpace);
232             }
233             _remoteSpace = space;
234 
235             if (_log.isDebugEnabled()) {
236                 _log.debug("notifyRead(read=" + read
237                            + ") [channelId=" + _channelId
238                            + ", remoteSpace=" + _remoteSpace
239                            + "]");
240             }
241             _lock.notifyAll();
242         }
243     }
244 
245     /***
246      * Invoked when the underlying physical connection is closed.
247      */
248     public void disconnected() {
249         synchronized (_lock) {
250             _disconnected = true;
251             _lock.notifyAll();
252         }
253     }
254 
255     /***
256      * Returns a string representation of this.
257      *
258      * @return a string representation of this
259      */
260     public String toString() {
261         return "MultiplexOutputStream[index=" + _index + "]";
262     }
263 
264     /***
265      * Sends length bytes from the specified byte array starting at offset to
266      * the endpoint.
267      *
268      * @param buffer the data to write
269      * @param offset the start offset in the data
270      * @param length the number of bytes to write
271      * @throws IOException if an I/O error occurs
272      */
273     private void send(byte[] buffer, int offset, int length)
274             throws IOException {
275         if (_log.isDebugEnabled()) {
276             _log.debug("send(length=" + length + ") [channelId=" + _channelId
277                        + ", remoteSpace=" + _remoteSpace
278                        + "]");
279         }
280         synchronized (_lock) {
281             _multiplexer.send(_type, _channelId, buffer, offset, length);
282             _type = DATA;
283 
284             _remoteSpace -= length;
285 
286 /*
287             if (_log.isDebugEnabled()) {
288                 StringBuffer buf = new StringBuffer();
289                 for (int i = 0; i < length; ++i) {
290                     if (i > 0) {
291                         buf.append(", ");
292                     }
293                     final int mask = 0xff;
294                     int value = buffer[offset + i] & mask;
295                     buf.append(Integer.toHexString(value));
296                 }
297                 _log.debug("send[channelId=" + _channelId + "], length="
298                            + length + ", data=" + buf);
299             }
300 */
301         }
302     }
303 
304     /***
305      * Returns immediately if the endpoint can receive data, otherwise blocks,
306      * waiting for the endpoint to have space available.
307      *
308      * @return the number of bytes that the endpoint can accept
309      * @throws IOException if the connection is closed while blocking
310      */
311     private int waitForSpace() throws IOException {
312         int available = 0;
313         while (!_disconnected) {
314             synchronized (_lock) {
315                 if (_log.isDebugEnabled()) {
316                     _log.debug("waitForSpace() [channelId=" + _channelId
317                                + ", remoteSpace=" + _remoteSpace
318                                + "]");
319                 }
320 
321                 if (_remoteSpace > 0) {
322                     available = _remoteSpace;
323                     break;
324                 } else {
325                     try {
326                         _lock.wait();
327                     } catch (InterruptedException ignore) {
328                     }
329                 }
330             }
331         }
332         if (_disconnected) {
333             throw new IOException("Connection has been closed");
334         }
335 
336         return available;
337     }
338 
339 }