View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2003-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: Multiplexer.java,v 1.9 2006/12/16 12:37:17 tanderson Exp $
44   */
45  package org.exolab.jms.net.multiplexer;
46  
47  import org.apache.commons.logging.Log;
48  import org.apache.commons.logging.LogFactory;
49  import org.exolab.jms.common.security.BasicPrincipal;
50  import org.exolab.jms.net.connector.Authenticator;
51  import org.exolab.jms.net.connector.ResourceException;
52  import org.exolab.jms.net.connector.SecurityException;
53  
54  import java.io.DataInputStream;
55  import java.io.DataOutputStream;
56  import java.io.IOException;
57  import java.net.ProtocolException;
58  import java.security.Principal;
59  import java.util.HashMap;
60  import java.util.LinkedList;
61  
62  
63  /***
64   * This class multiplexes data over a physical connection.
65   *
66   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
67   * @version $Revision: 1.9 $ $Date: 2006/12/16 12:37:17 $
68   */
69  public class Multiplexer implements Constants, Runnable {
70  
71      /***
72       * The listener to notify.
73       */
74      private MultiplexerListener _listener;
75  
76      /***
77       * If <code>true</code>, indicates that the multiplexer has been closed.
78       */
79      private volatile boolean _closed;
80  
81      /***
82       * The endpoint.
83       */
84      private Endpoint _endpoint;
85  
86      /***
87       * The endpoint's output stream.
88       */
89      private DataOutputStream _out;
90  
91      /***
92       * The endpoint's input stream.
93       */
94      private DataInputStream _in;
95  
96      /***
97       * The set of channels managed by this, keyed on channel identifier.
98       */
99      private final HashMap _channels = new HashMap();
100 
101     /***
102      * The set of free channels, keyed on channel identifier.
103      */
104     private final LinkedList _free = new LinkedList();
105 
106     /***
107      * If <code>true</code>, indicates that the physical connection was opened
108      * (client), rather than accepted (server). This is used in channel
109      * identifier generation
110      */
111     private boolean _client = false;
112 
113     /***
114      * The channel identifier seed.
115      */
116     private int _seed = 0;
117 
118     /***
119      * The principal that owns the connection, or <code>null</code>,
120      * if this is an unauthenticated connection.
121      */
122     private Principal _principal;
123 
124     /***
125      * The sending and receiving buffer size, in bytes.
126      */
127     private static final int BUFFER_SIZE = 2048;
128 
129     /***
130      * The logger.
131      */
132     private static final Log _log = LogFactory.getLog(Multiplexer.class);
133 
134     /***
135      * Construct a new client-side <code>Multiplexer</code>.
136      *
137      * @param listener  the multiplexer listener
138      * @param endpoint  the endpoint to multiplex messages over
139      * @param principal the security principal
140      * @throws IOException       if an I/O error occurs
141      * @throws SecurityException if connection is refused by the server
142      */
143     public Multiplexer(MultiplexerListener listener, Endpoint endpoint,
144                        Principal principal)
145             throws IOException, SecurityException {
146         initialise(listener, endpoint, true);
147         authenticate(principal);
148     }
149 
150     /***
151      * Construct a new server-side <code>Multiplexer</code>.
152      *
153      * @param listener      the multiplexer listener
154      * @param endpoint      the endpoint to multiplex messages over
155      * @param authenticator the connection authenticator
156      * @throws IOException       if an I/O error occurs
157      * @throws ResourceException if the authenticator cannot authenticate
158      */
159     public Multiplexer(MultiplexerListener listener, Endpoint endpoint,
160                        Authenticator authenticator)
161             throws IOException, ResourceException {
162         initialise(listener, endpoint, false);
163         authenticate(authenticator);
164     }
165 
166     /***
167      * Construct a new <code>Multiplexer</code>.
168      * <p/>
169      * This constructor is provided for subclasses that must perform setup
170      * work prior to invoking {@link #initialise}
171      */
172     protected Multiplexer() {
173     }
174 
175     /***
176      * Start multiplexing.
177      */
178     public void run() {
179         while (!_closed) {
180             multiplex();
181         }
182     }
183 
184     /***
185      * Returns a free channel from the pool, opening a new one if none are
186      * available.
187      *
188      * @return a free channel
189      * @throws IOException if an I/O error occurs
190      */
191     public Channel getChannel() throws IOException {
192         Channel channel = null;
193 
194         synchronized (_free) {
195             if (!_free.isEmpty()) {
196                 channel = (Channel) _free.removeFirst();
197             }
198         }
199 
200         if (channel == null) {
201             channel = open();
202         }
203 
204         return channel;
205     }
206 
207     /***
208      * Releases a channel back to the pool.
209      *
210      * @param channel the channel to release
211      */
212     public void release(Channel channel) {
213         synchronized (_free) {
214             _free.add(channel);
215         }
216     }
217 
218     /***
219      * Close a channel.
220      *
221      * @param channel the channel to close
222      * @throws IOException if an I/O error occurs
223      */
224     public void close(Channel channel) throws IOException {
225         int channelId = channel.getId();
226         synchronized (_channels) {
227             _channels.remove(new Integer(channelId));
228         }
229 
230         send(CLOSE, channelId);
231     }
232 
233     /***
234      * Send a message.
235      *
236      * @param type the packet type
237      * @throws IOException if an I/O error occurs
238      */
239     public void send(byte type) throws IOException {
240         synchronized (_out) {
241             _out.writeByte(type);
242             _out.flush();
243             if (_log.isDebugEnabled()) {
244                 _log.debug("send(type=0x" + Integer.toHexString(type) + ")");
245             }
246         }
247     }
248 
249     /***
250      * Send a message.
251      *
252      * @param type      the packet type
253      * @param channelId the identifier of the channel sending the message
254      * @throws IOException if an I/O error occurs
255      */
256     public void send(byte type, int channelId) throws IOException {
257         synchronized (_out) {
258             _out.writeByte(type);
259             _out.writeShort(channelId);
260             _out.flush();
261             if (_log.isDebugEnabled()) {
262                 _log.debug("send(type=0x" + Integer.toHexString(type)
263                         + ", channel=" + channelId + ")");
264             }
265         }
266     }
267 
268     /***
269      * Send a message.
270      *
271      * @param type      the packet type
272      * @param channelId the identifier of the channel sending the message
273      * @param data      the data to send
274      * @throws IOException if an I/O error occurs
275      */
276     public void send(byte type, int channelId, int data) throws IOException {
277         synchronized (_out) {
278             _out.writeByte(type);
279             _out.writeShort(channelId);
280             _out.writeInt(data);
281             _out.flush();
282             if (_log.isDebugEnabled()) {
283                 _log.debug("send(type=" + type + ", channel=" + channelId
284                         + ", data=" + Integer.toHexString(data) + ")");
285             }
286         }
287     }
288 
289     /***
290      * Send a message.
291      *
292      * @param type      the packet type
293      * @param channelId the identifier of the channel sending the message
294      * @param data      the data to send
295      * @param offset    the offset into the data
296      * @param length    the length of data
297      * @throws IOException if an I/O error occurs
298      */
299     public void send(byte type, int channelId, byte[] data, int offset,
300                      int length) throws IOException {
301         synchronized (_out) {
302             _out.writeByte(type);
303             _out.writeShort(channelId);
304             _out.writeInt(length);
305             _out.write(data, offset, length);
306             _out.flush();
307         }
308     }
309 
310     /***
311      * Ping the connection.
312      *
313      * @param token the token to be returned in the reply
314      * @throws IOException if an I/O error occurs
315      */
316     public void ping(int token) throws IOException {
317         synchronized (_out) {
318             _out.writeByte(PING_REQUEST);
319             _out.writeInt(token);
320             _out.flush();
321             if (_log.isDebugEnabled()) {
322                 _log.debug("ping(token=" + token + ")");
323             }
324         }
325     }
326 
327     /***
328      * Close the multiplexer, releasing any resources. This closes the socket
329      * and waits for the thread to terminate.
330      */
331     public void close() {
332         if (!_closed) {
333             _closed = true;
334             try {
335                 send(SHUTDOWN);
336             } catch (IOException exception) {
337                 _log.debug(exception);
338             }
339             try {
340                 _endpoint.close();
341             } catch (IOException exception) {
342                 _log.debug(exception);
343             }
344             // _pool.shutdownAfterProcessingCurrentlyQueuedTasks();
345             // @todo - as the pool is shared, need to block for
346             // tasks queued by this
347         }
348     }
349 
350     /***
351      * Determines if the multiplexer is closed.
352      *
353      * @return <code>true</code> if the multiplexer is closed
354      */
355     public boolean isClosed() {
356         return _closed;
357     }
358 
359     /***
360      * Determines if this is a client-side instance.
361      *
362      * @return <code>true</code> if this is a client-side instance,
363      *         <code>false</code> if it is a server=side instance
364      */
365     public boolean isClient() {
366         return _client;
367     }
368 
369     /***
370      * Returns the principal that owns the connection.
371      *
372      * @return the principal that owns the connection, or <code>null<code>
373      *         if this is an unauthenticated connection
374      */
375     public Principal getPrincipal() {
376         return _principal;
377     }
378 
379     /***
380      * Initialise the multiplexer.
381      *
382      * @param listener the multiplexer listener
383      * @param endpoint the endpoint to multiplex messages over
384      * @param client   determines if this is a client-side or server-side
385      *                 instance
386      * @throws IOException if an I/O error occurs
387      */
388     protected void initialise(MultiplexerListener listener, Endpoint endpoint,
389                               boolean client)
390             throws IOException {
391 
392 
393         if (listener == null) {
394             throw new IllegalArgumentException("Argument 'listener' is null");
395         }
396         if (endpoint == null) {
397             throw new IllegalArgumentException("Argument 'endpoint' is null");
398         }
399         if (_log.isDebugEnabled()) {
400             _log.debug("Multiplexer(uri=" + endpoint.getURI()
401                     + ", client=" + client);
402         }
403         _listener = listener;
404         _endpoint = endpoint;
405         _out = new DataOutputStream(endpoint.getOutputStream());
406         _in = new DataInputStream(endpoint.getInputStream());
407         _client = client;
408         handshake(_out, _in);
409     }
410 
411     /***
412      * Perform handshaking on initial connection, to verify protocol. Subclasses
413      * may extend this behaviour.
414      *
415      * @param out the endpoint's output stream
416      * @param in  the endpoint's input stream
417      * @throws IOException for any I/O error
418      */
419     protected void handshake(DataOutputStream out, DataInputStream in)
420             throws IOException {
421         out.writeInt(MAGIC);
422         out.writeInt(VERSION);
423         out.flush();
424 
425         int magic = in.readInt();
426         if (magic != MAGIC) {
427             throw new ProtocolException("Expected protocol magic=" + MAGIC
428                     + ", but received=" + magic);
429         }
430         int version = in.readInt();
431         if (version != VERSION) {
432             throw new ProtocolException("Expected protocol version=" + VERSION
433                     + ", but received=" + version);
434         }
435     }
436 
437     /***
438      * Perform authentication on initial connection.
439      *
440      * @param principal the security principal. May be <code>null</code>
441      * @throws IOException       for any I/O error
442      * @throws java.lang.SecurityException if connection is refused by the server
443      */
444     protected void authenticate(Principal principal)
445             throws IOException, SecurityException {
446         try {
447             if (principal != null && !(principal instanceof BasicPrincipal)) {
448                 throw new IOException(
449                         "Cannot authenticate with principal of type "
450                                 + principal.getClass().getName());
451             }
452             if (principal != null) {
453                 BasicPrincipal basic = (BasicPrincipal) principal;
454                 _out.writeByte(AUTH_BASIC);
455                 _out.writeUTF(basic.getName());
456                 _out.writeUTF(basic.getPassword());
457             } else {
458                 _out.writeByte(AUTH_NONE);
459             }
460             _out.flush();
461             if (_in.readByte() != AUTH_OK) {
462                 throw new SecurityException("Connection refused");
463             }
464         } catch (IOException exception) {
465             // terminate the connection
466             _endpoint.close();
467             throw exception;
468         }
469         _principal = principal;
470     }
471 
472     /***
473      * Performs authentication on initial connection.
474      *
475      * @param authenticator the authenticator
476      * @throws IOException       for any I/O error
477      * @throws ResourceException if the authenticator cannot authenticate
478      */
479     protected void authenticate(Authenticator authenticator)
480             throws IOException, ResourceException {
481 
482         try {
483             Principal principal = null;
484             byte type = _in.readByte();
485 
486             switch (type) {
487                 case AUTH_BASIC:
488                     String name = _in.readUTF();
489                     String password = _in.readUTF();
490                     principal = new BasicPrincipal(name, password);
491                     break;
492                 case AUTH_NONE:
493                     break;
494                 default:
495                     throw new IOException("Invalid packet type: " + type);
496             }
497             if (authenticator.authenticate(principal)) {
498                 _out.writeByte(AUTH_OK);
499                 _out.flush();
500             } else {
501                 _out.writeByte(AUTH_DENIED);
502                 _out.flush();
503                 throw new SecurityException("User " + principal
504                         + " unauthorised");
505             }
506             _principal = principal;
507         } catch (IOException exception) {
508             // terminate the connection
509             _endpoint.close();
510             throw exception;
511         } catch (ResourceException exception) {
512             // terminate the connection
513             _endpoint.close();
514             throw exception;
515         }
516     }
517 
518     /***
519      * Opens a new channel.
520      *
521      * @return a new channel
522      * @throws IOException if a channel can't be opened
523      */
524     protected Channel open() throws IOException {
525         Channel channel;
526         int channelId;
527         synchronized (_channels) {
528             channelId = getNextChannelId();
529             channel = addChannel(channelId);
530         }
531 
532         send(OPEN, channelId);
533         return channel;
534     }
535 
536     /***
537      * Read a packet from the endpoint.
538      */
539     private void multiplex() {
540         try {
541             byte type = _in.readByte();
542             switch (type) {
543                 case OPEN:
544                     handleOpen();
545                     break;
546                 case CLOSE:
547                     handleClose();
548                     break;
549                 case REQUEST:
550                     handleRequest();
551                     break;
552                 case RESPONSE:
553                     handleResponse();
554                     break;
555                 case DATA:
556                     handleData();
557                     break;
558                 case PING_REQUEST:
559                     handlePingRequest();
560                     break;
561                 case PING_RESPONSE:
562                     handlePingResponse();
563                     break;
564                 case FLOW_READ:
565                     handleFlowRead();
566                     break;
567                 case SHUTDOWN:
568                     handleShutdown();
569                     break;
570                 default:
571                     throw new IOException("Unrecognised message type: "
572                             + type);
573             }
574         } catch (Exception exception) {
575             boolean closed = _closed;
576             shutdown();
577             if (!closed) {
578                 _log.debug("Multiplexer shutting down on error", exception);
579                 // error notify the listener
580                 _listener.error(exception);
581             }
582         }
583     }
584 
585     /***
586      * Shuts down the multiplexer.
587      */
588     private void shutdown() {
589         // mark this as closed
590         _closed = true;
591 
592         // notify the channels
593         Channel[] channels;
594         synchronized (_channels) {
595             channels = (Channel[]) _channels.values().toArray(new Channel[0]);
596         }
597         for (int i = 0; i < channels.length; ++i) {
598             channels[i].disconnected();
599         }
600         try {
601             _endpoint.close();
602         } catch (IOException exception) {
603             _log.debug(exception);
604         }
605     }
606 
607     /***
608      * Open a new channel.
609      *
610      * @throws IOException for any error
611      */
612     private void handleOpen() throws IOException {
613         int channelId = _in.readUnsignedShort();
614         Integer key = new Integer(channelId);
615 
616         synchronized (_channels) {
617             if (_channels.get(key) != null) {
618                 throw new IOException(
619                         "A channel already exists with identifier: " + key);
620             }
621             addChannel(channelId);
622         }
623     }
624 
625     /***
626      * Close a channel.
627      *
628      * @throws IOException for any error
629      */
630     private void handleClose() throws IOException {
631         int channelId = _in.readUnsignedShort();
632         Integer key = new Integer(channelId);
633 
634         synchronized (_channels) {
635             Channel channel = (Channel) _channels.remove(key);
636             if (channel == null) {
637                 throw new IOException(
638                         "No channel exists with identifier: " + key);
639             }
640             channel.close();
641         }
642     }
643 
644     /***
645      * Handle a <code>REQUEST</code> packet.
646      *
647      * @throws IOException if an I/O error occurs, or no channel exists matching
648      *                     that read from the packet
649      */
650     private void handleRequest() throws IOException {
651         final Channel channel = handleData();
652         if (_log.isDebugEnabled()) {
653             _log.debug("handleRequest() [channel=" + channel.getId() + "]");
654         }
655         // todo - need to handle closed()
656         _listener.request(channel);
657 
658         if (_log.isDebugEnabled()) {
659             _log.debug("handleRequest() [channel=" + channel.getId()
660                     + "] - end");
661         }
662     }
663 
664     /***
665      * Handle a <code>RESPONSE</code> packet.
666      *
667      * @throws IOException if an I/O error occurs, or no channel exists matching
668      *                     that read from the packet
669      */
670     private void handleResponse() throws IOException {
671         handleData();
672     }
673 
674     /***
675      * Handle a <code>PING_REQUEST</code> packet.
676      *
677      * @throws IOException if an I/O error occurs
678      */
679     private void handlePingRequest() throws IOException {
680         int token = _in.readInt();
681         synchronized (_out) {
682             _out.writeByte(PING_RESPONSE);
683             _out.writeInt(token);
684             _out.flush();
685             if (_log.isDebugEnabled()) {
686                 _log.debug("pinged(token=" + token + ")");
687             }
688         }
689     }
690 
691     /***
692      * Handle a <code>PING_RESPONSE</code> packet.
693      *
694      * @throws IOException if an I/O error occurs
695      */
696     private void handlePingResponse() throws IOException {
697         int token = _in.readInt();
698         _listener.pinged(token);
699     }
700 
701     /***
702      * Handle a <code>DATA</code> packet.
703      *
704      * @return the channel to handle the packet
705      * @throws IOException if an I/O error occurs, or no channel exists matching
706      *                     that read from the packet
707      */
708     private Channel handleData() throws IOException {
709         Channel channel = readChannel();
710         int length = _in.readInt();
711         channel.getMultiplexInputStream().receive(_in, length);
712         return channel;
713     }
714 
715     /***
716      * Handle a <code>FLOW_READ</code> packet.
717      *
718      * @throws IOException if an I/O error occurs
719      */
720     private void handleFlowRead() throws IOException {
721         Channel channel = readChannel();
722         int read = _in.readInt();
723         channel.getMultiplexOutputStream().notifyRead(read);
724     }
725 
726     /***
727      * Handle a <code>SHUTDOWN</code> packet.
728      */
729     private void handleShutdown() {
730         shutdown();
731         _listener.closed();
732     }
733 
734     /***
735      * Adds a new channel.
736      * <p/>
737      * NOTE: Must be invoked with <code>_channels</code> synchronized
738      *
739      * @param channelId the channel identifier
740      * @return the new channel
741      */
742     private Channel addChannel(int channelId) {
743         int size = BUFFER_SIZE;
744         MultiplexOutputStream out =
745                 new MultiplexOutputStream(channelId, this, size, size);
746         MultiplexInputStream in =
747                 new MultiplexInputStream(channelId, this, size);
748         Channel channel = new Channel(channelId, this, in, out);
749         _channels.put(new Integer(channelId), channel);
750         return channel;
751     }
752 
753     /***
754      * Reads the channel identifier from the stream and returns the
755      * corresponding channel.
756      *
757      * @return the channel corresponding to the read channel identifier
758      * @throws IOException for any I/O error, or if there is no corresponding
759      *                     channel
760      */
761     private Channel readChannel() throws IOException {
762         int channelId = _in.readUnsignedShort();
763         return getChannel(channelId);
764     }
765 
766     /***
767      * Returns a channel given its identifier.
768      *
769      * @param channelId the channel identifier
770      * @return the channel corresponding to <code>channelId</code>
771      * @throws IOException if there is no corresponding channel
772      */
773     private Channel getChannel(int channelId) throws IOException {
774         Channel channel;
775         Integer key = new Integer(channelId);
776         synchronized (_channels) {
777             channel = (Channel) _channels.get(key);
778             if (channel == null) {
779                 throw new IOException(
780                         "No channel exists with identifier: " + channelId);
781             }
782         }
783         return channel;
784     }
785 
786     /***
787      * Returns the next available channel identifier. Channel identifiers
788      * generated on the client side are in the range 0x0..0x7FFF, on the server
789      * side, 0x8000-0xFFFF
790      * <p/>
791      * NOTE: Must be invoked with <code>_channels</code> synchronized
792      *
793      * @return the next channel identifier
794      * @throws IOException if the connection is closed
795      */
796     private int getNextChannelId() throws IOException {
797         final int mask = 0x7fff;
798         final int serverIdBase = 0x8000;
799         int channelId = 0;
800         while (!_closed) {
801             _seed = (_seed + 1) & mask;
802             channelId = (_client) ? _seed : _seed + serverIdBase;
803             if (!_channels.containsKey(new Integer(channelId))) {
804                 break;
805             }
806         }
807         if (_closed) {
808             throw new IOException("Connection has been closed");
809         }
810         return channelId;
811     }
812 
813 }