View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2004-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: MultiplexedManagedConnection.java,v 1.10 2006/12/16 12:37:17 tanderson Exp $
44   */
45  package org.exolab.jms.net.multiplexer;
46  
47  import java.io.IOException;
48  import java.security.Principal;
49  
50  import org.apache.commons.logging.Log;
51  import org.apache.commons.logging.LogFactory;
52  
53  import org.exolab.jms.net.connector.AbstractManagedConnection;
54  import org.exolab.jms.net.connector.Authenticator;
55  import org.exolab.jms.net.connector.Caller;
56  import org.exolab.jms.net.connector.CallerImpl;
57  import org.exolab.jms.net.connector.ConnectException;
58  import org.exolab.jms.net.connector.Connection;
59  import org.exolab.jms.net.connector.IllegalStateException;
60  import org.exolab.jms.net.connector.InvocationHandler;
61  import org.exolab.jms.net.connector.Request;
62  import org.exolab.jms.net.connector.ResourceException;
63  import org.exolab.jms.net.connector.Response;
64  import org.exolab.jms.net.connector.SecurityException;
65  import org.exolab.jms.net.connector.ManagedConnectionListener;
66  import org.exolab.jms.net.uri.URI;
67  
68  
69  /***
70   * A <code>ManagedConnection</code> that uses a {@link Multiplexer} to multiplex
71   * data over an {@link Endpoint}
72   *
73   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
74   * @version $Revision: 1.10 $ $Date: 2006/12/16 12:37:17 $
75   */
76  public abstract class MultiplexedManagedConnection
77          extends AbstractManagedConnection
78          implements MultiplexerListener {
79  
80      /***
81       * The multiplexer.
82       */
83      private Multiplexer _multiplexer;
84  
85      /***
86       * The thread used to run {@link #_multiplexer}.
87       */
88      private Thread _multiplexThread;
89  
90      /***
91       * The endpoint to multiplex data over.
92       */
93      private Endpoint _endpoint;
94  
95      /***
96       * The invocation handler.
97       */
98      private InvocationHandler _invoker;
99  
100     /***
101      * The security principal.
102      */
103     private Principal _principal;
104 
105     /***
106      * The connection authenticator, for server side instances.
107      */
108     private Authenticator _authenticator;
109 
110     /***
111      * Cached caller instance. Non-null if this is a server-side instance.
112      */
113     private Caller _caller;
114 
115     /***
116      * The thread group to associate any allocated threads with.
117      */
118     private ThreadGroup _group;
119 
120     /***
121      * The logger.
122      */
123     private static final Log _log =
124             LogFactory.getLog(MultiplexedManagedConnection.class);
125 
126 
127     /***
128      * Construct a new client <code>MultiplexedManagedConnection</code>.
129      *
130      * @param principal the security principal. May be <code>null</code>
131      */
132     public MultiplexedManagedConnection(Principal principal) {
133         _principal = principal;
134     }
135 
136     /***
137      * Construct a new server <code>MultiplexedManagedConnection</code>.
138      *
139      * @param authenticator the connection authenticator
140      */
141     public MultiplexedManagedConnection(Authenticator authenticator) {
142         if (authenticator == null) {
143             throw new IllegalArgumentException(
144                     "Argument 'authenticator' is null");
145         }
146         _authenticator = authenticator;
147     }
148 
149     /***
150      * Registers a handler for handling invocations on objects exported via this
151      * connection. Once a handler is registered, it cannot be de-registered.
152      *
153      * @param handler the invocation handler
154      * @throws IllegalStateException if a handler is already registered
155      * @throws ResourceException     for any error
156      */
157     public void setInvocationHandler(InvocationHandler handler)
158             throws ResourceException {
159         if (_invoker != null) {
160             throw new IllegalStateException(
161                     "An invocation handler is already registered");
162         }
163         _invoker = handler;
164         try {
165             _endpoint = createEndpoint();
166             if (isClient()) {
167                 _multiplexer = createMultiplexer(_endpoint, _principal);
168             } else {
169                 _multiplexer = createMultiplexer(_endpoint, _authenticator);
170                 _principal = _multiplexer.getPrincipal();
171                 _caller = new CallerImpl(getRemoteURI(), getLocalURI());
172             }
173             String name = getDisplayName() + "-Multiplexer";
174             _multiplexThread = new Thread(getThreadGroup(), _multiplexer,
175                                           name);
176             _multiplexThread.start();
177         } catch (IOException exception) {
178             throw new ConnectException("Failed to start multiplexer",
179                                        exception);
180         }
181     }
182 
183     /***
184      * Creates a new connection handle for the underlying physical connection.
185      *
186      * @return a new connection handle
187      * @throws IllegalStateException if an invocation handler hasn't been
188      *                               registered
189      */
190     public synchronized Connection getConnection()
191             throws IllegalStateException {
192         if (_invoker == null) {
193             throw new IllegalStateException("No InvocationHandler registered");
194         }
195         return new MultiplexedConnection(this);
196     }
197 
198     /***
199      * Ping the connection. The connection event listener will be notified
200      * if the ping succeeds.
201      *
202      * @throws IllegalStateException if a connection is not established
203      * @throws ResourceException for any error
204      */
205     public void ping() throws ResourceException {
206         Multiplexer multiplexer;
207         synchronized (this) {
208             multiplexer = _multiplexer;
209         }
210         if (multiplexer != null) {
211             try {
212                 multiplexer.ping(0);
213             } catch (IOException exception) {
214                 throw new ResourceException(exception.getMessage(), exception);
215             }
216         } else {
217             throw new IllegalStateException("Connection not established");
218         }
219     }
220 
221     /***
222      * Destroys the physical connection.
223      *
224      * @throws ResourceException for any error
225      */
226     public void destroy() throws ResourceException {
227         Multiplexer multiplexer;
228         Thread thread;
229         Endpoint endpoint;
230 
231         synchronized (this) {
232             multiplexer = _multiplexer;
233             thread = _multiplexThread;
234             endpoint = _endpoint;
235         }
236         try {
237             if (multiplexer != null) {
238                 // multiplexer handles endpoint closure
239                 multiplexer.close();
240                 if (thread != Thread.currentThread()) {
241                     try {
242                         // wait for the multiplexer thread to terminate
243                         thread.join();
244                     } catch (InterruptedException exception) {
245                         _log.debug(exception);
246                     }
247                 }
248             } else {
249                 if (endpoint != null) {
250                     try {
251                         endpoint.close();
252                     } catch (IOException exception) {
253                         throw new ResourceException("Failed to close endpoint",
254                                                     exception);
255                     }
256                 }
257             }
258         } finally {
259             synchronized (this) {
260                 _multiplexer = null;
261                 _multiplexThread = null;
262                 _endpoint = null;
263             }
264         }
265     }
266 
267     /***
268      * Returns the principal associated with this connection.
269      *
270      * @return the principal associated with this connection,
271      *         or <code>null<code> if none is set
272      */
273     public Principal getPrincipal() {
274         return _principal;
275     }
276 
277     /***
278      * Determines if the security principal that owns this connection is the
279      * same as that supplied.
280      * <p/>
281      * NOTE: If this is a server-side instance, the principal is only available
282      * once the connection has been established, by {@link
283      * #setInvocationHandler}
284      *
285      * @param principal the principal to compare. May be <code>null</code>.
286      * @return <code>true</code> if the principal that owns this connection is
287      *         the same as <code>principal</code>
288      */
289     public boolean hasPrincipal(Principal principal) {
290         boolean result = false;
291         if ((_principal != null && _principal.equals(principal))
292                 || (_principal == null && principal == null)) {
293             result = true;
294         }
295         return result;
296     }
297 
298     /***
299      * Invoked for an invocation request.
300      *
301      * @param channel the channel the invocation is on
302      */
303     public void request(Channel channel) {
304         _invoker.invoke(new ChannelInvocation(channel, getCaller()));
305     }
306 
307     /***
308      * Invoked when the connection is closed by the peer.
309      */
310     public void closed() {
311         notifyClosed();
312     }
313 
314     /***
315      * Invoked when an error occurs on the multiplexer.
316      *
317      * @param error the error
318      */
319     public void error(Throwable error) {
320         notifyError(error);
321     }
322 
323     /***
324      * Notifies of a successful ping.
325      *
326      * @param token the token sent in the ping
327      */
328     public void pinged(int token) {
329         ManagedConnectionListener listener = getConnectionEventListener();
330         if (listener != null) {
331             listener.pinged(this);
332         }
333     }
334 
335     /***
336      * Invoke a method on a remote object.
337      *
338      * @param connection the connection invoking the request
339      * @param request    the request
340      * @return the response
341      */
342     protected Response invoke(Connection connection, Request request) {
343         Response response;
344         Multiplexer multiplexer;
345         synchronized (this) {
346             multiplexer = _multiplexer;
347         }
348         if (multiplexer != null) {
349             Channel channel = null;
350             try {
351                 channel = multiplexer.getChannel();
352                 response = channel.invoke(request);
353                 channel.release();
354             } catch (Exception exception) {
355                 _log.debug(exception, exception);
356                 response = new Response(exception);
357                 if (channel != null) {
358                     channel.destroy();
359                 }
360             }
361         } else {
362             response = new Response(new ResourceException("Connection lost"));
363         }
364 
365         return response;
366     }
367 
368     /***
369      * Creates the endpoint to multiplex data over.
370      *
371      * @return the endpoint to multiplex data over
372      * @throws IOException for any I/O error
373      */
374     protected abstract Endpoint createEndpoint() throws IOException;
375 
376     /***
377      * Create a new client-side multiplexer.
378      *
379      * @param endpoint  the endpoint to multiplex messages over
380      * @param principal the security principal
381      * @return a new client-side multiplexer
382      * @throws IOException       if an I/O error occurs
383      * @throws SecurityException if connection is refused by the server
384      */
385     protected Multiplexer createMultiplexer(Endpoint endpoint,
386                                             Principal principal)
387             throws IOException, SecurityException {
388         return new Multiplexer(this, endpoint, principal);
389     }
390 
391     /***
392      * Create a new server-side multiplexer.
393      *
394      * @param endpoint      the endpoint to multiplex messages over
395      * @param authenticator the connection authetnicator
396      * @return a new server-side multiplexer
397      * @throws IOException       if an I/O error occurs
398      * @throws ResourceException if the authenticator cannot authenticate
399      */
400     protected Multiplexer createMultiplexer(Endpoint endpoint,
401                                             Authenticator authenticator)
402             throws IOException, ResourceException {
403         return new Multiplexer(this, endpoint, authenticator);
404     }
405 
406     /***
407      * Helper to determine if this is a client-side or server side instance.
408      *
409      * @return <code>true</code> if this is a client-side instance, otherwise
410      *         <code>false</code>
411      */
412     protected boolean isClient() {
413         return (_authenticator == null);
414     }
415 
416     /***
417      * Helper to return an {@link Caller} instance, denoting the client
418      * performing a method invocation. Only applicable for server-side, and only
419      * after the multiplexer has been created.
420      *
421      * @return the caller instance, or <code>null</code> if it hasn't been
422      *         initialised
423      */
424     protected Caller getCaller() {
425         return _caller;
426     }
427 
428     /***
429      * Returns the thread group to associate with allocated threads.
430      *
431      * @return the thread group to associate with allocated threads, or
432      *         <code>null</code> to use the default thread group.
433      */
434     protected synchronized ThreadGroup getThreadGroup() {
435         if (_group == null) {
436             _group = new ThreadGroup(getDisplayName());
437         }
438         return _group;
439     }
440 
441     /***
442      * Helper to generate a descriptive name, for display purposes.
443      * <p/>
444      * This implementation returns the remote URI, concatenated with "[client]"
445      * if this is a client connection, or "[server]" if it is a server
446      * connection.
447      *
448      * @return the display name
449      */
450     protected String getDisplayName() {
451         StringBuffer name = new StringBuffer();
452         URI uri = null;
453         try {
454             uri = getRemoteURI();
455         } catch (ResourceException ignore) {
456             if (_log.isDebugEnabled()) {
457                 _log.debug("Failed to determine remote URI", ignore);
458             }
459         }
460         if (uri != null) {
461             name.append(uri.toString());
462         } else {
463             name.append("<unknown>");
464         }
465         if (isClient()) {
466             name.append("[client]");
467         } else {
468             name.append("[server]");
469         }
470         return name.toString();
471     }
472 
473 }