View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2001-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: QueueConsumerEndpoint.java,v 1.35 2003/08/17 01:32:24 tanderson Exp $
44   *
45   * Date         Author  Changes
46   * 3/1/2001     jima    Created
47   */
48  package org.exolab.jms.messagemgr;
49  
50  import java.sql.Connection;
51  
52  import javax.jms.InvalidSelectorException;
53  import javax.jms.JMSException;
54  import javax.jms.Session;
55  import javax.transaction.TransactionManager;
56  
57  import org.apache.commons.logging.Log;
58  import org.apache.commons.logging.LogFactory;
59  
60  import org.exolab.jms.JMSErrorCodes;
61  import org.exolab.jms.client.JmsDestination;
62  import org.exolab.jms.client.JmsQueue;
63  import org.exolab.jms.message.MessageHandle;
64  import org.exolab.jms.message.MessageImpl;
65  import org.exolab.jms.persistence.PersistenceException;
66  import org.exolab.jms.scheduler.Scheduler;
67  import org.exolab.jms.server.ClientDisconnectionException;
68  import org.exolab.jms.server.JmsServerSession;
69  
70  
71  /***
72   * A QueueConsumerEndpoint extends {@link ConsumerEndpoint}. This object
73   * shares access to a particular queue with other QueueConsumerEndpoint
74   * instances.
75   *
76   * @version     $Revision: 1.35 $ $Date: 2003/08/17 01:32:24 $
77   * @author      <a href="mailto:jima@exoffice.com">Jim Alateras</a>
78   */
79  public class QueueConsumerEndpoint
80      extends ConsumerEndpoint {
81  
82      /***
83       * The destination that this consumer subscribes too
84       */
85      private QueueDestinationCache _cache = null;
86  
87      /***
88       * The queue that this endpoint is subscribed to
89       */
90      private JmsQueue _queue = null;
91  
92      /***
93       * The maximum number of messages that a dispatch can deliver at any one
94       * time
95       */
96      private final int MAX_MESSAGES = 200;
97  
98      /***
99       * The logger
100      */
101     private static final Log _log =
102         LogFactory.getLog(QueueConsumerEndpoint.class);
103 
104 
105     /***
106      * Construct a QueueConsumerEndpoint, that extends ConsumerEndpoint and
107      * is used to manage both synchronous and asynchronous message delivery
108      * for this consumer.
109      *
110      * @param session - the owning session
111      * @param clientId - uniquely identifies the remote client within session
112      * @param destination - destination that this object was created for
113      * @param selector - the selector attached to the consumer, if any.
114      * @param scheduler - used to schedule async message delivery.
115      * @exception InvalidSelectorException
116      */
117     QueueConsumerEndpoint(JmsServerSession session, long clientId,
118                           JmsQueue queue, String selector, Scheduler scheduler)
119         throws InvalidSelectorException {
120         super(session, clientId, selector, scheduler);
121 
122         // now we should register the endpoint with the destination cache. If
123         // the destination cache does not exist then we should create it.
124         // All other methods in this class assume that a non-null cache
125         // exists.
126         if (queue != null) {
127             _queue = queue;
128             _cache = (QueueDestinationCache)
129                 DestinationManager.instance().getDestinationCache(queue);
130             if (_cache == null) {
131                 _cache = (QueueDestinationCache)
132                     DestinationManager.instance().createDestinationCache(queue);
133             }
134             _cache.registerConsumer(this);
135         }
136     }
137 
138     /***
139      * Return the number of unsent messages
140      *
141      * @return the number of unsent messages
142      */
143     public int getMessageCount() {
144         return _cache.getMessageCount();
145     }
146 
147     /***
148      * Deliver messages in the cache to the consumer
149      *
150      * @return <code>true</code> if the endpoint should be rescheduled
151      */
152     public boolean deliverMessages() {
153         boolean reschedule = true;
154 
155         for (int index = 0; index < MAX_MESSAGES; index++) {
156             // check if we should exit the loop
157             if (stopDelivery()) {
158                 reschedule = false;
159                 break;
160             }
161 
162             MessageHandle handle = null;
163             try {
164                 handle = _cache.getMessage(this);
165                 
166                 // if the handle is null then there are no more messages
167                 // to remove so break
168                 if (handle == null) {
169                     reschedule = false;
170                     break;
171                 }
172                 
173                 // set the delivered flag only when the message is
174                 // processed in the session object. Send the message
175                 // handle to the listener.
176                 handle.setClientId(getClientId());
177                 _listener.onMessage(handle, true);
178             } catch (ClientDisconnectionException exception) {
179                 if (handle != null) {
180                     _cache.returnMessage(handle);
181                 }
182                 _listener = null;
183                 _log.error(exception, exception);
184             } catch (JMSException exception) {
185                 if (exception.getErrorCode().equals(
186                         JMSErrorCodes.FailedToResolveHandle)) {
187                     // do not return message back to the cache
188                     _log.error("Dropping handle " + handle
189                                + " since we cannot resolve it.");
190                 } else {
191                     _log.error(exception, exception);
192                     if (handle != null) {
193                         _cache.returnMessage(handle);
194                     }
195                 }
196             } catch (Exception exception) {
197                 if (handle != null) {
198                     _cache.returnMessage(handle);
199                 }
200                 _log.error(exception);
201             }
202         } 
203         return reschedule;
204     }
205 
206     // override ConsumerEndpoint.setMessageListener
207     public void setMessageListener(InternalMessageListener listener) {
208         // if the listener is null we unregister from the destination. If
209         // the listener is not null then we register with it. When we
210         // unregister we need to determine what happens to any messages
211         // in the sent and
212         if (listener == null) {
213             _cache.unregisterConsumer(this);
214         } else {
215             _cache.registerConsumer(this);
216         }
217 
218         super.setMessageListener(listener);
219     }
220 
221     // implementation of ConsumerEndpoint.receiveMessage
222     public MessageHandle receiveMessage(long wait) {
223         MessageHandle handle = getMessageFromCache();
224         if ((handle == null) &&
225             (wait >= 0)) {
226 
227             // set a flag indicating that we are waiting
228             // for a message
229             setWaitingForMessage();
230 
231             // perform a double check and we receive a
232             // message then clear the previous set flag
233             handle = getMessageFromCache();
234             if (handle != null) {
235                 clearWaitingForMessage();
236             }
237         }
238 
239         return handle;
240     }
241 
242     /***
243      * Check whether a listener has been registered with this endpoint to
244      * support async message delivery
245      *
246      * @return boolean - true if it has
247      */
248     public boolean hasMessageListener() {
249         return _listener != null;
250     }
251 
252     // implementation of ConsumerEndpoint.unregister
253     public void unregister() {
254         _cache.unregisterConsumer(this);
255     }
256 
257     // implementation of ConsumerEndpoint.getDestination
258     public JmsDestination getDestination() {
259         return _queue;
260     }
261 
262     // override ConsumerEndpoint.messageAdded
263     public boolean messageAdded(MessageImpl message) {
264         if (_listener != null) {
265             schedule();
266         } else {
267             // check to see whether the consumer is waiting to
268             // be notified
269             notifyMessageAvailable();
270         }
271 
272         return true;
273     }
274 
275     // override ConsumerEndpoint.persistentMessageAdded
276     public boolean persistentMessageAdded(Connection connection,
277                                           MessageImpl message)
278         throws PersistenceException {
279         if (_listener != null) {
280             schedule();
281         } else {
282             // check to see whether the consumer is waiting to
283             // be notified
284             notifyMessageAvailable();
285         }
286 
287         return true;
288     }
289 
290     // override ConsumerEndpoint.messageRemoved
291     public synchronized boolean messageRemoved(MessageImpl message) {
292         return false;
293     }
294 
295     /***
296      * Closes this endpoint
297      */
298     protected void doClose() {
299         // unregister from the DestinationCache
300         _cache.unregisterConsumer(this);
301     }
302 
303     /***
304      * Return a message from the corresponding cache for this consumer or
305      * null if one is not available
306      *
307      * @return MessageHandle - the handle or null
308      */
309     private MessageHandle getMessageFromCache() {
310         MessageHandle handle = _cache.getMessage(this);
311         if (handle != null) {
312             handle.setClientId(getClientId());
313         }
314 
315         return handle;
316     }
317 
318 } //-- QueueConsumerEndpoint
319