View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2005-2006 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: DefaultConnectionPool.java,v 1.9 2006/12/16 12:37:17 tanderson Exp $
44   */
45  package org.exolab.jms.net.connector;
46  
47  import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
48  import org.apache.commons.logging.Log;
49  import org.apache.commons.logging.LogFactory;
50  import org.exolab.jms.common.threads.ThreadFactory;
51  import org.exolab.jms.net.uri.URI;
52  import org.exolab.jms.net.util.Properties;
53  
54  import java.security.Principal;
55  import java.util.ArrayList;
56  import java.util.Collections;
57  import java.util.HashMap;
58  import java.util.List;
59  import java.util.Map;
60  
61  
62  /***
63   * Manages a pool of {@link ManagedConnection} instances, for a particular
64   * {@link ManagedConnectionFactory}.
65   *
66   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
67   * @version $Revision: 1.9 $ $Date: 2006/12/16 12:37:17 $
68   * @see AbstractConnectionManager
69   */
70  class DefaultConnectionPool
71          implements ManagedConnectionAcceptorListener,
72                     ManagedConnectionListener, ConnectionPool {
73  
74      /***
75       * The connection factory.
76       */
77      private final ManagedConnectionFactory _factory;
78  
79      /***
80       * Invocation handler to assign to each new connection.
81       */
82      private final InvocationHandler _handler;
83  
84      /***
85       * The connection factory for resolving connections via their URI.
86       */
87      private final ConnectionFactory _resolver;
88  
89      /***
90       * The set of allocated connections.
91       */
92      private List _connections = Collections.synchronizedList(new ArrayList());
93  
94      /***
95       * A map of ManagedConnection -> ManagedConnectionHandle. The handles are
96       * used to reap idle connections.
97       */
98      private Map _handles = Collections.synchronizedMap(new HashMap());
99  
100     /***
101      * The set of connection acceptors.
102      */
103     private List _acceptors = Collections.synchronizedList(new ArrayList());
104 
105     /***
106      * The set of accepted connections.
107      */
108     private List _accepted = Collections.synchronizedList(new ArrayList());
109 
110     /***
111      * The set of all connections, as a map of ManagedConnection -> PoolEntry
112      * instances.
113      */
114     private Map _entries = Collections.synchronizedMap(new HashMap());
115 
116     /***
117      * Reap thread synchronization helper.
118      */
119     private final Object _reapLock = new Object();
120 
121     /***
122      * Clock daemon for periodically running the reaper.
123      */
124     private ClockDaemon _daemon;
125 
126     /***
127      * Interval between pinging and reaping connections, in milliseconds.
128      * If <code>0</code> indicates not to reap connections.
129      */
130     private final long _reapInterval;
131 
132     /***
133      * Iterations before a connection that hasn't responded to a ping
134      * is declared dead.
135      */
136     private final int _reapDeadIterations;
137 
138     /***
139      * The maximum period that a connection may be idle before it is reaped,
140      * in milliseconds.
141      */
142     private final long _idlePeriod;
143 
144     /***
145      * The caller event listener.
146      */
147     private volatile CallerListener _listener;
148 
149     /***
150      * Property name prefix for pool configuration items.
151      */
152     private static final String POOL_PREFIX = "org.exolab.jms.net.pool.";
153 
154     /***
155      * Configuration property to indicate the no. of reaps to wait before
156      * reaping a connection that hasn't responded to a ping.
157      */
158     private static final String DEAD_ITERATIONS = "reapDeadIterations";
159 
160     /***
161      * Configuration property to indicate the reap interval.
162      */
163     private static final String REAP_INTERVAL = "reapInterval";
164 
165     /***
166      * Configuration property to indicate the idle time for connections
167      * before they may be reaped.
168      */
169     private static final String IDLE_PERIOD = "idlePeriod";
170 
171 
172     /***
173      * The logger.
174      */
175     private static final Log _log
176             = LogFactory.getLog(DefaultConnectionPool.class);
177 
178 
179     /***
180      * Construct a new <code>DefaultConnectionPool</code>.
181      *
182      * @param factory    the managed connection factory
183      * @param handler    the invocation handler, assigned to each new managed
184      *                   connection
185      * @param resolver   the connection factory for resolving connections via
186      *                   their URI
187      * @param properties configuration properties. May be <code>null</code>
188      * @throws ResourceException if any configuration property is invalid
189      */
190     public DefaultConnectionPool(ManagedConnectionFactory factory,
191                                  InvocationHandler handler,
192                                  ConnectionFactory resolver,
193                                  Map properties) throws ResourceException {
194         if (factory == null) {
195             throw new IllegalArgumentException("Argument 'factory' is null");
196         }
197         if (handler == null) {
198             throw new IllegalArgumentException("Argument 'handler' is null");
199         }
200         if (resolver == null) {
201             throw new IllegalArgumentException("Argument 'resolver' is null");
202         }
203         _factory = factory;
204         _handler = handler;
205         _resolver = resolver;
206 
207         Properties config = new Properties(properties, POOL_PREFIX);
208         _reapInterval = getPropertyMillis(config, REAP_INTERVAL, 60);
209         _reapDeadIterations = config.getInt(DEAD_ITERATIONS, 5);
210         _idlePeriod = getPropertyMillis(config, IDLE_PERIOD, 5);
211     }
212 
213     private long getPropertyMillis(Properties config, String key,
214                                    int defaultValue) throws ResourceException {
215         int seconds = config.getInt(key, defaultValue);
216         if (seconds < 0) {
217             seconds = 0;
218         }
219         return seconds * 1000;
220     }
221 
222     /***
223      * Creates a new connection.
224      *
225      * @param principal the security principal
226      * @param info      the connection request info
227      * @return a new connection
228      * @throws ResourceException if a connection cannot be established
229      */
230     public ManagedConnection createManagedConnection(Principal principal,
231                                                      ConnectionRequestInfo info)
232             throws ResourceException {
233         ManagedConnection connection = _factory.createManagedConnection(
234                 principal, info);
235         return add(connection, false);
236     }
237 
238     /***
239      * Creates an acceptor for connections.
240      *
241      * @param authenticator authenticates incoming connections
242      * @param info          the connection request info
243      * @return a new connection acceptor
244      * @throws ResourceException if an acceptor cannot be created
245      */
246     public ManagedConnectionAcceptor createManagedConnectionAcceptor(
247             Authenticator authenticator, ConnectionRequestInfo info)
248             throws ResourceException {
249 
250         ManagedConnectionAcceptor acceptor;
251 
252         acceptor = _factory.createManagedConnectionAcceptor(authenticator,
253                                                             info);
254         _acceptors.add(acceptor);
255         return acceptor;
256     }
257 
258     /***
259      * Returns a matched connection from the set of pooled connections.
260      *
261      * @param principal the security principal
262      * @param info      the connection request info
263      * @return the first acceptable match, or <code>null</code> if none is
264      *         found
265      * @throws ResourceException for any error
266      */
267     public ManagedConnection matchManagedConnections(Principal principal,
268                                                      ConnectionRequestInfo info)
269             throws ResourceException {
270 
271         ManagedConnection result;
272         synchronized (_reapLock) {
273             // ensure idle connections aren't being reaped while matching
274             result = _factory.matchManagedConnections(_connections, principal,
275                                                       info);
276             if (result != null) {
277                 // return the handle corresponding to the connection
278                 result = (ManagedConnection) _handles.get(result);
279             } else {
280                 result = _factory.matchManagedConnections(_accepted, principal,
281                                                           info);
282             }
283         }
284         return result;
285     }
286 
287     /***
288      * Returns a matched acceptor from the set of pooled connections.
289      *
290      * @param info the connection request info
291      * @return the first acceptable match, or <code>null</code> if none is
292      *         found
293      * @throws ResourceException for any error
294      */
295     public ManagedConnectionAcceptor matchManagedConnectionAcceptors(
296             ConnectionRequestInfo info) throws ResourceException {
297 
298         return _factory.matchManagedConnectionAcceptors(_acceptors, info);
299     }
300 
301     /***
302      * Returns a listener for handling accepted connections.
303      *
304      * @return a listener for handling accepted connections
305      */
306     public ManagedConnectionAcceptorListener
307             getManagedConnectionAcceptorListener() {
308         return this;
309     }
310 
311     /***
312      * Invoked when a new connection is accepted.
313      *
314      * @param acceptor   the acceptor which created the connection
315      * @param connection the accepted connection
316      */
317     public void accepted(ManagedConnectionAcceptor acceptor,
318                          ManagedConnection connection) {
319         try {
320             add(connection, true);
321         } catch (ResourceException exception) {
322             _log.debug("Failed to accept connection", exception);
323         }
324     }
325 
326     /***
327      * Notifies closure of a connection. The <code>ManagedConnection</code>
328      * instance invokes this to notify its registered listeners when the peer
329      * closes the connection.
330      *
331      * @param source the managed connection that is the source of the event
332      */
333     public void closed(ManagedConnection source) {
334         if (_log.isDebugEnabled()) {
335             _log.debug("Connection " + source + " closed by peer, destroying");
336         }
337         remove(source);
338     }
339 
340     /***
341      * Notifies a connection related error. The <code>ManagedConnection</code>
342      * instance invokes this to notify of the occurrence of a physical
343      * connection-related error.
344      *
345      * @param source    the managed connection that is the source of the event
346      * @param throwable the error
347      */
348     public void error(ManagedConnection source, Throwable throwable) {
349         if (_log.isDebugEnabled()) {
350             _log.debug("Error on connection " + source + ", destroying",
351                        throwable);
352         }
353         remove(source);
354     }
355 
356     /***
357      * Notifies of a successful ping.
358      *
359      * @param source the managed connection that is the source of the event
360      */
361     public void pinged(ManagedConnection source) {
362         ManagedConnectionHandle handle
363                 = (ManagedConnectionHandle) _handles.get(source);
364         if (handle != null) {
365             handle.pinged();
366         }
367     }
368 
369     /***
370      * Closes this connection pool, cleaning up any allocated resources.
371      *
372      * @throws ResourceException for any error
373      */
374     public void close() throws ResourceException {
375         ManagedConnectionAcceptor[] acceptors =
376                 (ManagedConnectionAcceptor[]) _acceptors.toArray(
377                         new ManagedConnectionAcceptor[0]);
378         _acceptors.clear();
379 
380         for (int i = 0; i < acceptors.length; ++i) {
381             acceptors[i].close();
382         }
383 
384         ManagedConnection[] connections =
385                 (ManagedConnection[]) _entries.keySet().toArray(
386                         new ManagedConnection[0]);
387         for (int i = 0; i < connections.length; ++i) {
388             connections[i].destroy();
389         }
390         _entries.clear();
391 
392         _accepted.clear();
393         _connections.clear();
394 
395         stopReaper();
396     }
397 
398     /***
399      * Invoked when an acceptor receives an error.
400      *
401      * @param acceptor  the acceptor which received the error
402      * @param throwable the error
403      */
404     public void error(ManagedConnectionAcceptor acceptor,
405                       Throwable throwable) {
406         _acceptors.remove(acceptor);
407 
408         String uri = "<unknown>";
409         try {
410             uri = acceptor.getURI().toString();
411         } catch (ResourceException ignore) {
412             // no-op
413         }
414         _log.error("Failed to accept connections on URI=" + uri,
415                    throwable);
416 
417         try {
418             acceptor.close();
419         } catch (ResourceException exception) {
420             if (_log.isDebugEnabled()) {
421                 _log.debug("Failed to close acceptor, URI=" + uri, exception);
422             }
423         }
424     }
425 
426     /***
427      * Sets the listener for caller events.
428      *
429      * @param listener the listener
430      */
431     public void setCallerListener(CallerListener listener) {
432         _listener = listener;
433     }
434 
435     /***
436      * Notifies when a managed connection is idle.
437      *
438      * @param connection the idle connection
439      */
440     protected synchronized void idle(ManagedConnectionHandle connection) {
441         connection.clearUsed();
442         if (_daemon != null) {
443             _daemon.executeAfterDelay(_idlePeriod, new IdleReaper());
444         }
445     }
446 
447     /***
448      * Adds a connection to the pool. If the connection was created, a {@link
449      * ManagedConnectionHandle} will be returned, wrapping the supplied
450      * connection.
451      *
452      * @param connection the connection to add
453      * @param accepted   if <code>true</code> the connection was accepted via an
454      *                   {@link ManagedConnectionAcceptor}, otherwise it was
455      *                   created via
456      *                   {@link ManagedConnectionFactory#createManagedConnection}
457      * @return the (possibly wrapped) connection
458      * @throws ResourceException if the connection cannot be added
459      */
460     protected ManagedConnection add(ManagedConnection connection,
461                                     boolean accepted) throws ResourceException {
462         ManagedConnection result;
463 
464         PoolEntry entry = new PoolEntry(connection, accepted);
465         _entries.put(connection, entry);
466         if (accepted) {
467             _accepted.add(connection);
468             result = connection;
469         } else {
470             _connections.add(connection);
471             ManagedConnection handle = new ManagedConnectionHandle(
472                     this, connection, _resolver);
473             _handles.put(connection, handle);
474             result = handle;
475         }
476         ContextInvocationHandler handler = new ContextInvocationHandler(
477                 _handler, _resolver, result);
478         try {
479             connection.setInvocationHandler(handler);
480             connection.setConnectionEventListener(this);
481         } catch (ResourceException exception) {
482             try {
483                 _log.debug("Failed to initialise connection, destroying",
484                            exception);
485                 connection.destroy();
486             } catch (ResourceException nested) {
487                 _log.debug("Failed to destroy connection", nested);
488             } finally {
489                 _entries.remove(connection);
490                 if (accepted) {
491                     _accepted.remove(connection);
492                 } else {
493                     _connections.remove(connection);
494                     _handles.remove(connection);
495                 }
496             }
497             // propagate the exception
498             throw exception;
499         }
500 
501         // mark the connection as initialised and therefore available for
502         // reaping
503         entry.setInitialised();
504 
505         startReaper();
506 
507         return result;
508     }
509 
510     /***
511      * Remove a connection from the pool.
512      *
513      * @param connection the connection to remove
514      */
515     protected void remove(ManagedConnection connection) {
516         PoolEntry entry = (PoolEntry) _entries.remove(connection);
517         if (entry != null) {
518             if (entry.getAccepted()) {
519                 _accepted.remove(connection);
520             } else {
521                 _connections.remove(connection);
522                 _handles.remove(connection);
523             }
524             URI remoteURI = null;
525             URI localURI = null;
526             try {
527                 remoteURI = connection.getRemoteURI();
528                 localURI = connection.getLocalURI();
529             } catch (ResourceException exception) {
530                 _log.debug("Failed to get connection URIs", exception);
531             }
532 
533             try {
534                 connection.destroy();
535             } catch (ResourceException exception) {
536                 _log.debug("Failed to destroy connection", exception);
537             }
538             if (remoteURI != null && localURI != null) {
539                 notifyDisconnection(remoteURI, localURI);
540             }
541         } else {
542             _log.debug("ManagedConnection not found");
543         }
544         if (_entries.isEmpty()) {
545             stopReaper();
546         }
547     }
548 
549     /***
550      * Notify of a disconnection.
551      *
552      * @param remoteURI the remote address that the client is calling from
553      * @param localURI  the local address that the client is calling to
554      */
555     private void notifyDisconnection(URI remoteURI, URI localURI) {
556         CallerListener listener = _listener;
557         if (listener != null) {
558             listener.disconnected(new CallerImpl(remoteURI, localURI));
559         }
560     }
561 
562     /***
563      * Starts the reaper for dead/idle connections, if needed.
564      */
565     private synchronized void startReaper() {
566         if (_daemon == null) {
567             _daemon = new ClockDaemon();
568             ThreadFactory creator =
569                     new ThreadFactory(null, "ManagedConnectionReaper", false);
570             _daemon.setThreadFactory(creator);
571 
572             if (_reapInterval > 0) {
573                 _daemon.executePeriodically(_reapInterval, new DeadReaper(),
574                                             false);
575             }
576         }
577     }
578 
579     /***
580      * Stops the reaper for dead/idle connections, if needed.
581      */
582     private synchronized void stopReaper() {
583         if (_daemon != null) {
584             _daemon.shutDown();
585             _daemon = null;
586         }
587     }
588 
589     /***
590      * Reap idle connections.
591      */
592     private void reapIdleConnections() {
593         Map.Entry[] entries = (Map.Entry[]) _handles.entrySet().toArray(
594                 new Map.Entry[0]);
595         for (int i = 0; i < entries.length && !stopReaping(); ++i) {
596             Map.Entry entry = entries[i];
597             ManagedConnection connection =
598                     (ManagedConnection) entry.getKey();
599             PoolEntry pooled = (PoolEntry) _entries.get(connection);
600             if (pooled != null && pooled.isInitialised()) {
601                 ManagedConnectionHandle handle =
602                         (ManagedConnectionHandle) entry.getValue();
603                 if (handle.canDestroy()) {
604                     if (_log.isDebugEnabled()) {
605                         try {
606                             _log.debug("Reaping idle connection, URI="
607                                     + connection.getRemoteURI()
608                                     + ", local URI="
609                                     + connection.getLocalURI());
610                         } catch (ResourceException ignore) {
611                             // do nothing
612                         }
613                     }
614                     remove(connection);
615                 }
616             }
617         }
618     }
619 
620     /***
621      * Reap dead connections.
622      */
623     private void reapDeadConnections() {
624         Map.Entry[] entries = (Map.Entry[]) _handles.entrySet().toArray(
625                 new Map.Entry[0]);
626         for (int i = 0; i < entries.length && !stopReaping(); ++i) {
627             Map.Entry entry = entries[i];
628             ManagedConnection connection =
629                     (ManagedConnection) entry.getKey();
630             PoolEntry pooled = (PoolEntry) _entries.get(connection);
631             if (pooled != null && pooled.isInitialised()) {
632                 ManagedConnectionHandle handle =
633                         (ManagedConnectionHandle) entry.getValue();
634                 if (!handle.used()) {
635                     // if the handle is unused, and is not waiting on a ping
636                     // reply, ping the connection
637                     if (handle.pinging()) {
638                         if (handle.incPingWaits() > _reapDeadIterations) {
639                             remove(connection);
640                         }
641                     } else {
642                         try {
643                             handle.ping();
644                         } catch (ResourceException exception) {
645                             if (_log.isDebugEnabled()) {
646                                 try {
647                                     _log.debug(
648                                             "Failed to ping connection, URI="
649                                                     + connection.getRemoteURI()
650                                                     + ", localURI="
651                                                     + connection.getLocalURI());
652                                 } catch (ResourceException ignore) {
653                                     // do nothing
654                                 }
655                             }
656                             remove(connection);
657                         }
658                     }
659                 } else {
660                     handle.clearUsed();
661                 }
662             }
663         }
664     }
665 
666     /***
667      * Helper to determines if a reaper should terminate, by checking the
668      * interrupt status of the current thread.
669      *
670      * @return <code>true</code> if the reaper should terminate
671      */
672     private boolean stopReaping() {
673         return Thread.currentThread().isInterrupted();
674     }
675 
676     /***
677      * Helper class for reaping idle connections.
678      */
679     private class IdleReaper implements Runnable {
680 
681         /***
682          * Run the reaper.
683          */
684         public void run() {
685             synchronized (_reapLock) {
686                 try {
687                     reapIdleConnections();
688                 } catch (Throwable exception) {
689                     _log.error(exception, exception);
690                 }
691             }
692         }
693     }
694 
695     /***
696      * Helper class for reaping dead connections.
697      */
698     private class DeadReaper implements Runnable {
699 
700         /***
701          * Run the reaper.
702          */
703         public void run() {
704             try {
705                 reapDeadConnections();
706             } catch (Throwable exception) {
707                 _log.error(exception, exception);
708             }
709         }
710     }
711 
712 }