1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2003-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: Multiplexer.java,v 1.9 2006/12/16 12:37:17 tanderson Exp $
44 */
45 package org.exolab.jms.net.multiplexer;
46
47 import org.apache.commons.logging.Log;
48 import org.apache.commons.logging.LogFactory;
49 import org.exolab.jms.common.security.BasicPrincipal;
50 import org.exolab.jms.net.connector.Authenticator;
51 import org.exolab.jms.net.connector.ResourceException;
52 import org.exolab.jms.net.connector.SecurityException;
53
54 import java.io.DataInputStream;
55 import java.io.DataOutputStream;
56 import java.io.IOException;
57 import java.net.ProtocolException;
58 import java.security.Principal;
59 import java.util.HashMap;
60 import java.util.LinkedList;
61
62
63 /***
64 * This class multiplexes data over a physical connection.
65 *
66 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
67 * @version $Revision: 1.9 $ $Date: 2006/12/16 12:37:17 $
68 */
69 public class Multiplexer implements Constants, Runnable {
70
71 /***
72 * The listener to notify.
73 */
74 private MultiplexerListener _listener;
75
76 /***
77 * If <code>true</code>, indicates that the multiplexer has been closed.
78 */
79 private volatile boolean _closed;
80
81 /***
82 * The endpoint.
83 */
84 private Endpoint _endpoint;
85
86 /***
87 * The endpoint's output stream.
88 */
89 private DataOutputStream _out;
90
91 /***
92 * The endpoint's input stream.
93 */
94 private DataInputStream _in;
95
96 /***
97 * The set of channels managed by this, keyed on channel identifier.
98 */
99 private final HashMap _channels = new HashMap();
100
101 /***
102 * The set of free channels, keyed on channel identifier.
103 */
104 private final LinkedList _free = new LinkedList();
105
106 /***
107 * If <code>true</code>, indicates that the physical connection was opened
108 * (client), rather than accepted (server). This is used in channel
109 * identifier generation
110 */
111 private boolean _client = false;
112
113 /***
114 * The channel identifier seed.
115 */
116 private int _seed = 0;
117
118 /***
119 * The principal that owns the connection, or <code>null</code>,
120 * if this is an unauthenticated connection.
121 */
122 private Principal _principal;
123
124 /***
125 * The sending and receiving buffer size, in bytes.
126 */
127 private static final int BUFFER_SIZE = 2048;
128
129 /***
130 * The logger.
131 */
132 private static final Log _log = LogFactory.getLog(Multiplexer.class);
133
134 /***
135 * Construct a new client-side <code>Multiplexer</code>.
136 *
137 * @param listener the multiplexer listener
138 * @param endpoint the endpoint to multiplex messages over
139 * @param principal the security principal
140 * @throws IOException if an I/O error occurs
141 * @throws SecurityException if connection is refused by the server
142 */
143 public Multiplexer(MultiplexerListener listener, Endpoint endpoint,
144 Principal principal)
145 throws IOException, SecurityException {
146 initialise(listener, endpoint, true);
147 authenticate(principal);
148 }
149
150 /***
151 * Construct a new server-side <code>Multiplexer</code>.
152 *
153 * @param listener the multiplexer listener
154 * @param endpoint the endpoint to multiplex messages over
155 * @param authenticator the connection authenticator
156 * @throws IOException if an I/O error occurs
157 * @throws ResourceException if the authenticator cannot authenticate
158 */
159 public Multiplexer(MultiplexerListener listener, Endpoint endpoint,
160 Authenticator authenticator)
161 throws IOException, ResourceException {
162 initialise(listener, endpoint, false);
163 authenticate(authenticator);
164 }
165
166 /***
167 * Construct a new <code>Multiplexer</code>.
168 * <p/>
169 * This constructor is provided for subclasses that must perform setup
170 * work prior to invoking {@link #initialise}
171 */
172 protected Multiplexer() {
173 }
174
175 /***
176 * Start multiplexing.
177 */
178 public void run() {
179 while (!_closed) {
180 multiplex();
181 }
182 }
183
184 /***
185 * Returns a free channel from the pool, opening a new one if none are
186 * available.
187 *
188 * @return a free channel
189 * @throws IOException if an I/O error occurs
190 */
191 public Channel getChannel() throws IOException {
192 Channel channel = null;
193
194 synchronized (_free) {
195 if (!_free.isEmpty()) {
196 channel = (Channel) _free.removeFirst();
197 }
198 }
199
200 if (channel == null) {
201 channel = open();
202 }
203
204 return channel;
205 }
206
207 /***
208 * Releases a channel back to the pool.
209 *
210 * @param channel the channel to release
211 */
212 public void release(Channel channel) {
213 synchronized (_free) {
214 _free.add(channel);
215 }
216 }
217
218 /***
219 * Close a channel.
220 *
221 * @param channel the channel to close
222 * @throws IOException if an I/O error occurs
223 */
224 public void close(Channel channel) throws IOException {
225 int channelId = channel.getId();
226 synchronized (_channels) {
227 _channels.remove(new Integer(channelId));
228 }
229
230 send(CLOSE, channelId);
231 }
232
233 /***
234 * Send a message.
235 *
236 * @param type the packet type
237 * @throws IOException if an I/O error occurs
238 */
239 public void send(byte type) throws IOException {
240 synchronized (_out) {
241 _out.writeByte(type);
242 _out.flush();
243 if (_log.isDebugEnabled()) {
244 _log.debug("send(type=0x" + Integer.toHexString(type) + ")");
245 }
246 }
247 }
248
249 /***
250 * Send a message.
251 *
252 * @param type the packet type
253 * @param channelId the identifier of the channel sending the message
254 * @throws IOException if an I/O error occurs
255 */
256 public void send(byte type, int channelId) throws IOException {
257 synchronized (_out) {
258 _out.writeByte(type);
259 _out.writeShort(channelId);
260 _out.flush();
261 if (_log.isDebugEnabled()) {
262 _log.debug("send(type=0x" + Integer.toHexString(type)
263 + ", channel=" + channelId + ")");
264 }
265 }
266 }
267
268 /***
269 * Send a message.
270 *
271 * @param type the packet type
272 * @param channelId the identifier of the channel sending the message
273 * @param data the data to send
274 * @throws IOException if an I/O error occurs
275 */
276 public void send(byte type, int channelId, int data) throws IOException {
277 synchronized (_out) {
278 _out.writeByte(type);
279 _out.writeShort(channelId);
280 _out.writeInt(data);
281 _out.flush();
282 if (_log.isDebugEnabled()) {
283 _log.debug("send(type=" + type + ", channel=" + channelId
284 + ", data=" + Integer.toHexString(data) + ")");
285 }
286 }
287 }
288
289 /***
290 * Send a message.
291 *
292 * @param type the packet type
293 * @param channelId the identifier of the channel sending the message
294 * @param data the data to send
295 * @param offset the offset into the data
296 * @param length the length of data
297 * @throws IOException if an I/O error occurs
298 */
299 public void send(byte type, int channelId, byte[] data, int offset,
300 int length) throws IOException {
301 synchronized (_out) {
302 _out.writeByte(type);
303 _out.writeShort(channelId);
304 _out.writeInt(length);
305 _out.write(data, offset, length);
306 _out.flush();
307 }
308 }
309
310 /***
311 * Ping the connection.
312 *
313 * @param token the token to be returned in the reply
314 * @throws IOException if an I/O error occurs
315 */
316 public void ping(int token) throws IOException {
317 synchronized (_out) {
318 _out.writeByte(PING_REQUEST);
319 _out.writeInt(token);
320 _out.flush();
321 if (_log.isDebugEnabled()) {
322 _log.debug("ping(token=" + token + ")");
323 }
324 }
325 }
326
327 /***
328 * Close the multiplexer, releasing any resources. This closes the socket
329 * and waits for the thread to terminate.
330 */
331 public void close() {
332 if (!_closed) {
333 _closed = true;
334 try {
335 send(SHUTDOWN);
336 } catch (IOException exception) {
337 _log.debug(exception);
338 }
339 try {
340 _endpoint.close();
341 } catch (IOException exception) {
342 _log.debug(exception);
343 }
344
345
346
347 }
348 }
349
350 /***
351 * Determines if the multiplexer is closed.
352 *
353 * @return <code>true</code> if the multiplexer is closed
354 */
355 public boolean isClosed() {
356 return _closed;
357 }
358
359 /***
360 * Determines if this is a client-side instance.
361 *
362 * @return <code>true</code> if this is a client-side instance,
363 * <code>false</code> if it is a server=side instance
364 */
365 public boolean isClient() {
366 return _client;
367 }
368
369 /***
370 * Returns the principal that owns the connection.
371 *
372 * @return the principal that owns the connection, or <code>null<code>
373 * if this is an unauthenticated connection
374 */
375 public Principal getPrincipal() {
376 return _principal;
377 }
378
379 /***
380 * Initialise the multiplexer.
381 *
382 * @param listener the multiplexer listener
383 * @param endpoint the endpoint to multiplex messages over
384 * @param client determines if this is a client-side or server-side
385 * instance
386 * @throws IOException if an I/O error occurs
387 */
388 protected void initialise(MultiplexerListener listener, Endpoint endpoint,
389 boolean client)
390 throws IOException {
391
392
393 if (listener == null) {
394 throw new IllegalArgumentException("Argument 'listener' is null");
395 }
396 if (endpoint == null) {
397 throw new IllegalArgumentException("Argument 'endpoint' is null");
398 }
399 if (_log.isDebugEnabled()) {
400 _log.debug("Multiplexer(uri=" + endpoint.getURI()
401 + ", client=" + client);
402 }
403 _listener = listener;
404 _endpoint = endpoint;
405 _out = new DataOutputStream(endpoint.getOutputStream());
406 _in = new DataInputStream(endpoint.getInputStream());
407 _client = client;
408 handshake(_out, _in);
409 }
410
411 /***
412 * Perform handshaking on initial connection, to verify protocol. Subclasses
413 * may extend this behaviour.
414 *
415 * @param out the endpoint's output stream
416 * @param in the endpoint's input stream
417 * @throws IOException for any I/O error
418 */
419 protected void handshake(DataOutputStream out, DataInputStream in)
420 throws IOException {
421 out.writeInt(MAGIC);
422 out.writeInt(VERSION);
423 out.flush();
424
425 int magic = in.readInt();
426 if (magic != MAGIC) {
427 throw new ProtocolException("Expected protocol magic=" + MAGIC
428 + ", but received=" + magic);
429 }
430 int version = in.readInt();
431 if (version != VERSION) {
432 throw new ProtocolException("Expected protocol version=" + VERSION
433 + ", but received=" + version);
434 }
435 }
436
437 /***
438 * Perform authentication on initial connection.
439 *
440 * @param principal the security principal. May be <code>null</code>
441 * @throws IOException for any I/O error
442 * @throws java.lang.SecurityException if connection is refused by the server
443 */
444 protected void authenticate(Principal principal)
445 throws IOException, SecurityException {
446 try {
447 if (principal != null && !(principal instanceof BasicPrincipal)) {
448 throw new IOException(
449 "Cannot authenticate with principal of type "
450 + principal.getClass().getName());
451 }
452 if (principal != null) {
453 BasicPrincipal basic = (BasicPrincipal) principal;
454 _out.writeByte(AUTH_BASIC);
455 _out.writeUTF(basic.getName());
456 _out.writeUTF(basic.getPassword());
457 } else {
458 _out.writeByte(AUTH_NONE);
459 }
460 _out.flush();
461 if (_in.readByte() != AUTH_OK) {
462 throw new SecurityException("Connection refused");
463 }
464 } catch (IOException exception) {
465
466 _endpoint.close();
467 throw exception;
468 }
469 _principal = principal;
470 }
471
472 /***
473 * Performs authentication on initial connection.
474 *
475 * @param authenticator the authenticator
476 * @throws IOException for any I/O error
477 * @throws ResourceException if the authenticator cannot authenticate
478 */
479 protected void authenticate(Authenticator authenticator)
480 throws IOException, ResourceException {
481
482 try {
483 Principal principal = null;
484 byte type = _in.readByte();
485
486 switch (type) {
487 case AUTH_BASIC:
488 String name = _in.readUTF();
489 String password = _in.readUTF();
490 principal = new BasicPrincipal(name, password);
491 break;
492 case AUTH_NONE:
493 break;
494 default:
495 throw new IOException("Invalid packet type: " + type);
496 }
497 if (authenticator.authenticate(principal)) {
498 _out.writeByte(AUTH_OK);
499 _out.flush();
500 } else {
501 _out.writeByte(AUTH_DENIED);
502 _out.flush();
503 throw new SecurityException("User " + principal
504 + " unauthorised");
505 }
506 _principal = principal;
507 } catch (IOException exception) {
508
509 _endpoint.close();
510 throw exception;
511 } catch (ResourceException exception) {
512
513 _endpoint.close();
514 throw exception;
515 }
516 }
517
518 /***
519 * Opens a new channel.
520 *
521 * @return a new channel
522 * @throws IOException if a channel can't be opened
523 */
524 protected Channel open() throws IOException {
525 Channel channel;
526 int channelId;
527 synchronized (_channels) {
528 channelId = getNextChannelId();
529 channel = addChannel(channelId);
530 }
531
532 send(OPEN, channelId);
533 return channel;
534 }
535
536 /***
537 * Read a packet from the endpoint.
538 */
539 private void multiplex() {
540 try {
541 byte type = _in.readByte();
542 switch (type) {
543 case OPEN:
544 handleOpen();
545 break;
546 case CLOSE:
547 handleClose();
548 break;
549 case REQUEST:
550 handleRequest();
551 break;
552 case RESPONSE:
553 handleResponse();
554 break;
555 case DATA:
556 handleData();
557 break;
558 case PING_REQUEST:
559 handlePingRequest();
560 break;
561 case PING_RESPONSE:
562 handlePingResponse();
563 break;
564 case FLOW_READ:
565 handleFlowRead();
566 break;
567 case SHUTDOWN:
568 handleShutdown();
569 break;
570 default:
571 throw new IOException("Unrecognised message type: "
572 + type);
573 }
574 } catch (Exception exception) {
575 boolean closed = _closed;
576 shutdown();
577 if (!closed) {
578 _log.debug("Multiplexer shutting down on error", exception);
579
580 _listener.error(exception);
581 }
582 }
583 }
584
585 /***
586 * Shuts down the multiplexer.
587 */
588 private void shutdown() {
589
590 _closed = true;
591
592
593 Channel[] channels;
594 synchronized (_channels) {
595 channels = (Channel[]) _channels.values().toArray(new Channel[0]);
596 }
597 for (int i = 0; i < channels.length; ++i) {
598 channels[i].disconnected();
599 }
600 try {
601 _endpoint.close();
602 } catch (IOException exception) {
603 _log.debug(exception);
604 }
605 }
606
607 /***
608 * Open a new channel.
609 *
610 * @throws IOException for any error
611 */
612 private void handleOpen() throws IOException {
613 int channelId = _in.readUnsignedShort();
614 Integer key = new Integer(channelId);
615
616 synchronized (_channels) {
617 if (_channels.get(key) != null) {
618 throw new IOException(
619 "A channel already exists with identifier: " + key);
620 }
621 addChannel(channelId);
622 }
623 }
624
625 /***
626 * Close a channel.
627 *
628 * @throws IOException for any error
629 */
630 private void handleClose() throws IOException {
631 int channelId = _in.readUnsignedShort();
632 Integer key = new Integer(channelId);
633
634 synchronized (_channels) {
635 Channel channel = (Channel) _channels.remove(key);
636 if (channel == null) {
637 throw new IOException(
638 "No channel exists with identifier: " + key);
639 }
640 channel.close();
641 }
642 }
643
644 /***
645 * Handle a <code>REQUEST</code> packet.
646 *
647 * @throws IOException if an I/O error occurs, or no channel exists matching
648 * that read from the packet
649 */
650 private void handleRequest() throws IOException {
651 final Channel channel = handleData();
652 if (_log.isDebugEnabled()) {
653 _log.debug("handleRequest() [channel=" + channel.getId() + "]");
654 }
655
656 _listener.request(channel);
657
658 if (_log.isDebugEnabled()) {
659 _log.debug("handleRequest() [channel=" + channel.getId()
660 + "] - end");
661 }
662 }
663
664 /***
665 * Handle a <code>RESPONSE</code> packet.
666 *
667 * @throws IOException if an I/O error occurs, or no channel exists matching
668 * that read from the packet
669 */
670 private void handleResponse() throws IOException {
671 handleData();
672 }
673
674 /***
675 * Handle a <code>PING_REQUEST</code> packet.
676 *
677 * @throws IOException if an I/O error occurs
678 */
679 private void handlePingRequest() throws IOException {
680 int token = _in.readInt();
681 synchronized (_out) {
682 _out.writeByte(PING_RESPONSE);
683 _out.writeInt(token);
684 _out.flush();
685 if (_log.isDebugEnabled()) {
686 _log.debug("pinged(token=" + token + ")");
687 }
688 }
689 }
690
691 /***
692 * Handle a <code>PING_RESPONSE</code> packet.
693 *
694 * @throws IOException if an I/O error occurs
695 */
696 private void handlePingResponse() throws IOException {
697 int token = _in.readInt();
698 _listener.pinged(token);
699 }
700
701 /***
702 * Handle a <code>DATA</code> packet.
703 *
704 * @return the channel to handle the packet
705 * @throws IOException if an I/O error occurs, or no channel exists matching
706 * that read from the packet
707 */
708 private Channel handleData() throws IOException {
709 Channel channel = readChannel();
710 int length = _in.readInt();
711 channel.getMultiplexInputStream().receive(_in, length);
712 return channel;
713 }
714
715 /***
716 * Handle a <code>FLOW_READ</code> packet.
717 *
718 * @throws IOException if an I/O error occurs
719 */
720 private void handleFlowRead() throws IOException {
721 Channel channel = readChannel();
722 int read = _in.readInt();
723 channel.getMultiplexOutputStream().notifyRead(read);
724 }
725
726 /***
727 * Handle a <code>SHUTDOWN</code> packet.
728 */
729 private void handleShutdown() {
730 shutdown();
731 _listener.closed();
732 }
733
734 /***
735 * Adds a new channel.
736 * <p/>
737 * NOTE: Must be invoked with <code>_channels</code> synchronized
738 *
739 * @param channelId the channel identifier
740 * @return the new channel
741 */
742 private Channel addChannel(int channelId) {
743 int size = BUFFER_SIZE;
744 MultiplexOutputStream out =
745 new MultiplexOutputStream(channelId, this, size, size);
746 MultiplexInputStream in =
747 new MultiplexInputStream(channelId, this, size);
748 Channel channel = new Channel(channelId, this, in, out);
749 _channels.put(new Integer(channelId), channel);
750 return channel;
751 }
752
753 /***
754 * Reads the channel identifier from the stream and returns the
755 * corresponding channel.
756 *
757 * @return the channel corresponding to the read channel identifier
758 * @throws IOException for any I/O error, or if there is no corresponding
759 * channel
760 */
761 private Channel readChannel() throws IOException {
762 int channelId = _in.readUnsignedShort();
763 return getChannel(channelId);
764 }
765
766 /***
767 * Returns a channel given its identifier.
768 *
769 * @param channelId the channel identifier
770 * @return the channel corresponding to <code>channelId</code>
771 * @throws IOException if there is no corresponding channel
772 */
773 private Channel getChannel(int channelId) throws IOException {
774 Channel channel;
775 Integer key = new Integer(channelId);
776 synchronized (_channels) {
777 channel = (Channel) _channels.get(key);
778 if (channel == null) {
779 throw new IOException(
780 "No channel exists with identifier: " + channelId);
781 }
782 }
783 return channel;
784 }
785
786 /***
787 * Returns the next available channel identifier. Channel identifiers
788 * generated on the client side are in the range 0x0..0x7FFF, on the server
789 * side, 0x8000-0xFFFF
790 * <p/>
791 * NOTE: Must be invoked with <code>_channels</code> synchronized
792 *
793 * @return the next channel identifier
794 * @throws IOException if the connection is closed
795 */
796 private int getNextChannelId() throws IOException {
797 final int mask = 0x7fff;
798 final int serverIdBase = 0x8000;
799 int channelId = 0;
800 while (!_closed) {
801 _seed = (_seed + 1) & mask;
802 channelId = (_client) ? _seed : _seed + serverIdBase;
803 if (!_channels.containsKey(new Integer(channelId))) {
804 break;
805 }
806 }
807 if (_closed) {
808 throw new IOException("Connection has been closed");
809 }
810 return channelId;
811 }
812
813 }