View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2001-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: ConsumerEndpoint.java,v 1.36 2003/09/25 11:23:13 tanderson Exp $
44   *
45   * Date         Author  Changes
46   * 3/1/2001     jima    Created
47   */
48  package org.exolab.jms.messagemgr;
49  
50  import java.io.Serializable;
51  import java.sql.Connection;
52  import java.util.Enumeration;
53  import java.util.Vector;
54  
55  import javax.jms.InvalidSelectorException;
56  import javax.jms.JMSException;
57  import javax.jms.Session;
58  import javax.transaction.TransactionManager;
59  
60  import org.apache.commons.logging.Log;
61  import org.apache.commons.logging.LogFactory;
62  
63  import org.exolab.jms.Identifiable;
64  import org.exolab.jms.client.JmsDestination;
65  import org.exolab.jms.message.MessageHandle;
66  import org.exolab.jms.message.MessageImpl;
67  import org.exolab.jms.persistence.PersistenceException;
68  import org.exolab.jms.scheduler.Scheduler;
69  import org.exolab.jms.selector.Selector;
70  import org.exolab.jms.server.JmsServerSession;
71  import org.exolab.jms.util.UUID;
72  
73  
74  /***
75   * A Consumer is a message subscriber with a unique identity
76   *
77   * @version     $Revision: 1.36 $ $Date: 2003/09/25 11:23:13 $
78   * @author      <a href="mailto:jima@exoffice.com">Jim Alateras</a>
79   * @author      <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
80   */
81  abstract public class ConsumerEndpoint
82      implements Serializable, Identifiable, DestinationCacheEventListener,
83      Runnable {
84  
85      /***
86       * The identity of the consumer
87       */
88      private String _id;
89  
90      /***
91       * The client identity, which uniquely identifies the remote client
92       * within a session. This is used to tag messages when they are
93       * asynchronously delivered to the client
94       */
95      private long _clientId = -1;
96  
97      /***
98       * The selector assoicated with this consumer. A selector is used
99       * to filter messages.
100      */
101     protected Selector _selector = null;
102 
103     /***
104      * Serializes access to the {@link #_waitingForMessage} flag. This is
105      * only required when it is changed
106      */
107     protected final Object _waitingForMessageMonitor = new Object();
108 
109     /***
110      * Used to block consumer until there is a message available
111      */
112     protected boolean _waitingForMessage = false;;
113 
114     /***
115      * Holds the consumer's message listener. This means that messages
116      * will be pushed down
117      */
118     protected InternalMessageListener _listener = null;
119 
120     /***
121      * Maintains the maximum size of this cache
122      */
123     protected int _size = 1000;
124 
125     /***
126      * This is the scheduler that is used to deliver messages if a consumer
127      * has a registered listener
128      */
129     protected transient Scheduler _scheduler = null;
130 
131     /***
132      * The acknowledgement mode for this endpoint.
133      */
134     protected transient int _ackMode = Session.AUTO_ACKNOWLEDGE;
135 
136     /***
137      * The nolocal indicator, if set, inhibits consuming messages that have
138      * been published on the same connection
139      */
140     protected transient boolean _nolocal = false;
141 
142     /***
143      * Indicates whether this endpoint belongs to a transacted session
144      */
145     protected transient boolean _transacted = false;
146 
147     /***
148      * Holds the connection id of the connection that the endpoint belongs too
149      */
150     protected transient int _connectionId = -1;
151 
152     /***
153      * caches the session that created this endpoint
154      */
155     protected JmsServerSession _session = null;
156 
157     /***
158      * This determines whether message delivery to the registered listener
159      * is enabled or disabled.
160      */
161     private volatile boolean _stopped = true;
162 
163     /***
164      * Identifies this endpoint as being closed
165      */
166     private volatile boolean _closed = false;
167 
168     /***
169      * Indicates whether the this cache has been scheduled with the dispatcher
170      * for asynchronous message delivery.
171      */
172     private boolean _scheduled = false;
173 
174     /***
175      * Cache of all messages and handles for this consumer.
176      */
177     private MessageCache _cache = new MessageCache();
178 
179     /***
180      * Synchronization helper for close() and deliverMessages()
181      */
182     private final Object _lock = new Object();
183 
184     /***
185      * The logger
186      */
187     private static final Log _log = LogFactory.getLog(ConsumerEndpoint.class);
188 
189 
190     /***
191      * Construct a <code>ConsumerEndpoint</code>.</p>
192      * The destination and selector determine where it will be sourcing
193      * its messages from, and scheduler is used to asynchronously deliver
194      * messages to the consumer.
195      *
196      * @param session - the owning session
197      * @param clientId - uniquely identifies the remote client within session
198      * @param selector - the selector attached to the consumer, if any.
199      * @param scheduler - used to schedule async message delivery.
200      * @throws InvalidSelectorException if the selector is not well formed
201      */
202     ConsumerEndpoint(JmsServerSession session, long clientId,
203                      String selector, Scheduler scheduler)
204         throws InvalidSelectorException {
205         _id = UUID.next();
206         _selector = (selector != null) ? new Selector(selector) : null;
207         _clientId = clientId;
208         _scheduler = scheduler;
209         _session = session;
210     }
211 
212     /***
213      * Return the destination that this consumer is subscribed to
214      *
215      * @return the destination that this consumer is subscribed to
216      */
217     public abstract JmsDestination getDestination();
218 
219     // implementation of Identifiable.getId
220     public String getId() {
221         return _id;
222     }
223 
224     /***
225      * Returns the persistent identifier for this consumer.<p/>
226      * This is the identity of the consumer which is persistent across
227      * subscriptions and server restarts. <p/>
228      *
229      * This implementation simply returns the transient identifier, i.e,
230      * {@link #getId}
231      *
232      * @return the persistent identifier for this consumer
233      */
234     public String getPersistentId() {
235         return getId();
236     }
237 
238     // implementation of Object.hashCode
239     public int hashCode() {
240         return _id.hashCode();
241     }
242 
243     /***
244      * Return a stringified version of the consumer
245      *
246      * @return String
247      */
248     public String toString() {
249         return _id + ":" + getDestination();
250     }
251 
252     /***
253      * Unregister this consumer for the specified destination cache, so that it
254      * will stop receiving messages from it.
255      */
256     public abstract void unregister();
257 
258     /***
259      * Return a reference to the client identity. This is an indirect reference
260      * back to the remote client, which can asynchronously receive messages
261      *
262      * @return identity of the client scoped to the session
263      */
264     public long getClientId() {
265         return _clientId;
266     }
267 
268     /***
269      * Return the selector for this endpoint or null if one is not specified
270      *
271      * @return String - the endpoint's selector
272      */
273     public Selector getSelector() {
274         return _selector;
275     }
276 
277     /***
278      * Set the selector for this endpoint
279      *
280      * @param selector - message selector as a string
281      * @exception InvalidSelectorException - raised when selector is not
282      *                                       well-formed
283      */
284     public void setSelector(String selector)
285         throws InvalidSelectorException {
286         _selector = (selector != null) ? new Selector(selector) : null;
287     }
288 
289     /***
290      * Return the ackmode for this endpoint
291      *
292      * @return int - acknowledgement mode
293      */
294     public int getAckMode() {
295         return _ackMode;
296     }
297 
298     /***
299      * Set the ackMode for this endpoint
300      *
301      * @param ackmode - the new ack mode for the endpoint
302      */
303     public void setAckMode(int ackmode) {
304         _ackMode = ackmode;
305     }
306 
307     /***
308      * Return the connection id that this endpoint belongs to
309      *
310      * @return the connection id
311      */
312     public int getConnectionId() {
313         return _connectionId;
314     }
315 
316     /***
317      * Set the connection that this endpooint belongs too
318      *
319      * @param id - connection identity
320      */
321     public void setConnectionId(int id) {
322         _connectionId = id;
323     }
324 
325     /***
326      * Return the value of the nolocal indicator
327      *
328      * @return boolean
329      */
330     public boolean getNoLocal() {
331         return _nolocal;
332     }
333 
334     /***
335      * Set the no local indicator for this endpoint
336      *
337      * @param nolocal - true to inhibit messages published on this connection
338      */
339     public void setNoLocal(boolean nolocal) {
340         _nolocal = nolocal;
341     }
342 
343     /***
344      * Check whether this endpoint belongs to a transacted session
345      *
346      * @return boolean - true if it does
347      */
348     public boolean getTransacted() {
349         return _transacted;
350     }
351 
352     /***
353      * Set the state if the transacted flag for this endpoint
354      *
355      * @param transacted - true if it is transacted
356      */
357     public void setTransacted(boolean transacted) {
358         _transacted = transacted;
359     }
360 
361     /***
362      * Set the maximum size of the cache. If there are more than this number
363      * of messages in the cache the {@link CacheEvictionPolicy} is enforced
364      * to remove messages.
365      *
366      * @param size - maximum number of messages a destination can hold
367      */
368     public void setMaximumSize(int size) {
369         _size = size;
370     }
371 
372     /***
373      * Return the cache's maximum size
374      *
375      * @return int - size of cache
376      */
377     public int getMaximumSize() {
378         return _size;
379     }
380 
381     /***
382      * Set the {@link CacheEvictionPolicy} for this object. This determines
383      * how messages are removed when the cache's upper limit is reached.
384      *
385      * @param policy the eviction policy
386      */
387     public void setCacheEvictionPolicy(CacheEvictionPolicy policy) {
388         // not implemented
389     }
390 
391     /***
392      * Return the number of unsent messages in the cache for this consumer
393      *
394      * @return the number of unsent messages
395      */
396     public int getMessageCount() {
397         return _cache.getHandleCount();
398     }
399 
400     /***
401      * Return a reference to the session owning this endpoint
402      *
403      * @return JmsServerSession - the owning session
404      */
405     public JmsServerSession getSession() {
406         return _session;
407     }
408    
409     /***
410      * Deliver messages in the cache to the consumer
411      *
412      * @return <code>true</code> if the endpoint should be rescheduled
413      */
414     public abstract boolean deliverMessages();
415 
416     /***
417      * The run method is used to asynchronously deliver the messages in the
418      * cache to the consumer, by invoking {@link #deliverMessages}. 
419      * <p>
420      * It is scheduled by the {@link Scheduler}.
421      */
422     public void run() {
423         synchronized (_lock) {
424             if (!_closed) {
425                 boolean reschedule = deliverMessages();
426                 _scheduled = false;
427                 if (reschedule) {
428                     schedule();
429                 }
430             }
431         }
432     }
433 
434     // implementation of DestinationCacheEventListener.messageAdded
435     public synchronized boolean messageAdded(MessageImpl message) {
436         boolean added = false;
437 
438         // create a message handle
439         try {
440             // if the nolocal indicator is set and the message arrived on
441             // the same connection, add this consumer then mark it as
442             // received, but do not add it to the queue
443             if (getNoLocal()
444                 && message.getConnectionId() == getConnectionId()) {
445                 // inform them that we have processed the message
446                 return true;
447             }
448 
449             MessageHandle handle =
450                 MessageHandleFactory.getHandle(this, message);
451 
452             if (!_cache.containsHandle(handle)) {
453                 // if the message is not already in the cache then add it
454                 // and flag that we have added the message to the cache
455                 addMessage(handle, message);
456                 added = true;
457 
458                 schedule();
459             }
460         } catch (JMSException exception) {
461             _log.error("Failed to add message to endpoint", exception);
462         }
463 
464         return added;
465     }
466 
467     // implementation of DestinationCacheEventListener.messageRemoved
468     public synchronized boolean messageRemoved(MessageImpl message) {
469         boolean removed = false;
470 
471         try {
472             //retrieve the message handle
473             MessageHandle handle = 
474                 MessageHandleFactory.getHandle(this, message);
475 
476             if (_cache.containsHandle(handle)) {
477                 // call remove regardless whether it exists
478                 removeMessage(handle);
479                 removed = true;
480             }
481         } catch (JMSException exception) {
482             _log.error("Failed to remove message from endpoint", exception);
483         }
484 
485         return removed;
486     }
487 
488     // implementation of DestinationCacheEventListener.persistentMessageAdded
489     public synchronized boolean persistentMessageAdded(Connection connection,
490                                                        MessageImpl message)
491         throws PersistenceException {
492 
493         return messageAdded(message);
494     }
495 
496     // implementation of DestinationCacheEventListener.persistentMessageRemoved
497     public synchronized boolean persistentMessageRemoved(Connection connection,
498                                                          MessageImpl message)
499         throws PersistenceException {
500 
501         return messageRemoved(message);
502     }
503 
504     /***
505      * Stop/start message delivery
506      *
507      * @param stop if <code>true</code> to stop message delivery, otherwise
508      * start it
509      */
510     public synchronized void setStopped(boolean stop) {
511         if (stop) {
512             _stopped = true;
513         } else {
514             _stopped = false;
515             // schedule message delivery if needed
516             schedule();
517         }
518     }
519 
520     /***
521      * This message will return all unacked messages to the queue and allow
522      * them to be resent to the consumer with the redelivery flag on.
523      */
524     public synchronized void recover() {
525         // default behaviour is to do nothing
526     }
527 
528     /***
529      * Close this endpoint.
530      * <p>
531      * This synchronizes with {@link #deliverMessages} before invoking
532      * @link {#doClose}
533      */
534     public final void close() {
535         _stopped = true;
536 
537         synchronized (_lock) {
538             // synchronize with deliverMessages()            
539             _scheduler.remove(this); // remove this, if it is scheduled
540             _scheduled = false;
541 
542         }
543 
544         synchronized (this) {
545             doClose();
546 
547             // clear all messages in the cache
548             if (_cache != null) {
549                 _cache.clear();
550             }
551 
552             _closed = true;
553         }
554     }
555 
556     /***
557      * Set the message listener for this consmer. If a message listener is set
558      * then messages will be scheduled to be sent to it when they are available
559      * <p>
560      * Each consumer cache can only have a single message listener. To remove
561      * the message listener call this method with null argument
562      *
563      * @param listener - the message listener to add.
564      */
565     public synchronized void setMessageListener(
566         InternalMessageListener listener) {
567         _listener = listener;
568         if (listener == null) {
569             // remove this from the scheduler
570             _scheduler.remove(this);
571             _scheduled = false;
572         } else {
573             // scheduler for it to run
574             schedule();
575         }
576     }
577 
578     /***
579      * Return the specified message to the cache.
580      *
581      * @param handle - handle to return
582      */
583     public synchronized void returnMessage(MessageHandle handle) {
584         if (_cache != null) {
585             addMessage(handle);
586             schedule();
587         }
588     }
589 
590     /***
591      * Return the next message to the client. This will also mark the message as
592      * sent and move it to the sent queue
593      *
594      * @param wait - the number of milliseconds to wait
595      * @return MessageHandle - handle to the next message in the list
596      */
597     abstract public MessageHandle receiveMessage(long wait);
598 
599     // implementation of GarbageCollectable.collectGarbage
600     public void collectGarbage(boolean aggressive) {
601         if (aggressive) {
602             // clear all persistent messages in the cache
603             _cache.clearPersistentMessages();
604             if (_log.isDebugEnabled()) {
605                 _log.debug("Evicted all persistent messages from dest "
606                     + getDestination().getName() + " and name "
607                     + getId());
608             }
609         }
610 
611         if (_log.isDebugEnabled()) {
612             _log.debug("ENDPOINT- " + getDestination().getName() + ":"
613                 + getPersistentId() + " Messages: P["
614                 + _cache.getPersistentCount() + "] T["
615                 + _cache.getTransientCount() + "] Handles: ["
616                 + _cache.getHandleCount() + "]");
617         }
618     }
619 
620     /***
621      * Closes the endpoint
622      */
623     protected abstract void doClose();
624 
625     /***
626      * Schedule asynchronouse message delivery
627      */
628     protected void schedule() {
629         if (!_stopped && !_closed && _listener != null && !_scheduled) {
630             _scheduled = true;
631             _scheduler.add(this);
632         }
633     }
634 
635     /***
636      * Clear all messages in the cache, regardless of whether they are
637      * persistent or non-persistent
638      */
639     protected void clearMessages() {
640         _cache.clear();
641     }
642 
643     /***
644      * Check whether the vector of handles contains one or more persistent
645      * handles
646      *
647      * @param handles - collection of {@link MessageHandle} objects
648      * @return true if there is one or more persistent handles
649      */
650     protected boolean collectionHasPersistentHandles(Vector collection) {
651         boolean result = false;
652         Enumeration enum = collection.elements();
653 
654         while (enum.hasMoreElements()) {
655             if (enum.nextElement() instanceof PersistentMessageHandle) {
656                 result = true;
657                 break;
658             }
659         }
660 
661         return result;
662     }
663 
664     /***
665      * Add the handle to the cache
666      *
667      * @param handle - the message handle to add
668      */
669     protected void addMessage(MessageHandle handle) {
670         handle.setConsumerName(getPersistentId());
671         _cache.addHandle(handle);
672 
673         // check to see whether the consumer is waiting to
674         // be notified
675         if (isWaitingForMessage()) {
676             notifyMessageAvailable();
677         }
678     }
679 
680     /***
681      * Cache a handle and its corresponding message
682      *
683      * @param handle the handle to cache
684      * @param message the corresponding message to cache
685      */
686     protected void addMessage(MessageHandle handle, MessageImpl message) {
687         handle.setConsumerName(getPersistentId());
688         _cache.addMessage(handle, message);
689 
690         // check to see whether the consumer is waiting to
691         // be notified
692         if (isWaitingForMessage()) {
693             notifyMessageAvailable();
694         }
695     }
696 
697     /***
698      * Return the message for the specified handle
699      *
700      * @param handle - the handle
701      * @return MessageImpl - the associated message
702      */
703     protected MessageImpl getMessage(MessageHandle handle) {
704         return _cache.getMessage(handle);
705     }
706 
707     /***
708      * Remove the handle from the cache
709      *
710      * @param handle the handle to remove
711      * @return <code>true</code> if the message was removed
712      */
713     protected boolean removeMessage(MessageHandle handle) {
714         return _cache.removeHandle(handle);
715     }
716 
717     /***
718      * Determines if a message handle is already cached
719      *
720      * @return <code>true</code> if it is cached
721      */
722     protected boolean containsMessage(MessageHandle handle) {
723         return _cache.containsHandle(handle);
724     }
725 
726     /***
727      * Return the first message handle in the cache
728      *
729      * @return the first message or null if cache is empty
730      */
731     protected MessageHandle removeFirstMessage() {
732         return _cache.removeFirstHandle();
733     }
734 
735     /***
736      * Delete the message with the specified handle from the cache
737      *
738      * @param handle the handle
739      */
740     protected void deleteMessage(MessageHandle handle) {
741         _cache.removeMessage(handle.getMessageId());
742     }
743 
744     /***
745      * Determines if this endpoint has been stopped
746      *
747      * @return <code>true</code> if this endpoint has been stopped
748      */
749     protected final boolean isStopped() {
750         return _stopped;
751     }
752 
753     /***
754      * Check if the consumer is waiting for a message. If it is then
755      * notify it that a message has arrived
756      */
757     protected void notifyMessageAvailable() {
758         // if we need to notify then send out the request
759         if (isWaitingForMessage()) {
760             clearWaitingForMessage();
761 
762             try {
763                 _session.onMessageAvailable(getClientId());
764             } catch (Exception exception) {
765                 //getLogger().logError("Error in notifyMessageAvailable " +
766                 //    getDestination().getName() + " " + exception.toString());
767             }
768         }
769     }
770 
771     /***
772      * Check whether the endpoint is waiting for a message
773      *
774      * @return boolean
775      */
776     protected final boolean isWaitingForMessage() {
777         return _waitingForMessage;
778     }
779 
780     /***
781      * Set the waiting for message flag
782      */
783     protected final void setWaitingForMessage() {
784         _waitingForMessage = true;
785     }
786 
787     /***
788      * Clear the waiting for message flag
789      */
790     protected final void clearWaitingForMessage() {
791         _waitingForMessage = false;
792     }
793 
794     /***
795      * Helper for {@link #deliverMessages} implementations, to determines if 
796      * asynchronous message delivery should be stopped
797      *
798      * @return <code>true</code> if asynchronous message delivery should be 
799      * stopped
800      */
801     protected boolean stopDelivery() {
802         return (_stopped || getMessageCount() == 0 || _listener == null);
803     }
804 
805 }