View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: JmsMessageConsumer.java,v 1.37 2004/01/20 14:14:21 tanderson Exp $
44   */
45  package org.exolab.jms.client;
46  
47  import javax.jms.JMSException;
48  import javax.jms.Message;
49  import javax.jms.MessageConsumer;
50  import javax.jms.MessageListener;
51  
52  import org.apache.commons.logging.Log;
53  import org.apache.commons.logging.LogFactory;
54  
55  import org.exolab.jms.message.MessageImpl;
56  
57  
58  /***
59   * Client implementation of the <code>javax.jms.MessageConsumer</code>
60   * interface
61   *
62   * @version     $Revision: 1.37 $ $Date: 2004/01/20 14:14:21 $
63   * @author      <a href="mailto:jima@comware.com.au">Jim Alateras</a>
64   * @author      <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
65   */
66  abstract class JmsMessageConsumer
67      implements MessageListener, MessageConsumer {
68  
69      /***
70       * A message listener may be assigned to this session, for
71       * asynchronous message delivery.
72       */
73      private MessageListener _messageListener = null;
74  
75      /***
76       * The session which created this
77       */
78      private JmsSession _session = null;
79  
80      /***
81       * The message selector, for filtering messages. May be <code>null</code>
82       */
83      private String _selector = null;
84  
85      /***
86       * Indicates if the session is closed
87       */
88      private volatile boolean _closed = false;
89  
90      /***
91       * The consumer's identity, allocated by the session
92       */
93      private long _clientId = -1;
94  
95      /***
96       * This is the last time that the listener had been set through
97       * {@link #setMessageListener}
98       */
99      private long _listenerSetTimestamp = 0;
100 
101     /***
102      * This is the last message id asynchronously delivered to the listener.
103      */
104     private String _lastMessageDelivered;
105 
106     /***
107      * The logger
108      */
109     private static final Log _log =
110         LogFactory.getLog(JmsMessageConsumer.class);
111 
112 
113     /***
114      * Construct a new <code>JmsMessageProducer</code>.
115      *
116      * @param session the session responsible for the consumer
117      * @param clientId the session allocated consumer identifier
118      * @param selector the message selector. May be <code>null</code
119      */
120     public JmsMessageConsumer(JmsSession session, long clientId,
121                               String selector) {
122         if (session ==  null) {
123             throw new IllegalArgumentException("Argument 'session' is null");
124         }
125         _session = session;
126         _clientId = clientId;
127         _selector = selector;
128     }
129 
130     /***
131      * Return the session allocated consumer identifier
132      *
133      * @return the identifier allocated to this by the session
134      */
135     public long getClientId() {
136         return _clientId;
137     }
138 
139     /***
140      * Return the message consumer's message selector expression
141      *
142      * @return the selector expression, or <code>null</code> if one isn't set
143      */
144     public String getMessageSelector() {
145         return _selector;
146     }
147 
148     /***
149      * Return the consumer's listener
150      *
151      * @return the listener for the consumer, or <code>null</code> if there
152      * isn't one set
153      */
154     public MessageListener getMessageListener() {
155         return _messageListener;
156     }
157 
158     /***
159      * Set the consumer's listener
160      *
161      * @param listener the message listener, or <code>null</code> to deregister
162      * an existing listener
163      * @throws JMSException if the listener cannot be set
164      */
165     public void setMessageListener(MessageListener listener)
166         throws JMSException {
167         // if listener is not null then enable asynchronous delivery
168         // otherwise disable it
169         if (listener != null) {
170             if (_messageListener == null) {
171                 // previously asynchronouse messaging was disabled
172                 _listenerSetTimestamp = System.currentTimeMillis();
173                 _messageListener = listener;
174                 _session.setMessageListener(this);
175             } else {
176                 // asynch message deliver is enabled, just changing the
177                 // client side receiving entity.
178                 _messageListener = listener;
179             }
180         } else {
181             if (_messageListener != null) {
182                 _session.removeMessageListener(this);
183                 _messageListener = listener;
184             }
185         }
186 
187         // reset the lastMessageDelivered regardless what the value
188         // of the listener is.
189         _lastMessageDelivered = null;
190     }
191 
192     /***
193      * Receive the next message produced for this consumer.
194      * This call blocks indefinitely until a message is produced or until
195      * this message consumer is closed.
196      *
197      * @return the next message produced for this consumer, or
198      * <code>null</code> if this consumer is concurrently closed
199      * @throws JMSException if the next message can't be received
200      */
201     public Message receive() throws JMSException {
202         return retrieveMessage(0);
203     }
204 
205     /***
206      * Receive the next message that arrives within the specified
207      * timeout interval.
208      * This call blocks until a message arrives, the timeout expires, or this
209      * message consumer is closed. A timeout of zero never expires and the call
210      * blocks indefinitely.
211      *
212      * @param timeout the timeout interval, in milliseconds
213      * @return the next message produced for this consumer, or
214      * <code>null</code> if the timeout expires or the consumer concurrently
215      * closed
216      * @throws JMSException if the next message can't be received
217      */
218     public Message receive(long timeout) throws JMSException {
219         return retrieveMessage(timeout);
220     }
221 
222     /***
223      * Receive the next message if one is immediately available
224      *
225      * @return the next message produced for this consumer, or
226      * <code>null</code> if one is not available
227      * @throws JMSException if the next message can't be received
228      */
229     public Message receiveNoWait() throws JMSException {
230         return retrieveMessage(-1);
231     }
232 
233     /***
234      * Close the consumer.
235      * This call blocks until a receive or message listener in progress has
236      * completed. A blocked consumer receive call returns <code>null</code>
237      * when this consumer is closed.
238      *
239      * @throws JMSException if this consumer can't be closed
240      */
241     public synchronized void close() throws JMSException {
242         _closed = true;
243 
244         // wake up any blocked threads and let them complete
245         notifyAll();
246 
247         _messageListener = null;
248         _session = null;
249         _selector = null;
250     }
251 
252     /***
253      * Release all resources used by this consumer
254      *
255      * @throws JMSException if this consumer can't be destroyed
256      */
257     public synchronized void destroy() throws JMSException {
258         _closed = true;
259 
260         // wake up any blocked threads and let them complete
261         notifyAll();
262 
263         _messageListener = null;
264         _session = null;
265         _selector = null;
266     }
267 
268     /***
269      * Handles messages received asynchronously via the owning session,
270      * passing them to the registered listener
271      *
272      * @param message the message received
273      */
274     public synchronized void onMessage(Message message) {
275         try {
276             if (_messageListener != null) {
277                 // drop all messages if they were received before the listener
278                 // had been set.
279                 long rcvd = message.getLongProperty("JMSXRcvTimestamp");
280                 if (rcvd < _listenerSetTimestamp) {
281                     return;
282                 }
283 
284                 // According to section 4.5.2 Asynchronous Delivery messages
285                 // delivered to consumers, through the MessageListener
286                 // interface in a transacted session must be treated the same
287                 // as synchronous delivery.
288                 // Need to set this field before we actually deliver the
289                 // message since the client can actually call
290                 // setMessageListener in onMessage()
291                 _lastMessageDelivered = ((MessageImpl) message).getId();
292                 _messageListener.onMessage(message);
293             }
294         } catch (JMSException exception) {
295             //report the exception
296             _log.error("Error in onMessage", exception);
297         }
298     }
299 
300     /***
301      * Retrieve the next message for the consumer.
302      *
303      * @param wait the maximum time to wait for a message, in milliseconds.
304      * If <code>-1</code>, don't wait, if <code>0</code> wait indefinitely,
305      * otherwise wait the specified time.
306      * @return the received message, or <code>null</code>, if no message is
307      * available
308      * @throws JMSException if an error occurs retrieving the message,
309      * the session is closed, or a message listener is set.
310      */
311     public Message retrieveMessage(long wait) throws JMSException {
312         if (_messageListener != null) {
313             // cannot call this method when a listener is defined
314             throw new JMSException("Can't receive when listener defined");
315         }
316 
317         if (_closed) {
318             // cannot call this method when a listener is defined
319             throw new JMSException("Can't receive when session closed");
320         }
321 
322         MessageImpl message =
323             (MessageImpl) _session.retrieveMessage(_clientId, wait);
324         if (message != null) {
325             _lastMessageDelivered = message.getId();
326         }
327 
328         return message;
329     }
330 
331     /***
332      * Return the last message asynchronously delivered to the consumer
333      *
334      * @return the last message delivered
335      */
336     public String getLastMessageDelivered() {
337         return _lastMessageDelivered;
338     }
339 
340     /***
341      * Determines if the consumer is closed
342      *
343      * @return <code>true</code> if the consumer is closed
344      */
345     public boolean isClosed() {
346         return _closed;
347     }
348 
349     /***
350      * Returns the session that created this consumer
351      *
352      * @return the session that created this consumer
353      */
354     protected JmsSession getSession() {
355         return _session;
356     }
357 
358 }