View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   */
43  package org.exolab.jms.messagemgr;
44  
45  import javax.jms.InvalidSelectorException;
46  import javax.jms.JMSException;
47  
48  import org.exolab.jms.client.JmsDestination;
49  import org.exolab.jms.message.MessageImpl;
50  import org.exolab.jms.selector.Selector;
51  
52  
53  /***
54   * Abstract implementation of the {@link ConsumerEndpoint} interface.
55   *
56   * @author <a href="mailto:jima@exoffice.com">Jim Alateras</a>
57   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
58   * @version $Revision: 1.3 $ $Date: 2005/08/30 06:25:47 $
59   */
60  public abstract class AbstractConsumerEndpoint implements ConsumerEndpoint {
61  
62      /***
63       * The identity of this consumer.
64       */
65      private final long _id;
66  
67      /***
68       * The destination the consumer is acceesing.
69       */
70      private final JmsDestination _destination;
71  
72      /***
73       * The message selector associated with this consumer. May be
74       * <code>null</code>.
75       */
76      private Selector _selector;
77  
78      /***
79       * If true, and the destination is a topic, inhibits the delivery of
80       * messages published by its own connection.
81       */
82      private boolean _noLocal;
83  
84      /***
85       * Determines if this consumer is asynchronous.
86       */
87      private boolean _asynchronous = false;
88  
89      /***
90       * The receive timeout, if the client is performing a blocking receive. A
91       * value of <code>0</code> indicates the client is blocking indefinitely.
92       */
93      private Condition _waitingForMessage;
94  
95      /***
96       * The listener to notify when a message is available.
97       */
98      private ConsumerEndpointListener _listener = null;
99  
100     /***
101      * Determines if this is (or is in the process of being) closed.
102      */
103     private final Flag _closed = new Flag(false);
104 
105 
106     /***
107      * Construct a new <code>ConsumerEndpoint</code>.
108      * <p/>
109      * The destination and selector determine where it will be sourcing its
110      * messages from, and scheduler is used to asynchronously deliver messages
111      * to the consumer.
112      *
113      * @param consumerId  the identity of this consumer
114      * @param destination the destination to access
115      * @param selector    the message selector. May be <code>null</code>
116      * @param noLocal     if true, and the destination is a topic, inhibits the
117      *                    delivery of messages published by its own connection.
118      * @throws InvalidSelectorException if the selector is not well formed
119      */
120     public AbstractConsumerEndpoint(long consumerId, JmsDestination destination,
121                                     String selector, boolean noLocal)
122             throws InvalidSelectorException {
123         if (destination == null) {
124             throw new IllegalArgumentException(
125                     "Argument 'destination' is null");
126         }
127         _id = consumerId;
128         _destination = destination;
129         setSelector(selector);
130         _noLocal = noLocal;
131     }
132 
133     /***
134      * Returns the identity of this consumer.
135      *
136      * @return the identity of this consumer
137      */
138     public long getId() {
139         return _id;
140     }
141 
142     /***
143      * Determines if this is a persistent or non-persistent consumer.
144      * <p/>
145      * If persistent, then the consumer is persistent accross subscriptions and
146      * server restarts, and {@link #getPersistentId} returns a non-null value.
147      *
148      * @return <code>false</code>
149      */
150     public boolean isPersistent() {
151         return false;
152     }
153 
154     /***
155      * Returns the persistent identifier for this consumer. This is the identity
156      * of the consumer which is persistent across subscriptions and server
157      * restarts.
158      *
159      * @return <code>null</code>
160      */
161     public String getPersistentId() {
162         return null;
163     }
164 
165     /***
166      * Return the destination that this consumer is accessing.
167      *
168      * @return the destination that this consumer is accessing
169      */
170     public JmsDestination getDestination() {
171         return _destination;
172     }
173 
174     /***
175      * Determines if this consumer can consume messages from the specified
176      * destination.
177      *
178      * @param destination the destination
179      * @return <code>true</code> if the consumer can consume messages from
180      *         <code>destination</code>; otherwise <code>false</code>
181      */
182     public boolean canConsume(JmsDestination destination) {
183         return _destination.equals(destination);
184     }
185 
186     /***
187      * Returns the message selector.
188      *
189      * @return the message selector, or <code>null</code> if none was specified
190      *         by the client
191      */
192     public Selector getSelector() {
193         return _selector;
194     }
195 
196     /***
197      * Determines if a message is selected by the consumer.
198      *
199      * @param message the message to check
200      * @return <code>true</code> if the message is selected; otherwise
201      *         <code>false</code>
202      */
203     public boolean selects(MessageImpl message) {
204         return (_selector == null || _selector.selects(message));
205     }
206 
207     /***
208      * Returns if locally produced messages are being inhibited.
209      *
210      * @return <code>true</code> if locally published messages are being
211      *         inhibited.
212      */
213     public boolean getNoLocal() {
214         return _noLocal;
215     }
216 
217     /***
218      * Return the next available message to the client.
219      *
220      * @param cancel
221      * @return the next message, or <code>null</code> if none is available
222      * @throws JMSException for any error
223      */
224     public final synchronized MessageHandle receive(final Condition cancel)
225             throws JMSException {
226         MessageHandle result = null;
227         if (!_closed.get()) {
228             Condition condition = new Condition() {
229                 public boolean get() {
230                     return _closed.get() || cancel.get();
231                 }
232             };
233             result = doReceive(condition);
234         }
235         return result;
236     }
237 
238     /***
239      * Indicates if this is an asynchronous consumer.
240      * <p/>
241      * An asynchronous consumer has a client <code>MessageConsumer</code> with
242      * an associated <code>MessageListener</code>.
243      *
244      * @param asynchronous if <code>true</code> marks this as an asynchronous
245      *                     consumer
246      */
247     public synchronized void setAsynchronous(boolean asynchronous) {
248         _asynchronous = asynchronous;
249     }
250 
251     /***
252      * Determines if this is an asynchronous consumer.
253      *
254      * @return <code>true</code> if this is an asynchronous consumer; otherwise
255      *         <code>false</code>
256      */
257     public synchronized boolean isAsynchronous() {
258         return _asynchronous;
259     }
260 
261     /***
262      * Indicates that the client is currently waiting for a message.
263      *
264      * @param condition the condition to evaluate to determine if the client is
265      *                  waiting for message. May be <code>null</code>.
266      */
267     public synchronized void setWaitingForMessage(Condition condition) {
268         _waitingForMessage = condition;
269     }
270 
271     /***
272      * Determines if the client is currently waiting for a message.
273      *
274      * @return <code>true</code> if the client is waiting for messages;
275      *         otherwise <code>false</code>
276      */
277     public synchronized boolean isWaitingForMessage() {
278         return _waitingForMessage != null && _waitingForMessage.get();
279     }
280 
281     /***
282      * Set the listener for this consumer. If a listener is set, it is notified
283      * when messages become available.
284      *
285      * @param listener the listener to add, or <code>null</code> to remove an
286      *                 existing listener
287      */
288     public synchronized void setListener(ConsumerEndpointListener listener) {
289         _listener = listener;
290     }
291 
292     /***
293      * Determines if this consumer is closed, or in the process of being
294      * closed.
295      *
296      * @return <code>true</code> if this consumer is closed; otherwise
297      *         <code>false</code>
298      */
299     public final boolean isClosed() {
300         return _closed.get();
301     }
302 
303     /***
304      * Close this endpoint.
305      */
306     public final void close() {
307         _closed.set(true);
308         synchronized (this) {
309             _listener = null;
310             doClose();
311         }
312     }
313 
314     /***
315      * Returns a stringified version of the consumer.
316      *
317      * @return a stringified version of the consumer
318      */
319     public String toString() {
320         return _id + ":" + getDestination();
321     }
322 
323     /***
324      * Return the next available message to the client.
325      * <p/>
326      * This method will not be invoked if the consumer is being closed, however
327      * it is possible for {@link #close()} to be invoked while this method is in
328      * progress. Implementations should therefore invoke isClosed() to determine
329      * if the consumer is in the process of being closed, and if so, return
330      * <code>null</code>.
331      *
332      * @param cancel
333      * @return the next message, or <code>null</code> if none is available
334      * @throws JMSException for any error
335      */
336     protected abstract MessageHandle doReceive(Condition cancel)
337             throws JMSException;
338 
339     /***
340      * Closes the endpoint.
341      */
342     protected abstract void doClose();
343 
344     /***
345      * Notify the listener that a message is available for this consumer.
346      */
347     protected synchronized void notifyMessageAvailable() {
348         if (_listener != null && !_closed.get()) {
349             _listener.messageAvailable(this);
350         }
351     }
352 
353     /***
354      * Sets the message selector.
355      *
356      * @param selector the message selector. May be <code>null</code>
357      * @throws InvalidSelectorException if the selector is not well formed
358      */
359     protected void setSelector(String selector)
360             throws InvalidSelectorException {
361         _selector = (selector != null) ? new Selector(selector) : null;
362     }
363 
364     /***
365      * Determines if locally produced messages are being inhibited.
366      *
367      * @param noLocal if <code>true</code>, locally published messages are
368      *                inhibited.
369      */
370     protected void setNoLocal(boolean noLocal) {
371         _noLocal = noLocal;
372     }
373 
374 }