View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: DestinationManagerImpl.java,v 1.2 2005/11/12 10:49:48 tanderson Exp $
44   */
45  package org.exolab.jms.messagemgr;
46  
47  import java.sql.Connection;
48  import java.util.Enumeration;
49  import java.util.HashMap;
50  import java.util.Iterator;
51  import java.util.LinkedList;
52  import java.util.Map;
53  import java.util.List;
54  import java.util.ArrayList;
55  import javax.jms.InvalidDestinationException;
56  import javax.jms.JMSException;
57  
58  import org.apache.commons.logging.Log;
59  import org.apache.commons.logging.LogFactory;
60  
61  import org.exolab.jms.client.JmsDestination;
62  import org.exolab.jms.client.JmsQueue;
63  import org.exolab.jms.client.JmsTopic;
64  import org.exolab.jms.gc.GarbageCollectionService;
65  import org.exolab.jms.message.MessageImpl;
66  import org.exolab.jms.persistence.DatabaseService;
67  import org.exolab.jms.persistence.PersistenceAdapter;
68  import org.exolab.jms.persistence.PersistenceException;
69  import org.exolab.jms.service.Service;
70  import org.exolab.jms.service.ServiceException;
71  
72  
73  /***
74   * The destination manager is responsible for creating and managing the
75   * lifecycle of {@link DestinationCache} objects. The destination manager is
76   * also responsible for managing messages, that are received by the message
77   * manager, which do not have any registered {@link DestinationCache}.
78   *
79   * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a>
80   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
81   * @version $Revision: 1.2 $ $Date: 2005/11/12 10:49:48 $
82   */
83  public class DestinationManagerImpl extends Service
84          implements DestinationManager {
85  
86      /***
87       * The set of persistent and non-persistent destinations, keyed on name.
88       */
89      private final HashMap _destinations = new HashMap();
90  
91      /***
92       * The set of active DestinationCache instances, keyed on destination.
93       */
94      private final HashMap _caches = new HashMap();
95  
96      /***
97       * Synchronization helper. Should be synchronized on whenever accessing
98       * _destinations, or _caches
99       */
100     private final Object _lock = _destinations;
101 
102     /***
103      * Maintains a linked list of DestinationEventListener objects. These
104      * listeners will be informed when destinations are added or destroyed.
105      */
106     private LinkedList _listeners = new LinkedList();
107 
108     /***
109      * The message manager.
110      */
111     private final MessageManager _messages;
112 
113     /***
114      * The destination cache factory.
115      */
116     private final DestinationCacheFactory _factory;
117 
118     /***
119      * The consumer manager.
120      */
121     private ConsumerManager _consumers;
122 
123     /***
124      * The database service.
125      */
126     private final DatabaseService _database;
127 
128     /***
129      * The garbage collection service.
130      */
131     private final GarbageCollectionService _collector;
132 
133     /***
134      * The logger.
135      */
136     private static final Log _log =
137             LogFactory.getLog(DestinationManagerImpl.class);
138 
139     /***
140      * Construct a new <code>DestinationManagerImpl</code>.
141      *
142      * @param messages  the message manager
143      * @param factory   the destination cache factory
144      * @param database  the database service
145      * @param collector the garbage collection service
146      */
147     public DestinationManagerImpl(MessageManager messages,
148                                   DestinationCacheFactory factory,
149                                   DatabaseService database,
150                                   GarbageCollectionService collector) {
151         if (messages == null) {
152             throw new IllegalArgumentException("Argument 'messages' is null");
153         }
154         if (factory == null) {
155             throw new IllegalArgumentException("Argument 'factory' is null");
156         }
157         if (database == null) {
158             throw new IllegalArgumentException("Argument 'database' is null");
159         }
160         if (collector == null) {
161             throw new IllegalArgumentException("Argument 'collector' is null");
162         }
163         _messages = messages;
164         _factory = factory;
165         _database = database;
166         _collector = collector;
167     }
168 
169     /***
170      * Sets the consumer manager.
171      *
172      * @param consumers the consumer manager
173      */
174     public void setConsumerManager(ConsumerManager consumers) {
175         _consumers = consumers;
176     }
177 
178     /***
179      * Returns the cache for the supplied destination.
180      * <p/>
181      * If the cache doesn't exist, it will be created, and any registered {@link
182      * DestinationEventListener}s will be notified.
183      *
184      * @param destination the destination of the cache to return
185      * @return the cache associated with <code>destination</code>
186      * @throws InvalidDestinationException if <code>destination</code> doesn't
187      *                                     exist
188      * @throws JMSException                if the cache can't be created
189      */
190     public DestinationCache getDestinationCache(JmsDestination destination)
191             throws JMSException {
192         DestinationCache result;
193         boolean created = false;
194 
195         synchronized (_lock) {
196             final String name = destination.getName();
197 
198             // make sure the managed destination instance is used.
199             destination = getExistingDestination(name);
200 
201             result = (DestinationCache) _caches.get(destination);
202             if (result == null) {
203                 checkWildcard(destination);
204                 result = _factory.createDestinationCache(destination);
205                 _caches.put(destination, result);
206                 _messages.addEventListener(destination, result);
207                 created = true;
208             }
209         }
210 
211         if (created) {
212             // notify the listeners that a new cache has been added,
213             // outside the sync of _lock
214             notifyCacheAdded(result);
215         }
216 
217         return result;
218     }
219 
220     /***
221      * Returns a destination given its name.
222      *
223      * @param name the name of the destination
224      * @return the destination corresponding to <code>name</code> or
225      *         <code>null</code> if none exists
226      */
227     public JmsDestination getDestination(String name) {
228         synchronized (_lock) {
229             return (JmsDestination) _destinations.get(name);
230         }
231     }
232 
233     /***
234      * Register an event listener to be notified when destinations are created
235      * and destroyed.
236      *
237      * @param listener the listener to add
238      */
239     public void addDestinationEventListener(DestinationEventListener listener) {
240         synchronized (_listeners) {
241             if (!_listeners.contains(listener)) {
242                 _listeners.add(listener);
243             }
244         }
245     }
246 
247     /***
248      * Remove an event listener.
249      *
250      * @param listener the listener to remove
251      */
252     public void removeDestinationEventListener(
253             DestinationEventListener listener) {
254         synchronized (_listeners) {
255             _listeners.remove(listener);
256         }
257     }
258 
259     /***
260      * Create a destination.
261      * <p/>
262      * Any registered {@link DestinationEventListener}s will be notified.
263      *
264      * @param destination the destination to create
265      * @throws InvalidDestinationException if the destination already exists or
266      *                                     is a wildcard destination
267      * @throws JMSException                if the destination can't be created
268      */
269     public void createDestination(JmsDestination destination)
270             throws JMSException {
271         checkWildcard(destination);
272         synchronized (_lock) {
273             if (exists(destination.getName())) {
274                 throw new InvalidDestinationException(
275                         "Destination already exists: " + destination.getName());
276             }
277             if (destination.getPersistent()) {
278                 createPersistentDestination(destination);
279             }
280             addToDestinations(destination);
281         }
282 
283         notifyDestinationAdded(destination);
284     }
285 
286     /***
287      * Remove a destination.
288      * <p/>
289      * All messages and durable consumers will be removed. Any registered {@link
290      * DestinationEventListener}s will be notified.
291      *
292      * @param destination the destination to remove
293      * @throws InvalidDestinationException if the destination is invalid
294      * @throws JMSException                if the destination can't be removed
295      */
296     public void removeDestination(JmsDestination destination)
297             throws JMSException {
298 
299         if (_log.isDebugEnabled()) {
300             _log.debug("removeDestination(destination=" + destination + ")");
301         }
302 
303         // make sure the managed destination instance is used.
304         destination = getExistingDestination(destination.getName());
305 
306         boolean queue = (destination instanceof JmsQueue) ? true : false;
307 
308         if (!queue) {
309             // If its a topic, unsubscribe any inactive durable subscribers.
310             // The following will fail if there are active subscribers
311             _consumers.unsubscribe((JmsTopic) destination);
312         }
313 
314         synchronized (_lock) {
315             DestinationCache cache =
316                     (DestinationCache) _caches.get(destination);
317             // make sure there are no consumers
318             if (cache != null && cache.hasConsumers()) {
319                 throw new JMSException("Cannot delete destination"
320                                        + destination + " since there are "
321                                        + " active consumers.");
322             }
323         }
324 
325         // now that we have removed all the durable consumers we can remove
326         // the administered topic. First delete it from memory and then
327         // from the persistent store
328         try {
329             _database.begin();
330             Connection connection = _database.getConnection();
331 
332             _database.getAdapter().removeDestination(connection,
333                                                      destination.getName());
334             destroyDestinationCache(destination);
335             removeFromDestinations(destination);
336             _database.commit();
337         } catch (Exception exception) { // JMSException, PersistenceException
338             String msg = "Failed to remove destination "
339                     + destination.getName();
340             cleanup(msg, exception);
341         }
342 
343         notifyDestinationRemoved(destination);
344     }
345 
346     /***
347      * Invoked when the {@link MessageManager} receives a non-persistent
348      * message.
349      *
350      * @param destination the message's destination
351      * @param message     the message
352      * @throws JMSException if the listener fails to handle the message
353      */
354     public void messageAdded(JmsDestination destination,
355                              MessageImpl message)
356             throws JMSException {
357         if (destination instanceof JmsTopic) {
358             // check to see whether there are active consumers interested
359             // in the specified destination. If there are then we need to
360             // create a destination cache and pass the message to it.
361             if (_consumers.hasActiveConsumers(destination)) {
362                 if (!exists(destination.getName())) {
363                     createDestination(destination);
364                 }
365                 DestinationCache cache = getDestinationCache(destination);
366                 cache.messageAdded(destination, message);
367             }
368         } else {
369             // destination is a queue. Since the message is non-persistent,
370             // create the cache and pass the message to it.
371             if (!exists(destination.getName())) {
372                 createDestination(destination);
373             }
374             DestinationCache cache = getDestinationCache(destination);
375             cache.messageAdded(destination, message);
376         }
377     }
378 
379     /***
380      * Invoked when the {@link MessageManager} receives a persistent message.
381      *
382      * @param destination the message's destination
383      * @param message     the message
384      * @throws JMSException         if the listener fails to handle the message
385      * @throws PersistenceException if there is a persistence related problem
386      */
387     public void persistentMessageAdded(JmsDestination destination,
388                                        MessageImpl message)
389             throws JMSException, PersistenceException {
390         DestinationCache cache = getDestinationCache(destination);
391         cache.persistentMessageAdded(destination, message);
392     }
393 
394     /***
395      * Returns all destinations.
396      *
397      * @return a list of JmsDestination instances.
398      * @throws JMSException for any JMS error
399      */
400     public List getDestinations() throws JMSException {
401         synchronized (_lock) {
402             return new ArrayList(_destinations.values());
403         }
404     }
405 
406     /***
407      * Returns a map of all destinations that match the specified topic.
408      * <p/>
409      * If the topic represents a wildcard then it may match none, one or more
410      * destinations.
411      *
412      * @param topic the topic
413      * @return a map of topics to DestinationCache instances
414      */
415     public Map getTopicDestinationCaches(JmsTopic topic) {
416         HashMap result = new HashMap();
417 
418         synchronized (_lock) {
419             Iterator iter = _caches.keySet().iterator();
420             while (iter.hasNext()) {
421                 JmsDestination dest = (JmsDestination) iter.next();
422                 if ((dest instanceof JmsTopic) &&
423                         (topic.match((JmsTopic) dest))) {
424                     result.put(dest, _caches.get(dest));
425                 }
426             }
427         }
428 
429         return result;
430     }
431 
432     /***
433      * Perform any garbage collection on this resource. This will have the
434      * effect of releasing system resources.  If the 'aggressive' flag is set to
435      * true then the garbage collection should do more to release memory related
436      * resources since it is called when the application memory is low.
437      *
438      * @param aggressive <code>true</code> for aggressive garbage collection
439      */
440     public void collectGarbage(boolean aggressive) {
441         int gcCaches = 0;
442         int gcDestinations = 0;
443 
444         DestinationCache[] caches;
445         synchronized (_lock) {
446             caches = (DestinationCache[]) _caches.values().toArray(
447                     new DestinationCache[0]);
448         }
449         for (int index = 0; index < caches.length; index++) {
450             DestinationCache cache = caches[index];
451             if (cache.canDestroy()) {
452                 if (_log.isDebugEnabled()) {
453                     _log.debug("Garbage collecting destination cache="
454                                + cache);
455                 }
456                 destroyDestinationCache(cache);
457                 gcCaches++;
458             } else {
459                 // the cache is active, so issue a garbage collection
460                 // request on it
461                 cache.collectGarbage(aggressive);
462             }
463         }
464 
465         // get rid of non-persistent destinations, without associated caches.
466         synchronized (_lock) {
467             JmsDestination[] destinations
468                     = (JmsDestination[]) _destinations.values().toArray(
469                             new JmsDestination[0]);
470             for (int i = 0; i < destinations.length; ++i) {
471                 JmsDestination dest = destinations[i];
472                 if (!dest.getPersistent() && !_caches.containsKey(dest)) {
473                     gcDestinations++;
474                     _destinations.remove(dest.getName());
475                 }
476             }
477 
478             // log the information
479             _log.info("DMGC Collected " + gcCaches + " caches, "
480                       + _caches.size()
481                       + " remaining.");
482             _log.info("DMGC Collected " + gcDestinations + " destinations, "
483                       + _destinations.size() + " remaining.");
484         }
485 
486     }
487 
488     /***
489      * Start the service.
490      *
491      * @throws ServiceException if the service fails to start
492      */
493     protected void doStart() throws ServiceException {
494         if (_consumers == null) {
495             throw new ServiceException(
496                     "ConsumerManager hasn't been initialised");
497         }
498         init();
499         _collector.register(this);
500     }
501 
502     /***
503      * Stop the service.
504      *
505      * @throws ServiceException if the service fails to stop
506      */
507     protected void doStop() throws ServiceException {
508         _collector.unregister(this);
509 
510         JmsDestination[] destinations;
511         synchronized (_lock) {
512             destinations = (JmsDestination[]) _caches.keySet().toArray(
513                     new JmsDestination[0]);
514         }
515         for (int index = 0; index < destinations.length; index++) {
516             destroyDestinationCache(destinations[index]);
517         }
518 
519         _caches.clear();
520 
521         _destinations.clear();
522 
523         // remove all the listeners
524         synchronized (_listeners) {
525             _listeners.clear();
526         }
527     }
528 
529     /***
530      * Initialises the destination manager.
531      *
532      * @throws ServiceException if the service cannot be initialised
533      */
534     protected void init() throws ServiceException {
535         Enumeration iter;
536         try {
537             _database.begin();
538             Connection connection = _database.getConnection();
539 
540             // return a list of JmsDestination objects.
541             iter = _database.getAdapter().getAllDestinations(connection);
542             _database.commit();
543         } catch (PersistenceException exception) {
544             _log.error(exception, exception);
545             rollback();
546             throw new ServiceException("Failed to get destinations", exception);
547         }
548 
549         while (iter.hasMoreElements()) {
550             // add each destination to the cache
551             JmsDestination dest = (JmsDestination) iter.nextElement();
552             addToDestinations(dest);
553         }
554     }
555 
556     /***
557      * Determines if a destination exists.
558      *
559      * @param name the destination name
560      * @return <code>true</code> if the destination exists, otherwise
561      *         <code>false
562      */
563     protected boolean exists(String name) {
564         return getDestination(name) != null;
565     }
566 
567     /***
568      * Delete the specfied destination.
569      *
570      * @param cache the destination to destroy
571      */
572     protected void destroyDestinationCache(DestinationCache cache) {
573         destroyDestinationCache(cache.getDestination());
574     }
575 
576     /***
577      * Delete the specfied destination.
578      *
579      * @param dest the destination to destroy
580      */
581     protected void destroyDestinationCache(JmsDestination dest) {
582         synchronized (_lock) {
583             DestinationCache cache = (DestinationCache) _caches.remove(dest);
584             if (cache != null) {
585                 // deregister the cache from message manager.
586                 _messages.removeEventListener(dest);
587 
588                 // notify the listeners that a cache has been removed from
589                 // the destination manager
590                 notifyCacheRemoved(cache);
591 
592                 cache.destroy();
593             }
594         }
595     }
596 
597     /***
598      * Create a persistent destination.
599      *
600      * @param destination the destination to create
601      * @throws JMSException if the destination cannot be created
602      */
603     private void createPersistentDestination(JmsDestination destination)
604             throws JMSException {
605         if (_log.isDebugEnabled()) {
606             _log.debug("createPersistentDestination(destination="
607                        + destination + ")");
608         }
609 
610         boolean queue = (destination instanceof JmsQueue) ? true : false;
611         PersistenceAdapter adapter = _database.getAdapter();
612 
613         // check that the destination does not exist. If it exists then return
614         // false. If it doesn't exists the create it and bind it to the jndi
615         // context
616 
617         try {
618             _database.begin();
619             Connection connection = _database.getConnection();
620             adapter.addDestination(connection, destination.getName(), queue);
621             _database.commit();
622         } catch (Exception exception) { // JMSException, PersistenceException
623             cleanup("Failed to create persistent destination "
624                     + destination.getName(), exception);
625         }
626     }
627 
628     /***
629      * Notify the list of {@link DestinationEventListener} objects that the
630      * specified destination has been added.
631      *
632      * @param destination the added destination
633      * @throws JMSException if a listener fails to be notified
634      */
635     private void notifyDestinationAdded(JmsDestination destination)
636             throws JMSException {
637         DestinationEventListener[] listeners = getListeners();
638         for (int i = 0; i < listeners.length; ++i) {
639             listeners[i].destinationAdded(destination);
640         }
641     }
642 
643     /***
644      * Notify the list of {@link DestinationEventListener} objects that the
645      * specified destination has been removed.
646      *
647      * @param destination the added destination
648      * @throws JMSException if a listeners fails to be notified
649      */
650     private void notifyDestinationRemoved(JmsDestination destination)
651             throws JMSException {
652         DestinationEventListener[] listeners = getListeners();
653         for (int i = 0; i < listeners.length; ++i) {
654             listeners[i].destinationRemoved(destination);
655         }
656     }
657 
658     /***
659      * Notify the list of {@link DestinationEventListener} objects that the
660      * specified message cache has been added.
661      *
662      * @param cache the added cache
663      */
664     private void notifyCacheAdded(DestinationCache cache) {
665         JmsDestination destination = cache.getDestination();
666         DestinationEventListener[] listeners = getListeners();
667         for (int i = 0; i < listeners.length; ++i) {
668             listeners[i].cacheAdded(destination, cache);
669         }
670     }
671 
672     /***
673      * Notify the list of {@link DestinationEventListener} objects that the
674      * specified message cache has been removed.
675      *
676      * @param cache the added cache
677      */
678     private void notifyCacheRemoved(DestinationCache cache) {
679         JmsDestination destination = cache.getDestination();
680         DestinationEventListener[] listeners = getListeners();
681         for (int i = 0; i < listeners.length; ++i) {
682             listeners[i].cacheRemoved(destination, cache);
683         }
684     }
685 
686     /***
687      * Add the specified destination to the destination cache.
688      *
689      * @param destination the destination to add
690      */
691     private void addToDestinations(JmsDestination destination) {
692         synchronized (_lock) {
693             if (!_destinations.containsKey(destination.getName())) {
694                 _destinations.put(destination.getName(), destination);
695             }
696         }
697     }
698 
699     /***
700      * Remove the specified destination from the cache.
701      *
702      * @param destination the destination to remove
703      */
704     private void removeFromDestinations(JmsDestination destination) {
705         synchronized (_lock) {
706             _destinations.remove(destination.getName());
707         }
708     }
709 
710     /***
711      * Returns a destination given its name.
712      *
713      * @param name the name of the destination
714      * @return the destination corresponding to <code>name</code>
715      * @throws InvalidDestinationException if the named destination doesn't
716      *                                     exist
717      */
718     private JmsDestination getExistingDestination(String name)
719             throws InvalidDestinationException {
720         JmsDestination destination = getDestination(name);
721         if (destination == null) {
722             throw new InvalidDestinationException(
723                     "Destination does not exist:" + name);
724         }
725         return destination;
726     }
727 
728     /***
729      * Ensures that the specified destination isn't a wildcard.
730      *
731      * @param destination the destination to check
732      * @throws InvalidDestinationException if the destination is a wildcard
733      */
734     private void checkWildcard(JmsDestination destination)
735             throws InvalidDestinationException {
736         if (destination instanceof JmsTopic
737                 && ((JmsTopic) destination).isWildCard()) {
738             throw new InvalidDestinationException(
739                     "Wildcarded topics cannot be managed: "
740                     + destination.getName());
741         }
742     }
743 
744     /***
745      * Returns the registered {@link DestinationEventListener}s.
746      *
747      * @return the registered {@link DestinationEventListener}s.
748      */
749     private DestinationEventListener[] getListeners() {
750         synchronized (_listeners) {
751             return (DestinationEventListener[]) _listeners.toArray(
752                     new DestinationEventListener[0]);
753         }
754     }
755 
756     /***
757      * Rollback the current transaction, logging any error.
758      */
759     private void rollback() {
760         try {
761             _database.rollback();
762         } catch (PersistenceException exception) {
763             _log.error(exception, exception);
764         }
765     }
766 
767     /***
768      * Cleanup a failed transaction, and propagate the exception as a
769      * JMSException.
770      *
771      * @param message   the message to log
772      * @param exception the exception propagate
773      * @throws JMSException <code>exception</code> if it is an instance of
774      *                      JMSException, else a new JMSException containing
775      *                      <code>message</code>
776      */
777     private void cleanup(String message, Exception exception)
778             throws JMSException {
779         _log.error(message, exception);
780         rollback();
781         if (exception instanceof JMSException) {
782             throw (JMSException) exception;
783         } else {
784             throw new JMSException(message + ": " + exception.getMessage());
785         }
786     }
787 }