View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2001-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: QueueDestinationCache.java,v 1.34.2.1 2004/05/01 12:05:26 tanderson Exp $
44   *
45   * Date         Author  Changes
46   * 3/1/2001     jima    Created
47   */
48  package org.exolab.jms.messagemgr;
49  
50  import java.sql.Connection;
51  import java.util.Collections;
52  import java.util.Enumeration;
53  import java.util.Iterator;
54  import java.util.LinkedList;
55  import java.util.List;
56  import java.util.Vector;
57  
58  import javax.jms.JMSException;
59  import javax.transaction.TransactionManager;
60  
61  import org.apache.commons.logging.Log;
62  import org.apache.commons.logging.LogFactory;
63  
64  import org.exolab.jms.client.JmsDestination;
65  import org.exolab.jms.client.JmsQueue;
66  import org.exolab.jms.client.JmsTemporaryDestination;
67  import org.exolab.jms.message.MessageHandle;
68  import org.exolab.jms.message.MessageImpl;
69  import org.exolab.jms.persistence.DatabaseService;
70  import org.exolab.jms.persistence.PersistenceException;
71  import org.exolab.jms.selector.Selector;
72  import org.exolab.jms.server.JmsServerConnectionManager;
73  
74  
75  /***
76   * A DestinationCache for Queues
77   *
78   * @version     $Revision: 1.34.2.1 $ $Date: 2004/05/01 12:05:26 $
79   * @author      <a href="mailto:jima@exoffice.com">Jim Alateras</a>
80   */
81  public class QueueDestinationCache
82      extends DestinationCache {
83  
84      /***
85       * Maintains a list of queue listeners for this cache
86       */
87      protected List _queueListeners =
88          Collections.synchronizedList(new LinkedList());
89  
90      /***
91       * Underlying destination
92       */
93      private JmsQueue _queue = null;
94  
95      /***
96       * Index of the last listener that received a message from this
97       * destination. If multiple listeners are attached to this queue then
98       * messages will be sent to each in a round robin fashion
99       */
100     private int _lastConsumerIndex = 0;
101 
102     /***
103      * Tracks the number of messages added to the destination cache
104      */
105     private long _publishCount;
106 
107     /***
108      * Tracks the number of messages consumed from the destination cache
109      */
110     private long _consumeCount;
111 
112     /***
113      * The logger
114      */
115     private static final Log _log = LogFactory.getLog(
116         QueueDestinationCache.class);
117 
118 
119     /***
120      * Construct a message cache for a queue destination. This cache will
121      * receive all messages published under the specified destination.
122      * <p>
123      * the constructor will also attempt to load any persistent messages
124      * from the database.
125      * <p>
126      *
127      * @param destination the queue
128      * @throws FailedToInitializeException
129      */
130     QueueDestinationCache(JmsQueue destination)
131         throws FailedToInitializeException {
132 
133         _queue = destination;
134         // call init on the base class
135         init();
136         if (DestinationManager.instance().isAdministeredDestination(destination.getName())) {
137             // call the persistence adapter to determine if we have unacknowledged
138             // messages for this queue. The persistent handles are keyed on the
139             // queue name
140             Connection connection = null;
141             TransactionManager tm = null;
142             try {
143                 connection = DatabaseService.getConnection();
144 
145                 // initialise the cache
146                 init(destination, connection);
147 
148                 // commit the work
149                 connection.commit();
150             } catch (PersistenceException exception) {
151                 if (connection != null) {
152                     try {
153                         connection.rollback();
154                     } catch (Exception nested) {
155                         // ignore
156                     }
157                 }
158                 throw new FailedToInitializeException(
159                     "QueueDestinationCache init failed " + exception);
160             } catch (Exception exception) {
161                 // rethrow as a JMSException
162                 throw new FailedToInitializeException(
163                     "QueueDestinationCache init failed " + exception);
164             } finally {
165                 if (connection != null) {
166                     try {
167                         connection.close();
168                     } catch (Exception nested) {
169                         // ignore
170                     }
171                 }
172             }
173         }
174     }
175 
176 
177     /***
178      * Construct a message cache for a queue destination using the specified
179      * database connection. This cache will receive all messages published
180      * under the destination.
181      * <p>
182      * the constructor will also attempt to load any persistent messages
183      * from the database using the specified connection
184      * <p>
185      * If there is any problem during construction a FailedToInitializeException
186      * will be raised.
187      *
188      * @param destination - the queue
189      * @throws FialedToInitializeException
190      */
191     QueueDestinationCache(Connection connection, JmsQueue destination)
192         throws FailedToInitializeException {
193         super();
194         _queue = destination;
195 
196         // call init on the base class
197         init(connection);
198         if (DestinationManager.instance().isAdministeredDestination(destination.getName())) {
199             // call the persistence adapter to determine if we have unacknowledged
200             // messages for this queue. The persistent handles are keyed on the
201             // queue name
202             try {
203                 // initialise the cache
204                 init(destination, connection);
205             } catch (Exception exception) {
206                 // rethrow as a JMSException
207                 throw new FailedToInitializeException(
208                     "QueueDestinationCache init failed " + exception);
209             }
210         }
211     }
212 
213 
214     /***
215      * This common method is used to help initialise the cache. It basically
216      * removes all the expired messages and then retrieves all unacked messages
217      * from the database and stores them locally.
218      * <p>
219      * It will throw a PersistenceException if there is a database related
220      * problem.
221      *
222      * @param destination - the queue
223      * @param connection - the database connection to use
224      * @throws PersistenceException
225      */
226     void init(JmsQueue destination, Connection connection)
227         throws PersistenceException {
228 
229         DatabaseService.getAdapter().removeExpiredMessageHandles(
230             connection, destination.getName());
231         Vector handles = DatabaseService.getAdapter().getMessageHandles(
232             connection, destination, destination.getName());
233         if (handles != null) {
234             Enumeration iter = handles.elements();
235             while (iter.hasMoreElements()) {
236                 addMessage((MessageHandle) iter.nextElement());
237             }
238         }
239     }
240 
241     // implementation of DestinationCache.getDestination
242     public JmsDestination getDestination() {
243         return _queue;
244     }
245 
246     /***
247      * A Queue can also hav a queue listener, which simply gets informed of all
248      * messages that arrive at this destination
249      *
250      * @param listener - queue listener
251      */
252     public void addQueueListener(QueueListener listener) {
253         // add if not present
254         if (!_queueListeners.contains(listener)) {
255             _queueListeners.add(listener);
256         }
257     }
258 
259     /***
260      * Remove the queue listener associated with this cache
261      *
262      * @param listener - queue listener to remove
263      */
264     public void removeQueueListener(QueueListener listener) {
265         // add if not present
266         if (_queueListeners.contains(listener)) {
267             _queueListeners.remove(listener);
268         }
269     }
270 
271     // implementation of MessageMgr.messageAdded
272     public boolean messageAdded(JmsDestination destination,
273                                 MessageImpl message) {
274         boolean processed = false;
275 
276         if ((destination != null) &&
277             (message != null)) {
278 
279             // check that the message is for this queue
280             if (destination.equals(_queue)) {
281 
282                 // create a handle for the message
283                 try {
284                     MessageHandle handle =
285                         MessageHandleFactory.createHandle(this, message);
286 
287 
288                     // all messages are added to this queue. Receivers will
289                     // then pick messages of it as required.
290                     addMessage(handle, message);
291 
292                     // update the publishedCount
293                     _publishCount++;
294 
295                     // if we have any registered consumers then we need to
296                     // send the message to one of them first. If none are
297                     // registered then cache it.
298                     QueueConsumerEndpoint endpoint =
299                         getEndpointForMessage(message);
300                     if (endpoint != null) {
301                         endpoint.messageAdded(message);
302                     }
303 
304                     // notify any queue listeners that a message has arrived
305                     notifyQueueListeners(message);
306 
307                     // create a lease iff one is required
308                     checkMessageExpiry(message);
309 
310                     // check the message as processed
311                     processed = true;
312                 } catch (JMSException exception) {
313                     _log.error("Failed to add message", exception);
314                 }
315             } else {
316                 // need to notify someone or something that we are
317                 // dropping messages. Do we throw an exception
318                 _log.error("Dropping message " + message.getMessageId()
319                     + " for destination " + destination.getName());
320             }
321         }
322 
323         return processed;
324     }
325 
326     /***
327      * This method is called when the {@link MessageMgr} removes a message
328      * from the cache.
329      *
330      * @param destination the message destination
331      * @param message the message removed from cache
332      */
333     public void messageRemoved(JmsDestination destination,
334                                MessageImpl message) {
335 
336         if ((destination != null) &&
337             (message != null)) {
338 
339             try {
340                 MessageHandle handle =
341                     MessageHandleFactory.getHandle(this, message);
342 
343                 // call remove regardless whether it exists
344                 if (destination.equals(_queue)) {
345                     removeMessage(handle);
346                     notifyOnRemoveMessage(message);
347                     handle.destroy();
348                 }
349             } catch (JMSException exception) {
350                 _log.error("Failed to remove message", exception);
351             }
352         }
353     }
354 
355     // implementation of MessageMgr.persistentMessageAdded
356     public boolean persistentMessageAdded(Connection connection,
357                                           JmsDestination destination,
358                                           MessageImpl message)
359         throws PersistenceException {
360 
361         boolean processed = false;
362 
363         if ((destination != null) &&
364             (message != null)) {
365 
366             // check that it is not already present before adding it.
367             if (destination.equals(_queue)) {
368 
369                 // create a handle for the message
370                 try {
371 
372                     // all messages are added to this queue. Receivers will
373                     // then pick messages of it as required.
374                     MessageHandle handle =
375                         MessageHandleFactory.getHandle(this, message);
376                     addMessage(handle, message);
377 
378                     // increment the number of messages received
379                     _publishCount++;
380 
381                     // if we have any registered consumers then we need to
382                     // send the message to one of them first. If none are
383                     // registered then cache it.
384                     QueueConsumerEndpoint endpoint =
385                         getEndpointForMessage(message);
386                     if (endpoint != null) {
387                         endpoint.persistentMessageAdded(connection, message);
388                     }
389 
390                     // notify any queue listeners that a message has arrived
391                     notifyQueueListeners(message);
392 
393                     // create a lease iff one is required
394                     checkMessageExpiry(message);
395 
396                     // check the message as processed
397                     processed = true;
398                 } catch (JMSException exception) {
399                     _log.error("Failed to add persistent message",
400                         exception);
401                 }
402             } else {
403                 // need to notify someone or something that we are
404                 // dropping messages. Do we throw an exception
405             }
406         }
407 
408         return processed;
409     }
410 
411     // implementation of MessageMgr.persistentMessageAdded
412     public synchronized void persistentMessageRemoved(
413         Connection connection, JmsDestination destination,
414         MessageImpl message)
415         throws PersistenceException {
416 
417         if ((destination != null) &&
418             (message != null)) {
419 
420             try {
421                 PersistentMessageHandle handle = (PersistentMessageHandle)
422                     MessageHandleFactory.getHandle(this, message);
423 
424                 // call remove regardless whether it exists
425                 if (destination.equals(_queue)) {
426                     removeMessage(handle);
427                     notifyOnRemovePersistentMessage(connection, message);
428                     MessageHandleFactory.destroyPersistentHandle(connection,
429                         handle);
430                 }
431             } catch (JMSException exception) {
432                 _log.error("Failed to remove persistent message", exception);
433             }
434         }
435     }
436 
437     /***
438      * Return the next {@link ConsumerEndpoint} that can consume the specified
439      * message or null if there is none.
440      *
441      * @param  message - the message to consume
442      * @return the consumer who should receive this message or null
443      */
444     private synchronized QueueConsumerEndpoint getEndpointForMessage(
445         MessageImpl message) {
446         QueueConsumerEndpoint selectedEndpoint = null;
447 
448         if (_consumers.size() > 0) {
449             // roll over the consumer index if it is greater
450             // than the number of registered consumers
451             if ((_lastConsumerIndex + 1) > _consumers.size()) {
452                 _lastConsumerIndex = 0;
453             }
454 
455             // look over the list of consumers and return the
456             // first endpoint that can process this message
457             int index = _lastConsumerIndex;
458             do {
459                 QueueConsumerEndpoint endpoint =
460                     (QueueConsumerEndpoint) _consumers.get(index);
461                 Selector selector = endpoint.getSelector();
462 
463                 // if the endpoint has a message listener registered
464                 // or the endpoint is waiting for a message and the
465                 // message satisfies the selector then return it to
466                 // the client.
467                 if (((endpoint.hasMessageListener()) ||
468                     (endpoint.isWaitingForMessage())) &&
469                     ((selector == null) ||
470                     (selector.selects(message)))) {
471                     _lastConsumerIndex = ++index;
472                     selectedEndpoint = endpoint;
473                     break;
474                 }
475 
476                 // advance to the next consumer
477                 if (++index >= _consumers.size()) {
478                     index = 0;
479                 }
480             } while (index != _lastConsumerIndex);
481         }
482 
483         return selectedEndpoint;
484     }
485 
486     /***
487      * Return the first message of the queue or null if there are no messages
488      * in the cache
489      *
490      * @param QueueConsumerEndpoint - the consumer who will receive the message
491      * @return MessageHandle - handle to the first message
492      */
493     public synchronized MessageHandle getMessage(
494         QueueConsumerEndpoint endpoint) {
495         MessageHandle handle = null;
496         // do not return a message is the endpoint is null;
497         if ((endpoint != null) &&
498             (getMessageCount() > 0)) {
499             Selector selector = endpoint.getSelector();
500             if (selector == null) {
501                 // if no selector has been specified then remove and return
502                 // the first message
503                 handle = removeFirstMessage();
504                 _consumeCount++;
505             } else {
506                 // for non null selector we must find the first matching
507                 Object[] handles = toMessageArray();
508                 for (int i = 0; i < handles.length; ++i) {
509                     MessageHandle hdl = (MessageHandle) handles[i];
510                     MessageImpl message = hdl.getMessage();
511                     if (message != null && selector.selects(message)) {
512                         handle = hdl;
513                         removeMessage(hdl);
514                         _consumeCount++;
515                         break;
516                     }
517                 }
518             }
519         }
520 
521         return handle;
522     }
523 
524     /***
525      * Playback all the messages in the cache to the specified
526      * {@link QueueListener}
527      *
528      * @param listener - the queue listener
529      */
530     public void playbackMessages(QueueListener listener) {
531 
532         Object[] messages = toMessageArray();
533         if ((listener != null) &&
534             (messages.length > 0)) {
535             try {
536                 for (int index = 0; index < messages.length; index++) {
537                     listener.onMessage(((MessageHandle) messages[index]).getMessage());
538                 }
539             } catch (IndexOutOfBoundsException exception) {
540                 // ignore the exception since the list is dynamic and may
541                 // be modified while it is being processed.
542             }
543         }
544     }
545 
546     /***
547      * Return the specified message to top of the queue. This is called to
548      * recover unsent or unacked messages
549      *
550      * @param message - message to return
551      */
552     public synchronized void returnMessage(MessageHandle handle) {
553 
554         // add the message to the destination cache
555         addMessage(handle);
556 
557         // if there are registered consumers then check whether
558         // any of them have registered message listeners
559         if (_consumers.size() > 0) {
560             // roll over the consumer index if it is greater
561             // than the number of registered consumers
562             if ((_lastConsumerIndex + 1) > _consumers.size()) {
563                 _lastConsumerIndex = 0;
564             }
565 
566             int index =
567                 (_lastConsumerIndex >= _consumers.size()) ?
568                 0 : _lastConsumerIndex;
569 
570             do {
571 
572                 QueueConsumerEndpoint endpoint =
573                     (QueueConsumerEndpoint) _consumers.get(index);
574 
575                 // if we find an endpoint with a listener then
576                 // we should reschedule it.
577                 if (endpoint.hasMessageListener()) {
578                     endpoint.schedule();
579                     _lastConsumerIndex = ++index;
580                     break;
581                 }
582 
583                 // advance to the next consumer
584                 if (++index >= _consumers.size()) {
585                     index = 0;
586                 }
587             } while (index != _lastConsumerIndex);
588         }
589     }
590 
591     /***
592      * Notify all the queue listeners, that this message has arrived. This is
593      * ideal for browsers and iterators
594      *
595      * @param message - message to deliver
596      */
597     void notifyQueueListeners(MessageImpl message) {
598         if (!_queueListeners.isEmpty()) {
599             QueueListener[] listeners =
600                 (QueueListener[]) _queueListeners.toArray(
601                     new QueueListener[0]);
602 
603             int size = listeners.length;
604             for (int index = 0; index < size; ++index) {
605                 QueueListener listener = listeners[index];
606                 if (listener instanceof QueueBrowserEndpoint) {
607                     QueueBrowserEndpoint browser =
608                         (QueueBrowserEndpoint) listener;
609                     Selector selector = browser.getSelector();
610 
611                     // if a selector has been specified then apply the filter
612                     // before sending down the message
613                     if ((selector == null) ||
614                         (selector.selects(message))) {
615                         browser.onMessage(message);
616                     }
617                 } else {
618                     // if there is any other type of subscriber then just
619                     // send the message to it.
620                     listener.onMessage(message);
621                 }
622             }
623         }
624     }
625 
626     // implementation of DestinationCache.notifyOnAddMessage
627     boolean notifyOnAddMessage(MessageImpl message) {
628         return true;
629     }
630 
631     // implementation of DestinationCache.notifyOnRemoveMessage
632     void notifyOnRemoveMessage(MessageImpl message) {
633     }
634 
635     // implementation of DestinationCache.hasActiveConsumers
636     boolean hasActiveConsumers() {
637         boolean active = true;
638         if (_queueListeners.isEmpty() && _consumers.isEmpty()) {
639             active = false;
640         }
641         if (_log.isDebugEnabled()) {
642             _log.debug("hasActiveConsumers()[queue=" + _queue + "]=" + active);
643         }
644         return active;
645     }
646 
647     /***
648      * Determines if this cache can be destroyed.
649      * A <code>QueueDestinationCache</code> can be destroyed if there
650      * are no active consumers and:
651      * <ul>
652      *   <li>the queue is persistent and there are no messages</li>
653      *   <li>
654      *     the queue is temporary and the corresponding connection is closed
655      *   </li>
656      * </ul>
657      *
658      * @return <code>true</code> if the cache can be destroyed, otherwise
659      * <code>false</code>
660      */
661     public boolean canDestroy() {
662         boolean destroy = false;
663         if (!hasActiveConsumers()) {
664             JmsDestination queue = getDestination();
665             if (queue.getPersistent() && getMessageCount() == 0) {
666                 destroy = true;
667             } else if (queue.isTemporaryDestination()) {
668                 // check if there is a corresponding connection. If
669                 // not, it has been closed, and the cache can be removed
670                 String connectionId =
671                     ((JmsTemporaryDestination) queue).getConnectionId();
672                 JmsServerConnectionManager manager =
673                     JmsServerConnectionManager.instance();
674                 if (manager.getConnection(connectionId) == null) {
675                     destroy = true;
676                 }
677             }
678         }
679         return destroy;
680     }
681 
682     /***
683      * Destroy this object
684      */
685     synchronized void destroy() {
686         super.destroy();
687         _queueListeners.clear();
688     }
689 
690     // override Object.toString
691     public String toString() {
692         return _queue.toString();
693     }
694 
695     // override Object.hashCode
696     public int hashCode() {
697         return _queue.hashCode();
698     }
699 
700 } //-- QueueDestinationCache