View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: JmsMessageConsumer.java,v 1.4 2007/01/24 12:00:28 tanderson Exp $
44   */
45  package org.exolab.jms.client;
46  
47  import javax.jms.Destination;
48  import javax.jms.JMSException;
49  import javax.jms.Message;
50  import javax.jms.MessageConsumer;
51  import javax.jms.MessageListener;
52  
53  import org.apache.commons.logging.Log;
54  import org.apache.commons.logging.LogFactory;
55  
56  import org.exolab.jms.message.MessageImpl;
57  
58  import java.rmi.RemoteException;
59  
60  
61  /***
62   * Client implementation of the <code>javax.jms.MessageConsumer</code>
63   * interface.
64   *
65   * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a>
66   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
67   * @version $Revision: 1.4 $ $Date: 2007/01/24 12:00:28 $
68   */
69  class JmsMessageConsumer
70          implements JmsMessageListener, MessageConsumer {
71  
72      /***
73       * The session which created this.
74       */
75      private JmsSession _session = null;
76  
77      /***
78       * The consumer's identity, allocated by the server.
79       */
80      private final long _consumerId;
81  
82      /***
83       * The destination to receive messages from.
84       */
85      private final Destination _destination;
86  
87      /***
88       * A message listener may be assigned to this session, for asynchronous
89       * message delivery.
90       */
91      private MessageListener _listener = null;
92  
93      /***
94       * The message selector, for filtering messages. May be <code>null</code>.
95       */
96      private String _selector = null;
97  
98      /***
99       * Indicates if the session is closed.
100      */
101     private volatile boolean _closed = false;
102 
103     /***
104      * The logger.
105      */
106     private static final Log _log =
107             LogFactory.getLog(JmsMessageConsumer.class);
108 
109 
110     /***
111      * Construct a new <code>JmsMessageProducer</code>.
112      *
113      * @param session     the session responsible for the consumer
114      * @param consumerId  the identity of this consumer
115      * @param destination the destination to receive messages from
116      * @param selector    the message selector. May be <code>null</code
117      */
118     public JmsMessageConsumer(JmsSession session, long consumerId,
119                               Destination destination, String selector) {
120         if (session == null) {
121             throw new IllegalArgumentException("Argument 'session' is null");
122         }
123         if (destination == null) {
124             throw new IllegalArgumentException(
125                     "Argument 'destination' is null");
126         }
127         _session = session;
128         _consumerId = consumerId;
129         _destination = destination;
130         _selector = selector;
131     }
132 
133     /***
134      * Return the message consumer's message selector expression.
135      *
136      * @return the selector expression, or <code>null</code> if one isn't set
137      */
138     public String getMessageSelector() {
139         return _selector;
140     }
141 
142     /***
143      * Return the consumer's listener.
144      *
145      * @return the listener for the consumer, or <code>null</code> if there
146      *         isn't one set
147      */
148     public MessageListener getMessageListener() {
149         return _listener;
150     }
151 
152     /***
153      * Set the consumer's listener.
154      *
155      * @param listener the message listener, or <code>null</code> to deregister
156      *                 an existing listener
157      * @throws JMSException if the listener cannot be set
158      */
159     public void setMessageListener(MessageListener listener)
160             throws JMSException {
161         // if listener is not null then enable asynchronous delivery
162         // otherwise disable it
163         if (listener != null) {
164             if (_listener == null) {
165                 // previously asynchronouse messaging was disabled
166                 _listener = listener;
167                 _session.setMessageListener(this);
168             } else {
169                 // asynch message deliver is enabled, just changing the
170                 // client side receiving entity.
171                 _listener = listener;
172             }
173         } else {
174             if (_listener != null) {
175                 _session.removeMessageListener(this);
176                 _listener = listener;
177             }
178         }
179     }
180 
181     /***
182      * Receive the next message produced for this consumer. This call blocks
183      * indefinitely until a message is produced or until this message consumer
184      * is closed.
185      *
186      * @return the next message produced for this consumer, or <code>null</code>
187      *         if this consumer is concurrently closed
188      * @throws JMSException if the next message can't be received
189      */
190     public Message receive() throws JMSException {
191         return receive(0);
192     }
193 
194     /***
195      * Receive the next message that arrives within the specified timeout
196      * interval. This call blocks until a message arrives, the timeout expires,
197      * or this message consumer is closed. A timeout of zero never expires and
198      * the call blocks indefinitely.
199      *
200      * @param timeout the timeout interval, in milliseconds
201      * @return the next message produced for this consumer, or <code>null</code>
202      *         if the timeout expires or the consumer concurrently closed
203      * @throws JMSException if the next message can't be received
204      */
205     public Message receive(long timeout) throws JMSException {
206         checkReceive();
207         return _session.receive(_consumerId, timeout);
208     }
209 
210     /***
211      * Receive the next message if one is immediately available.
212      *
213      * @return the next message produced for this consumer, or <code>null</code>
214      *         if one is not available
215      * @throws JMSException if the next message can't be received
216      */
217     public Message receiveNoWait() throws JMSException {
218         checkReceive();
219         return _session.receiveNoWait(_consumerId);
220     }
221 
222     /***
223      * Close the consumer. This call blocks until a receive or message listener
224      * in progress has completed. A blocked consumer receive call returns
225      * <code>null</code> when this consumer is closed.
226      *
227      * @throws JMSException if this consumer can't be closed
228      */
229     public synchronized void close() throws JMSException {
230         if (!_closed) {
231             try {
232                 _closed = true;
233                 _session.removeConsumer(this);
234 
235                 // wake up any blocked threads and let them complete
236                 notifyAll();
237             } finally {
238                 _listener = null;
239                 _session = null;
240                 _selector = null;
241             }
242         }
243     }
244 
245     /***
246      * Deliver a message.
247      *
248      * @param message the message to deliver
249      * @return <code>true</code> if the message was delivered; otherwise
250      *         <code>false</code>.
251      */
252     public boolean onMessage(MessageImpl message) {
253         boolean delivered = false;
254         try {
255             if (_listener != null) {
256                 _listener.onMessage(message);
257                 delivered = true;
258             } else {
259                 _log.error("NessageListener no longer registered");
260             }
261         } catch (Throwable exception) {
262             _log.error("MessageListener threw exception", exception);
263         }
264         return delivered;
265     }
266 
267     /***
268      * Informs the session that there is a message available for a synchronous
269      * consumer.
270      */
271     public void onMessageAvailable() throws RemoteException {
272         // no-op
273     }
274 
275     /***
276      * Returns the destination to receive messages from.
277      *
278      * @return the destination to receive messages from
279      */
280     protected Destination getDestination() {
281         return _destination;
282     }
283 
284     /***
285      * Returns the identity of this consumer.
286      *
287      * @return the identity of this consumer
288      */
289     protected long getConsumerId() {
290         return _consumerId;
291     }
292 
293     /***
294      * Returns the session that created this consumer.
295      *
296      * @return the session that created this consumer
297      */
298     protected JmsSession getSession() {
299         return _session;
300     }
301 
302     /***
303      * Determines if the consumer can perform receives.
304      *
305      * @throws JMSException if the consumer can't perform a receive
306      */
307     private void checkReceive() throws JMSException {
308         if (_listener != null) {
309             // cannot call this method when a listener is defined
310             throw new JMSException("Can't receive when listener defined");
311         }
312 
313         if (_closed) {
314             throw new JMSException("Can't receive when session closed");
315         }
316     }
317 
318 }