View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2001-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: DestinationManager.java,v 1.36 2003/08/16 06:37:43 tanderson Exp $
44   *
45   * Date         Author  Changes
46   * 3/1/2001     jima    Created
47   */
48  package org.exolab.jms.messagemgr;
49  
50  import java.sql.Connection;
51  import java.sql.SQLException;
52  import java.util.Collections;
53  import java.util.Enumeration;
54  import java.util.HashMap;
55  import java.util.Iterator;
56  import java.util.LinkedList;
57  import java.util.Map;
58  import java.util.Vector;
59  
60  import javax.jms.DeliveryMode;
61  import javax.jms.JMSException;
62  import javax.naming.Context;
63  import javax.naming.InitialContext;
64  import javax.naming.NamingException;
65  import javax.transaction.TransactionManager;
66  
67  import org.apache.commons.logging.Log;
68  import org.apache.commons.logging.LogFactory;
69  
70  import org.exolab.core.service.ServiceException;
71  import org.exolab.jms.client.JmsDestination;
72  import org.exolab.jms.client.JmsQueue;
73  import org.exolab.jms.client.JmsTopic;
74  import org.exolab.jms.config.AdministeredDestinations;
75  import org.exolab.jms.config.AdministeredQueue;
76  import org.exolab.jms.config.AdministeredTopic;
77  import org.exolab.jms.config.Configuration;
78  import org.exolab.jms.config.ConfigurationManager;
79  import org.exolab.jms.config.MessageManagerConfiguration;
80  import org.exolab.jms.config.Subscriber;
81  import org.exolab.jms.gc.GarbageCollectable;
82  import org.exolab.jms.gc.GarbageCollectionService;
83  import org.exolab.jms.message.MessageImpl;
84  import org.exolab.jms.persistence.DatabaseService;
85  import org.exolab.jms.persistence.PersistenceAdapter;
86  import org.exolab.jms.persistence.PersistenceException;
87  import org.exolab.jms.persistence.SQLHelper;
88  import org.exolab.jms.server.NamingHelper;
89  
90  
91  // @todo - need to restructure dirs
92  
93  
94  /***
95   * The destination manager is responsible for creating and managing the
96   * lifecycle of {@link DestinationCache} objects. The destination manager
97   * is also responsible for managing messages, that are received by the
98   * message manager, which do not have any registered {@link DestinationCache}
99   *
100  * @version     $Revision: 1.36 $ $Date: 2003/08/16 06:37:43 $
101  * @author      <a href="mailto:jima@exoffice.com">Jim Alateras</a>
102  */
103 public class DestinationManager
104     implements MessageManagerEventListener, GarbageCollectable {
105 
106     /***
107      * This structure maintains a list of active caches.
108      */
109     private Map _caches = Collections.synchronizedMap(new HashMap());
110 
111     /***
112      * A list of administered and non-administered destinations are maintained
113      * in this data structure.
114      */
115     private HashMap _destinationCache = new HashMap();
116 
117     /***
118      * The maximum no. of messages that each destination cache can hold
119      */
120     private final int _maxCacheSize;
121 
122     /***
123      * Maintains a list of wildcard destinations, which can either be durable
124      * or transient
125      */
126     private LinkedList _wildcardDestinations = new LinkedList();
127 
128     /***
129      * Maintains a linked list of DestinationEventListener objects. These
130      * listeners will be informated when destination are added or destroyed
131      */
132     private LinkedList _listeners = new LinkedList();
133 
134     /***
135      * Manage the singleton instance of the DestinationManager
136      */
137     private static volatile DestinationManager _instance = null;
138 
139     /***
140      * Caches the root context for all jndi binding.
141      */
142     private InitialContext _context = null;
143 
144     /***
145      * The logger
146      */
147     private static final Log _log =
148         LogFactory.getLog(DestinationManager.class);
149 
150     /***
151      * Construct a new <code>DestinationManager</code>
152      *
153      * @throws ServiceException if the service cannot be initialised
154      */
155     private DestinationManager() throws ServiceException {
156         MessageManagerConfiguration config =
157             ConfigurationManager.getConfig().getMessageManagerConfiguration();
158         _maxCacheSize = config.getDestinationCacheSize();
159 
160         init();
161 
162         // register with the GarbageCollectionService
163         GarbageCollectionService.instance().register(this);
164     }
165 
166     /***
167      * Create the singleton instance of the destination manager
168      *
169      * @return the singleton instance
170      * @throws ServiceException if the service cannot be initialised
171      */
172     public static DestinationManager createInstance() throws ServiceException {
173         _instance = new DestinationManager();
174         return _instance;
175     }
176 
177     /***
178      * Return the singleton destination manager
179      *
180      * @return the singleton instance, or <code>null</code> if it hasn't
181      * been initialised
182      */
183     public static DestinationManager instance() {
184         return _instance;
185     }
186 
187     /***
188      * Create a destination of the specified type. If the destination already
189      * exists then simply return it. If it doesn't exist then create it.
190      *
191      * @param dest - the destination to create
192      * @return DestinationCache - the created destination cache
193      */
194     public synchronized DestinationCache createDestinationCache(
195         JmsDestination dest) {
196         return createDestinationCache(null, dest);
197     }
198 
199     /***
200      * Create a destination of the specified type. If the destination already
201      * exists then simply return it. If it doesn't exist then create it. If
202      * a connection is supplied then create the cache using the connection
203      * context, otherwise create it without a context.
204      *
205      * @param connection - the connection to use.
206      * @param dest - the destination to create
207      * @return DestinationCache - the created destination cache
208      */
209     public synchronized DestinationCache createDestinationCache(
210         Connection connection, JmsDestination dest) {
211 
212         // check to see if it exists first
213         DestinationCache cache = (DestinationCache) _caches.get(dest);
214 
215         if (cache == null) {
216             // create a destination based on its type
217             try {
218                 if (dest instanceof JmsTopic) {
219                     cache = (connection != null) ?
220                         new TopicDestinationCache(connection, (JmsTopic) dest) :
221                         new TopicDestinationCache((JmsTopic) dest);
222                 } else if (dest instanceof JmsQueue) {
223                     cache = (connection != null) ?
224                         new QueueDestinationCache(connection, (JmsQueue) dest) :
225                         new QueueDestinationCache((JmsQueue) dest);
226                 }
227                 // set the configured size of each destination cache
228                 cache.setMaximumSize(_maxCacheSize);
229 
230                 // notify the listeners that a new destination has been added
231                 // to the destination manager
232                 notifyDestinationAdded(dest, cache);
233                 //cache it first
234                 _caches.put(dest, cache);
235             } catch (Exception exception) {
236                 _log.error("Failed to createDestinationCache", exception);
237             }
238         }
239 
240         return cache;
241     }
242 
243     /***
244      * Delete the specfied destination
245      *
246      * @param cahe - the destination to destroy
247      */
248     public synchronized void destroyDestinationCache(DestinationCache cache) {
249         destroyDestinationCache(cache.getDestination());
250     }
251 
252     /***
253      * Delete the specfied destination
254      *
255      * @deprecated use destroyDestination(JmsDestination) instead
256      * @param name - destination name
257      */
258     public synchronized void destroyDestinationCache(String name) {
259 
260         // currently we are doing a linear search through the
261         // destinations until we find the corresponding {@link JmsDestination}
262         // We want to discourage the use of this function.
263         Iterator iter = _caches.keySet().iterator();
264         while (iter.hasNext()) {
265             JmsDestination dest = (JmsDestination) iter.next();
266             if (dest.getName().equals(name)) {
267                 destroyDestinationCache(dest);
268                 break;
269             }
270         }
271     }
272 
273     /***
274      * Delete the specfied destination
275      *
276      * @param dest - the destination to destroy
277      */
278     public synchronized void destroyDestinationCache(JmsDestination dest) {
279         DestinationCache cache = (DestinationCache) _caches.remove(dest);
280         if (cache != null) {
281             cache.destroy();
282 
283             // notify the listeners that a destination has been removed from
284             // the destination manager
285             notifyDestinationRemoved(dest, cache);
286         }
287     }
288 
289     /***
290      * Return the JmsDestination corresponding to the specified destination
291      * name
292      *
293      * @param name - destination name
294      * @return JmsDestination - the corresponding object or null
295      */
296     public synchronized JmsDestination destinationFromString(String name) {
297         return (JmsDestination) _destinationCache.get(name);
298     }
299 
300     /***
301      * Register the specified DestinationEventListener. If the listener is
302      * already registered then do not re-register it again.
303      *
304      * @param listener - listener to add
305      */
306     void addDestinationEventListener(DestinationEventListener listener) {
307         synchronized (_listeners) {
308             if (!_listeners.contains(listener)) {
309                 _listeners.add(listener);
310             }
311 
312         }
313     }
314 
315     /***
316      * Remove the specified DestinationEventListener from the list
317      *
318      * @param listener - listener to remove
319      */
320     void removeDestinationEventListener(DestinationEventListener listener) {
321         synchronized (_listeners) {
322             _listeners.remove(listener);
323         }
324     }
325 
326     /***
327      * Return the destination cache associated with the dest object
328      *
329      * @param dest - the destination
330      * @return DestinationCache - associated destination object or null
331      */
332     public DestinationCache getDestinationCache(JmsDestination dest) {
333         return (DestinationCache) _caches.get(dest);
334     }
335 
336     /***
337      * Return the destination object associated with destination
338      *
339      * @param dest - the name of the destination
340      * @return DestinationCache - associated destination object or null
341      */
342     public DestinationCache getDestinationCache(String dest) {
343         return getDestinationCache(destinationFromString(dest));
344     }
345 
346     /***
347      * Return the {@link DestinationCache} for this message.
348      *
349      * @param message - the message to query
350      * @return DestinationCache - the corresponding cahce or null
351      */
352     public DestinationCache getDestinationCache(MessageImpl message) {
353         DestinationCache cache = null;
354 
355         if (message != null) {
356             try {
357                 cache = getDestinationCache(
358                     (JmsDestination) message.getJMSDestination());
359             } catch (JMSException exception) {
360                 // ignore it, probably means thant no destination has been
361                 // assigned.
362             }
363         }
364 
365         return cache;
366     }
367 
368     /***
369      * Check if the specified destination exists.
370      *
371      * @param dest - destination to check
372      * @return boolean - true if it exists
373      */
374     public boolean hasDestinationCache(JmsDestination dest) {
375         return (_caches.containsKey(dest));
376     }
377 
378     /***
379      * Create a non-administered destination and cache it. It does not
380      * check to see whether or not it is an administered destination
381      * this  must be done by the caller
382      *
383      * @param destination  - the destination to create
384      */
385     public synchronized void createDestination(JmsDestination destination) {
386         addToDestinationCache(destination);
387     }
388 
389     /***
390      * Create an administered destination using the specified destination.
391      * It will create the destination in the database and register it with
392      * the jndi context.
393      *
394      * @param dest - the destination
395      * @return boolean - true if successful
396      */
397     public synchronized boolean createAdministeredDestination(
398         JmsDestination dest) throws JMSException {
399         if (_log.isDebugEnabled()) {
400             _log.debug("createAdministeredDestination(dest=" + dest + ")");
401         }
402 
403         boolean success = true;
404         boolean queue = (dest instanceof JmsQueue) ? true : false;
405         PersistenceAdapter adapter = DatabaseService.getAdapter();
406 
407         // check that the destination does not exist. If it exists then return
408         // false. If it doesn't exists the create it and bind it to the jndi
409         // context
410 
411         Connection connection = null;
412         try {
413 
414             connection = DatabaseService.getConnection();
415 
416             if (!adapter.checkDestination(connection, dest.getName())) {
417                 adapter.addDestination(connection, dest.getName(), queue);
418 
419                 // destination was created in persistent store, now create it
420                 // in transient memory and also bind it in the jndi context
421                 addToDestinationCache(dest);
422                 try {
423                     dest.setPersistent(true);
424                     ContextHelper.rebind(getContext(), dest.getName(), dest);
425                 } catch (NamingException exception) {
426                     String msg = "Failed to add destination " + dest.getName()
427                         + " to JNDI context";
428                     _log.error(msg, exception);
429                     throw new JMSException(msg + ": " +
430                         exception.getMessage());
431                 }
432             } else {
433                 success = false;
434             }
435             connection.commit();
436         } catch (JMSException exception) {
437             SQLHelper.rollback(connection);
438             throw exception;
439         } catch (Exception exception) { // PersistenceException, SQLException
440             SQLHelper.rollback(connection);
441             String msg = "Failed to create administered destination"
442                 + dest.getName();
443             _log.error(msg, exception);
444             throw new JMSException(msg + ": " + exception.getMessage());
445         } finally {
446             SQLHelper.close(connection);
447         }
448 
449         return success;
450     }
451 
452     /***
453      * Remove the corresponding administered destination from persistent
454      * store, from transient memory and from the jndi context. This will
455      * also remove all durable consumers for this topic.
456      *
457      * @param dest - the destination to remove
458      * @return boolean - true if successful
459      */
460     public synchronized void deleteAdministeredDestination(JmsDestination dest)
461         throws JMSException {
462 
463         if (_log.isDebugEnabled()) {
464             _log.debug("deleteAdministeredDestination(dest=" + dest + ")");
465         }
466 
467         boolean success = false;
468         boolean queue = (dest instanceof JmsQueue) ? true : false;
469         ConsumerManager consumerMgr = ConsumerManager.instance();
470 
471         // If we are dealing with a topic then first check that there are
472         // no active durable consumers for the destination
473         if (!queue) {
474             if (consumerMgr.hasActiveDurableConsumers(dest)) {
475                 throw new JMSException(
476                     "Cannot delete the administered destination " + dest
477                     + " since there are active durable consumers.");
478             }
479             // no active consumers. Remove all durable consumers to this
480             // destination
481             consumerMgr.removeDurableConsumers(dest);
482         }
483 
484         // make sure there are not active endpoints, but first clear
485         // unreferenced endpoints.
486         consumerMgr.cleanUnreferencedEndpoints(dest);
487         int active = consumerMgr.getEndpointsForDest(dest).size();
488         if (active > 0) {
489             throw new JMSException(
490                 "Cannot delete the administered destination" + dest
491                 + " since there are " + active + " active endpoints.");
492         }
493 
494         // unbind it from the jndi context so that now it is unavailable
495         // to other consumers
496         try {
497             getContext().unbind(dest.getName());
498         } catch (NamingException error) {
499             _log.error("Failed to remove destination " + dest.getName()
500                 + " from JNDI", error);
501         }
502 
503         // now that we have removed all the durable consumers we can remove
504         // the administered topic. First delete it from memory and then
505         // from the persistent store
506         Connection connection = null;
507         try {
508             connection = DatabaseService.getConnection();
509 
510             DatabaseService.getAdapter().removeDestination(connection,
511                 dest.getName());
512             destroyDestinationCache(dest);
513             removeFromDestinationCache(dest);
514             connection.commit();
515         } catch (PersistenceException exception) {
516             SQLHelper.rollback(connection);
517             String msg = "Failed to remove destination " + dest.getName();
518             _log.error(msg, exception);
519             throw new JMSException(msg + ":" + exception.getMessage());
520         } catch (SQLException exception) {
521             SQLHelper.rollback(connection);
522             String msg = "Failed to remove destination " + dest.getName();
523             _log.error(msg, exception);
524             throw new JMSException(msg + ":" + exception.getMessage());
525         } finally {
526             SQLHelper.close(connection);
527         }
528     }
529 
530     /***
531      * Return a list of destination names currently supported by the destination
532      * manager. This includes all types of destinations.
533      *
534      * @return Iterator - iterator for {@link JmsDestination} objects
535      */
536     public Iterator destinationNames() {
537         return _destinationCache.values().iterator();
538     }
539 
540     /***
541      * Return a list of {@link DestinationCache} objects that are currently
542      * active and in memory. This will return a list of all destination
543      * types (temporary. transient, administered}.
544      *
545      * @return Iterator - set of DestinationCache objects
546      */
547     public Iterator destinations() {
548         return _caches.values().iterator();
549     }
550 
551     /***
552      * This method will create the administered destinations specified in
553      * the configuration. A topic may also have zero or more preconfigured
554      * durable sunbscribers. An equivalent entity for queues does not
555      * exist.
556      */
557     public void registerConfiguredAdministeredDestinations() {
558         AdministeredDestinations destinations =
559             ConfigurationManager.getConfig().getAdministeredDestinations();
560         if (destinations != null) {
561 
562             // first process the topics
563             int count = destinations.getAdministeredTopicCount();
564             for (int index = 0; index < count; index++) {
565                 AdministeredTopic topic = destinations.getAdministeredTopic(index);
566 
567                 // define a persistent topic destination and then use the
568                 // message manager administrator to add it
569                 JmsTopic destination = new JmsTopic(topic.getName());
570                 destination.setPersistent(true);
571                 try {
572 
573                     createAdministeredDestination(destination);
574 
575                     // register the subscribers for each topic.
576                     int scount = topic.getSubscriberCount();
577                     ConsumerManager mgr = ConsumerManager.instance();
578                     for (int sindex = 0; sindex < scount; sindex++) {
579                         Subscriber subscriber = topic.getSubscriber(sindex);
580 
581                         // create the durable consumer only if one does
582                         // not already exist
583                         if (!mgr.exists(subscriber.getName())) {
584                             mgr.createDurableConsumer(destination,
585                                 subscriber.getName());
586                         }
587                     }
588                 } catch (JMSException exception) {
589                     _log.error("Failed to register persistent topic "
590                         + topic.getName(), exception);
591                 }
592             }
593 
594             // next process the queue destinations. QueueDestinations do not
595             // have any associated durable subscribers
596             count = destinations.getAdministeredQueueCount();
597             for (int index = 0; index < count; index++) {
598                 AdministeredQueue queue = destinations.getAdministeredQueue(index);
599 
600                 // define a persistent topic destination and then use the
601                 // message manager administrator to add it
602                 JmsQueue destination = new JmsQueue(queue.getName());
603                 destination.setPersistent(true);
604                 try {
605                     createAdministeredDestination(destination);
606                 } catch (JMSException exception) {
607                     _log.error("Failed to register persistent queue "
608                         + queue.getName(), exception);
609                 }
610             }
611         }
612     }
613 
614     // implementation of MessageManagerEventListener.messageAdded
615     public synchronized boolean messageAdded(JmsDestination destination, MessageImpl message) {
616         boolean result = false;
617         try {
618             if (destination instanceof JmsTopic) {
619                 // check to see whether there are active consumers for the
620                 // specified destination. If there are then we need to
621                 // create a destination cache and pass the message to it.
622                 if (ConsumerManager.instance().hasActiveConsumers(destination)) {
623                     if (!destinationExists(destination)) {
624                         createDestination(destination);
625                     }
626                     DestinationCache cache = createDestinationCache(destination);
627                     result = cache.messageAdded(destination, message);
628                 }
629             } else {
630                 // assume that the destination is a queue. since the message
631                 // is non-persistent then we need to create the cache and pass the
632                 // message to it.
633                 if (!destinationExists(destination)) {
634                     createDestination(destination);
635                 }
636                 DestinationCache cache = createDestinationCache(destination);
637                 result = cache.messageAdded(destination, message);
638             }
639         } catch (Exception exception) {
640             _log.error("Exception in DestinationManager.messageAdded",
641                 exception);
642         }
643 
644         return result;
645     }
646 
647     // implementation of MessageManagerEventListener.messageRemoved
648     public void messageRemoved(JmsDestination destination, MessageImpl message) {
649         // removing a non-persistent messages, when the associated destination
650         // is not active is a noop
651     }
652 
653     // implementation of MessageManagerEventListener.persistentMessageAdded
654     public synchronized boolean persistentMessageAdded(Connection connection,
655                                                        JmsDestination destination, MessageImpl message)
656         throws PersistenceException {
657 
658         boolean result = false;
659 
660         try {
661             if (destination instanceof JmsTopic) {
662                 // the cache for this destination is inactive. Determine, if
663                 // there are any active wildcard consumers for this destination
664                 // If there are then create the destination cache and let it
665                 // handle the message. Otherwise send it to the ConsumerManager
666                 // for processing
667                 ConsumerManager manager = ConsumerManager.instance();
668                 if (manager.hasActiveConsumers(destination)) {
669                     // create the destincation cache and let it process the
670                     // message
671                     DestinationCache cache = createDestinationCache(connection,
672                         destination);
673                     result = cache.persistentMessageAdded(connection,
674                         destination, message);
675                 } else {
676                     // This is now handled by the MessageMgr when the message
677                     // enters the system
678                     // let the consumer manager handle this
679                     // result = ConsumerManager.instance().persistentMessageAdded(
680                     //    connection, message);
681                 }
682             } else {
683                 // This is now handled by the MessageMgr when the message
684                 // enters the system
685                 // assume that the destination is a queue. Since the message is
686                 // persistent then we do not need to activate the cache, simply
687                 // create a persistent handle and be done with it.
688                 // MessageHandleFactory.createPersistentHandle(connection,
689                 //    destination, null, message);
690             }
691         } catch (Exception exception) {
692             // rethrow as a PersistenceException
693             exception.printStackTrace();
694             throw new PersistenceException(
695                 "Exception in DestinationManager.messageAdded " +
696                 exception.toString());
697         }
698 
699         return result;
700     }
701 
702     // implementation of MessageManagerEventListener.persistentMessageRemoved
703     public void persistentMessageRemoved(Connection connection,
704                                          JmsDestination destination, MessageImpl message)
705         throws PersistenceException {
706         try {
707             if (destination instanceof JmsTopic) {
708                 // this is a persistent message so we need to retrieve the
709                 // set of durable subscribers for this.
710                 Vector names =
711                     ConsumerManager.instance().getDurableConsumersForDest(
712                         (JmsTopic) destination);
713 
714                 // for each durable consumer we need to destory that handle
715                 while (names.size() > 0) {
716                     String name = (String) names.remove(0);
717                     MessageHandleFactory.destroyPersistentHandle(connection,
718                         destination, name, message);
719                 }
720             } else {
721                 // assume it is a queue and destroy the handle.
722                 MessageHandleFactory.destroyPersistentHandle(connection,
723                     destination, null, message);
724             }
725         } catch (PersistenceException exception) {
726             // catch and rethrow
727             throw exception;
728         } catch (Exception exception) {
729             throw new PersistenceException(
730                 "Exception in DestinationManager.messageRemoved " +
731                 exception.toString());
732         }
733     }
734 
735     // implement of GarbageCollectable.collectGarbage
736     public synchronized void collectGarbage(boolean aggressive) {
737         // before continuing we should change the priority of the thread
738         // to the lowest value.
739         int gc_caches = 0;
740         int gc_destinations = 0;
741 
742         Object[] caches = _caches.values().toArray();
743         for (int index = 0; index < caches.length; index++) {
744             DestinationCache cache = (DestinationCache) caches[index];
745             if (cache.canDestroy()) {
746                 if (_log.isDebugEnabled()) {
747                     _log.debug("Garbage collecting destination cache="
748                         + cache);
749                 }
750                 destroyDestinationCache(cache);
751                 gc_caches++;
752             } else {
753                 // the cache is active, so issue a garbage collection
754                 // request on it
755                 cache.collectGarbage(aggressive);
756             }
757         }
758 
759         // get rid of non-administered destinations, without
760         // associated caches.
761         Iterator destinations = _destinationCache.values().iterator();
762         Vector to_delete = new Vector();
763         while (destinations.hasNext()) {
764             JmsDestination dest = (JmsDestination) destinations.next();
765             if (!(dest.getPersistent()) &&
766                 (!_caches.containsKey(dest))) {
767                 to_delete.add(dest);
768                 gc_destinations++;
769             }
770         }
771 
772         // now delete the actual destinations
773         Enumeration todel = to_delete.elements();
774         while (todel.hasMoreElements()) {
775             _destinationCache.remove(((JmsDestination) todel.nextElement()).getName());
776         }
777 
778         // log the information
779         _log.info("DMGC Collected " + gc_caches + " caches, " + _caches.size()
780             + " remaining.");
781         _log.info("DMGC Collected " + gc_destinations + " destinations, "
782             + _destinationCache.size() + " remaining.");
783     }
784 
785     /***
786      * Return a HashMap of all destinations that match the specified topic
787      * If the topic represents a wildcard then it may match none, one or more
788      * destinations. The results are returns as a mapping of destination and
789      * the corresponding cache
790      * <p>
791      * The topic maybe a straight topic name or a wildcard
792      *
793      * @param topic - topic to query
794      * @return HashMap
795      */
796     synchronized HashMap getTopicDestinationCaches(JmsTopic topic) {
797         HashMap result = new HashMap();
798 
799         Iterator iter = _caches.keySet().iterator();
800         while (iter.hasNext()) {
801             JmsDestination dest = (JmsDestination) iter.next();
802             if ((dest instanceof JmsTopic) &&
803                 (topic.match((JmsTopic) dest))) {
804                 result.put(dest, _caches.get(dest));
805             }
806         }
807 
808         return result;
809     }
810 
811     /***
812      * Destroy this manager. This is brutal and final
813      */
814     public synchronized void destroy() {
815         // clean up all the destinations
816         Object[] dests = _caches.keySet().toArray();
817         for (int index = 0; index < dests.length; index++) {
818             destroyDestinationCache((JmsDestination) dests[index]);
819         }
820 
821         _caches.clear();
822         _caches = null;
823 
824         _destinationCache.clear();
825         _destinationCache = null;
826 
827         // remove all the listeners
828         _listeners.clear();
829         _listeners = null;
830 
831         _context = null;
832 
833         // reset the singleton
834         _instance = null;
835     }
836 
837     /***
838      * Test whether the specified destination is an administered destination. It
839      * assumes that the destination exsits.
840      *
841      * @param name - the name of the destination
842      * @return boolean - true if it is and false otherwise
843      */
844     public boolean isAdministeredDestination(String name) {
845         boolean result = false;
846         JmsDestination dest = (JmsDestination) _destinationCache.get(name);
847         if ((dest != null) &&
848             (dest.getPersistent())) {
849             result = true;
850         }
851 
852         return result;
853     }
854 
855     /***
856      * Test whether the specified message is for an administered destination.
857      * This would be the case if the destination is administered or if there
858      * is an administered wildcard destination that is satisfied by the
859      * message destination
860      *
861      * @param message - the message to check
862      * @return boolean
863      */
864     public boolean isMessageForAdministeredDestination(MessageImpl msg) {
865         boolean result = false;
866         try {
867             JmsDestination mdest = (JmsDestination) msg.getJMSDestination();
868             JmsDestination dest = (JmsDestination) _destinationCache.get(mdest.getName());
869 
870             if (dest != null) {
871                 if (dest.getPersistent()) {
872                     result = true;
873                 } else if (mdest instanceof JmsTopic) {
874                     // check if any of the wildcards are administered
875                     Object[] dests = _wildcardDestinations.toArray();
876                     for (int index = 0; index < dests.length; index++) {
877                         JmsTopic adest = (JmsTopic) dests[index];
878                         if ((adest.match((JmsTopic) mdest)) &&
879                             (adest.getPersistent())) {
880                             result = true;
881                             break;
882                         }
883                     }
884 
885                 }
886             }
887         } catch (JMSException ignore) {
888         }
889 
890         return result;
891     }
892 
893     /***
894      * Add the specified entry to the destination cache, if it doesn't
895      * already exist.
896      *
897      * @param destination - destination to add
898      */
899     void addToDestinationCache(JmsDestination destination) {
900         synchronized (_destinationCache) {
901             if (!_destinationCache.containsKey(destination.getName())) {
902                 _destinationCache.put(destination.getName(), destination);
903 
904                 // check whether it is a wildcard destination
905                 if (((destination instanceof JmsTopic) &&
906                     (((JmsTopic) destination).isWildCard()))) {
907                     _wildcardDestinations.add(destination);
908                 }
909             }
910         }
911     }
912 
913     /***
914      * Remove the specified destination from the cache
915      *
916      * @param destination - the destination to remove
917      */
918     void removeFromDestinationCache(JmsDestination destination) {
919         synchronized (_destinationCache) {
920             if (_destinationCache.remove(destination.getName()) != null) {
921 
922                 // check whether we also need to delete it from the
923                 // list of wildcard subscriptions
924                 if (((destination instanceof JmsTopic) &&
925                     (((JmsTopic) destination).isWildCard()))) {
926                     _wildcardDestinations.remove(destination);
927                 }
928             }
929 
930         }
931     }
932 
933     /***
934      * Check if the specified destination exists.
935      *
936      * @param destination - the destination to check
937      * @return boolean - true if it exists and false otherwise
938      */
939     public boolean destinationExists(JmsDestination destination) {
940         return _destinationCache.containsKey(destination.getName());
941     }
942 
943     /***
944      * Initialises the destination manager.
945      *
946      * @throws ServiceException if the service cannot be initialised
947      */
948     protected void init() throws ServiceException {
949         Connection connection = null;
950         TransactionManager tm = null;
951         try {
952             connection = DatabaseService.getConnection();
953 
954             // return a list of JmsDestination objects.
955             Enumeration iter =
956                 DatabaseService.getAdapter().getAllDestinations(connection);
957             connection.commit();
958 
959             while (iter.hasMoreElements()) {
960                 // add each destination to the cache and also bind
961                 // it to the context
962                 JmsDestination dest = (JmsDestination) iter.nextElement();
963                 addToDestinationCache(dest);
964                 try {
965                     // for each of the administered destinations rebind it to
966                     // the jndi context
967                     dest.setPersistent(true);
968                     ContextHelper.rebind(getContext(), dest.getName(), dest);
969                 } catch (NamingException error) {
970                     throw new ServiceException(
971                         "Failed to add destination " + dest.getName()
972                         + " to JNDI", error);
973                 }
974             }
975         } catch (PersistenceException exception) {
976             SQLHelper.rollback(connection);
977             String msg = "Failed to initialise DestinationManager";
978             _log.error(msg, exception);
979             throw exception;
980         } catch (SQLException exception) {
981             SQLHelper.rollback(connection);
982             String msg = "Failed to initialise DestinationManager";
983             _log.error(msg, exception);
984             throw new ServiceException(msg, exception);
985         } finally {
986             SQLHelper.close(connection);
987         }
988     }
989 
990     /***
991      * Notyify the list of DestinationEventListener objects that the specified
992      * destination has been added
993      *
994      * @param dest - the destination that was added
995      * @param cache - the corresponding cache
996      */
997     private void notifyDestinationAdded(JmsDestination dest,
998                                         DestinationCache cache) {
999         synchronized (_listeners) {
1000             Iterator iter = _listeners.iterator();
1001             while (iter.hasNext()) {
1002                 ((DestinationEventListener) iter.next()).destinationAdded(dest, cache);
1003             }
1004         }
1005     }
1006 
1007     /***
1008      * Notyify the list of DestinationEventListener objects that the specified
1009      * destination has been removed
1010      *
1011      * @param dest - the destination that was removed
1012      * @param cache - the corresponding cache
1013      */
1014     private void notifyDestinationRemoved(JmsDestination dest,
1015                                           DestinationCache cache) {
1016         synchronized (_listeners) {
1017             Iterator iter = _listeners.iterator();
1018             while (iter.hasNext()) {
1019                 ((DestinationEventListener) iter.next()).destinationRemoved(dest, cache);
1020             }
1021         }
1022     }
1023 
1024     /***
1025      * Return the initial context using the JndiHelper. It assumes that the
1026      * configuration manager has been successfully initialized. If the context
1027      * has been retrieved it is cached so subsequent gets will return the
1028      * cached instance
1029      *
1030      * @return the root context
1031      */
1032     private static Context getContext() throws NamingException {
1033         return NamingHelper.getInitialContext(
1034             ConfigurationManager.getConfig());
1035     }
1036 
1037     /***
1038      * This static class is used to maintain information about a
1039      * destination
1040      */
1041     private static class DestinationEntry {
1042 
1043         /***
1044          * The destination that this entry pertains too
1045          */
1046         public JmsDestination _destination = null;
1047 
1048         /***
1049          * Indicates whether the destination is administered
1050          */
1051         public boolean _administered = false;
1052 
1053 
1054         /***
1055          * Construct an instance of the entry using the specified
1056          * parameters
1057          *
1058          * @param dest - the destination
1059          * @param administered - true if the destination is administered
1060          */
1061         DestinationEntry(JmsDestination dest, boolean administered) {
1062             _destination = dest;
1063             _administered = administered;
1064         }
1065 
1066         // override Object.equals
1067         public boolean equals(Object obj) {
1068 
1069             boolean result = false;
1070 
1071             if ((obj != null) &&
1072                 (obj instanceof DestinationEntry)) {
1073 
1074                 result = _destination.equals(((DestinationEntry) obj)._destination);
1075             }
1076 
1077             return result;
1078         }
1079     }
1080 }