1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: ConsumerManagerImpl.java,v 1.3 2005/12/23 12:17:25 tanderson Exp $
44 */
45 package org.exolab.jms.messagemgr;
46
47 import java.sql.Connection;
48 import java.util.ArrayList;
49 import java.util.HashMap;
50 import java.util.Iterator;
51 import java.util.List;
52 import javax.jms.InvalidDestinationException;
53 import javax.jms.InvalidSelectorException;
54 import javax.jms.JMSException;
55
56 import org.apache.commons.logging.Log;
57 import org.apache.commons.logging.LogFactory;
58
59 import org.exolab.jms.client.JmsDestination;
60 import org.exolab.jms.client.JmsQueue;
61 import org.exolab.jms.client.JmsTopic;
62 import org.exolab.jms.persistence.DatabaseService;
63 import org.exolab.jms.persistence.PersistenceAdapter;
64 import org.exolab.jms.persistence.PersistenceException;
65 import org.exolab.jms.service.Service;
66 import org.exolab.jms.service.ServiceException;
67
68
69 /***
70 * The consumer manager is responsible for creating and managing the lifecycle
71 * of consumers. The consumer manager maintains a list of all active consumers.
72 *
73 * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a>
74 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
75 * @version $Revision: 1.3 $ $Date: 2005/12/23 12:17:25 $
76 */
77 public class ConsumerManagerImpl extends Service implements ConsumerManager {
78
79 /***
80 * The destination manager.
81 */
82 private final DestinationManager _destinations;
83
84 /***
85 * The database service.
86 */
87 private final DatabaseService _database;
88
89 /***
90 * Maintains a list of all consumers, durable and non-durable. All durable
91 * subscribers are maintained in memory until they are removed from the
92 * system entirely. All non-durable subscribers are maintained in memory
93 * until their endpoint is removed.
94 */
95 private HashMap _consumers = new HashMap();
96
97 /***
98 * The set of all consumer endpoints. This is a map of {@link
99 * ConsumerEndpoint} instances, keyed on {@link ConsumerEndpoint#getPersistentId()}
100 * if non null; otherwise {@link ConsumerEndpoint#getId()}.
101 */
102 private HashMap _endpoints = new HashMap();
103
104
105 /***
106 * Maintains a mapping between destinations and consumers. A destination can
107 * have more than one consumer and a consumer can also be registered to more
108 * than one destination
109 */
110 private HashMap _destToConsumerMap = new HashMap();
111
112 /***
113 * The set of all wildcard consumers, represented by a map of ConsumerEntry
114 * -> JmsTopic instances.
115 */
116 private HashMap _wildcardConsumers = new HashMap();
117
118 /***
119 * The seed to allocate identifiers to new consumers.
120 */
121 private long _consumerIdSeed = 0;
122
123 /***
124 * The logger.
125 */
126 private static final Log _log = LogFactory.getLog(
127 ConsumerManagerImpl.class);
128
129
130 /***
131 * Construct a new <code>ConsumerManager</code>.
132 *
133 * @param destinations the destination manager
134 * @param database the database service
135 */
136 public ConsumerManagerImpl(DestinationManager destinations,
137 DatabaseService database) {
138 if (destinations == null) {
139 throw new IllegalArgumentException(
140 "Argument 'destinations' is null");
141 }
142 if (database == null) {
143 throw new IllegalArgumentException("Argument 'database' is null");
144 }
145 _destinations = destinations;
146 _database = database;
147 }
148
149 /***
150 * Create a new durable subscription.
151 * <p/>
152 * A client can change an existing durable subscription by creating a new
153 * subscription with the same name and a new topic. Changing a durable
154 * subscriber is equivalent to unsubscribing the old one and creating a new
155 * one.
156 *
157 * @param topic the topic to subscribe to
158 * @param name the subscription name
159 * @param clientID the client identifier. May be <code>null</code>
160 * @throws InvalidDestinationException if <code>topic</code> is not a
161 * persistent destination, or
162 * <code>name</code> is an invalid
163 * subscription name
164 * @throws JMSException if the durable consumer can't be
165 * created
166 */
167 public synchronized void subscribe(JmsTopic topic, String name,
168 String clientID)
169 throws JMSException {
170 createInactiveDurableConsumer(topic, name, clientID);
171 }
172
173
174 /***
175 * Remove a durable subscription.
176 * <p/>
177 * A subscription may only be removed if the associated {@link
178 * DurableConsumerEndpoint} is inactive.
179 *
180 * @param name the subscription name
181 * @param clientID the client identifier. May be <code>null</code>
182 * @throws InvalidDestinationException if an invalid subscription name is
183 * specified.
184 * @throws JMSException if the durable consumer is active, or
185 * cannot be removed
186 */
187 public synchronized void unsubscribe(String name, String clientID)
188 throws JMSException {
189 if (_log.isDebugEnabled()) {
190 _log.debug("unsubscribe(name=" + name + ", clientID="
191 + clientID + ")");
192 }
193
194 DurableConsumerEndpoint consumer
195 = (DurableConsumerEndpoint) _endpoints.remove(name);
196 if (consumer == null) {
197 throw new InvalidDestinationException("Durable consumer " + name
198 + " is not defined.");
199 }
200 if (consumer.isActive()) {
201 throw new JMSException("Cannot remove durable consumer=" + name
202 + ": consumer is active");
203 }
204 consumer.close();
205
206
207 try {
208 _database.begin();
209 Connection connection = _database.getConnection();
210
211 _database.getAdapter().removeDurableConsumer(connection, name);
212 removeConsumerEntry(name);
213 _database.commit();
214 } catch (PersistenceException exception) {
215 String msg = "Failed to remove durable consumer, name=" + name;
216 rethrow(msg, exception);
217 }
218 }
219
220 /***
221 * Remove all durable subscriptions for a destination.
222 * <p/>
223 * Subscriptions may only be removed if the associated {@link
224 * ConsumerEndpoint}s are inactive.
225 *
226 * @param topic the topic to remove consumers for
227 * @throws JMSException if the subscriptions can't be removed
228 */
229 public synchronized void unsubscribe(JmsTopic topic) throws JMSException {
230 List list = (List) _destToConsumerMap.get(topic);
231 if (list != null) {
232 ConsumerEntry[] consumers
233 = (ConsumerEntry[]) list.toArray(new ConsumerEntry[0]);
234 for (int i = 0; i < consumers.length; ++i) {
235 ConsumerEntry consumer = consumers[i];
236 if (consumer.isDurable()) {
237
238
239 unsubscribe(consumer.getName(), consumer.getClientID());
240 }
241 }
242 }
243
244
245 removeFromConsumerCache(topic);
246 }
247
248 /***
249 * Create a transient consumer for the specified destination.
250 *
251 * @param destination the destination to consume messages from
252 * @param connectionId the identity of the connection that owns this
253 * consumer
254 * @param selector the message selector. May be <code>null</code>
255 * @param noLocal if true, and the destination is a topic, inhibits the
256 * delivery of messages published by its own connection.
257 * The behavior for <code>noLocal</code> is not
258 * specified if the destination is a queue.
259 * @return a new transient consumer
260 */
261 public synchronized ConsumerEndpoint createConsumer(
262 JmsDestination destination, long connectionId,
263 String selector,
264 boolean noLocal)
265 throws JMSException, InvalidSelectorException {
266
267 if (_log.isDebugEnabled()) {
268 _log.debug("createConsumerEndpoint(destination=" + destination
269 + ", connectionId=" + connectionId
270 + ", selector=" + selector
271 + ", noLocal=" + noLocal + ")");
272 }
273
274 ConsumerEndpoint consumer = null;
275
276
277 getDestination(destination, true);
278
279 long consumerId = getNextConsumerId();
280
281 try {
282 _database.begin();
283
284
285 if (destination instanceof JmsTopic) {
286 JmsTopic topic = (JmsTopic) destination;
287 consumer = new TopicConsumerEndpoint(consumerId, connectionId,
288 topic, selector, noLocal,
289 _destinations);
290 } else if (destination instanceof JmsQueue) {
291 QueueDestinationCache cache;
292 cache = (QueueDestinationCache) _destinations.getDestinationCache(
293 destination);
294 consumer = new QueueConsumerEndpoint(consumerId, cache, selector);
295 }
296
297 if (consumer != null) {
298
299
300
301 Object key = ConsumerEntry.getConsumerKey(consumer);
302 _endpoints.put(key, consumer);
303 addConsumerEntry(key, destination, null, false);
304 }
305 _database.commit();
306 } catch (Exception exception) {
307 rethrow("Failed to create consumer", exception);
308 }
309
310 return consumer;
311 }
312
313 /***
314 * Create a durable consumer.
315 *
316 * @param topic the topic to subscribe to
317 * @param name the subscription name
318 * @param clientID the client identifier. May be <code>null</code>.
319 * @param connectionId the identity of the connection that owns this
320 * consumer
321 * @param noLocal if true, and the destination is a topic, inhibits the
322 * delivery of messages published by its own
323 * connection.
324 * @param selector the message selector. May be <code>null</code>
325 * @return the durable consumer endpoint
326 * @throws InvalidDestinationException if <code>topic</code> is not a
327 * persistent destination
328 * @throws InvalidSelectorException if the selector is not well formed
329 * @throws JMSException if a durable consumer is already
330 * active with the same <code>name</code>
331 */
332 public synchronized DurableConsumerEndpoint createDurableConsumer(
333 JmsTopic topic, String name, String clientID, long connectionId,
334 boolean noLocal,
335 String selector)
336 throws JMSException {
337
338 if (_log.isDebugEnabled()) {
339 _log.debug("createDurableConsumer(topic=" + topic
340 + ", name=" + name + ", connectionId=" + connectionId
341 + ", selector=" + selector + ", noLocal=" + noLocal
342 + ")");
343 }
344
345 DurableConsumerEndpoint consumer
346 = createInactiveDurableConsumer(topic, name, clientID);
347 consumer.activate(connectionId, selector, noLocal);
348
349 return consumer;
350 }
351
352 /***
353 * Create a browser for the specified destination and the selector. A
354 * browser is responsible for passing all messages back to the client that
355 * reside on the queue
356 *
357 * @param queue the queue to browse
358 * @param selector the message selector. May be <code>null</code>
359 * @return the queue browser endpoint
360 * @throws JMSException if the browser can't be created
361 */
362 public synchronized ConsumerEndpoint createQueueBrowser(JmsQueue queue,
363 String selector)
364 throws JMSException {
365
366
367 getDestination(queue, true);
368
369 long consumerId = getNextConsumerId();
370
371
372 ConsumerEndpoint consumer = null;
373 try {
374 _database.begin();
375 QueueDestinationCache cache;
376 cache = (QueueDestinationCache) _destinations.getDestinationCache(
377 queue);
378 consumer = new QueueBrowserEndpoint(consumerId, cache, selector);
379 Object key = ConsumerEntry.getConsumerKey(consumer);
380 _endpoints.put(key, consumer);
381 addConsumerEntry(key, queue, null, false);
382 _database.commit();
383 } catch (Exception exception) {
384 rethrow("Failed to create browser", exception);
385 }
386
387 return consumer;
388 }
389
390 /***
391 * Close a consumer.
392 *
393 * @param consumer the consumer to close
394 */
395 public synchronized void closeConsumer(ConsumerEndpoint consumer) {
396 if (_log.isDebugEnabled()) {
397 _log.debug("closeConsumerEndpoint(consumer=[Id="
398 + consumer.getId() + ", destination="
399 + consumer.getDestination() + ")");
400 }
401
402 Object key = ConsumerEntry.getConsumerKey(consumer);
403
404 ConsumerEndpoint existing = (ConsumerEndpoint) _endpoints.get(key);
405 if (existing != null) {
406 try {
407 _database.begin();
408 if (consumer.getId() != existing.getId()) {
409
410
411
412 _log.error("Existing endpoint doesn't match that to be closed "
413 + "- retaining");
414 } else if (existing instanceof DurableConsumerEndpoint) {
415 DurableConsumerEndpoint durable
416 = (DurableConsumerEndpoint) existing;
417 if (durable.isActive()) {
418 try {
419 durable.deactivate();
420 } catch (JMSException exception) {
421 _log.error("Failed to deactivate durable consumer="
422 + durable, exception);
423 }
424 }
425 } else {
426 _endpoints.remove(key);
427 consumer.close();
428 removeConsumerEntry(key);
429 }
430 _database.commit();
431 } catch (PersistenceException exception) {
432 _log.error("Failed to close consumer=" + consumer, exception);
433 rollback();
434 }
435 }
436 }
437
438 /***
439 * Return the consumer with the specified identity.
440 *
441 * @param consumerId the identity of the consumer
442 * @return the associated consumer, or <code>null</code> if none exists
443 */
444 public synchronized ConsumerEndpoint getConsumerEndpoint(long consumerId) {
445 return (ConsumerEndpoint) _endpoints.get(new Long(consumerId));
446 }
447
448 /***
449 * Return the consumer with the specified persistent identity.
450 *
451 * @param persistentId the persistent identity of the consumer
452 * @return the associated consumer, or <code>null</code> if none exists
453 */
454 public synchronized ConsumerEndpoint getConsumerEndpoint(
455 String persistentId) {
456 return (ConsumerEndpoint) _endpoints.get(persistentId);
457 }
458
459 /***
460 * Determines if there are any active consumers for a destination.
461 *
462 * @param destination the destination
463 * @return <code>true</code> if there is at least one consumer
464 */
465 public synchronized boolean hasActiveConsumers(JmsDestination destination) {
466 boolean result = false;
467 ConsumerEndpoint[] consumers = getConsumers();
468 for (int i = 0; i < consumers.length; ++i) {
469 if (consumers[i].canConsume(destination)) {
470 result = true;
471 break;
472 }
473 }
474 return result;
475 }
476
477 /***
478 * Start the service.
479 *
480 * @throws ServiceException if the service fails to start
481 */
482 protected void doStart() throws ServiceException {
483 try {
484 _database.begin();
485 Connection connection = _database.getConnection();
486
487 PersistenceAdapter adapter = _database.getAdapter();
488
489
490 HashMap map = adapter.getAllDurableConsumers(connection);
491 Iterator iter = map.keySet().iterator();
492
493
494 while (iter.hasNext()) {
495 String consumer = (String) iter.next();
496 String deststr = (String) map.get(consumer);
497
498 JmsDestination dest = _destinations.getDestination(deststr);
499 if (dest == null) {
500
501 dest = new JmsTopic(deststr);
502 if (!((JmsTopic) dest).isWildCard()) {
503 dest = null;
504 }
505 }
506
507 if (consumer != null && dest != null &&
508 dest instanceof JmsTopic) {
509
510 addDurableConsumer((JmsTopic) dest, consumer, null);
511 } else {
512
513 _log.error("Failure in ConsumerManager.init : " + consumer +
514 ":" + dest);
515 }
516 }
517 _database.commit();
518 } catch (Exception exception) {
519 rollback();
520 throw new ServiceException("Failed to initialise ConsumerManager",
521 exception);
522 }
523 }
524
525 /***
526 * Stop the service.
527 */
528 protected synchronized void doStop() {
529
530 Object[] endpoints = _endpoints.values().toArray();
531 for (int index = 0; index < endpoints.length; index++) {
532 closeConsumer((ConsumerEndpoint) endpoints[index]);
533 }
534 _endpoints.clear();
535
536
537 _consumers.clear();
538 _destToConsumerMap.clear();
539 _wildcardConsumers.clear();
540 }
541
542 /***
543 * Create an inactive durable consumer.
544 * <p/>
545 * If the consumer doesn't exist, it will created in the persistent store.
546 * If it does exist, and is inactive, it will be recreated. If it does
547 * exist, but is active, an exception will be raised.
548 *
549 * @param topic the topic to subscribe to
550 * @param name the subscription name
551 * @param clientID the client identifier. May be <code>null</code>.
552 * @return the durable consumer
553 * @throws InvalidDestinationException if <code>topic</code> is not a
554 * persistent destination
555 * @throws JMSException if a durable consumer is already
556 * active with the same <code>name</code>,
557 * or the consumer can't be created
558 */
559 private DurableConsumerEndpoint createInactiveDurableConsumer(
560 JmsTopic topic, String name, String clientID)
561 throws JMSException {
562 DurableConsumerEndpoint endpoint;
563
564 if (_log.isDebugEnabled()) {
565 _log.debug("createInactiveDurableConsumer(topic=" + topic
566 + ", name=" + name + ", clientID=" + clientID + ")");
567 }
568
569
570 if (!topic.isWildCard()) {
571 topic = (JmsTopic) getDestination(topic, false);
572 }
573
574 if (name == null || name.length() == 0) {
575 throw new InvalidDestinationException(
576 "Invalid subscription name: " + name);
577 }
578 endpoint = (DurableConsumerEndpoint) _endpoints.get(name);
579 if (endpoint != null) {
580 if (endpoint.isActive()) {
581 throw new JMSException(
582 "Durable subscriber already exists with name: " + name);
583 }
584 if (!endpoint.getDestination().equals(topic)) {
585
586 unsubscribe(name, clientID);
587 endpoint = null;
588 }
589 }
590 if (endpoint == null) {
591 try {
592 _database.begin();
593 PersistenceAdapter adapter = _database.getAdapter();
594 Connection connection = _database.getConnection();
595 adapter.addDurableConsumer(connection, topic.getName(), name);
596 endpoint = addDurableConsumer(topic, name, clientID);
597 _database.commit();
598 } catch (Exception exception) {
599 String msg = "Failed to create durable consumer, name=" + name
600 + ", for topic=" + topic.getName();
601 rethrow(msg, exception);
602 }
603 }
604
605 return endpoint;
606 }
607
608 /***
609 * Register an inactive durable consumer.
610 *
611 * @param topic the topic to subscribe to
612 * @param name the subscription name
613 * @param clientID the client identifier. May be <code>null</code>.
614 * @return the durable consumer
615 * @throws JMSException for any JMS error
616 * @throws PersistenceException for any persistence error
617 */
618 private DurableConsumerEndpoint addDurableConsumer(JmsTopic topic,
619 String name,
620 String clientID)
621 throws JMSException, PersistenceException {
622 DurableConsumerEndpoint consumer;
623
624 addConsumerEntry(name, topic, clientID, true);
625
626 long consumerId = getNextConsumerId();
627 consumer = new DurableConsumerEndpoint(consumerId, topic, name,
628 _destinations);
629 _endpoints.put(consumer.getPersistentId(), consumer);
630 return consumer;
631 }
632
633 /***
634 * Add a consumer entry.
635 *
636 * @param key a key to identify the entry.
637 * @param destination the destination it is subscribed to. It can be a
638 * wildcard
639 * @param clientID the client identifier. May be <code>null</code>
640 * @param durable indicates whether it is a durable subscription
641 * @throws JMSException if key specifies a duplicate entry.
642 */
643 private void addConsumerEntry(Object key, JmsDestination destination,
644 String clientID,
645 boolean durable)
646 throws JMSException {
647 if (_log.isDebugEnabled()) {
648 _log.debug("addConsumerEntry(key=" + key + ", destination="
649 + destination + ", clientID=" + clientID
650 + ", durable=" + durable + ")");
651 }
652
653 if (_consumers.containsKey(key)) {
654 throw new JMSException("Duplicate consumer key:" + key);
655 }
656
657 ConsumerEntry entry = new ConsumerEntry(key, destination, clientID,
658 durable);
659 _consumers.put(key, entry);
660
661 if (destination instanceof JmsTopic
662 && ((JmsTopic) destination).isWildCard()) {
663
664
665 _wildcardConsumers.put(entry, destination);
666 } else {
667
668 List consumers = (List) _destToConsumerMap.get(destination);
669 if (consumers == null) {
670 consumers = new ArrayList();
671 _destToConsumerMap.put(destination, consumers);
672 }
673
674
675 consumers.add(entry);
676 }
677 }
678
679 /***
680 * Remove the specified consumer from the cache.
681 *
682 * @param key the consumer key
683 */
684 private void removeConsumerEntry(Object key) {
685 if (_log.isDebugEnabled()) {
686 _log.debug("removeConsumerEntry(key=" + key + ")");
687 }
688
689 ConsumerEntry entry = (ConsumerEntry) _consumers.remove(key);
690 if (entry != null) {
691 JmsDestination dest = entry.getDestination();
692
693 if (dest instanceof JmsTopic && ((JmsTopic) dest).isWildCard()) {
694
695 _wildcardConsumers.remove(entry);
696 } else {
697
698 List consumers = (List) _destToConsumerMap.get(dest);
699 if (consumers != null) {
700 consumers.remove(entry);
701
702
703 if (consumers.isEmpty()) {
704 _destToConsumerMap.remove(dest);
705 }
706 }
707 }
708 } else if (_log.isDebugEnabled()) {
709 _log.debug("removeConsumerEntry(key=" + key
710 + "): consumer not found");
711 }
712 }
713
714 /***
715 * Remove all the consumers for the specified destination from the cache.
716 *
717 * @param destination the destination to remove
718 */
719 private void removeFromConsumerCache(JmsDestination destination) {
720 _destToConsumerMap.remove(destination);
721 }
722
723 /***
724 * Returns the next seed value to be allocated to a new consumer.
725 *
726 * @return a unique identifier for a consumer
727 */
728 private long getNextConsumerId() {
729 return ++_consumerIdSeed;
730 }
731
732 /***
733 * Returns the destination managed by {@link DestinationManager}
734 * corresponding to that supplied, creating it if needed.
735 *
736 * @param destination the destination to look up
737 * @param create if <code>true</code> the destination may be created if
738 * it doesn't exist
739 * @return the destination managed by {@link DestinationManager}
740 * corresponding to <code>destination</code>.
741 * @throws InvalidDestinationException if the destination doesn't exist and
742 * <code>create</code> is false; or the
743 * destination's properties don't match
744 * the existing destination
745 * @throws JMSException if the destination can't be created
746 */
747 private JmsDestination getDestination(JmsDestination destination,
748 boolean create)
749 throws InvalidDestinationException, JMSException {
750 final String name = destination.getName();
751 JmsDestination result;
752 JmsDestination existing = _destinations.getDestination(name);
753
754 if (existing == null) {
755 if (!create) {
756 throw new InvalidDestinationException(
757 "No destination with name=" + name + " exists");
758 }
759
760 _destinations.createDestination(destination);
761 result = _destinations.getDestination(destination.getName());
762 } else {
763
764
765 if (!destination.getClass().getName().equals(
766 existing.getClass().getName())) {
767 throw new InvalidDestinationException(
768 "Mismatched destination properties for destination"
769 + " with name=" + name);
770 }
771 if (existing.getPersistent() != destination.getPersistent()) {
772 throw new InvalidDestinationException(
773 "Mismatched destination properties for destination"
774 + " with name=" + name);
775 }
776 result = existing;
777 }
778 return result;
779 }
780
781 /***
782 * Returns the consumers managed by this.
783 *
784 * @return an array of consumers
785 */
786 private ConsumerEndpoint[] getConsumers() {
787 return (ConsumerEndpoint[]) _endpoints.values().toArray(
788 new ConsumerEndpoint[0]);
789 }
790
791 /***
792 * Rollback any transaction.
793 */
794 private void rollback() {
795 try {
796 if (_database.isTransacted()) {
797 _database.rollback();
798 }
799 } catch (PersistenceException error) {
800 _log.warn("Failed to rollback after error", error);
801 }
802 }
803
804 /***
805 * Helper to clean up after a failed call, and rethrow.
806 * Any transaction will be rolled back.
807 *
808 * @param message the message to log
809 * @param exception the exception
810 * @throws JMSException the original exception adapted to a
811 * <code>JMSException</code> if necessary
812 */
813 private void rethrow(String message, Exception exception)
814 throws JMSException {
815 rollback();
816
817 if (exception instanceof JMSException) {
818 _log.debug(message, exception);
819 throw (JMSException) exception;
820 }
821
822 _log.error(message, exception);
823 throw new JMSException(exception.getMessage());
824 }
825
826 /***
827 * Helper class used to maintain consumer information
828 */
829 private static final class ConsumerEntry {
830
831 /***
832 * An identifier for the consumer.
833 */
834 private final Object _key;
835
836 /***
837 * The destination that the consumer is subscribed to.
838 */
839 private final JmsDestination _destination;
840
841 /***
842 * The client identifier. May be <code>null</code>.
843 */
844 private final String _clientID;
845
846 /***
847 * Indicated whether this entry is for a durable subscriber
848 */
849 private final boolean _durable;
850
851
852 /***
853 * Construct a new <code>ConsumerEntry</code>.
854 *
855 * @param key an identifier for the consumer
856 * @param destination the destination consumer is subscribed to
857 * @param clientID the client identifier. May be <code>null</code>
858 * @param durable indicates whether it is a durable subscription
859 */
860 public ConsumerEntry(Object key, JmsDestination destination,
861 String clientID, boolean durable) {
862 _key = key;
863 _destination = destination;
864 _clientID = clientID;
865 _durable = durable;
866 }
867
868 public boolean equals(Object obj) {
869 boolean result = false;
870 if (obj instanceof ConsumerEntry) {
871 result = ((ConsumerEntry) obj)._key.equals(_key);
872 }
873
874 return result;
875 }
876
877 public Object getKey() {
878 return _key;
879 }
880
881 public String getName() {
882 return (_key instanceof String) ? (String) _key : null;
883 }
884
885 public JmsDestination getDestination() {
886 return _destination;
887 }
888
889 public String getClientID() {
890 return _clientID;
891 }
892
893 public boolean isDurable() {
894 return _durable;
895 }
896
897 /***
898 * Helper to return a key for identifying {@link ConsumerEndpoint}
899 * instances. This returns the consumers persistent identifier if it has
900 * one; if not, it returns its transient identifier.
901 *
902 * @param consumer the consumer
903 * @return a key for identifying <code>consumer</code>
904 */
905 public static Object getConsumerKey(ConsumerEndpoint consumer) {
906 Object key = null;
907 String id = consumer.getPersistentId();
908 if (id != null) {
909 key = id;
910 } else {
911 key = new Long(consumer.getId());
912 }
913 return key;
914 }
915 }
916
917 }
918