View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: ConsumerManagerImpl.java,v 1.3 2005/12/23 12:17:25 tanderson Exp $
44   */
45  package org.exolab.jms.messagemgr;
46  
47  import java.sql.Connection;
48  import java.util.ArrayList;
49  import java.util.HashMap;
50  import java.util.Iterator;
51  import java.util.List;
52  import javax.jms.InvalidDestinationException;
53  import javax.jms.InvalidSelectorException;
54  import javax.jms.JMSException;
55  
56  import org.apache.commons.logging.Log;
57  import org.apache.commons.logging.LogFactory;
58  
59  import org.exolab.jms.client.JmsDestination;
60  import org.exolab.jms.client.JmsQueue;
61  import org.exolab.jms.client.JmsTopic;
62  import org.exolab.jms.persistence.DatabaseService;
63  import org.exolab.jms.persistence.PersistenceAdapter;
64  import org.exolab.jms.persistence.PersistenceException;
65  import org.exolab.jms.service.Service;
66  import org.exolab.jms.service.ServiceException;
67  
68  
69  /***
70   * The consumer manager is responsible for creating and managing the lifecycle
71   * of consumers. The consumer manager maintains a list of all active consumers.
72   *
73   * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a>
74   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
75   * @version $Revision: 1.3 $ $Date: 2005/12/23 12:17:25 $
76   */
77  public class ConsumerManagerImpl extends Service implements ConsumerManager {
78  
79      /***
80       * The destination manager.
81       */
82      private final DestinationManager _destinations;
83  
84      /***
85       * The database service.
86       */
87      private final DatabaseService _database;
88  
89      /***
90       * Maintains a list of all consumers, durable and non-durable. All durable
91       * subscribers are maintained in memory until they are removed from the
92       * system entirely. All non-durable subscribers are maintained in memory
93       * until their endpoint is removed.
94       */
95      private HashMap _consumers = new HashMap();
96  
97      /***
98       * The set of all consumer endpoints. This is a map of {@link
99       * ConsumerEndpoint} instances, keyed on {@link ConsumerEndpoint#getPersistentId()}
100      * if non null; otherwise {@link ConsumerEndpoint#getId()}.
101      */
102     private HashMap _endpoints = new HashMap();
103 
104 
105     /***
106      * Maintains a mapping between destinations and consumers. A destination can
107      * have more than one consumer and a consumer can also be registered to more
108      * than one destination
109      */
110     private HashMap _destToConsumerMap = new HashMap();
111 
112     /***
113      * The set of all wildcard consumers, represented by a map of ConsumerEntry
114      * -> JmsTopic instances.
115      */
116     private HashMap _wildcardConsumers = new HashMap();
117 
118     /***
119      * The seed to allocate identifiers to new consumers.
120      */
121     private long _consumerIdSeed = 0;
122 
123     /***
124      * The logger.
125      */
126     private static final Log _log = LogFactory.getLog(
127             ConsumerManagerImpl.class);
128 
129 
130     /***
131      * Construct a new  <code>ConsumerManager</code>.
132      *
133      * @param destinations the destination manager
134      * @param database     the database service
135      */
136     public ConsumerManagerImpl(DestinationManager destinations,
137                                DatabaseService database) {
138         if (destinations == null) {
139             throw new IllegalArgumentException(
140                     "Argument 'destinations' is null");
141         }
142         if (database == null) {
143             throw new IllegalArgumentException("Argument 'database' is null");
144         }
145         _destinations = destinations;
146         _database = database;
147     }
148 
149     /***
150      * Create a new durable subscription.
151      * <p/>
152      * A client can change an existing durable subscription by creating a new
153      * subscription with the same name and a new topic. Changing a durable
154      * subscriber is equivalent to unsubscribing the old one and creating a new
155      * one.
156      *
157      * @param topic    the topic to subscribe to
158      * @param name     the subscription name
159      * @param clientID the client identifier. May be <code>null</code>
160      * @throws InvalidDestinationException if <code>topic</code> is not a
161      *                                     persistent destination, or
162      *                                     <code>name</code> is an invalid
163      *                                     subscription name
164      * @throws JMSException                if the durable consumer can't be
165      *                                     created
166      */
167     public synchronized void subscribe(JmsTopic topic, String name,
168                                        String clientID)
169             throws JMSException {
170         createInactiveDurableConsumer(topic, name, clientID);
171     }
172 
173 
174     /***
175      * Remove a durable subscription.
176      * <p/>
177      * A subscription may only be removed if the associated {@link
178      * DurableConsumerEndpoint} is inactive.
179      *
180      * @param name     the subscription name
181      * @param clientID the client identifier. May be <code>null</code>
182      * @throws InvalidDestinationException if an invalid subscription name is
183      *                                     specified.
184      * @throws JMSException                if the durable consumer is active, or
185      *                                     cannot be removed
186      */
187     public synchronized void unsubscribe(String name, String clientID)
188             throws JMSException {
189         if (_log.isDebugEnabled()) {
190             _log.debug("unsubscribe(name=" + name + ", clientID="
191                        + clientID + ")");
192         }
193 
194         DurableConsumerEndpoint consumer
195                 = (DurableConsumerEndpoint) _endpoints.remove(name);
196         if (consumer == null) {
197             throw new InvalidDestinationException("Durable consumer " + name
198                                                   + " is not defined.");
199         }
200         if (consumer.isActive()) {
201             throw new JMSException("Cannot remove durable consumer=" + name
202                                    + ": consumer is active");
203         }
204         consumer.close();
205 
206         // remove it from the persistent store.
207         try {
208             _database.begin();
209             Connection connection = _database.getConnection();
210 
211             _database.getAdapter().removeDurableConsumer(connection, name);
212             removeConsumerEntry(name);
213             _database.commit();
214         } catch (PersistenceException exception) {
215             String msg = "Failed to remove durable consumer, name=" + name;
216             rethrow(msg, exception);
217         }
218     }
219 
220     /***
221      * Remove all durable subscriptions for a destination.
222      * <p/>
223      * Subscriptions may only be removed if the associated {@link
224      * ConsumerEndpoint}s are inactive.
225      *
226      * @param topic the topic to remove consumers for
227      * @throws JMSException if the subscriptions can't be removed
228      */
229     public synchronized void unsubscribe(JmsTopic topic) throws JMSException {
230         List list = (List) _destToConsumerMap.get(topic);
231         if (list != null) {
232             ConsumerEntry[] consumers
233                     = (ConsumerEntry[]) list.toArray(new ConsumerEntry[0]);
234             for (int i = 0; i < consumers.length; ++i) {
235                 ConsumerEntry consumer = consumers[i];
236                 if (consumer.isDurable()) {
237                     // remove the durable consumer. This operation
238                     // will fail if the consumer is active.
239                     unsubscribe(consumer.getName(), consumer.getClientID());
240                 }
241             }
242         }
243 
244         // remove all consumers for the specified destination
245         removeFromConsumerCache(topic);
246     }
247 
248     /***
249      * Create a transient consumer for the specified destination.
250      *
251      * @param destination  the destination to consume messages from
252      * @param connectionId the identity of the connection that owns this
253      *                     consumer
254      * @param selector     the message selector. May be <code>null</code>
255      * @param noLocal      if true, and the destination is a topic, inhibits the
256      *                     delivery of messages published by its own connection.
257      *                     The behavior for <code>noLocal</code> is not
258      *                     specified if the destination is a queue.
259      * @return a new transient consumer
260      */
261     public synchronized ConsumerEndpoint createConsumer(
262             JmsDestination destination, long connectionId,
263             String selector,
264             boolean noLocal)
265             throws JMSException, InvalidSelectorException {
266 
267         if (_log.isDebugEnabled()) {
268             _log.debug("createConsumerEndpoint(destination=" + destination
269                        + ", connectionId=" + connectionId
270                        + ", selector=" + selector
271                        + ", noLocal=" + noLocal + ")");
272         }
273 
274         ConsumerEndpoint consumer = null;
275 
276         // ensure that the destination is valid before proceeding
277         getDestination(destination, true);
278 
279         long consumerId = getNextConsumerId();
280 
281         try {
282             _database.begin();
283             // determine what type of consumer to create based on the destination
284             // it subscribes to.
285             if (destination instanceof JmsTopic) {
286                 JmsTopic topic = (JmsTopic) destination;
287                 consumer = new TopicConsumerEndpoint(consumerId, connectionId,
288                                                      topic, selector, noLocal,
289                                                      _destinations);
290             } else if (destination instanceof JmsQueue) {
291                 QueueDestinationCache cache;
292                 cache = (QueueDestinationCache) _destinations.getDestinationCache(
293                         destination);
294                 consumer = new QueueConsumerEndpoint(consumerId, cache, selector);
295             }
296 
297             if (consumer != null) {
298                 // add it to the list of managed consumers. If it has a persistent
299                 // identity, use that as the key, otherwise use its transient
300                 // identity.
301                 Object key = ConsumerEntry.getConsumerKey(consumer);
302                 _endpoints.put(key, consumer);
303                 addConsumerEntry(key, destination, null, false);
304             }
305             _database.commit();
306         } catch (Exception exception) {
307             rethrow("Failed to create consumer", exception);
308         }
309 
310         return consumer;
311     }
312 
313     /***
314      * Create a durable consumer.
315      *
316      * @param topic        the topic to subscribe to
317      * @param name         the subscription name
318      * @param clientID     the client identifier. May be <code>null</code>.
319      * @param connectionId the identity of the connection that owns this
320      *                     consumer
321      * @param noLocal      if true, and the destination is a topic, inhibits the
322      *                     delivery of messages published by its own
323      *                     connection.
324      * @param selector     the message selector. May be <code>null</code>
325      * @return the durable consumer endpoint
326      * @throws InvalidDestinationException if <code>topic</code> is not a
327      *                                     persistent destination
328      * @throws InvalidSelectorException    if the selector is not well formed
329      * @throws JMSException                if a durable consumer is already
330      *                                     active with the same <code>name</code>
331      */
332     public synchronized DurableConsumerEndpoint createDurableConsumer(
333             JmsTopic topic, String name, String clientID, long connectionId,
334             boolean noLocal,
335             String selector)
336             throws JMSException {
337 
338         if (_log.isDebugEnabled()) {
339             _log.debug("createDurableConsumer(topic=" + topic
340                        + ", name=" + name + ", connectionId=" + connectionId
341                        + ", selector=" + selector + ", noLocal=" + noLocal
342                        + ")");
343         }
344 
345         DurableConsumerEndpoint consumer
346                 = createInactiveDurableConsumer(topic, name, clientID);
347         consumer.activate(connectionId, selector, noLocal);
348 
349         return consumer;
350     }
351 
352     /***
353      * Create a browser for the specified destination and the selector. A
354      * browser is responsible for passing all messages back to the client that
355      * reside on the queue
356      *
357      * @param queue    the queue to browse
358      * @param selector the message selector. May be <code>null</code>
359      * @return the queue browser endpoint
360      * @throws JMSException             if the browser can't be created
361      */
362     public synchronized ConsumerEndpoint createQueueBrowser(JmsQueue queue,
363                                                             String selector)
364             throws JMSException {
365 
366         // ensure that the destination is valid before proceeding
367         getDestination(queue, true);
368 
369         long consumerId = getNextConsumerId();
370 
371 
372         ConsumerEndpoint consumer = null;
373         try {
374             _database.begin();
375             QueueDestinationCache cache;
376             cache = (QueueDestinationCache) _destinations.getDestinationCache(
377                     queue);
378             consumer = new QueueBrowserEndpoint(consumerId, cache, selector);
379             Object key = ConsumerEntry.getConsumerKey(consumer);
380             _endpoints.put(key, consumer);
381             addConsumerEntry(key, queue, null, false);
382             _database.commit();
383         } catch (Exception exception) {
384             rethrow("Failed to create browser", exception);
385         }
386 
387         return consumer;
388     }
389 
390     /***
391      * Close a consumer.
392      *
393      * @param consumer the consumer to close
394      */
395     public synchronized void closeConsumer(ConsumerEndpoint consumer) {
396         if (_log.isDebugEnabled()) {
397             _log.debug("closeConsumerEndpoint(consumer=[Id="
398                        + consumer.getId() + ", destination="
399                        + consumer.getDestination() + ")");
400         }
401 
402         Object key = ConsumerEntry.getConsumerKey(consumer);
403 
404         ConsumerEndpoint existing = (ConsumerEndpoint) _endpoints.get(key);
405         if (existing != null) {
406             try {
407                  _database.begin();
408                 if (consumer.getId() != existing.getId()) {
409                     // As a fix for bug 759752, only remove the consumer if it
410                     // matches the existing one.
411                     // @todo - not sure if this situation can arise any longer
412                     _log.error("Existing endpoint doesn't match that to be closed "
413                                + "- retaining");
414                 } else if (existing instanceof DurableConsumerEndpoint) {
415                     DurableConsumerEndpoint durable
416                             = (DurableConsumerEndpoint) existing;
417                     if (durable.isActive()) {
418                         try {
419                             durable.deactivate();
420                         } catch (JMSException exception) {
421                             _log.error("Failed to deactivate durable consumer="
422                                        + durable, exception);
423                         }
424                     }
425                 } else {
426                     _endpoints.remove(key);
427                     consumer.close();
428                     removeConsumerEntry(key);
429                 }
430                 _database.commit();
431             } catch (PersistenceException exception) {
432                 _log.error("Failed to close consumer=" + consumer, exception);
433                 rollback();
434             }
435         }
436     }
437 
438     /***
439      * Return the consumer with the specified identity.
440      *
441      * @param consumerId the identity of the consumer
442      * @return the associated consumer, or <code>null</code> if none exists
443      */
444     public synchronized ConsumerEndpoint getConsumerEndpoint(long consumerId) {
445         return (ConsumerEndpoint) _endpoints.get(new Long(consumerId));
446     }
447 
448     /***
449      * Return the consumer with the specified persistent identity.
450      *
451      * @param persistentId the persistent identity of the consumer
452      * @return the associated consumer, or <code>null</code> if none exists
453      */
454     public synchronized ConsumerEndpoint getConsumerEndpoint(
455             String persistentId) {
456         return (ConsumerEndpoint) _endpoints.get(persistentId);
457     }
458 
459     /***
460      * Determines if there are any active consumers for a destination.
461      *
462      * @param destination the destination
463      * @return <code>true</code> if there is at least one consumer
464      */
465     public synchronized boolean hasActiveConsumers(JmsDestination destination) {
466         boolean result = false;
467         ConsumerEndpoint[] consumers = getConsumers();
468         for (int i = 0; i < consumers.length; ++i) {
469             if (consumers[i].canConsume(destination)) {
470                 result = true;
471                 break;
472             }
473         }
474         return result;
475     }
476 
477     /***
478      * Start the service.
479      *
480      * @throws ServiceException if the service fails to start
481      */
482     protected void doStart() throws ServiceException {
483         try {
484             _database.begin();
485             Connection connection = _database.getConnection();
486 
487             PersistenceAdapter adapter = _database.getAdapter();
488 
489             // return a list of JmsDestination objects.
490             HashMap map = adapter.getAllDurableConsumers(connection);
491             Iterator iter = map.keySet().iterator();
492 
493             // Create an endpoint for each durable consumer
494             while (iter.hasNext()) {
495                 String consumer = (String) iter.next();
496                 String deststr = (String) map.get(consumer);
497 
498                 JmsDestination dest = _destinations.getDestination(deststr);
499                 if (dest == null) {
500                     // this maybe a wildcard subscription
501                     dest = new JmsTopic(deststr);
502                     if (!((JmsTopic) dest).isWildCard()) {
503                         dest = null;
504                     }
505                 }
506 
507                 if (consumer != null && dest != null &&
508                         dest instanceof JmsTopic) {
509                     // cache the consumer-destination mapping in memory.
510                     addDurableConsumer((JmsTopic) dest, consumer, null);
511                 } else {
512                     // @todo
513                     _log.error("Failure in ConsumerManager.init : " + consumer +
514                                ":" + dest);
515                 }
516             }
517             _database.commit();
518         } catch (Exception exception) {
519             rollback();
520             throw new ServiceException("Failed to initialise ConsumerManager",
521                                        exception);
522         }
523     }
524 
525     /***
526      * Stop the service.
527      */
528     protected synchronized void doStop() {
529         // clean up all the destinations
530         Object[] endpoints = _endpoints.values().toArray();
531         for (int index = 0; index < endpoints.length; index++) {
532             closeConsumer((ConsumerEndpoint) endpoints[index]);
533         }
534         _endpoints.clear();
535 
536         // remove cache data structures
537         _consumers.clear();
538         _destToConsumerMap.clear();
539         _wildcardConsumers.clear();
540     }
541 
542     /***
543      * Create an inactive durable consumer.
544      * <p/>
545      * If the consumer doesn't exist, it will created in the persistent store.
546      * If it does exist, and is inactive, it will be recreated. If it does
547      * exist, but is active, an exception will be raised.
548      *
549      * @param topic    the topic to subscribe to
550      * @param name     the subscription name
551      * @param clientID the client identifier. May be <code>null</code>.
552      * @return the durable consumer
553      * @throws InvalidDestinationException if <code>topic</code> is not a
554      *                                     persistent destination
555      * @throws JMSException                if a durable consumer is already
556      *                                     active with the same <code>name</code>,
557      *                                     or the consumer can't be created
558      */
559     private DurableConsumerEndpoint createInactiveDurableConsumer(
560             JmsTopic topic, String name, String clientID)
561             throws JMSException {
562         DurableConsumerEndpoint endpoint;
563 
564         if (_log.isDebugEnabled()) {
565             _log.debug("createInactiveDurableConsumer(topic=" + topic
566                        + ", name=" + name + ", clientID=" + clientID + ")");
567         }
568 
569         // check that the destination exists, if the topic is not a wildcard
570         if (!topic.isWildCard()) {
571             topic = (JmsTopic) getDestination(topic, false);
572         }
573 
574         if (name == null || name.length() == 0) {
575             throw new InvalidDestinationException(
576                     "Invalid subscription name: " + name);
577         }
578         endpoint = (DurableConsumerEndpoint) _endpoints.get(name);
579         if (endpoint != null) {
580             if (endpoint.isActive()) {
581                 throw new JMSException(
582                         "Durable subscriber already exists with name: " + name);
583             }
584             if (!endpoint.getDestination().equals(topic)) {
585                 // subscribing to a different topic. Need to re-subscribe.
586                 unsubscribe(name, clientID);
587                 endpoint = null;
588             }
589         }
590         if (endpoint == null) {
591             try {
592                 _database.begin();
593                 PersistenceAdapter adapter = _database.getAdapter();
594                 Connection connection = _database.getConnection();
595                 adapter.addDurableConsumer(connection, topic.getName(), name);
596                 endpoint = addDurableConsumer(topic, name, clientID);
597                 _database.commit();
598             } catch (Exception exception) {
599                 String msg = "Failed to create durable consumer, name=" + name
600                         + ", for topic=" + topic.getName();
601                 rethrow(msg, exception);
602             }
603         }
604 
605         return endpoint;
606     }
607 
608     /***
609      * Register an inactive durable consumer.
610      *
611      * @param topic    the topic to subscribe to
612      * @param name     the subscription name
613      * @param clientID the client identifier. May be <code>null</code>.
614      * @return the durable consumer
615      * @throws JMSException for any JMS error
616      * @throws PersistenceException for any persistence error
617      */
618     private DurableConsumerEndpoint addDurableConsumer(JmsTopic topic,
619                                                        String name,
620                                                        String clientID)
621             throws JMSException, PersistenceException {
622         DurableConsumerEndpoint consumer;
623         // cache the consumer locally
624         addConsumerEntry(name, topic, clientID, true);
625 
626         long consumerId = getNextConsumerId();
627         consumer = new DurableConsumerEndpoint(consumerId, topic, name,
628                                                _destinations);
629         _endpoints.put(consumer.getPersistentId(), consumer);
630         return consumer;
631     }
632 
633     /***
634      * Add a consumer entry.
635      *
636      * @param key         a key to identify the entry.
637      * @param destination the destination it is subscribed to. It can be a
638      *                    wildcard
639      * @param clientID    the client identifier. May be <code>null</code>
640      * @param durable     indicates whether it is a durable subscription
641      * @throws JMSException if key specifies a duplicate entry.
642      */
643     private void addConsumerEntry(Object key, JmsDestination destination,
644                                   String clientID,
645                                   boolean durable)
646             throws JMSException {
647         if (_log.isDebugEnabled()) {
648             _log.debug("addConsumerEntry(key=" + key + ", destination="
649                        + destination + ", clientID=" + clientID
650                        + ", durable=" + durable + ")");
651         }
652 
653         if (_consumers.containsKey(key)) {
654             throw new JMSException("Duplicate consumer key:" + key);
655         }
656 
657         ConsumerEntry entry = new ConsumerEntry(key, destination, clientID,
658                                                 durable);
659         _consumers.put(key, entry);
660 
661         if (destination instanceof JmsTopic
662                 && ((JmsTopic) destination).isWildCard()) {
663             // if the specified destination is a JmsTopic and also a wildcard
664             // then we need to add it to all matching destinations
665             _wildcardConsumers.put(entry, destination);
666         } else {
667             // we also need to add the reverse mapping
668             List consumers = (List) _destToConsumerMap.get(destination);
669             if (consumers == null) {
670                 consumers = new ArrayList();
671                 _destToConsumerMap.put(destination, consumers);
672             }
673 
674             // add the mapping
675             consumers.add(entry);
676         }
677     }
678 
679     /***
680      * Remove the specified consumer from the cache.
681      *
682      * @param key the consumer key
683      */
684     private void removeConsumerEntry(Object key) {
685         if (_log.isDebugEnabled()) {
686             _log.debug("removeConsumerEntry(key=" + key + ")");
687         }
688 
689         ConsumerEntry entry = (ConsumerEntry) _consumers.remove(key);
690         if (entry != null) {
691             JmsDestination dest = entry.getDestination();
692 
693             if (dest instanceof JmsTopic && ((JmsTopic) dest).isWildCard()) {
694                 // remove it from the wildcard cache.
695                 _wildcardConsumers.remove(entry);
696             } else {
697                 // remove it from the specified destination
698                 List consumers = (List) _destToConsumerMap.get(dest);
699                 if (consumers != null) {
700                     consumers.remove(entry);
701 
702                     // if consumers is of size 0 then remove it
703                     if (consumers.isEmpty()) {
704                         _destToConsumerMap.remove(dest);
705                     }
706                 }
707             }
708         } else if (_log.isDebugEnabled()) {
709             _log.debug("removeConsumerEntry(key=" + key
710                        + "): consumer not found");
711         }
712     }
713 
714     /***
715      * Remove all the consumers for the specified destination from the cache.
716      *
717      * @param destination the destination to remove
718      */
719     private void removeFromConsumerCache(JmsDestination destination) {
720         _destToConsumerMap.remove(destination);
721     }
722 
723     /***
724      * Returns the next seed value to be allocated to a new consumer.
725      *
726      * @return a unique identifier for a consumer
727      */
728     private long getNextConsumerId() {
729         return ++_consumerIdSeed;
730     }
731 
732     /***
733      * Returns the destination managed by {@link DestinationManager}
734      * corresponding to that supplied, creating it if needed.
735      *
736      * @param destination the destination to look up
737      * @param create      if <code>true</code> the destination may be created if
738      *                    it doesn't exist
739      * @return the destination managed by {@link DestinationManager}
740      *         corresponding to <code>destination</code>.
741      * @throws InvalidDestinationException if the destination doesn't exist and
742      *                                     <code>create</code> is false; or the
743      *                                     destination's properties don't match
744      *                                     the existing destination
745      * @throws JMSException                if the destination can't be created
746      */
747     private JmsDestination getDestination(JmsDestination destination,
748                                           boolean create)
749             throws InvalidDestinationException, JMSException {
750         final String name = destination.getName();
751         JmsDestination result;
752         JmsDestination existing = _destinations.getDestination(name);
753 
754         if (existing == null) {
755             if (!create) {
756                 throw new InvalidDestinationException(
757                         "No destination with name=" + name + " exists");
758             }
759             // register the destination dynamically.
760             _destinations.createDestination(destination);
761             result = _destinations.getDestination(destination.getName());
762         } else {
763             // make sure the supplied destination has the same properties
764             // as the existing one
765             if (!destination.getClass().getName().equals(
766                     existing.getClass().getName())) {
767                 throw new InvalidDestinationException(
768                         "Mismatched destination properties for destination"
769                         + " with name=" + name);
770             }
771             if (existing.getPersistent() != destination.getPersistent()) {
772                 throw new InvalidDestinationException(
773                         "Mismatched destination properties for destination"
774                         + " with name=" + name);
775             }
776             result = existing;
777         }
778         return result;
779     }
780 
781     /***
782      * Returns the consumers managed by this.
783      *
784      * @return an array of consumers
785      */
786     private ConsumerEndpoint[] getConsumers() {
787         return (ConsumerEndpoint[]) _endpoints.values().toArray(
788                 new ConsumerEndpoint[0]);
789     }
790 
791     /***
792      * Rollback any transaction.
793      */
794     private void rollback() {
795         try {
796             if (_database.isTransacted()) {
797                 _database.rollback();
798             }
799         } catch (PersistenceException error) {
800             _log.warn("Failed to rollback after error", error);
801         }
802     }
803 
804     /***
805      * Helper to clean up after a failed call, and rethrow.
806      * Any transaction will be rolled back.
807      *
808      * @param message   the message to log
809      * @param exception the exception
810      * @throws JMSException the original exception adapted to a
811      *                      <code>JMSException</code> if necessary
812      */
813     private void rethrow(String message, Exception exception)
814             throws JMSException {
815         rollback();
816 
817         if (exception instanceof JMSException) {
818             _log.debug(message, exception);
819             throw (JMSException) exception;
820         }
821         // need to adapt the exception, so log as an error before rethrow
822         _log.error(message, exception);
823         throw new JMSException(exception.getMessage());
824     }
825 
826     /***
827      * Helper class used to maintain consumer information
828      */
829     private static final class ConsumerEntry {
830 
831         /***
832          * An identifier for the consumer.
833          */
834         private final Object _key;
835 
836         /***
837          * The destination that the consumer is subscribed to.
838          */
839         private final JmsDestination _destination;
840 
841         /***
842          * The client identifier. May be <code>null</code>.
843          */
844         private final String _clientID;
845 
846         /***
847          * Indicated whether this entry is for a durable subscriber
848          */
849         private final boolean _durable;
850 
851 
852         /***
853          * Construct a new <code>ConsumerEntry</code>.
854          *
855          * @param key         an identifier for the consumer
856          * @param destination the destination consumer is subscribed to
857          * @param clientID    the client identifier. May be <code>null</code>
858          * @param durable     indicates whether it is a durable subscription
859          */
860         public ConsumerEntry(Object key, JmsDestination destination,
861                              String clientID, boolean durable) {
862             _key = key;
863             _destination = destination;
864             _clientID = clientID;
865             _durable = durable;
866         }
867 
868         public boolean equals(Object obj) {
869             boolean result = false;
870             if (obj instanceof ConsumerEntry) {
871                 result = ((ConsumerEntry) obj)._key.equals(_key);
872             }
873 
874             return result;
875         }
876 
877         public Object getKey() {
878             return _key;
879         }
880 
881         public String getName() {
882             return (_key instanceof String) ? (String) _key : null;
883         }
884 
885         public JmsDestination getDestination() {
886             return _destination;
887         }
888 
889         public String getClientID() {
890             return _clientID;
891         }
892 
893         public boolean isDurable() {
894             return _durable;
895         }
896 
897         /***
898          * Helper to return a key for identifying {@link ConsumerEndpoint}
899          * instances. This returns the consumers persistent identifier if it has
900          * one; if not, it returns its transient identifier.
901          *
902          * @param consumer the consumer
903          * @return a key for identifying <code>consumer</code>
904          */
905         public static Object getConsumerKey(ConsumerEndpoint consumer) {
906             Object key = null;
907             String id = consumer.getPersistentId();
908             if (id != null) {
909                 key = id;
910             } else {
911                 key = new Long(consumer.getId());
912             }
913             return key;
914         }
915     }
916 
917 }
918