View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: AbstractDestinationCache.java,v 1.4 2007/01/24 12:00:28 tanderson Exp $
44   */
45  package org.exolab.jms.messagemgr;
46  
47  import java.util.Collections;
48  import java.util.HashMap;
49  import java.util.Map;
50  import javax.jms.JMSException;
51  
52  import org.apache.commons.logging.Log;
53  import org.apache.commons.logging.LogFactory;
54  
55  import org.exolab.jms.client.JmsDestination;
56  import org.exolab.jms.lease.LeaseEventListenerIfc;
57  import org.exolab.jms.lease.LeaseManager;
58  import org.exolab.jms.message.MessageImpl;
59  import org.exolab.jms.persistence.PersistenceException;
60  import org.exolab.jms.persistence.DatabaseService;
61  
62  
63  /***
64   * Abstract implementation of the {@link DestinationCache} interface.
65   *
66   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
67   * @version $Revision: 1.4 $ $Date: 2007/01/24 12:00:28 $
68   */
69  public abstract class AbstractDestinationCache implements DestinationCache,
70          LeaseEventListenerIfc {
71  
72      /***
73       * The destination to cache messages for.
74       */
75      private final JmsDestination _destination;
76  
77      /***
78       * The message cache for this destination.
79       */
80      private DefaultMessageCache _cache = new DefaultMessageCache();
81  
82      /***
83       * The set of consumers that have subscribed to this cache, keyed on id.
84       */
85      private Map _consumers = Collections.synchronizedMap(new HashMap());
86  
87      /***
88       * A map of String -> MessageLease objects, representing the active leases
89       * keyed on JMSMessageID.
90       */
91      private final HashMap _leases = new HashMap();
92  
93      /***
94       * The database service.
95       */
96      private final DatabaseService _database;
97  
98      /***
99       * The lease manager.
100      */
101     private final LeaseManager _leaseMgr;
102 
103     /***
104      * The logger.
105      */
106     private static final Log _log = LogFactory.getLog(
107             AbstractDestinationCache.class);
108 
109 
110     /***
111      * Construct a new <code>AbstractDestinationCache</code>.
112      *
113      * @param destination the destination to cache messages for
114      * @param database    the database service
115      * @param leases      the lease manager
116      */
117     public AbstractDestinationCache(JmsDestination destination,
118                                     DatabaseService database,
119                                     LeaseManager leases) {
120         if (destination == null) {
121             throw new IllegalArgumentException(
122                     "Argument 'destination' is null");
123         }
124         if (database == null) {
125             throw new IllegalArgumentException("Argument 'database' is null");
126         }
127         if (leases == null) {
128             throw new IllegalArgumentException("Argument 'leases' is null");
129         }
130         _destination = destination;
131         _database = database;
132         _leaseMgr = leases;
133     }
134 
135     /***
136      * Returns the destination that messages are being cached for.
137      *
138      * @return the destination that messages are being cached for
139      */
140     public JmsDestination getDestination() {
141         return _destination;
142     }
143 
144     /***
145      * Register a consumer with this cache.
146      *
147      * @param consumer the message consumer for this destination
148      * @return <code>true</code> if registered; otherwise <code>false</code>
149      */
150     public boolean addConsumer(ConsumerEndpoint consumer) {
151         boolean result = false;
152 
153         // check to see that the consumer is actually one for this
154         // destination
155         if (consumer.getDestination().equals(getDestination())) {
156             Long key = new Long(consumer.getId());
157             if (!_consumers.containsKey(key)) {
158                 _consumers.put(key, consumer);
159                 result = true;
160             }
161         }
162 
163         return result;
164     }
165 
166     /***
167      * Remove the consumer for the list of registered consumers.
168      *
169      * @param consumer the consumer to remove
170      */
171     public void removeConsumer(ConsumerEndpoint consumer) {
172         Long key = new Long(consumer.getId());
173         _consumers.remove(key);
174     }
175 
176     /***
177      * Determines if the cache has any consumers.
178      *
179      * @return <code>true</code> if the cache has consumers; otherwise
180      *         <code>false</code>
181      */
182     public boolean hasConsumers() {
183         return !_consumers.isEmpty();
184     }
185 
186     /***
187      * Returns the number of messages in the cache.
188      *
189      * @return the number of messages in the cache
190      */
191     public int getMessageCount() {
192         return _cache.getMessageCount();
193     }
194 
195     /***
196      * Determines if this cache can be destroyed. This implementation returns
197      * <code>true</code> if there are no active consumers.
198      *
199      * @return <code>true</code> if the cache can be destroyed, otherwise
200      *         <code>false</code>
201      */
202     public boolean canDestroy() {
203         return !hasConsumers();
204     }
205 
206     /***
207      * Destroy this cache.
208      */
209     public synchronized void destroy() {
210         // clear the cache
211         _cache.clear();
212 
213         // remove the consumers
214         _consumers.clear();
215 
216         // remove the leases
217         MessageLease[] leases;
218         synchronized (_leases) {
219             leases = (MessageLease[]) _leases.values().toArray(
220                     new MessageLease[0]);
221             _leases.clear();
222         }
223 
224         for (int i = 0; i < leases.length; ++i) {
225             MessageLease lease = leases[i];
226             _leaseMgr.removeLease(lease);
227         }
228     }
229 
230     /***
231      * Invoked when a message lease has expired.
232      *
233      * @param object an instance of {@link MessageRef}
234      */
235     public void onLeaseExpired(Object object) {
236         MessageRef reference = (MessageRef) object;
237         String messageId = reference.getMessageId();
238         synchronized (_leases) {
239             _leases.remove(messageId);
240         }
241 
242         // determine whether the message is persistent or not and take
243         // the corresponding action
244         try {
245             _database.begin();
246             if (reference.isPersistent()) {
247                 persistentMessageExpired(reference);
248             } else {
249                 messageExpired(reference);
250             }
251             reference.destroy();
252             _database.commit();
253         } catch (Exception exception) {
254             _log.error("Failed to expire message", exception);
255             try {
256                 _database.rollback();
257             } catch (PersistenceException error) {
258                 _log.warn("Failed to rollback", error);
259             }
260         }
261     }
262 
263     public void collectGarbage(boolean aggressive) {
264         if (aggressive) {
265             // clear all persistent messages in the cache
266             _cache.clearPersistentMessages();
267             if (_log.isDebugEnabled()) {
268                 _log.debug("Evicted all persistent messages from cache "
269                            + getDestination().getName());
270             }
271         }
272 
273         if (_log.isDebugEnabled()) {
274             _log.debug("DESTCACHE -" + getDestination().getName()
275                        + " Messages: P[" + _cache.getPersistentCount()
276                        + "] T[" + _cache.getTransientCount() + "] Total: ["
277                        + _cache.getMessageCount() + "]");
278         }
279     }
280 
281     /***
282      * Add a message reference and its corresponding message to the cache
283      *
284      * @param reference the reference to the message
285      * @param message   the message
286      */
287     protected void addMessage(MessageRef reference, MessageImpl message) {
288         if (_log.isDebugEnabled()) {
289             _log.debug("addMessage(reference=[JMSMessageID="
290                     + reference.getMessageId() + "])");
291         }
292         _cache.addMessage(reference, message);
293     }
294 
295     /***
296      * Returns the message cache.
297      *
298      * @return the message cache
299      */
300     protected DefaultMessageCache getMessageCache() {
301         return _cache;
302     }
303 
304     /***
305      * Returns a consumer endpoint, given its id.
306      *
307      * @param consumerId the consumer identity
308      * @return the consumer corresponding to <code>id</code>, or
309      *         <code>null</code> if none is registered
310      */
311     protected ConsumerEndpoint getConsumerEndpoint(long consumerId) {
312         return (ConsumerEndpoint) _consumers.get(new Long(consumerId));
313     }
314 
315     /***
316      * Helper to return the consumers as an array.
317      *
318      * @return the consumers of this cache
319      */
320     protected ConsumerEndpoint[] getConsumerArray() {
321         return (ConsumerEndpoint[]) _consumers.values().toArray(
322                 new ConsumerEndpoint[0]);
323     }
324 
325     /***
326      * Remove an expired non-peristent message, and notify any listeners.
327      *
328      * @param reference the reference to the expired message
329      * @throws JMSException for any error
330      */
331     protected void messageExpired(MessageRef reference)
332             throws JMSException {
333         // notify consumers
334         String messageId = reference.getMessageId();
335         ConsumerEndpoint[] consumers = getConsumerArray();
336         for (int i = 0; i < consumers.length; ++i) {
337             consumers[i].messageRemoved(messageId);
338         }
339     }
340 
341     /***
342      * Remove an expired persistent message, and notify any listeners.
343      *
344      * @param reference  the reference to the expired message
345      * @throws JMSException         if a listener fails to handle the
346      *                              expiration
347      * @throws PersistenceException if there is a persistence related problem
348      */
349     protected void persistentMessageExpired(MessageRef reference)
350             throws JMSException, PersistenceException {
351         // notify consumers
352         String messageId = reference.getMessageId();
353         ConsumerEndpoint[] consumers = getConsumerArray();
354 
355         for (int i = 0; i < consumers.length; ++i) {
356             consumers[i].persistentMessageRemoved(messageId);
357         }
358     }
359 
360     /***
361      * Check to see if the message has a TTL. If so then set up a lease for it.
362      * An expiry time of 0 means that the message never expires
363      *
364      * @param reference a reference to the message
365      * @param message   the message
366      * @throws JMSException if the JMSExpiration property can't be accessed
367      */
368     protected void checkMessageExpiry(MessageRef reference,
369                                       MessageImpl message) throws JMSException {
370         checkMessageExpiry(reference, message.getJMSExpiration());
371     }
372 
373     /***
374      * Check to see if the message has a TTL. If so then set up a lease for it.
375      * An expiry time of 0 means that the message never expires
376      *
377      * @param reference  a reference to the message
378      * @param expiryTime the time when the message expires
379      */
380     protected void checkMessageExpiry(MessageRef reference,
381                                       long expiryTime) {
382         if (expiryTime != 0) {
383             synchronized (_leases) {
384                 // ensure that a lease for this message does not already exist.
385                 if (!_leases.containsKey(reference.getMessageId())) {
386                     long duration = expiryTime - System.currentTimeMillis();
387                     if (duration <= 0) {
388                         duration = 1;
389                     }
390                     MessageLease lease = new MessageLease(reference, duration,
391                                                           this);
392                     _leaseMgr.addLease(lease);
393                     _leases.put(reference.getMessageId(), lease);
394                 }
395             }
396         }
397     }
398 
399 }