1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2004-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: MultiplexedManagedConnection.java,v 1.10 2006/12/16 12:37:17 tanderson Exp $
44 */
45 package org.exolab.jms.net.multiplexer;
46
47 import java.io.IOException;
48 import java.security.Principal;
49
50 import org.apache.commons.logging.Log;
51 import org.apache.commons.logging.LogFactory;
52
53 import org.exolab.jms.net.connector.AbstractManagedConnection;
54 import org.exolab.jms.net.connector.Authenticator;
55 import org.exolab.jms.net.connector.Caller;
56 import org.exolab.jms.net.connector.CallerImpl;
57 import org.exolab.jms.net.connector.ConnectException;
58 import org.exolab.jms.net.connector.Connection;
59 import org.exolab.jms.net.connector.IllegalStateException;
60 import org.exolab.jms.net.connector.InvocationHandler;
61 import org.exolab.jms.net.connector.Request;
62 import org.exolab.jms.net.connector.ResourceException;
63 import org.exolab.jms.net.connector.Response;
64 import org.exolab.jms.net.connector.SecurityException;
65 import org.exolab.jms.net.connector.ManagedConnectionListener;
66 import org.exolab.jms.net.uri.URI;
67
68
69 /***
70 * A <code>ManagedConnection</code> that uses a {@link Multiplexer} to multiplex
71 * data over an {@link Endpoint}
72 *
73 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
74 * @version $Revision: 1.10 $ $Date: 2006/12/16 12:37:17 $
75 */
76 public abstract class MultiplexedManagedConnection
77 extends AbstractManagedConnection
78 implements MultiplexerListener {
79
80 /***
81 * The multiplexer.
82 */
83 private Multiplexer _multiplexer;
84
85 /***
86 * The thread used to run {@link #_multiplexer}.
87 */
88 private Thread _multiplexThread;
89
90 /***
91 * The endpoint to multiplex data over.
92 */
93 private Endpoint _endpoint;
94
95 /***
96 * The invocation handler.
97 */
98 private InvocationHandler _invoker;
99
100 /***
101 * The security principal.
102 */
103 private Principal _principal;
104
105 /***
106 * The connection authenticator, for server side instances.
107 */
108 private Authenticator _authenticator;
109
110 /***
111 * Cached caller instance. Non-null if this is a server-side instance.
112 */
113 private Caller _caller;
114
115 /***
116 * The thread group to associate any allocated threads with.
117 */
118 private ThreadGroup _group;
119
120 /***
121 * The logger.
122 */
123 private static final Log _log =
124 LogFactory.getLog(MultiplexedManagedConnection.class);
125
126
127 /***
128 * Construct a new client <code>MultiplexedManagedConnection</code>.
129 *
130 * @param principal the security principal. May be <code>null</code>
131 */
132 public MultiplexedManagedConnection(Principal principal) {
133 _principal = principal;
134 }
135
136 /***
137 * Construct a new server <code>MultiplexedManagedConnection</code>.
138 *
139 * @param authenticator the connection authenticator
140 */
141 public MultiplexedManagedConnection(Authenticator authenticator) {
142 if (authenticator == null) {
143 throw new IllegalArgumentException(
144 "Argument 'authenticator' is null");
145 }
146 _authenticator = authenticator;
147 }
148
149 /***
150 * Registers a handler for handling invocations on objects exported via this
151 * connection. Once a handler is registered, it cannot be de-registered.
152 *
153 * @param handler the invocation handler
154 * @throws IllegalStateException if a handler is already registered
155 * @throws ResourceException for any error
156 */
157 public void setInvocationHandler(InvocationHandler handler)
158 throws ResourceException {
159 if (_invoker != null) {
160 throw new IllegalStateException(
161 "An invocation handler is already registered");
162 }
163 _invoker = handler;
164 try {
165 _endpoint = createEndpoint();
166 if (isClient()) {
167 _multiplexer = createMultiplexer(_endpoint, _principal);
168 } else {
169 _multiplexer = createMultiplexer(_endpoint, _authenticator);
170 _principal = _multiplexer.getPrincipal();
171 _caller = new CallerImpl(getRemoteURI(), getLocalURI());
172 }
173 String name = getDisplayName() + "-Multiplexer";
174 _multiplexThread = new Thread(getThreadGroup(), _multiplexer,
175 name);
176 _multiplexThread.start();
177 } catch (IOException exception) {
178 throw new ConnectException("Failed to start multiplexer",
179 exception);
180 }
181 }
182
183 /***
184 * Creates a new connection handle for the underlying physical connection.
185 *
186 * @return a new connection handle
187 * @throws IllegalStateException if an invocation handler hasn't been
188 * registered
189 */
190 public synchronized Connection getConnection()
191 throws IllegalStateException {
192 if (_invoker == null) {
193 throw new IllegalStateException("No InvocationHandler registered");
194 }
195 return new MultiplexedConnection(this);
196 }
197
198 /***
199 * Ping the connection. The connection event listener will be notified
200 * if the ping succeeds.
201 *
202 * @throws IllegalStateException if a connection is not established
203 * @throws ResourceException for any error
204 */
205 public void ping() throws ResourceException {
206 Multiplexer multiplexer;
207 synchronized (this) {
208 multiplexer = _multiplexer;
209 }
210 if (multiplexer != null) {
211 try {
212 multiplexer.ping(0);
213 } catch (IOException exception) {
214 throw new ResourceException(exception.getMessage(), exception);
215 }
216 } else {
217 throw new IllegalStateException("Connection not established");
218 }
219 }
220
221 /***
222 * Destroys the physical connection.
223 *
224 * @throws ResourceException for any error
225 */
226 public void destroy() throws ResourceException {
227 Multiplexer multiplexer;
228 Thread thread;
229 Endpoint endpoint;
230
231 synchronized (this) {
232 multiplexer = _multiplexer;
233 thread = _multiplexThread;
234 endpoint = _endpoint;
235 }
236 try {
237 if (multiplexer != null) {
238
239 multiplexer.close();
240 if (thread != Thread.currentThread()) {
241 try {
242
243 thread.join();
244 } catch (InterruptedException exception) {
245 _log.debug(exception);
246 }
247 }
248 } else {
249 if (endpoint != null) {
250 try {
251 endpoint.close();
252 } catch (IOException exception) {
253 throw new ResourceException("Failed to close endpoint",
254 exception);
255 }
256 }
257 }
258 } finally {
259 synchronized (this) {
260 _multiplexer = null;
261 _multiplexThread = null;
262 _endpoint = null;
263 }
264 }
265 }
266
267 /***
268 * Returns the principal associated with this connection.
269 *
270 * @return the principal associated with this connection,
271 * or <code>null<code> if none is set
272 */
273 public Principal getPrincipal() {
274 return _principal;
275 }
276
277 /***
278 * Determines if the security principal that owns this connection is the
279 * same as that supplied.
280 * <p/>
281 * NOTE: If this is a server-side instance, the principal is only available
282 * once the connection has been established, by {@link
283 * #setInvocationHandler}
284 *
285 * @param principal the principal to compare. May be <code>null</code>.
286 * @return <code>true</code> if the principal that owns this connection is
287 * the same as <code>principal</code>
288 */
289 public boolean hasPrincipal(Principal principal) {
290 boolean result = false;
291 if ((_principal != null && _principal.equals(principal))
292 || (_principal == null && principal == null)) {
293 result = true;
294 }
295 return result;
296 }
297
298 /***
299 * Invoked for an invocation request.
300 *
301 * @param channel the channel the invocation is on
302 */
303 public void request(Channel channel) {
304 _invoker.invoke(new ChannelInvocation(channel, getCaller()));
305 }
306
307 /***
308 * Invoked when the connection is closed by the peer.
309 */
310 public void closed() {
311 notifyClosed();
312 }
313
314 /***
315 * Invoked when an error occurs on the multiplexer.
316 *
317 * @param error the error
318 */
319 public void error(Throwable error) {
320 notifyError(error);
321 }
322
323 /***
324 * Notifies of a successful ping.
325 *
326 * @param token the token sent in the ping
327 */
328 public void pinged(int token) {
329 ManagedConnectionListener listener = getConnectionEventListener();
330 if (listener != null) {
331 listener.pinged(this);
332 }
333 }
334
335 /***
336 * Invoke a method on a remote object.
337 *
338 * @param connection the connection invoking the request
339 * @param request the request
340 * @return the response
341 */
342 protected Response invoke(Connection connection, Request request) {
343 Response response;
344 Multiplexer multiplexer;
345 synchronized (this) {
346 multiplexer = _multiplexer;
347 }
348 if (multiplexer != null) {
349 Channel channel = null;
350 try {
351 channel = multiplexer.getChannel();
352 response = channel.invoke(request);
353 channel.release();
354 } catch (Exception exception) {
355 _log.debug(exception, exception);
356 response = new Response(exception);
357 if (channel != null) {
358 channel.destroy();
359 }
360 }
361 } else {
362 response = new Response(new ResourceException("Connection lost"));
363 }
364
365 return response;
366 }
367
368 /***
369 * Creates the endpoint to multiplex data over.
370 *
371 * @return the endpoint to multiplex data over
372 * @throws IOException for any I/O error
373 */
374 protected abstract Endpoint createEndpoint() throws IOException;
375
376 /***
377 * Create a new client-side multiplexer.
378 *
379 * @param endpoint the endpoint to multiplex messages over
380 * @param principal the security principal
381 * @return a new client-side multiplexer
382 * @throws IOException if an I/O error occurs
383 * @throws SecurityException if connection is refused by the server
384 */
385 protected Multiplexer createMultiplexer(Endpoint endpoint,
386 Principal principal)
387 throws IOException, SecurityException {
388 return new Multiplexer(this, endpoint, principal);
389 }
390
391 /***
392 * Create a new server-side multiplexer.
393 *
394 * @param endpoint the endpoint to multiplex messages over
395 * @param authenticator the connection authetnicator
396 * @return a new server-side multiplexer
397 * @throws IOException if an I/O error occurs
398 * @throws ResourceException if the authenticator cannot authenticate
399 */
400 protected Multiplexer createMultiplexer(Endpoint endpoint,
401 Authenticator authenticator)
402 throws IOException, ResourceException {
403 return new Multiplexer(this, endpoint, authenticator);
404 }
405
406 /***
407 * Helper to determine if this is a client-side or server side instance.
408 *
409 * @return <code>true</code> if this is a client-side instance, otherwise
410 * <code>false</code>
411 */
412 protected boolean isClient() {
413 return (_authenticator == null);
414 }
415
416 /***
417 * Helper to return an {@link Caller} instance, denoting the client
418 * performing a method invocation. Only applicable for server-side, and only
419 * after the multiplexer has been created.
420 *
421 * @return the caller instance, or <code>null</code> if it hasn't been
422 * initialised
423 */
424 protected Caller getCaller() {
425 return _caller;
426 }
427
428 /***
429 * Returns the thread group to associate with allocated threads.
430 *
431 * @return the thread group to associate with allocated threads, or
432 * <code>null</code> to use the default thread group.
433 */
434 protected synchronized ThreadGroup getThreadGroup() {
435 if (_group == null) {
436 _group = new ThreadGroup(getDisplayName());
437 }
438 return _group;
439 }
440
441 /***
442 * Helper to generate a descriptive name, for display purposes.
443 * <p/>
444 * This implementation returns the remote URI, concatenated with "[client]"
445 * if this is a client connection, or "[server]" if it is a server
446 * connection.
447 *
448 * @return the display name
449 */
450 protected String getDisplayName() {
451 StringBuffer name = new StringBuffer();
452 URI uri = null;
453 try {
454 uri = getRemoteURI();
455 } catch (ResourceException ignore) {
456 if (_log.isDebugEnabled()) {
457 _log.debug("Failed to determine remote URI", ignore);
458 }
459 }
460 if (uri != null) {
461 name.append(uri.toString());
462 } else {
463 name.append("<unknown>");
464 }
465 if (isClient()) {
466 name.append("[client]");
467 } else {
468 name.append("[server]");
469 }
470 return name.toString();
471 }
472
473 }