1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: DestinationManagerImpl.java,v 1.2 2005/11/12 10:49:48 tanderson Exp $
44 */
45 package org.exolab.jms.messagemgr;
46
47 import java.sql.Connection;
48 import java.util.Enumeration;
49 import java.util.HashMap;
50 import java.util.Iterator;
51 import java.util.LinkedList;
52 import java.util.Map;
53 import java.util.List;
54 import java.util.ArrayList;
55 import javax.jms.InvalidDestinationException;
56 import javax.jms.JMSException;
57
58 import org.apache.commons.logging.Log;
59 import org.apache.commons.logging.LogFactory;
60
61 import org.exolab.jms.client.JmsDestination;
62 import org.exolab.jms.client.JmsQueue;
63 import org.exolab.jms.client.JmsTopic;
64 import org.exolab.jms.gc.GarbageCollectionService;
65 import org.exolab.jms.message.MessageImpl;
66 import org.exolab.jms.persistence.DatabaseService;
67 import org.exolab.jms.persistence.PersistenceAdapter;
68 import org.exolab.jms.persistence.PersistenceException;
69 import org.exolab.jms.service.Service;
70 import org.exolab.jms.service.ServiceException;
71
72
73 /***
74 * The destination manager is responsible for creating and managing the
75 * lifecycle of {@link DestinationCache} objects. The destination manager is
76 * also responsible for managing messages, that are received by the message
77 * manager, which do not have any registered {@link DestinationCache}.
78 *
79 * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a>
80 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
81 * @version $Revision: 1.2 $ $Date: 2005/11/12 10:49:48 $
82 */
83 public class DestinationManagerImpl extends Service
84 implements DestinationManager {
85
86 /***
87 * The set of persistent and non-persistent destinations, keyed on name.
88 */
89 private final HashMap _destinations = new HashMap();
90
91 /***
92 * The set of active DestinationCache instances, keyed on destination.
93 */
94 private final HashMap _caches = new HashMap();
95
96 /***
97 * Synchronization helper. Should be synchronized on whenever accessing
98 * _destinations, or _caches
99 */
100 private final Object _lock = _destinations;
101
102 /***
103 * Maintains a linked list of DestinationEventListener objects. These
104 * listeners will be informed when destinations are added or destroyed.
105 */
106 private LinkedList _listeners = new LinkedList();
107
108 /***
109 * The message manager.
110 */
111 private final MessageManager _messages;
112
113 /***
114 * The destination cache factory.
115 */
116 private final DestinationCacheFactory _factory;
117
118 /***
119 * The consumer manager.
120 */
121 private ConsumerManager _consumers;
122
123 /***
124 * The database service.
125 */
126 private final DatabaseService _database;
127
128 /***
129 * The garbage collection service.
130 */
131 private final GarbageCollectionService _collector;
132
133 /***
134 * The logger.
135 */
136 private static final Log _log =
137 LogFactory.getLog(DestinationManagerImpl.class);
138
139 /***
140 * Construct a new <code>DestinationManagerImpl</code>.
141 *
142 * @param messages the message manager
143 * @param factory the destination cache factory
144 * @param database the database service
145 * @param collector the garbage collection service
146 */
147 public DestinationManagerImpl(MessageManager messages,
148 DestinationCacheFactory factory,
149 DatabaseService database,
150 GarbageCollectionService collector) {
151 if (messages == null) {
152 throw new IllegalArgumentException("Argument 'messages' is null");
153 }
154 if (factory == null) {
155 throw new IllegalArgumentException("Argument 'factory' is null");
156 }
157 if (database == null) {
158 throw new IllegalArgumentException("Argument 'database' is null");
159 }
160 if (collector == null) {
161 throw new IllegalArgumentException("Argument 'collector' is null");
162 }
163 _messages = messages;
164 _factory = factory;
165 _database = database;
166 _collector = collector;
167 }
168
169 /***
170 * Sets the consumer manager.
171 *
172 * @param consumers the consumer manager
173 */
174 public void setConsumerManager(ConsumerManager consumers) {
175 _consumers = consumers;
176 }
177
178 /***
179 * Returns the cache for the supplied destination.
180 * <p/>
181 * If the cache doesn't exist, it will be created, and any registered {@link
182 * DestinationEventListener}s will be notified.
183 *
184 * @param destination the destination of the cache to return
185 * @return the cache associated with <code>destination</code>
186 * @throws InvalidDestinationException if <code>destination</code> doesn't
187 * exist
188 * @throws JMSException if the cache can't be created
189 */
190 public DestinationCache getDestinationCache(JmsDestination destination)
191 throws JMSException {
192 DestinationCache result;
193 boolean created = false;
194
195 synchronized (_lock) {
196 final String name = destination.getName();
197
198
199 destination = getExistingDestination(name);
200
201 result = (DestinationCache) _caches.get(destination);
202 if (result == null) {
203 checkWildcard(destination);
204 result = _factory.createDestinationCache(destination);
205 _caches.put(destination, result);
206 _messages.addEventListener(destination, result);
207 created = true;
208 }
209 }
210
211 if (created) {
212
213
214 notifyCacheAdded(result);
215 }
216
217 return result;
218 }
219
220 /***
221 * Returns a destination given its name.
222 *
223 * @param name the name of the destination
224 * @return the destination corresponding to <code>name</code> or
225 * <code>null</code> if none exists
226 */
227 public JmsDestination getDestination(String name) {
228 synchronized (_lock) {
229 return (JmsDestination) _destinations.get(name);
230 }
231 }
232
233 /***
234 * Register an event listener to be notified when destinations are created
235 * and destroyed.
236 *
237 * @param listener the listener to add
238 */
239 public void addDestinationEventListener(DestinationEventListener listener) {
240 synchronized (_listeners) {
241 if (!_listeners.contains(listener)) {
242 _listeners.add(listener);
243 }
244 }
245 }
246
247 /***
248 * Remove an event listener.
249 *
250 * @param listener the listener to remove
251 */
252 public void removeDestinationEventListener(
253 DestinationEventListener listener) {
254 synchronized (_listeners) {
255 _listeners.remove(listener);
256 }
257 }
258
259 /***
260 * Create a destination.
261 * <p/>
262 * Any registered {@link DestinationEventListener}s will be notified.
263 *
264 * @param destination the destination to create
265 * @throws InvalidDestinationException if the destination already exists or
266 * is a wildcard destination
267 * @throws JMSException if the destination can't be created
268 */
269 public void createDestination(JmsDestination destination)
270 throws JMSException {
271 checkWildcard(destination);
272 synchronized (_lock) {
273 if (exists(destination.getName())) {
274 throw new InvalidDestinationException(
275 "Destination already exists: " + destination.getName());
276 }
277 if (destination.getPersistent()) {
278 createPersistentDestination(destination);
279 }
280 addToDestinations(destination);
281 }
282
283 notifyDestinationAdded(destination);
284 }
285
286 /***
287 * Remove a destination.
288 * <p/>
289 * All messages and durable consumers will be removed. Any registered {@link
290 * DestinationEventListener}s will be notified.
291 *
292 * @param destination the destination to remove
293 * @throws InvalidDestinationException if the destination is invalid
294 * @throws JMSException if the destination can't be removed
295 */
296 public void removeDestination(JmsDestination destination)
297 throws JMSException {
298
299 if (_log.isDebugEnabled()) {
300 _log.debug("removeDestination(destination=" + destination + ")");
301 }
302
303
304 destination = getExistingDestination(destination.getName());
305
306 boolean queue = (destination instanceof JmsQueue) ? true : false;
307
308 if (!queue) {
309
310
311 _consumers.unsubscribe((JmsTopic) destination);
312 }
313
314 synchronized (_lock) {
315 DestinationCache cache =
316 (DestinationCache) _caches.get(destination);
317
318 if (cache != null && cache.hasConsumers()) {
319 throw new JMSException("Cannot delete destination"
320 + destination + " since there are "
321 + " active consumers.");
322 }
323 }
324
325
326
327
328 try {
329 _database.begin();
330 Connection connection = _database.getConnection();
331
332 _database.getAdapter().removeDestination(connection,
333 destination.getName());
334 destroyDestinationCache(destination);
335 removeFromDestinations(destination);
336 _database.commit();
337 } catch (Exception exception) {
338 String msg = "Failed to remove destination "
339 + destination.getName();
340 cleanup(msg, exception);
341 }
342
343 notifyDestinationRemoved(destination);
344 }
345
346 /***
347 * Invoked when the {@link MessageManager} receives a non-persistent
348 * message.
349 *
350 * @param destination the message's destination
351 * @param message the message
352 * @throws JMSException if the listener fails to handle the message
353 */
354 public void messageAdded(JmsDestination destination,
355 MessageImpl message)
356 throws JMSException {
357 if (destination instanceof JmsTopic) {
358
359
360
361 if (_consumers.hasActiveConsumers(destination)) {
362 if (!exists(destination.getName())) {
363 createDestination(destination);
364 }
365 DestinationCache cache = getDestinationCache(destination);
366 cache.messageAdded(destination, message);
367 }
368 } else {
369
370
371 if (!exists(destination.getName())) {
372 createDestination(destination);
373 }
374 DestinationCache cache = getDestinationCache(destination);
375 cache.messageAdded(destination, message);
376 }
377 }
378
379 /***
380 * Invoked when the {@link MessageManager} receives a persistent message.
381 *
382 * @param destination the message's destination
383 * @param message the message
384 * @throws JMSException if the listener fails to handle the message
385 * @throws PersistenceException if there is a persistence related problem
386 */
387 public void persistentMessageAdded(JmsDestination destination,
388 MessageImpl message)
389 throws JMSException, PersistenceException {
390 DestinationCache cache = getDestinationCache(destination);
391 cache.persistentMessageAdded(destination, message);
392 }
393
394 /***
395 * Returns all destinations.
396 *
397 * @return a list of JmsDestination instances.
398 * @throws JMSException for any JMS error
399 */
400 public List getDestinations() throws JMSException {
401 synchronized (_lock) {
402 return new ArrayList(_destinations.values());
403 }
404 }
405
406 /***
407 * Returns a map of all destinations that match the specified topic.
408 * <p/>
409 * If the topic represents a wildcard then it may match none, one or more
410 * destinations.
411 *
412 * @param topic the topic
413 * @return a map of topics to DestinationCache instances
414 */
415 public Map getTopicDestinationCaches(JmsTopic topic) {
416 HashMap result = new HashMap();
417
418 synchronized (_lock) {
419 Iterator iter = _caches.keySet().iterator();
420 while (iter.hasNext()) {
421 JmsDestination dest = (JmsDestination) iter.next();
422 if ((dest instanceof JmsTopic) &&
423 (topic.match((JmsTopic) dest))) {
424 result.put(dest, _caches.get(dest));
425 }
426 }
427 }
428
429 return result;
430 }
431
432 /***
433 * Perform any garbage collection on this resource. This will have the
434 * effect of releasing system resources. If the 'aggressive' flag is set to
435 * true then the garbage collection should do more to release memory related
436 * resources since it is called when the application memory is low.
437 *
438 * @param aggressive <code>true</code> for aggressive garbage collection
439 */
440 public void collectGarbage(boolean aggressive) {
441 int gcCaches = 0;
442 int gcDestinations = 0;
443
444 DestinationCache[] caches;
445 synchronized (_lock) {
446 caches = (DestinationCache[]) _caches.values().toArray(
447 new DestinationCache[0]);
448 }
449 for (int index = 0; index < caches.length; index++) {
450 DestinationCache cache = caches[index];
451 if (cache.canDestroy()) {
452 if (_log.isDebugEnabled()) {
453 _log.debug("Garbage collecting destination cache="
454 + cache);
455 }
456 destroyDestinationCache(cache);
457 gcCaches++;
458 } else {
459
460
461 cache.collectGarbage(aggressive);
462 }
463 }
464
465
466 synchronized (_lock) {
467 JmsDestination[] destinations
468 = (JmsDestination[]) _destinations.values().toArray(
469 new JmsDestination[0]);
470 for (int i = 0; i < destinations.length; ++i) {
471 JmsDestination dest = destinations[i];
472 if (!dest.getPersistent() && !_caches.containsKey(dest)) {
473 gcDestinations++;
474 _destinations.remove(dest.getName());
475 }
476 }
477
478
479 _log.info("DMGC Collected " + gcCaches + " caches, "
480 + _caches.size()
481 + " remaining.");
482 _log.info("DMGC Collected " + gcDestinations + " destinations, "
483 + _destinations.size() + " remaining.");
484 }
485
486 }
487
488 /***
489 * Start the service.
490 *
491 * @throws ServiceException if the service fails to start
492 */
493 protected void doStart() throws ServiceException {
494 if (_consumers == null) {
495 throw new ServiceException(
496 "ConsumerManager hasn't been initialised");
497 }
498 init();
499 _collector.register(this);
500 }
501
502 /***
503 * Stop the service.
504 *
505 * @throws ServiceException if the service fails to stop
506 */
507 protected void doStop() throws ServiceException {
508 _collector.unregister(this);
509
510 JmsDestination[] destinations;
511 synchronized (_lock) {
512 destinations = (JmsDestination[]) _caches.keySet().toArray(
513 new JmsDestination[0]);
514 }
515 for (int index = 0; index < destinations.length; index++) {
516 destroyDestinationCache(destinations[index]);
517 }
518
519 _caches.clear();
520
521 _destinations.clear();
522
523
524 synchronized (_listeners) {
525 _listeners.clear();
526 }
527 }
528
529 /***
530 * Initialises the destination manager.
531 *
532 * @throws ServiceException if the service cannot be initialised
533 */
534 protected void init() throws ServiceException {
535 Enumeration iter;
536 try {
537 _database.begin();
538 Connection connection = _database.getConnection();
539
540
541 iter = _database.getAdapter().getAllDestinations(connection);
542 _database.commit();
543 } catch (PersistenceException exception) {
544 _log.error(exception, exception);
545 rollback();
546 throw new ServiceException("Failed to get destinations", exception);
547 }
548
549 while (iter.hasMoreElements()) {
550
551 JmsDestination dest = (JmsDestination) iter.nextElement();
552 addToDestinations(dest);
553 }
554 }
555
556 /***
557 * Determines if a destination exists.
558 *
559 * @param name the destination name
560 * @return <code>true</code> if the destination exists, otherwise
561 * <code>false
562 */
563 protected boolean exists(String name) {
564 return getDestination(name) != null;
565 }
566
567 /***
568 * Delete the specfied destination.
569 *
570 * @param cache the destination to destroy
571 */
572 protected void destroyDestinationCache(DestinationCache cache) {
573 destroyDestinationCache(cache.getDestination());
574 }
575
576 /***
577 * Delete the specfied destination.
578 *
579 * @param dest the destination to destroy
580 */
581 protected void destroyDestinationCache(JmsDestination dest) {
582 synchronized (_lock) {
583 DestinationCache cache = (DestinationCache) _caches.remove(dest);
584 if (cache != null) {
585
586 _messages.removeEventListener(dest);
587
588
589
590 notifyCacheRemoved(cache);
591
592 cache.destroy();
593 }
594 }
595 }
596
597 /***
598 * Create a persistent destination.
599 *
600 * @param destination the destination to create
601 * @throws JMSException if the destination cannot be created
602 */
603 private void createPersistentDestination(JmsDestination destination)
604 throws JMSException {
605 if (_log.isDebugEnabled()) {
606 _log.debug("createPersistentDestination(destination="
607 + destination + ")");
608 }
609
610 boolean queue = (destination instanceof JmsQueue) ? true : false;
611 PersistenceAdapter adapter = _database.getAdapter();
612
613
614
615
616
617 try {
618 _database.begin();
619 Connection connection = _database.getConnection();
620 adapter.addDestination(connection, destination.getName(), queue);
621 _database.commit();
622 } catch (Exception exception) {
623 cleanup("Failed to create persistent destination "
624 + destination.getName(), exception);
625 }
626 }
627
628 /***
629 * Notify the list of {@link DestinationEventListener} objects that the
630 * specified destination has been added.
631 *
632 * @param destination the added destination
633 * @throws JMSException if a listener fails to be notified
634 */
635 private void notifyDestinationAdded(JmsDestination destination)
636 throws JMSException {
637 DestinationEventListener[] listeners = getListeners();
638 for (int i = 0; i < listeners.length; ++i) {
639 listeners[i].destinationAdded(destination);
640 }
641 }
642
643 /***
644 * Notify the list of {@link DestinationEventListener} objects that the
645 * specified destination has been removed.
646 *
647 * @param destination the added destination
648 * @throws JMSException if a listeners fails to be notified
649 */
650 private void notifyDestinationRemoved(JmsDestination destination)
651 throws JMSException {
652 DestinationEventListener[] listeners = getListeners();
653 for (int i = 0; i < listeners.length; ++i) {
654 listeners[i].destinationRemoved(destination);
655 }
656 }
657
658 /***
659 * Notify the list of {@link DestinationEventListener} objects that the
660 * specified message cache has been added.
661 *
662 * @param cache the added cache
663 */
664 private void notifyCacheAdded(DestinationCache cache) {
665 JmsDestination destination = cache.getDestination();
666 DestinationEventListener[] listeners = getListeners();
667 for (int i = 0; i < listeners.length; ++i) {
668 listeners[i].cacheAdded(destination, cache);
669 }
670 }
671
672 /***
673 * Notify the list of {@link DestinationEventListener} objects that the
674 * specified message cache has been removed.
675 *
676 * @param cache the added cache
677 */
678 private void notifyCacheRemoved(DestinationCache cache) {
679 JmsDestination destination = cache.getDestination();
680 DestinationEventListener[] listeners = getListeners();
681 for (int i = 0; i < listeners.length; ++i) {
682 listeners[i].cacheRemoved(destination, cache);
683 }
684 }
685
686 /***
687 * Add the specified destination to the destination cache.
688 *
689 * @param destination the destination to add
690 */
691 private void addToDestinations(JmsDestination destination) {
692 synchronized (_lock) {
693 if (!_destinations.containsKey(destination.getName())) {
694 _destinations.put(destination.getName(), destination);
695 }
696 }
697 }
698
699 /***
700 * Remove the specified destination from the cache.
701 *
702 * @param destination the destination to remove
703 */
704 private void removeFromDestinations(JmsDestination destination) {
705 synchronized (_lock) {
706 _destinations.remove(destination.getName());
707 }
708 }
709
710 /***
711 * Returns a destination given its name.
712 *
713 * @param name the name of the destination
714 * @return the destination corresponding to <code>name</code>
715 * @throws InvalidDestinationException if the named destination doesn't
716 * exist
717 */
718 private JmsDestination getExistingDestination(String name)
719 throws InvalidDestinationException {
720 JmsDestination destination = getDestination(name);
721 if (destination == null) {
722 throw new InvalidDestinationException(
723 "Destination does not exist:" + name);
724 }
725 return destination;
726 }
727
728 /***
729 * Ensures that the specified destination isn't a wildcard.
730 *
731 * @param destination the destination to check
732 * @throws InvalidDestinationException if the destination is a wildcard
733 */
734 private void checkWildcard(JmsDestination destination)
735 throws InvalidDestinationException {
736 if (destination instanceof JmsTopic
737 && ((JmsTopic) destination).isWildCard()) {
738 throw new InvalidDestinationException(
739 "Wildcarded topics cannot be managed: "
740 + destination.getName());
741 }
742 }
743
744 /***
745 * Returns the registered {@link DestinationEventListener}s.
746 *
747 * @return the registered {@link DestinationEventListener}s.
748 */
749 private DestinationEventListener[] getListeners() {
750 synchronized (_listeners) {
751 return (DestinationEventListener[]) _listeners.toArray(
752 new DestinationEventListener[0]);
753 }
754 }
755
756 /***
757 * Rollback the current transaction, logging any error.
758 */
759 private void rollback() {
760 try {
761 _database.rollback();
762 } catch (PersistenceException exception) {
763 _log.error(exception, exception);
764 }
765 }
766
767 /***
768 * Cleanup a failed transaction, and propagate the exception as a
769 * JMSException.
770 *
771 * @param message the message to log
772 * @param exception the exception propagate
773 * @throws JMSException <code>exception</code> if it is an instance of
774 * JMSException, else a new JMSException containing
775 * <code>message</code>
776 */
777 private void cleanup(String message, Exception exception)
778 throws JMSException {
779 _log.error(message, exception);
780 rollback();
781 if (exception instanceof JMSException) {
782 throw (JMSException) exception;
783 } else {
784 throw new JMSException(message + ": " + exception.getMessage());
785 }
786 }
787 }