View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: QueueDestinationCache.java,v 1.9 2006/06/09 12:58:56 tanderson Exp $
44   */
45  package org.exolab.jms.messagemgr;
46  
47  import java.sql.Connection;
48  import java.util.Collections;
49  import java.util.Iterator;
50  import java.util.LinkedList;
51  import java.util.List;
52  import javax.jms.JMSException;
53  
54  import org.apache.commons.logging.Log;
55  import org.apache.commons.logging.LogFactory;
56  
57  import org.exolab.jms.client.JmsDestination;
58  import org.exolab.jms.client.JmsQueue;
59  import org.exolab.jms.client.JmsTemporaryDestination;
60  import org.exolab.jms.lease.LeaseManager;
61  import org.exolab.jms.message.MessageImpl;
62  import org.exolab.jms.persistence.DatabaseService;
63  import org.exolab.jms.persistence.PersistenceException;
64  import org.exolab.jms.selector.Selector;
65  import org.exolab.jms.server.ServerConnectionManager;
66  
67  
68  /***
69   * A {@link DestinationCache} for queues.
70   *
71   * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a>
72   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
73   * @version $Revision: 1.9 $ $Date: 2006/06/09 12:58:56 $
74   */
75  public class QueueDestinationCache extends AbstractDestinationCache {
76  
77      /***
78       * Maintains a list of {@link QueueConsumerMessageHandle} instances.
79       */
80      private final MessageQueue _handles = new MessageQueue();
81  
82      /***
83       * Maintains a list of queue browsers for this cache.
84       */
85      private final List _browsers
86              = Collections.synchronizedList(new LinkedList());
87  
88      /***
89       * The connection manager.
90       */
91      private final ServerConnectionManager _connections;
92  
93      /***
94       * Synchronization helper.
95       */
96      private final Object _lock = new Object();
97  
98      /***
99       * Index of the last {@link QueueConsumerEndpoint} that received a message
100      * from this destination. If multiple consumers are attached to this queue
101      * then messages will be sent to each in a round robin fashion
102      */
103     private int _lastConsumerIndex = 0;
104 
105     /***
106      * The logger.
107      */
108     private static final Log _log = LogFactory.getLog(
109             QueueDestinationCache.class);
110 
111 
112     /***
113      * Construct a new <code>QueueDestinationCache</code>.
114      *
115      * @param queue       the queue to cache messages for
116      * @param database    the database service
117      * @param leases      the lease manager
118      * @param connections the connection manager
119      * @throws JMSException if the cache can't be initialised
120      */
121     public QueueDestinationCache(JmsQueue queue,
122                                  DatabaseService database,
123                                  LeaseManager leases,
124                                  ServerConnectionManager connections)
125             throws JMSException {
126         super(queue, database, leases);
127         if (connections == null) {
128             throw new IllegalArgumentException(
129                     "Argument 'connections' is null");
130         }
131         _connections = connections;
132 
133         if (queue.getPersistent()) {
134             init();
135         }
136     }
137 
138     /***
139      * A Queue can also hav a queue listener, which simply gets informed of all
140      * messages that arrive at this destination.
141      *
142      * @param listener - queue listener
143      */
144     public void addQueueListener(QueueBrowserEndpoint listener) {
145         // add if not present
146         if (!_browsers.contains(listener)) {
147             _browsers.add(listener);
148         }
149     }
150 
151     /***
152      * Remove the queue listener associated with this cache
153      *
154      * @param listener - queue listener to remove
155      */
156     public void removeQueueListener(QueueBrowserEndpoint listener) {
157         // add if not present
158         if (_browsers.contains(listener)) {
159             _browsers.remove(listener);
160         }
161     }
162 
163     /***
164      * Invoked when the {@link MessageMgr} receives a non-persistent message.
165      *
166      * @param destination the message's destination
167      * @param message     the message
168      * @throws JMSException if the listener fails to handle the message
169      */
170     public void messageAdded(JmsDestination destination, MessageImpl message)
171             throws JMSException {
172         MessageRef reference = new CachedMessageRef(message, false,
173                 getMessageCache());
174         MessageHandle shared = new SharedMessageHandle(this, reference,
175                 message);
176         MessageHandle handle = new QueueConsumerMessageHandle(shared);
177 
178         // all messages are added to this queue. Receivers will
179         // then pick messages from it as required.
180         addMessage(reference, message, handle);
181 
182         // if we have any registered consumers then we need to
183         // send the message to one of them first.
184         ConsumerEndpoint consumer = getConsumerForMessage(message);
185         if (consumer != null) {
186             consumer.messageAdded(handle, message);
187         }
188     }
189 
190     /***
191      * Invoked when the {@link MessageMgr} receives a persistent message.
192      *
193      * @param destination the message's destination
194      * @param message     the message
195      * @throws JMSException         if the listener fails to handle the message
196      * @throws PersistenceException if there is a persistence related problem
197      */
198     public void persistentMessageAdded(JmsDestination destination,
199                                        MessageImpl message)
200             throws JMSException, PersistenceException {
201         MessageRef reference = new CachedMessageRef(message, true,
202                 getMessageCache());
203         MessageHandle shared = new SharedMessageHandle(this, reference,
204                 message);
205         MessageHandle handle = new QueueConsumerMessageHandle(shared);
206         handle.add();
207 
208         addMessage(reference, message, handle);
209 
210         // if there are any registered consumers, notify one of them that
211         // a message has arrived
212         ConsumerEndpoint consumer = getConsumerForMessage(message);
213         if (consumer != null) {
214             consumer.persistentMessageAdded(handle, message);
215         }
216     }
217 
218     /***
219      * Returns the first available message matching the supplied message
220      * selector.
221      *
222      * @param selector the message selector to use. May be <code>null</code>
223      * @param cancel
224      * @return handle to the first message, or <code>null</code> if there are no
225      *         messages, or none matching <code>selector</code>
226      * @throws JMSException for any error
227      */
228     public synchronized MessageHandle getMessage(Selector selector,
229                                                  Condition cancel)
230             throws JMSException {
231         QueueConsumerMessageHandle handle = null;
232         if (selector == null) {
233             // if no selector has been specified then remove and return
234             // the first message
235             handle = (QueueConsumerMessageHandle) _handles.removeFirst();
236         } else {
237             // for non null selector we must find the first matching
238             MessageHandle[] handles = _handles.toArray();
239             for (int i = 0; i < handles.length && !cancel.get(); ++i) {
240                 MessageHandle hdl = handles[i];
241                 MessageImpl message = hdl.getMessage();
242                 if (message != null && selector.selects(message)) {
243                     handle = (QueueConsumerMessageHandle) hdl;
244                     _handles.remove(handle);
245                     break;
246                 }
247             }
248         }
249         return handle;
250     }
251 
252     /***
253      * Playback all the messages in the cache to the specified {@link
254      * QueueBrowserEndpoint}.
255      *
256      * @param browser the queue browser
257      * @throws JMSException for any error
258      */
259     public void playbackMessages(QueueBrowserEndpoint browser)
260             throws JMSException {
261         MessageHandle[] handles = _handles.toArray();
262         for (int i = 0; i < handles.length; ++i) {
263             MessageHandle handle = handles[i];
264             MessageImpl message = handle.getMessage();
265             if (message != null) {
266                 browser.messageAdded(handle, message);
267             }
268         }
269     }
270 
271     /***
272      * Return a message handle back to the cache, to recover unsent or
273      * unacknowledged messages.
274      *
275      * @param handle the message handle to return
276      */
277     public void returnMessageHandle(MessageHandle handle) {
278         // add the message to the destination cache
279         _handles.add(handle);
280         try {
281             MessageImpl message = handle.getMessage();
282             if (message != null) {
283                 // if there are any registered consumers, notify one of them
284                 // that a message has arrived
285                 ConsumerEndpoint consumer = getConsumerForMessage(message);
286                 if (consumer != null) {
287                     consumer.messageAdded(handle, message);
288                 }
289             }
290         } catch (JMSException exception) {
291             _log.debug(exception, exception);
292         }
293     }
294 
295     /***
296      * Determines if there are any registered consumers.
297      *
298      * @return <code>true</code> if there are registered consumers
299      */
300     public boolean hasConsumers() {
301         boolean active = super.hasConsumers();
302         if (!active && !_browsers.isEmpty()) {
303             active = true;
304         }
305         if (_log.isDebugEnabled()) {
306             _log.debug("hasActiveConsumers()[queue=" + getDestination() + "]="
307                     + active);
308         }
309         return active;
310     }
311 
312     /***
313      * Returns the number of messages in the cache.
314      *
315      * @return the number of messages in the cache
316      */
317     public int getMessageCount() {
318         return _handles.size();
319     }
320 
321     /***
322      * Determines if this cache can be destroyed. A <code>QueueDestinationCache</code>
323      * can be destroyed if there are no active consumers and: <ul> <li>the queue
324      * is persistent and there are no messages</li> <li> the queue is temporary
325      * and the corresponding connection is closed </li> </ul>
326      *
327      * @return <code>true</code> if the cache can be destroyed, otherwise
328      *         <code>false</code>
329      */
330     public boolean canDestroy() {
331         boolean destroy = false;
332         if (!hasConsumers()) {
333             JmsDestination queue = getDestination();
334             if (queue.getPersistent() && getMessageCount() == 0) {
335                 destroy = true;
336             } else if (queue.isTemporaryDestination()) {
337                 // check if there is a corresponding connection. If
338                 // not, it has been closed, and the cache can be removed
339                 long connectionId =
340                         ((JmsTemporaryDestination) queue).getConnectionId();
341                 if (_connections.getConnection(connectionId) == null) {
342                     destroy = true;
343                 }
344             }
345         }
346         return destroy;
347     }
348 
349     /***
350      * Destroy this object.
351      */
352     public void destroy() {
353         super.destroy();
354         _browsers.clear();
355     }
356 
357     /***
358      * Initialise the cache. This removes all the expired messages, and then
359      * retrieves all unacked messages from the database and stores them
360      * locally.
361      *
362      * @throws JMSException if the cache can't be initialised
363      */
364     protected void init() throws JMSException {
365         JmsDestination queue = getDestination();
366 
367         List handles;
368         DatabaseService service = null;
369         try {
370             service = DatabaseService.getInstance();
371             Connection connection = service.getConnection();
372             service.getAdapter().removeExpiredMessageHandles(connection,
373                     queue.getName());
374             handles = service.getAdapter().getMessageHandles(connection, queue,
375                     queue.getName());
376         } catch (PersistenceException exception) {
377             _log.error(exception, exception);
378             try {
379                 if (service != null) {
380                     service.rollback();
381                 }
382             } catch (PersistenceException error) {
383                 _log.error(error, error);
384             }
385             throw new JMSException(exception.getMessage());
386         }
387 
388         Iterator iterator = handles.iterator();
389         DefaultMessageCache cache = getMessageCache();
390         while (iterator.hasNext()) {
391             PersistentMessageHandle handle = (PersistentMessageHandle) iterator.next();
392             String messageId = handle.getMessageId();
393             MessageRef reference = cache.getMessageRef(messageId);
394             if (reference == null) {
395                 reference = new CachedMessageRef(messageId, true, cache);
396             }
397             cache.addMessageRef(reference);
398             handle.reference(reference);
399             handle.setDestinationCache(this);
400             _handles.add(new QueueConsumerMessageHandle(handle));
401 
402             checkMessageExpiry(reference, handle.getExpiryTime());
403         }
404     }
405 
406     /***
407      * Add a message, and notify any listeners.
408      *
409      * @param reference a reference to the message
410      * @param message   the message
411      * @param handle    the handle to add
412      * @throws JMSException for any error
413      */
414     protected void addMessage(MessageRef reference, MessageImpl message,
415                               MessageHandle handle) throws JMSException {
416         addMessage(reference, message);
417         _handles.add(handle);
418 
419         // notify any queue listeners that a message has arrived
420         notifyQueueListeners(handle, message);
421 
422         // create a lease iff one is required
423         checkMessageExpiry(reference, message);
424     }
425 
426 
427     /***
428      * Notify queue browsers that a message has arrived.
429      *
430      * @param handle  a handle to the message
431      * @param message the message
432      * @throws JMSException if a browser fails to handle the message
433      */
434     protected void notifyQueueListeners(MessageHandle handle,
435                                         MessageImpl message)
436             throws JMSException {
437         QueueBrowserEndpoint[] browsers =
438                 (QueueBrowserEndpoint[]) _browsers.toArray(
439                         new QueueBrowserEndpoint[0]);
440 
441         for (int index = 0; index < browsers.length; ++index) {
442             QueueBrowserEndpoint browser = browsers[index];
443             browser.messageAdded(handle, message);
444         }
445     }
446 
447     /***
448      * Remove an expired non-peristent message, and notify any listeners.
449      *
450      * @param reference the reference to the expired message
451      * @throws JMSException for any error
452      */
453     protected void messageExpired(MessageRef reference) throws JMSException {
454         _handles.remove(reference.getMessageId());
455         // @todo - notify browser
456         super.messageExpired(reference);
457     }
458 
459     /***
460      * Remove an expired persistent message, and notify any listeners.
461      *
462      * @param reference the reference to the expired message
463      * @throws JMSException         if a listener fails to handle the
464      *                              expiration
465      * @throws PersistenceException if there is a persistence related problem
466      */
467     protected void persistentMessageExpired(MessageRef reference)
468             throws JMSException, PersistenceException {
469         _handles.remove(reference.getMessageId());
470         // @todo - notify browsers
471         super.messageExpired(reference);
472     }
473 
474     /***
475      * Return the next QueueConsumerEndpoint that can consume the specified
476      * message or null if there is none.
477      *
478      * @param message - the message to consume
479      * @return the consumer who should receive this message, or null
480      */
481     private ConsumerEndpoint getConsumerForMessage(MessageImpl message) {
482         ConsumerEndpoint result = null;
483 
484         ConsumerEndpoint[] consumers = getConsumerArray();
485         final int size = consumers.length;
486         if (size > 0) {
487             synchronized (_lock) {
488                 // roll over the consumer index if it is greater
489                 // than the number of registered consumers
490                 if ((_lastConsumerIndex + 1) > size) {
491                     _lastConsumerIndex = 0;
492                 }
493 
494                 // look over the list of consumers and return the
495                 // first endpoint that can process this message
496                 int index = _lastConsumerIndex;
497                 do {
498                     ConsumerEndpoint consumer = consumers[index];
499                     // if the endpoint has a message listener registered
500                     // or the endpoint is waiting for a message and the
501                     // message satisfies the selector then return it to
502                     // the client.
503                     if ((consumer.isAsynchronous()
504                             || consumer.isWaitingForMessage())
505                             && consumer.selects(message)) {
506                         _lastConsumerIndex = ++index;
507                         result = consumer;
508                         break;
509                     }
510 
511                     // advance to the next consumer
512                     if (++index >= size) {
513                         index = 0;
514                     }
515                 } while (index != _lastConsumerIndex);
516             }
517         }
518 
519         return result;
520     }
521 
522 }