View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2001-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: ConsumerManager.java,v 1.33 2003/09/25 11:24:16 tanderson Exp $
44   *
45   * Date         Author  Changes * 3/1/2001     jima    Created
46   */
47  package org.exolab.jms.messagemgr;
48  
49  import java.sql.Connection;
50  import java.util.Enumeration;
51  import java.util.HashMap;
52  import java.util.Iterator;
53  import java.util.LinkedList;
54  import java.util.Vector;
55  
56  import javax.jms.DeliveryMode;
57  import javax.jms.InvalidSelectorException;
58  import javax.jms.JMSException;
59  import javax.transaction.TransactionManager;
60  
61  import org.apache.commons.logging.Log;
62  import org.apache.commons.logging.LogFactory;
63  
64  import org.exolab.core.service.ServiceException;
65  import org.exolab.jms.client.JmsDestination;
66  import org.exolab.jms.client.JmsQueue;
67  import org.exolab.jms.client.JmsTopic;
68  import org.exolab.jms.gc.GarbageCollectable;
69  import org.exolab.jms.gc.GarbageCollectionService;
70  import org.exolab.jms.message.MessageImpl;
71  import org.exolab.jms.persistence.DatabaseService;
72  import org.exolab.jms.persistence.PersistenceAdapter;
73  import org.exolab.jms.persistence.PersistenceException;
74  import org.exolab.jms.persistence.SQLHelper;
75  import org.exolab.jms.scheduler.Scheduler;
76  import org.exolab.jms.server.JmsServerSession;
77  
78  
79  /***
80   * The consumer manager is responsible for creating and managing the
81   * lifecycle of Consumer. The consumer manager maintains a list of all
82   * active consumers.
83   * <p>
84   * The consumer manager, in an attempt to make better utilization of
85   * resource will also act as a proxy for all inactive durable subscribers
86   * So it will act as a DestinationCacheEventListener and process the
87   * message on behalf of the inactive consumer.
88   *
89   * @version     $Revision: 1.33 $ $Date: 2003/09/25 11:24:16 $
90   * @author      <a href="mailto:jima@exoffice.com">Jim Alateras</a>
91   */
92  public class ConsumerManager
93      implements DestinationCacheEventListener, GarbageCollectable {
94  
95      /***
96       * Maintains a cache of all active endpoints.
97       */
98      private HashMap _endpoints = new HashMap();
99  
100     /***
101      * Maintains a list of all unique consumers, durable and non-durable.
102      * Each entry has an associated {@link #ConsumerEntry} record. All
103      * durable subscribers are maintained in memory until they are removed
104      * from the system entirely. All non-durable subscribers are maintained
105      * in memory until their endpoint is removed.
106      */
107     private HashMap _consumerCache = new HashMap();
108 
109     /***
110      * Maintais a mapping between destinations and consumers. A destination
111      * can have more than one consumer and a consumer can also be registered
112      * to more than one destination
113      */
114     private HashMap _destToConsumerMap = new HashMap();
115 
116     /***
117      * Maintains a list of wildcard subscriptions using subscription name
118      * and the JmsTopic.
119      */
120     private HashMap _wildcardConsumers = new HashMap();
121 
122     /***
123      * Cache a copy of the scheduler instance
124      */
125     private Scheduler _scheduler = null;
126 
127     /***
128      * The singleton instance of the consumer manager
129      */
130     private static ConsumerManager _instance = null;
131 
132     /***
133      * The logger
134      */
135     private static final Log _log = LogFactory.getLog(ConsumerManager.class);
136 
137 
138     /***
139      * Create the singleton instance of the consumer manager
140      *
141      * @return the singleton instance
142      * @throws ServiceException if the service cannot be initialised
143      */
144     public static ConsumerManager createInstance() throws ServiceException {
145         _instance = new ConsumerManager();
146         return _instance;
147     }
148 
149     /***
150      * Return the singleton instance of the ConsumerManager
151      *
152      * @return ConsumerManager
153      */
154     public static ConsumerManager instance() {
155         return _instance;
156     }
157 
158     /***
159      * Construct the <code>ConsumerManager</code>
160      *
161      * @throws ServiceException - if it fails to initialise
162      */
163     private ConsumerManager() throws ServiceException {
164         // register with the GarbageCollectionService
165         GarbageCollectionService.instance().register(this);
166 
167         init();
168     }
169 
170     /***
171      * This method creates an actual durable consumer for the specified and
172      * caches it. It does not create and endpoint. To create the endpoint the
173      * client should call createDurableConsumerEndpoint.
174      *
175      * @param topic - the topic destination
176      * @param name - the consumer name
177      * @exception JMSException - if it cannot be created
178      */
179     public synchronized void createDurableConsumer(JmsTopic topic, String name)
180         throws JMSException {
181 
182         PersistenceAdapter adapter = DatabaseService.getAdapter();
183 
184         Connection connection = null;
185         try {
186 
187             connection = DatabaseService.getConnection();
188 
189             // ensure that we are trying to create a durable consumer to an
190             // administered destination.
191             if (!adapter.checkDestination(connection, topic.getName())) {
192                 throw new JMSException(
193                     "Cannot create durable consumer, name=" + name +
194                     ", for non-administered topic=" + topic.getName());
195             }
196 
197             if (!adapter.durableConsumerExists(connection, name)) {
198                 adapter.addDurableConsumer(connection, topic.getName(), name);
199             }
200 
201             connection.commit();
202             // cache the consumer locally
203             addToConsumerCache(name, topic, true);
204         } catch (JMSException exception) {
205             throw exception;
206         } catch (Exception exception) { // PersistenceException, SQLException
207             SQLHelper.rollback(connection);
208             String msg = "Failed to create durable consumer, name=" + name
209                 + ", for topic=" + topic.getName();
210             _log.error(msg, exception);
211             throw new JMSException(msg + ": " + exception.getMessage());
212         } finally {
213             SQLHelper.close(connection);
214         }
215     }
216 
217 
218     /***
219      * This method will remove the durable consumer from the database and
220      * from transient memory only if it exists and is inactive. If there
221      * is an active endpoint then it cannot be deleted and an exception
222      * will be raise.
223      * <p>
224      * If the durable consumer does not exist then an exception is also
225      * raised.
226      *
227      * @param name - the consumer name
228      * @exception JMSException - if it cannot be removed
229      */
230     public synchronized void removeDurableConsumer(String name)
231         throws JMSException {
232         if (_log.isDebugEnabled()) {
233             _log.debug("removeDurableConsumer(name=" + name + ")");
234         }
235 
236         // check to see that the durable consumer exists
237         if (!durableConsumerExists(name)) {
238             throw new JMSException("Durable consumer " + name +
239                 " is not defined.");
240         }
241         if (isDurableConsumerActive(name)) {
242             throw new JMSException(
243                 "Cannot remove durable consumer=" + name
244                 + ": consumer is active");
245         }
246 
247         // remove it from the persistent store.
248         Connection connection = null;
249         try {
250             connection = DatabaseService.getConnection();
251 
252             DatabaseService.getAdapter().removeDurableConsumer(connection,
253                 name);
254             // if it has been successfully removed from persistent store then
255             // clear up the transient references.
256             ConsumerEndpoint endpoint = getConsumerEndpoint(name);
257             if (endpoint != null) {
258                 deleteConsumerEndpoint(endpoint);
259             }
260             removeFromConsumerCache(name);
261             connection.commit();
262         } catch (Exception exception) { // PersistenceException, SQLException
263             SQLHelper.rollback(connection);
264             String msg = "Failed to remove durable consumer, name=" + name;
265             _log.error(msg, exception);
266             throw new JMSException(msg + ":" + exception.getMessage());
267         } finally {
268             SQLHelper.close(connection);
269         }
270     }
271 
272     /***
273      * This method will remove all the durable consumers from the database and
274      * from transient memory whether they are active or not.
275      * <p>
276      * If we have problems removing the durable consumers then throw the
277      * JMSException.
278      *
279      * @param topic the topic to remove consumers for
280      * @throw JMSException if the consumers cannot be removed
281      */
282     public synchronized void removeDurableConsumers(JmsDestination topic)
283         throws JMSException {
284 
285         Vector consumers = (Vector) _destToConsumerMap.get(topic);
286         if (consumers != null) {
287             Enumeration entries = consumers.elements();
288             while (entries.hasMoreElements()) {
289                 ConsumerEntry entry = (ConsumerEntry) entries.nextElement();
290                 if (entry._durable) {
291                     // remove the actual durable consumer from transient and
292                     // secondary memory.
293                     removeDurableConsumer(entry._name);
294                 }
295             }
296         }
297 
298         // remove all consumers for the specified destination
299         removeFromConsumerCache(topic);
300     }
301 
302     /***
303      * Create a transient consumer for the specified destination. The client
304      * can optionally specify a selector to filter messages.
305      * <p>
306      * The clientId parameter is used to indirectly reference the remote client
307      * which is uniquely identifiable within a session and is used during
308      * asynchronous message delivery.
309      *
310      * @param clientId - indirect reference to the remote client
311      * @param destination - consumer for this destination
312      * @param selector - the consumer's selector if specified.
313      * @return transient consumer
314      */
315     public synchronized ConsumerEndpoint createConsumerEndpoint(
316         JmsServerSession session, long clientId, JmsDestination destination,
317         String selector)
318         throws JMSException, InvalidSelectorException {
319 
320         if (_log.isDebugEnabled()) {
321             _log.debug("createConsumerEndpoint(session=[sessionId="
322                 + session.getSessionId() + "], clientId=" + clientId
323                 + ", destination=" + destination
324                 + ", selector=" + selector + ")");
325         }
326 
327         ConsumerEndpoint endpoint = null;
328         DestinationManager destmgr = DestinationManager.instance();
329 
330         // before we create the destination we need to check the
331         // characteristics of the destination. If the destination
332         // is an administered destination then it must be already
333         // defined. If the destination is a temporary destination
334         // then we may need to add it to the cache.
335         if (destination.getPersistent()) {
336             if (!destmgr.destinationExists(destination)) {
337                 throw new JMSException("Cannot create consumer endpoint: "
338                     + "destination=" + destination
339                     + " does not exist");
340             }
341         } else {
342             if (!destmgr.destinationExists(destination)) {
343                 destmgr.createDestination(destination);
344             }
345         }
346 
347         // determine what type of consumer endpoint to create based on
348         // the destination it subscribes too.
349         if (destination instanceof JmsTopic) {
350             JmsTopic topic = (JmsTopic) destination;
351             endpoint = new TopicConsumerEndpoint(session, clientId, topic,
352                 selector, _scheduler);
353         } else if (destination instanceof JmsQueue) {
354             // this seems like a good opportunity to clean up any unreferenced
355             // endpoints attached to this queue.
356             cleanUnreferencedEndpoints(destination);
357 
358             // now we can create the endpoint
359             endpoint = new QueueConsumerEndpoint(session, clientId,
360                 (JmsQueue) destination, selector, _scheduler);
361         }
362 
363         if (endpoint != null) {
364             // add it to the list on managed consumers
365             String id = endpoint.getPersistentId();
366             _endpoints.put(id, endpoint);
367             addToConsumerCache(id, destination, false);
368         }
369 
370         return endpoint;
371     }
372 
373     /***
374      * Create a durable consumer with the specified well-known name.
375      *
376      * @param session the owning session
377      * @param topic consumer for this topic
378      * @param name the unique subscriber name
379      * @param clientId the remote client identity
380      * @param selector the message selector. May be <code>null</code>
381      * @return the durable consumer endpoint
382      * @throws JMSException if a durable consumer is already active with
383      * the same <code>name</code>, or the <code>topic</code> doesn't exist,
384      * or <code>selector</code> is an invalid selector
385      */
386     public synchronized DurableConsumerEndpoint createDurableConsumerEndpoint(
387         JmsServerSession session, JmsTopic topic, String name,
388         long clientId, String selector)
389         throws JMSException {
390 
391         if (_log.isDebugEnabled()) {
392             _log.debug("createDurableConsumerEndpoint(session=[sessionId="
393                 + session.getSessionId() + "], topic=" + topic
394                 + ", name=" + name + ", clientId=" + clientId
395                 + ", selector=" + selector);
396         }
397 
398         // check that the durable subscriber is not already registered. If it
399         // is registered then check to see whether the client is still active.
400         DurableConsumerEndpoint endpoint =
401             (DurableConsumerEndpoint) _endpoints.get(name);
402         if (endpoint != null) {
403             if (endpoint.getSession().isClientEndpointActive()) {
404                 throw new JMSException(name + " is already registered");
405             } else {
406                 // client endpoint must have been lost
407                 if (_log.isDebugEnabled()) {
408                     _log.debug("Closing session for inactive durable " +
409                         "consumer [name=" + name + "]");
410                 }
411                 endpoint.getSession().close();
412             }
413         }
414 
415         // check that the destination actually exists, if the topic
416         // is not a wildcard
417         if (!topic.isWildCard() &&
418             !DestinationManager.instance().destinationExists(topic)) {
419             throw new JMSException("Cannot create a durable consumer for "
420                                    + topic);
421         }
422 
423         // if we get this far then we need to create the durable consumer
424         endpoint = new DurableConsumerEndpoint(session, clientId, topic, name,
425             selector, _scheduler);
426         _endpoints.put(endpoint.getPersistentId(), endpoint);
427 
428         return endpoint;
429     }
430 
431     /***
432      * Check whether there are active durable consumers for the specified
433      * destination.
434      *
435      * @param topic the destination to check
436      * @param <code>true</code> if there is at least one active consumer
437      */
438     public synchronized boolean hasActiveDurableConsumers(
439         JmsDestination topic) {
440 
441         boolean result = false;
442         Vector consumers = (Vector) _destToConsumerMap.get(topic);
443         if (consumers != null) {
444             Enumeration iterator = consumers.elements();
445             while (iterator.hasMoreElements()) {
446                 ConsumerEntry entry = (ConsumerEntry) iterator.nextElement();
447                 if (entry._durable) {
448                     result = true;
449                     break;
450                 }
451             }
452         }
453 
454         return result;
455     }
456 
457     /***
458      * Create a browser for the specified destination and the selector. A
459      * browser is responsible for passing all messages back to the client
460      * that reside on the queue
461      *
462      * @param session the owning session
463      * @param clientId the remote client identity, which is session scoped
464      * @param queue the queue destination cache
465      * @param selector optional filter
466      * @return the queue browser endpoint
467      */
468     public synchronized ConsumerEndpoint createQueueBrowserEndpoint(
469         JmsServerSession session, long clientId, JmsQueue queue,
470         String selector)
471         throws JMSException {
472 
473         ConsumerEndpoint consumer = null;
474 
475         if (queue != null) {
476             consumer = new QueueBrowserEndpoint(session, clientId,
477                 queue, selector, _scheduler);
478         } else {
479             throw new JMSException("Cannot create a browser for a null queue");
480         }
481 
482         String id = consumer.getPersistentId();
483         _endpoints.put(id, consumer);
484         addToConsumerCache(id, queue, false);
485 
486         return consumer;
487     }
488 
489     /***
490      * Destroy the endpoint associated with the specified durable consumer
491      *
492      * @param name - name of the durable consumer
493      * @exception JMSException - if itt cannot complete the request
494      */
495     public synchronized void deleteDurableConsumerEndpoint(String name)
496         throws JMSException {
497 
498         if (_log.isDebugEnabled()) {
499             _log.debug("deleteDurableConsumerEndpoint(name=" + name + ")");
500         }
501 
502         ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
503         if (entry != null) {
504             if (entry._durable) {
505                 deleteConsumerEndpoint((ConsumerEndpoint) _endpoints.get(name));
506             } else {
507                 throw new JMSException(name + " is not a durable subscriber");
508             }
509         } else {
510             // ignore since the consumer is not active
511             if (_log.isDebugEnabled()) {
512                 _log.debug("deleteDurableConsumerEndpoint(name=" + name
513                     + "): failed to locate consumer");
514             }
515         }
516     }
517 
518     /***
519      * Destroy the specified consumer
520      *
521      * @param consumer the consumer to destroy
522      */
523     public synchronized void deleteConsumerEndpoint(
524         ConsumerEndpoint consumer) {
525 
526         if (_log.isDebugEnabled()) {
527             _log.debug("deleteConsumerEndpoint(consumer=[clientId="
528                 + consumer.getClientId() + ", destination="
529                 + consumer.getDestination() + ", session=[sessionId="
530                 + consumer.getSession().getSessionId() + "]])");
531         }
532 
533         String id = consumer.getPersistentId();
534 
535         // if the consumer is currently active then delete it
536         ConsumerEndpoint existing = (ConsumerEndpoint) _endpoints.get(id);
537         if (existing != null) {
538             // unregister itself from all the caches
539             consumer.unregister();
540 
541             // remove it from the list of active endpoints
542             // As a fix for bug 759752, only remove the consumer if it
543             // matches the existing one
544             if (consumer.getId().equals(existing.getId())) {
545                 _endpoints.remove(id);
546             } else {
547                 if (_log.isDebugEnabled()) {
548                     _log.debug("Existing endpoint doesn't match that to " +
549                         "be deleted - retaining");
550                 }
551             }
552 
553             // close the endpoint
554             consumer.close();
555 
556             // remove it from the consumer cache if and only if it is a
557             // non-durable subscriber
558             if (!(consumer instanceof DurableConsumerEndpoint)) {
559                 try {
560                     removeFromConsumerCache(id);
561                 } catch (JMSException exception) {
562                     _log.debug("Failed to remove " + id + " from the cache",
563                         exception);
564                 }
565             }
566         }
567     }
568 
569     /***
570      * Return the consumer with the specified identity
571      *
572      * @param id - identity of the consumer
573      * @return Consumer - associated consumer object or null
574      */
575     public ConsumerEndpoint getConsumerEndpoint(String id) {
576         return (ConsumerEndpoint) _endpoints.get(id);
577     }
578 
579     /***
580      * Return a list of {@link ConsumerEndpoint} objects, both transient and
581      * durable that are currently active.
582      *
583      * @return Iterator - iterator of {@link ConsumerEndpoint} objects
584      */
585     public Iterator consumerEndpoints() {
586         return _endpoints.values().iterator();
587     }
588 
589     /***
590      * Return a list of consumer names id's currently active in the
591      * consumer manager.
592      *
593      * @return Iterator - iterator of {#link String} ids
594      */
595     public Iterator consumerIds() {
596         return _endpoints.keySet().iterator();
597     }
598 
599     /***
600      * Check whether a consumer, with the specified identity actually
601      * exists.
602      *
603      * @param id - identity of the consumer
604      * @return boolean - true if one exists
605      */
606     public boolean exists(String id) {
607         return (getConsumerEndpoint(id) != null);
608     }
609 
610     /***
611      * Check whether there is an active consumer for the specified
612      * destination
613      *
614      * @param destination - the destination to check
615      * @return boolean - true if it exists
616      */
617     public boolean hasActiveConsumers(JmsDestination destination)
618         throws JMSException {
619         boolean result = false;
620 
621         Object[] endpoints = _endpoints.values().toArray();
622         for (int index = 0; index < endpoints.length; index++) {
623             ConsumerEndpoint endpoint = (ConsumerEndpoint) endpoints[index];
624             JmsDestination endpoint_dest = endpoint.getDestination();
625 
626             if ((destination instanceof JmsTopic) &&
627                 (endpoint_dest instanceof JmsTopic) &&
628                 (((JmsTopic) endpoint_dest).isWildCard())) {
629                 if (((JmsTopic) endpoint_dest).match((JmsTopic) destination)) {
630                     result = true;
631                     break;
632                 }
633             } else {
634                 if (endpoint_dest.equals(destination)) {
635                     result = true;
636                     break;
637                 }
638             }
639         }
640 
641         return result;
642     }
643 
644     /***
645      * Check whether a particular durable consumer is active
646      *
647      * @param name - the consumer name
648      * @return boolean - true if active
649      */
650     public boolean isDurableConsumerActive(String name) {
651         return (_endpoints.get(name) != null);
652     }
653 
654     /***
655      * Return the destination assoicated with the specified durable
656      * consumer.
657      *
658      * @param name - consumer name
659      * @return JmsDestination - the destination is it registered under or null
660      */
661     public JmsDestination getDestinationForConsumerName(String name) {
662         ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
663         return (entry != null) ? entry._destination : null;
664     }
665 
666     /***
667      * Check if the specified durable consumer exists
668      *
669      * @param name - the name of the durable consumer
670      * @return boolean - true if successful
671      */
672     public boolean durableConsumerExists(String name) {
673         boolean result = false;
674         ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
675         if ((entry != null) &&
676             (entry._durable)) {
677             result = true;
678         }
679 
680         return result;
681     }
682 
683     /***
684      * This method will check that the name-destination pair actually
685      * are valid and exist as a DurableConsumer  entity
686      *
687      * @param topic - the name of the topic
688      * @param name - the name of the durable consumer
689      * @return boolean - true if valid and false otherwise
690      */
691     public boolean validSubscription(String topic, String name) {
692 
693         boolean result = false;
694         ConsumerEntry entry = (ConsumerEntry) _consumerCache.get(name);
695 
696         if ((entry != null) &&
697             (entry._destination != null) &&
698             (entry._destination.getName().equals(topic))) {
699             result = true;
700         }
701 
702         return result;
703     }
704 
705     // implementation of DestinationCacheEventListener.messageAdded
706     synchronized public boolean messageAdded(MessageImpl message) {
707         return false;
708     }
709 
710     // implementation of DestinationCacheEventListener.messageRemoved
711     synchronized public boolean messageRemoved(MessageImpl message) {
712         return false;
713     }
714 
715     // implementation of DestinationCacheEventListener.persistentMessageAdded
716     synchronized public boolean persistentMessageAdded(Connection connection,
717                                                        MessageImpl message)
718         throws PersistenceException {
719         try {
720             JmsDestination dest = (JmsDestination) message.getJMSDestination();
721 
722             // check to ensure that the message is not of type queue. If it
723             // is then we don't want to process it here. All the processing
724             // for non-resident queues are done at the cache level
725             if (dest instanceof JmsQueue) {
726                 return false;
727             }
728 
729             // for each durable consumer we need to add a persistent handle
730             Vector names = getDurableConsumersForDest((JmsTopic) dest);
731             while (names.size() > 0) {
732                 String name = (String) names.remove(0);
733                 MessageHandleFactory.createPersistentHandle(connection, dest,
734                     name, message);
735             }
736         } catch (JMSException exception) {
737             // rethrow as a PersistenceException...but does it make sense
738             throw new PersistenceException(
739                 "Failed to create persistent handle", exception);
740         }
741 
742         // return false, even though we processed it to force the
743         // message manager not to keep the message in memory.
744         return false;
745     }
746 
747     // implementation of DestinationCacheEventListener.persistentMessageRemoved
748     synchronized public boolean persistentMessageRemoved(Connection connection,
749                                                          MessageImpl message)
750         throws PersistenceException {
751         try {
752 
753             JmsDestination dest = (JmsDestination) message.getJMSDestination();
754 
755             // check to ensure that the message is not of type queue. If it
756             // is then we don't want to process it here.
757             if (dest instanceof JmsQueue) {
758                 return false;
759             }
760 
761             Vector consumers = (Vector) _destToConsumerMap.get(dest);
762             if (consumers != null) {
763                 // now search through the list of inactive durable consumers
764                 // and remove it from their cache
765                 Enumeration entries = consumers.elements();
766                 while (entries.hasMoreElements()) {
767                     ConsumerEntry entry = (ConsumerEntry) entries.nextElement();
768                     if ((entry._durable) &&
769                         (!_endpoints.containsKey(entry._name))) {
770                         MessageHandleFactory.destroyPersistentHandle(connection,
771                             dest, entry._name, message);
772                     }
773                 }
774             }
775         } catch (PersistenceException exception) {
776             throw exception;
777         } catch (Exception exception) {
778             // rethrow as a PersistenceException...but does it make sense
779             throw new PersistenceException(
780                 "Exception in ConsumerManager.persistentMessageRemoved " +
781                 exception.toString());
782         }
783 
784         return true;
785     }
786 
787     /***
788      * Destroy this manager. This is brutal and final
789      */
790     public synchronized void destroy() {
791 
792         // clean up all the destinations
793         Object[] endpoints = _endpoints.values().toArray();
794         for (int index = 0; index < endpoints.length; index++) {
795             deleteConsumerEndpoint((ConsumerEndpoint) endpoints[index]);
796         }
797         _endpoints.clear();
798 
799         // remove cache data structures
800         _consumerCache.clear();
801         _consumerCache = null;
802         _destToConsumerMap.clear();
803         _destToConsumerMap = null;
804         _wildcardConsumers.clear();
805         _wildcardConsumers = null;
806 
807         // reset the singleton
808         _instance = null;
809     }
810 
811     /***
812      * Return a list of durable subscribers for the specified destination
813      *
814      * @return Vector - a vector of strings, which denote the name
815      */
816     public synchronized Vector getDurableConsumersForDest(JmsTopic dest) {
817         Vector names = new Vector();
818 
819         Vector consumers = (Vector) _destToConsumerMap.get(dest);
820         if (consumers != null) {
821             Enumeration entries = consumers.elements();
822             while (entries.hasMoreElements()) {
823                 ConsumerEntry entry = (ConsumerEntry) entries.nextElement();
824                 if (entry._durable) {
825                     names.add(entry._name);
826                 }
827             }
828         }
829 
830         // if the destination is a topic and part is also a wildcard then
831         // check the wildcardConsumers for additional consumers
832         Iterator wildconsumers = _wildcardConsumers.keySet().iterator();
833         while (wildconsumers.hasNext()) {
834             ConsumerEntry entry = (ConsumerEntry) wildconsumers.next();
835             JmsDestination adest = entry._destination;
836             if ((entry._durable) &&
837                 (adest instanceof JmsTopic) &&
838                 (((JmsTopic) adest).match((JmsTopic) dest))) {
839                 names.add(entry._name);
840             }
841         }
842 
843         return names;
844     }
845 
846     /***
847      * Clean all the unreferenced endpoints for a specified destination.
848      * Unreference endpoints usually occur when the client abnormally
849      * terminates leaving some dangling endpoints on the server.
850      *
851      * @param dest- the destination to query
852      */
853     public void cleanUnreferencedEndpoints(JmsDestination dest) {
854         Object[] endpoints = _endpoints.values().toArray();
855 
856         for (int index = 0; index < endpoints.length; index++) {
857             ConsumerEndpoint endpoint = (ConsumerEndpoint) endpoints[index];
858             if (dest.equals(endpoint.getDestination())) {
859                 if (!endpoint.getSession().isClientEndpointActive()) {
860                     try {
861                         endpoint.getSession().close();
862                     } catch (Exception ignore) {
863                         // ignore this exception
864                     }
865                 }
866             }
867         }
868     }
869 
870     /***
871      * Return a list of {@link ConsumerEndpoint} objects attached to the
872      * specified destination
873      *
874      * @param dest the destination to query
875      * @return list of endpoints
876      */
877     public synchronized LinkedList getEndpointsForDest(JmsDestination dest) {
878         LinkedList endpoints = new LinkedList();
879         Iterator iter = _endpoints.values().iterator();
880 
881         while (iter.hasNext()) {
882             ConsumerEndpoint endpoint = (ConsumerEndpoint) iter.next();
883             if (dest.equals(endpoint.getDestination())) {
884                 endpoints.add(endpoint);
885             }
886         }
887 
888         return endpoints;
889     }
890 
891     // implement of GarbageCollectable.collectGarbage
892     public synchronized void collectGarbage(boolean aggressive) {
893         if (aggressive) {
894             Object[] endpoints = _endpoints.values().toArray();
895             int count = endpoints.length;
896 
897             for (int index = 0; index < count; index++) {
898                 ((ConsumerEndpoint) endpoints[index]).collectGarbage(aggressive);
899             }
900         }
901     }
902 
903     /***
904      * Add the specified consumer to the cache.
905      *
906      * @param name - the name of the consumer
907      * @param dest - the destination it is subscribed to. It can be a wildcard
908      * @param durable - indicates whether it is a durable subscription
909      */
910     synchronized void addToConsumerCache(String name, JmsDestination dest,
911                                          boolean durable)
912         throws JMSException {
913 
914         if (_log.isDebugEnabled()) {
915             _log.debug("addToConsumerCache(name=" + name + ", dest=" + dest
916                 + ", durable=" + durable + ")");
917         }
918 
919         if (!_consumerCache.containsKey(name)) {
920             ConsumerEntry entry = new ConsumerEntry(name, dest, durable);
921             _consumerCache.put(name, entry);
922 
923             // if the specified destination is a JmsTopic and also a wildcard
924             // then we need to add it to all matching desitnations
925             if ((dest instanceof JmsTopic) &&
926                 (((JmsTopic) dest).isWildCard())) {
927                 // store wild card consumers in a separate array.
928                 _wildcardConsumers.put(new ConsumerEntry(name, dest, durable),
929                     dest);
930             } else {
931                 // we also need to add the reverse mapping
932                 Vector consumers = (Vector) _destToConsumerMap.get(dest);
933                 if (consumers == null) {
934                     consumers = new Vector();
935                     _destToConsumerMap.put(dest, consumers);
936                 }
937 
938                 // add the mapping
939                 consumers.add(entry);
940             }
941         }
942     }
943 
944     /***
945      * Remove the specified consumer from the cache and make all necessary
946      * adjustments
947      *
948      * @param name - name of consumer to remove
949      */
950     synchronized void removeFromConsumerCache(String name)
951         throws JMSException {
952 
953         if (_log.isDebugEnabled()) {
954             _log.debug("removeFromConsumerCache(name=" + name + ")");
955         }
956 
957         if (_consumerCache.containsKey(name)) {
958             ConsumerEntry entry = (ConsumerEntry) _consumerCache.remove(name);
959             JmsDestination dest = entry._destination;
960 
961 
962             if ((dest instanceof JmsTopic) &&
963                 (((JmsTopic) dest).isWildCard())) {
964                 // remove it from the wildcard cache.
965                 _wildcardConsumers.remove(name);
966             } else {
967                 // remove it from the specified destination
968                 Vector consumers = (Vector) _destToConsumerMap.get(dest);
969                 if (consumers != null) {
970                     consumers.remove(entry);
971 
972                     // if consumers is of size 0 then remove it
973                     if (consumers.size() == 0) {
974                         _destToConsumerMap.remove(dest);
975                     }
976                 }
977             }
978         } else {
979             if (_log.isDebugEnabled()) {
980                 _log.debug("removeFromConsumerCache(name=" + name +
981                     "): consumer not found");
982             }
983         }
984     }
985 
986     /***
987      * Remove all the consumers for the specified destination from the
988      * cache.
989      *
990      * @param destination - destination to remove.
991      */
992     synchronized void removeFromConsumerCache(JmsDestination destination) {
993         if (_destToConsumerMap.containsKey(destination)) {
994             _destToConsumerMap.remove(destination);
995 
996             // i am not too sure whether we need more house keeping
997         }
998     }
999 
1000     /***
1001      * Return the number of active consumer endpoints
1002      *
1003      * @return int - number of active consumer endpoints
1004      */
1005     int getConsumerEndpointCount() {
1006         return _endpoints.size();
1007     }
1008 
1009     /***
1010      * Initialises the consumer manager
1011      *
1012      * @throws ServiceException if the manager can't be initialised
1013      */
1014     private void init() throws ServiceException {
1015         _scheduler = Scheduler.instance();
1016 
1017         Connection connection = null;
1018         try {
1019             connection = DatabaseService.getConnection();
1020 
1021             PersistenceAdapter adapter = DatabaseService.getAdapter();
1022             connection.commit();
1023 
1024             // return a list of JmsDestination objects.
1025             HashMap map = adapter.getAllDurableConsumers(connection);
1026             Iterator iter = map.keySet().iterator();
1027 
1028             // Create an endpoint for each durable consumer
1029             while (iter.hasNext()) {
1030                 // for each destination, create the destination cache
1031                 String consumer = (String) iter.next();
1032                 String deststr = (String) map.get(consumer);
1033 
1034                 JmsDestination dest =
1035                     DestinationManager.instance().destinationFromString(
1036                         deststr);
1037                 if (dest == null) {
1038                     // this maybe a wildcard subscription
1039                     dest = new JmsTopic(deststr);
1040                     if (!((JmsTopic) dest).isWildCard()) {
1041                         dest = null;
1042                     }
1043                 }
1044 
1045                 if (consumer != null && dest != null &&
1046                     dest instanceof JmsTopic) {
1047                     // cache the consumer-destination mapping in memory.
1048                     addToConsumerCache(consumer, dest, true);
1049                 } else {
1050                     // what should we do about this stage
1051                     _log.error(
1052                         "Failure in ConsumerManager.init : " + consumer +
1053                         ":" + dest);
1054                 }
1055             }
1056         } catch (ServiceException exception) {
1057             SQLHelper.rollback(connection);
1058             throw exception;
1059         } catch (Exception exception) {
1060             SQLHelper.rollback(connection);
1061             throw new ServiceException("Failed to initialise ConsumerManager",
1062                 exception);
1063         } finally {
1064             SQLHelper.close(connection);
1065         }
1066     }
1067 
1068     /***
1069      * This private static class is used to maintain consumer information
1070      */
1071     private static class ConsumerEntry {
1072 
1073         /***
1074          * The name of the consumer. This name is either the durable
1075          * subscriber name or a uniquely generated name.
1076          */
1077         String _name = null;
1078 
1079         /***
1080          * Indicated whether this entry is for a durable subscriber
1081          */
1082         boolean _durable = false;
1083 
1084         /***
1085          * The destination that the consumer is actually subscribed too
1086          */
1087         JmsDestination _destination = null;
1088 
1089         /***
1090          * Construct an instance of this class using the specified
1091          * name and durable subscriber indicator
1092          *
1093          * @param name - the name of the consumer
1094          * @param destination - the destination consumer is subscribed too
1095          * @param durable - indicates whether it is a durable subscription
1096          */
1097         ConsumerEntry(String name, JmsDestination destination,
1098                       boolean durable) {
1099             _name = name;
1100             _destination = destination;
1101             _durable = durable;
1102         }
1103 
1104         // override Object.equals
1105         public boolean equals(Object obj) {
1106 
1107             boolean result = false;
1108             if ((obj != null) &&
1109                 (obj instanceof ConsumerEntry) &&
1110                 (((ConsumerEntry) obj)._name.equals(_name))) {
1111                 result = true;
1112             }
1113 
1114             return result;
1115         }
1116 
1117     } //-- ConsumerEntry
1118 
1119 } //-- ConsumerManager
1120