View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2001-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: DestinationCache.java,v 1.34 2003/12/29 13:09:15 tanderson Exp $
44   *
45   * Date         Author  Changes
46   * 3/1/2001     jima    Created
47   */
48  package org.exolab.jms.messagemgr;
49  
50  import java.sql.Connection;
51  import java.util.Collections;
52  import java.util.Iterator;
53  import java.util.LinkedList;
54  import java.util.List;
55  
56  import javax.transaction.TransactionManager;
57  
58  import org.apache.commons.logging.Log;
59  import org.apache.commons.logging.LogFactory;
60  
61  import org.exolab.jms.Identifiable;
62  import org.exolab.jms.client.JmsDestination;
63  import org.exolab.jms.gc.GarbageCollectable;
64  import org.exolab.jms.lease.LeaseEventListenerIfc;
65  import org.exolab.jms.message.MessageHandle;
66  import org.exolab.jms.message.MessageImpl;
67  import org.exolab.jms.persistence.DatabaseService;
68  import org.exolab.jms.persistence.PersistenceException;
69  import org.exolab.jms.persistence.SQLHelper;
70  import org.exolab.jms.util.UUID;
71  
72  
73  /***
74   * A DestinationCache is used to cache messages for a particular destination. 
75   * <p>
76   * It implements {@link MessageManagerEventListener} in order to be notified
77   * of messages being added to and removed from 
78   * {@link MessageMgr}
79   * <p>
80   * A {@link ConsumerEndpoint} registers with a {@link DestinationCache} to
81   * receive messages for a particular destination.
82   * <p>
83   * In all instances this class doesn't deal with <code>Message</code> objects
84   * directly, but instead uses their corresponding {@link MessageHandle},
85   * which is far more lightweight.
86   * <p>
87   * A two level cache is used to facilitate quick seeding of registered
88   * consumers and quick per-consumer-acknowledgment strategy. The two level
89   * cache includes this, the DestinationCache, at the highest level and then
90   * {@link ConsumerManager} at the lowest level.
91   * <p>
92   * In addition to registering {@link ConsumerEndpoint} objects the cache also
93   * supports {@link DestinationCacheEventListener}s. A listener will be 
94   * notified when messages are added to the cache but do not actually consume 
95   * messages. A cache can have one or more registered listeners. This feature is
96   * predominately used by browsers or iterators of destinations.
97   * <p>
98   * Clients can also become lifecycle listeners for this object to get notified
99   * during initialization and shutdwon.
100  * <p>
101  * This cache is ordered on priority.
102  *
103  * @version     $Revision: 1.34 $ $Date: 2003/12/29 13:09:15 $
104  * @author      <a href="mailto:jima@exoffice.com">Jim Alateras</a>
105  */
106 public abstract class DestinationCache
107     implements MessageManagerEventListener, Identifiable,
108     LeaseEventListenerIfc, GarbageCollectable {
109 
110     /***
111      * The identity of this object
112      */
113     private String _id;
114 
115     /***
116      * Create a message cache for this destination
117      */
118     private MessageCache _cache = new MessageCache();
119 
120     /***
121      * Maintain the maximum size of this cache. When the cache reaches this
122      * size then messages will be lost. We should only drop transient messages
123      *  and maintain persistent messages. Remove transient messages from the
124      * top of the cache and replace them with persistent messages.
125      */
126     private int _maximumSize = Integer.MAX_VALUE;
127 
128     /***
129      * This is the list of consumers that have subscribed to this cache. It
130      * hosts both durable and transient subscribers
131      */
132     protected List _consumers =
133         Collections.synchronizedList(new LinkedList());
134 
135     /***
136      * The message lease helper is used to manage leases for messages
137      * cached by the destination
138      */
139     protected MessageLeaseHelper _leaseHelper = null;
140 
141     /***
142      * The logger
143      */
144     private static final Log _log = LogFactory.getLog(DestinationCache.class);
145 
146     /***
147      * Construct a message cache for a particular destination.
148      */
149     DestinationCache() {
150         _id = UUID.next();
151     }
152 
153     /***
154      * Register this destination with the message manager. If it cannot
155      * initialise then throw FailedToInitializeException
156      */
157     void init() throws FailedToInitializeException {
158         // moved to after lease helper creation
159         //MessageMgr.instance().addEventListener(getDestination(), this);
160 
161         try {
162             _leaseHelper = new MessageLeaseHelper(this);
163         } catch (PersistenceException exception) {
164             String msg = "Failed to initialise destination cache";
165             _log.error(msg, exception);
166             throw new FailedToInitializeException(
167                 msg + ":" + exception.getMessage());
168         }
169 
170         MessageMgr.instance().addEventListener(getDestination(), this);
171     }
172 
173     /***
174      * Register this destination with the message manager and create the
175      * lease helper. The {@link MessageLeaseHelper} is initialized using the
176      * specified {@link Connection}.
177      * <p>
178      * If there are problems with the initialization then throw
179      * FailedToInitializeException
180      *
181      * @param connection - the connection to use
182      * @throws FailedToInitializeException
183      */
184     void init(Connection connection)
185         throws FailedToInitializeException {
186         MessageMgr.instance().addEventListener(getDestination(), this);
187 
188         try {
189             _leaseHelper = new MessageLeaseHelper(connection, this);
190         } catch (Exception exception) {
191             // rethrow
192             throw new FailedToInitializeException("Error initialising " +
193                 exception.toString());
194         }
195     }
196 
197     /***
198      * Set the maximum size of the cache. If there are more than this number
199      * of messages in the cache the {@link CacheEvictionPolicy} is enforced
200      * to remove messages.
201      *
202      * @param size - maximum number of messages a destination can hold
203      */
204     public void setMaximumSize(int size) {
205         _maximumSize = size;
206     }
207 
208     /***
209      * Return the cache's maximum size
210      *
211      * @return int - size of cache
212      */
213     public int getMaximumSize() {
214         return _maximumSize;
215     }
216 
217     /***
218      * Return a reference to the underlying destination
219      *
220      * @return JmsDestination
221      */
222     abstract public JmsDestination getDestination();
223 
224     /***
225      * Return the string form of the destination
226      *
227      * @return  String
228      */
229     public String getDestinationByName() {
230         return getDestination().getName();
231     }
232 
233     /***
234      * Set the {@link CacheEvictionPolicy} for this object. This determines
235      * how messages are removed when the cache's upper limit is reached.
236      *
237      * @param policy the eviction policy
238      */
239     public void setCacheEvictionPolicy(CacheEvictionPolicy policy) {
240         // not implemented
241     }
242 
243     /***
244      * Register a consumer with this cache. Part of the registration process
245      * will be to seed the {@link ConsumerEndpoint} with an initial list of
246      * messages that it needs to consume and then feed messages to it through
247      * the specified listener object.
248      * <p>
249      * Messages are subsequently passed down to the consumer's through the
250      * listener, as they enter the DestinationCache.
251      *
252      * @param consumer - message consumer for this destination
253      * @return boolean - true if registered and false otherwise
254      */
255     public boolean registerConsumer(ConsumerEndpoint consumer) {
256 
257         boolean result = false;
258 
259         // check to see that the consumer is actually one for this
260         // destination
261         if (consumer.getDestination().equals(getDestination())) {
262             if (!_consumers.contains(consumer)) {
263                 _consumers.add(consumer);
264                 consumer.setMaximumSize(this.getMaximumSize());
265                 result = true;
266             }
267         }
268 
269         return result;
270     }
271 
272     /***
273      * Remove the consumer for the list of registered consumers. If the
274      * consumer does not exist then the call fails silently.
275      *
276      * @param consumer - consumer to remove.
277      */
278     public void unregisterConsumer(ConsumerEndpoint consumer) {
279         if (_consumers.contains(consumer)) {
280             _consumers.remove(consumer);
281         } else {
282         }
283     }
284 
285     /***
286      * Return an enmeration of all consumers attached to this cache.
287      *
288      * @return Iterator - list of registered consumers
289      */
290     public Iterator getConsumers() {
291         return _consumers.iterator();
292     }
293 
294     /***
295      * Return the consumers as an array of {@link ConsumerEndpoint} objects
296      * This is a safer way to get a list of consumers since it avoids
297      * concurrent modification exceptions
298      *
299      * @return Object[] - an array of ConsumerEndpoint objects
300      */
301     Object[] getConsumersByArray() {
302         return _consumers.toArray();
303     }
304 
305     /***
306      * This method is called when the {@link MessageMgr} adds a message
307      * for this destination to the cache
308      *
309      * @param message - message added to cache
310      */
311     abstract public boolean messageAdded(JmsDestination destination,
312                                          MessageImpl message);
313 
314     /***
315      * This method is called when the {@link MessageMgr} removes a
316      * message from the cache.
317      *
318      * @param message - message removed from cache
319      */
320     abstract public void messageRemoved(JmsDestination destination,
321                                         MessageImpl message);
322 
323     /***
324      * Return the number of messages currently active for this destination
325      *
326      * @return int - number of active messages
327      */
328     public int getMessageCount() {
329         return _cache.getHandleCount();
330     }
331 
332     /***
333      * Notify the listeners that a non-persistent message has been added to the
334      * cache
335      *
336      * @param handle - message that was added
337      * @return boolean - true of at least one listener has processed itx
338      */
339     abstract boolean notifyOnAddMessage(MessageImpl message);
340 
341     /***
342      * Notify the listeners that a non-persistent message has been removed form
343      * the cache
344      *
345      * @param handle - message that was removed
346      */
347     abstract void notifyOnRemoveMessage(MessageImpl message);
348 
349     /***
350      * Notify the listeners that a persistent message has been added to the
351      * cache
352      *
353      * @param connection - the persistent connection to use
354      * @param handle - message that was added
355      * @return boolean - true of at least one listener has processed it
356      * @throws PersistenceException - if there is a persistence related error
357      */
358     boolean notifyOnAddPersistentMessage(Connection connection,
359                                          MessageImpl message)
360         throws PersistenceException {
361 
362         //default implementation
363         return true;
364     }
365 
366     /***
367      * Notify the listeners that a persistent message has been removed form
368      * the cache
369      *
370      * @param connection - the persistent connection to use.
371      * @param handle - message that was removed
372      * @throws PersistenceException - if there is a persistence related error
373      */
374     void notifyOnRemovePersistentMessage(Connection connection,
375                                          MessageImpl message)
376         throws PersistenceException {
377         //default implementation is empty
378     }
379 
380     /***
381      * Check whether there are any attached consumers to this cache
382      *
383      * @return boolean - true if there are attached consumers
384      */
385     abstract boolean hasActiveConsumers();
386 
387     /***
388      * This method is called whenever a lease expires. It passes the
389      * object that has expired.
390      *
391      * @param       leasedObject        reference to the leased object
392      */
393     public void onLeaseExpired(Object leasedObject) {
394         if (leasedObject != null) {
395             MessageHandle handle = (MessageHandle) leasedObject;
396 
397             // retrieve an instance of the message
398             MessageImpl message = resolveExpiredMessage(handle);
399 
400             // determine whether  the message is persistent or not and take
401             // the corresponding action
402             if (handle instanceof PersistentMessageHandle) {
403                 Connection connection = null;
404                 try {
405                     connection = DatabaseService.getConnection();
406                     persistentMessageRemoved(connection, getDestination(),
407                         message);
408                     connection.commit();
409                 } catch (Exception exception) {
410                     SQLHelper.rollback(connection);
411                     _log.error("Failure in onLeaseExpired", exception);
412                 } finally {
413                     SQLHelper.close(connection);
414                 }
415             } else {
416                 // notify it's listeners that the non-persistent message has
417                 // been removed
418                 messageRemoved(getDestination(), message);
419             }
420         }
421     }
422 
423     /***
424      * Determines if this cache can be destroyed
425      *
426      * @return <code>true</code> if the cache can be destroyed, otherwise
427      * <code>false</code>
428      */
429     public abstract boolean canDestroy();
430 
431     /***
432      * Check to see if the message has a TTL. If so then set up a lease
433      * for it. An expiry time of 0 means that the message never expires
434      *
435      * @param message - message to add
436      */
437     void checkMessageExpiry(MessageImpl message) {
438         if (message != null) {
439             _leaseHelper.addLease(message);
440         }
441     }
442 
443     /***
444      * Destory this cache.
445      */
446     synchronized void destroy() {
447         // clear the cache
448         _cache.clear();
449 
450         // remove the consumers
451         _consumers.clear();
452 
453         // unregister itself from the message manager
454         MessageMgr.instance().removeEventListener(getDestination(), this);
455 
456         // remove the lease
457         _leaseHelper.clear();
458     }
459 
460     /***
461      * Close the cache and unregister all the consumers. Notify any and all
462      * DestinationCacheLifecycleListeners.
463      * <p>
464      * Once the cache is closed it will no longger receive messages for this
465      * destination.
466      */
467     public void shutdown() {
468         destroy();
469     }
470 
471     /***
472      * Insert the specified handle to the handles cache.
473      *
474      * @param handle - handle to add
475      */
476     void addMessage(MessageHandle handle) {
477         handle.setConsumerName(getDestination().getName());
478         _cache.addHandle(handle);
479     }
480 
481     /***
482      * Add the following handle and corresponding message to their respective
483      * caches
484      *
485      * @param handle - handle to add
486      * @param message - the corresponding message to add
487      */
488     void addMessage(MessageHandle handle, MessageImpl message) {
489         handle.setConsumerName(getDestination().getName());
490         _cache.addMessage(handle, message);
491     }
492 
493     /***
494      * Return the message for the specified handle
495      *
496      * @param handle - the handle
497      * @return MessageImpl - the associated message
498      */
499     MessageImpl getMessage(MessageHandle handle) {
500         return _cache.getMessage(handle);
501     }
502 
503     /***
504      * Remove the message handle from the cache, if it exists.
505      *
506      * @param handle - handle to remove
507      * @return boolean - true if it was removed
508      */
509     boolean removeMessage(MessageHandle handle) {
510         return _cache.removeHandle(handle);
511     }
512 
513     /***
514      * Remove and return the first message handle in the cache
515      *
516      * @return MessageHandle - the first handle or null if cache is empty
517      */
518     final MessageHandle removeFirstMessage() {
519         return _cache.removeFirstHandle();
520     }
521 
522     /***
523      * Return the message handles in the cache as an array
524      *
525      * @return Object[] - array of message handles
526      */
527     final Object[] toMessageArray() {
528         return _cache.getHandleArray();
529     }
530 
531     /***
532      * Delete the message with the specified handle from the cache
533      *
534      * @param handle - the handle
535      */
536     void deleteMessage(MessageHandle handle) {
537         _cache.removeMessage(handle.getMessageId());
538     }
539 
540     // implementation of Identifiable.getId
541     public String getId() {
542         return _id;
543     }
544 
545     // implementation of GarbageCollectable.collectGarbage
546     public void collectGarbage(boolean aggressive) {
547         if (aggressive) {
548             // clear all persistent messages in the cache
549             _cache.clearPersistentMessages();
550             if (_log.isDebugEnabled()) {
551                 _log.debug("Evicted all persistent messages from cache "
552                     + getDestination().getName());
553             }
554         }
555 
556         if (_log.isDebugEnabled()) {
557             _log.debug("DESTCACHE -" + getDestination().getName()
558                 + " Messages: P[" + _cache.getPersistentCount()
559                 + "] T[" + _cache.getTransientCount() + "] Handles: ["
560                 + _cache.getHandleCount() + "]");
561         }
562     }
563 
564     /***
565      * Resolve an expired message through its handle
566      *
567      * @param handle the expired message's handle
568      * @return the expired message. May be null.
569      */
570     protected MessageImpl resolveExpiredMessage(MessageHandle handle) {
571         return handle.getMessage();
572     }
573 
574 }