View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2001-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: QueueBrowserEndpoint.java,v 1.24 2003/08/13 13:30:54 tanderson Exp $
44   *
45   * Date         Author  Changes
46   * 3/1/2001     jima    Created
47   */
48  package org.exolab.jms.messagemgr;
49  
50  import java.util.Vector;
51  
52  import javax.jms.InvalidSelectorException;
53  import javax.jms.Session;
54  
55  import org.apache.commons.logging.Log;
56  import org.apache.commons.logging.LogFactory;
57  
58  import org.exolab.jms.client.JmsDestination;
59  import org.exolab.jms.client.JmsQueue;
60  import org.exolab.jms.message.MessageHandle;
61  import org.exolab.jms.message.MessageImpl;
62  import org.exolab.jms.scheduler.Scheduler;
63  import org.exolab.jms.server.JmsServerSession;
64  
65  
66  /***
67   * A QueueBrowserEndpoint is a QueueListener to a QueueDestinationCache. This
68   * enables it to receive all the messages, which it then feeds down to the
69   * client side.
70   *
71   * @version     $Revision: 1.24 $ $Date: 2003/08/13 13:30:54 $
72   * @author      <a href="mailto:jima@exoffice.com">Jim Alateras</a>
73   */
74  public class QueueBrowserEndpoint
75      extends ConsumerEndpoint
76      implements QueueListener {
77  
78      /***
79       * The destination that this consumer subscribes too
80       */
81      private QueueDestinationCache _cache = null;
82  
83      /***
84       * The queue that this endpoint is subscribed too
85       */
86      private JmsQueue _queue = null;
87  
88      /***
89       * The logger
90       */
91      private static final Log _log =
92          LogFactory.getLog(QueueBrowserEndpoint.class);
93  
94  
95      /***
96       * Construct a QueueConsumerEndpoint, that extends ConsumerEndpoint and
97       * is used to manage both synchronous and asynchronous message delivery
98       * for this consumer.
99       *
100      * @param session - the owning session
101      * @param clientId - uniquely identifies the remote client within session
102      * @param destination - destination that this object was created for
103      * @param selector - the selector attached to the consumer, if any.
104      * @param scheduler - used to schedule async message delivery.
105      * @throws InvalidSelectorException
106      */
107     QueueBrowserEndpoint(JmsServerSession session, long clientId,
108                          JmsQueue destination, String selector,
109                          Scheduler scheduler)
110         throws InvalidSelectorException {
111         super(session, clientId, selector, scheduler);
112 
113         _cache = (QueueDestinationCache)
114             DestinationManager.instance().getDestinationCache(destination);
115         // if a cache is not active then we need to create one.
116         if (_cache == null) {
117             _cache = (QueueDestinationCache)
118                 DestinationManager.instance().createDestinationCache(
119                     destination);
120         }
121 
122         // cache the destination
123         _queue = destination;
124 
125         // set up the message cache and register itself as a listener to the 
126         // cache
127         _cache.addQueueListener(this);
128         _cache.playbackMessages(this);
129 
130     }
131 
132     /***
133      * Deliver messages in the cache to the consumer.<p>
134      * This is not relevant to QueueBrowsers, and thus shouldn't be invoked.
135      *
136      * @return <code>false</code>
137      */
138     public boolean deliverMessages() {
139         _log.error(
140             "QueueBrowserEndpoint.deliverMessages() should never be called",
141             new Exception());
142         return false;
143     }
144 
145     // implementation of ConsumerEndpoint.receiveMessage
146     public synchronized MessageHandle receiveMessage(long wait) {
147         throw new UnsupportedOperationException(
148             "Cannot call receiveMessage for QueueBrowser");
149     }
150 
151     /***
152      * Return, at most, count messages from the cache.
153      *
154      * @param count - the max number of messages to receive
155      * @return Vector - number of messages
156      */
157     public synchronized Vector receiveMessages(int count) {
158         Vector messages = new Vector();
159         int index = 0;
160         while (index < count) {
161 
162             // check if we should exit the loop
163             if (isStopped() || getMessageCount() == 0) {
164                 break;
165             }
166 
167             // remove the first message from the list and check
168             // that it is not null. Synchronize the removal of
169             // the message but not the sending to the remote
170             // consumer
171             try {
172                 MessageHandle handle = removeFirstMessage();
173 
174                 MessageImpl m = handle.getMessage();
175                 if (m != null) {
176                     // add it to the list of messages to send
177                     // but only deliver messsages that satisfy the
178                     // selection criteria.
179                     if ((_selector == null) ||
180                         (_selector.selects(m))) {
181                         handle.setClientId(getClientId());
182                         messages.addElement(handle);
183                         ++index;
184                     } else {
185                         // drop the message
186                     }
187                 } else {
188                     // message may have been consumed in the interim
189                 }
190             } catch (Exception exception) {
191                 _log.error(exception, exception);
192             }
193         }
194 
195         return messages;
196     }
197 
198 
199     // implementation of ConsumerEndpoint.getDestination
200     public JmsDestination getDestination() {
201         return _queue;
202     }
203 
204     // implementation of ConsumerEndpoint.unregister
205     public void unregister() {
206         _cache.unregisterConsumer(this);
207     }
208 
209     // override implementation of ConsumerEndpoint.returnMessage
210     public void returnMessage(MessageHandle handle) {
211         // no-op at the moment. What happens to a QueueBrowser when
212         // a transaction aborts.
213     }
214 
215     // implementation of QueueListener.onMessage
216     public void onMessage(MessageImpl message) {
217         // delegate to message added, regardless of whether or not
218         // the message is persistent.
219         messageAdded(message);
220     }
221 
222     // override ConsumerEndpoint.setMessageListener
223     public void setMessageListener(InternalMessageListener listener) {
224         _log.error(
225             "QueueBrowserEndpoint.setMessageListener should never be called");
226         Thread.currentThread().dumpStack();
227     }
228 
229     /***
230      * Closes this endpoint
231      */
232     protected void doClose() {
233         // unregister from the DestinationCache
234         _cache.removeQueueListener(this);
235     }
236 
237 }