1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 */
43 package org.exolab.jms.messagemgr;
44
45 import javax.jms.InvalidSelectorException;
46 import javax.jms.JMSException;
47
48 import org.exolab.jms.client.JmsDestination;
49 import org.exolab.jms.message.MessageImpl;
50 import org.exolab.jms.selector.Selector;
51
52
53 /***
54 * Abstract implementation of the {@link ConsumerEndpoint} interface.
55 *
56 * @author <a href="mailto:jima@exoffice.com">Jim Alateras</a>
57 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
58 * @version $Revision: 1.3 $ $Date: 2005/08/30 06:25:47 $
59 */
60 public abstract class AbstractConsumerEndpoint implements ConsumerEndpoint {
61
62 /***
63 * The identity of this consumer.
64 */
65 private final long _id;
66
67 /***
68 * The destination the consumer is acceesing.
69 */
70 private final JmsDestination _destination;
71
72 /***
73 * The message selector associated with this consumer. May be
74 * <code>null</code>.
75 */
76 private Selector _selector;
77
78 /***
79 * If true, and the destination is a topic, inhibits the delivery of
80 * messages published by its own connection.
81 */
82 private boolean _noLocal;
83
84 /***
85 * Determines if this consumer is asynchronous.
86 */
87 private boolean _asynchronous = false;
88
89 /***
90 * The receive timeout, if the client is performing a blocking receive. A
91 * value of <code>0</code> indicates the client is blocking indefinitely.
92 */
93 private Condition _waitingForMessage;
94
95 /***
96 * The listener to notify when a message is available.
97 */
98 private ConsumerEndpointListener _listener = null;
99
100 /***
101 * Determines if this is (or is in the process of being) closed.
102 */
103 private final Flag _closed = new Flag(false);
104
105
106 /***
107 * Construct a new <code>ConsumerEndpoint</code>.
108 * <p/>
109 * The destination and selector determine where it will be sourcing its
110 * messages from, and scheduler is used to asynchronously deliver messages
111 * to the consumer.
112 *
113 * @param consumerId the identity of this consumer
114 * @param destination the destination to access
115 * @param selector the message selector. May be <code>null</code>
116 * @param noLocal if true, and the destination is a topic, inhibits the
117 * delivery of messages published by its own connection.
118 * @throws InvalidSelectorException if the selector is not well formed
119 */
120 public AbstractConsumerEndpoint(long consumerId, JmsDestination destination,
121 String selector, boolean noLocal)
122 throws InvalidSelectorException {
123 if (destination == null) {
124 throw new IllegalArgumentException(
125 "Argument 'destination' is null");
126 }
127 _id = consumerId;
128 _destination = destination;
129 setSelector(selector);
130 _noLocal = noLocal;
131 }
132
133 /***
134 * Returns the identity of this consumer.
135 *
136 * @return the identity of this consumer
137 */
138 public long getId() {
139 return _id;
140 }
141
142 /***
143 * Determines if this is a persistent or non-persistent consumer.
144 * <p/>
145 * If persistent, then the consumer is persistent accross subscriptions and
146 * server restarts, and {@link #getPersistentId} returns a non-null value.
147 *
148 * @return <code>false</code>
149 */
150 public boolean isPersistent() {
151 return false;
152 }
153
154 /***
155 * Returns the persistent identifier for this consumer. This is the identity
156 * of the consumer which is persistent across subscriptions and server
157 * restarts.
158 *
159 * @return <code>null</code>
160 */
161 public String getPersistentId() {
162 return null;
163 }
164
165 /***
166 * Return the destination that this consumer is accessing.
167 *
168 * @return the destination that this consumer is accessing
169 */
170 public JmsDestination getDestination() {
171 return _destination;
172 }
173
174 /***
175 * Determines if this consumer can consume messages from the specified
176 * destination.
177 *
178 * @param destination the destination
179 * @return <code>true</code> if the consumer can consume messages from
180 * <code>destination</code>; otherwise <code>false</code>
181 */
182 public boolean canConsume(JmsDestination destination) {
183 return _destination.equals(destination);
184 }
185
186 /***
187 * Returns the message selector.
188 *
189 * @return the message selector, or <code>null</code> if none was specified
190 * by the client
191 */
192 public Selector getSelector() {
193 return _selector;
194 }
195
196 /***
197 * Determines if a message is selected by the consumer.
198 *
199 * @param message the message to check
200 * @return <code>true</code> if the message is selected; otherwise
201 * <code>false</code>
202 */
203 public boolean selects(MessageImpl message) {
204 return (_selector == null || _selector.selects(message));
205 }
206
207 /***
208 * Returns if locally produced messages are being inhibited.
209 *
210 * @return <code>true</code> if locally published messages are being
211 * inhibited.
212 */
213 public boolean getNoLocal() {
214 return _noLocal;
215 }
216
217 /***
218 * Return the next available message to the client.
219 *
220 * @param cancel
221 * @return the next message, or <code>null</code> if none is available
222 * @throws JMSException for any error
223 */
224 public final synchronized MessageHandle receive(final Condition cancel)
225 throws JMSException {
226 MessageHandle result = null;
227 if (!_closed.get()) {
228 Condition condition = new Condition() {
229 public boolean get() {
230 return _closed.get() || cancel.get();
231 }
232 };
233 result = doReceive(condition);
234 }
235 return result;
236 }
237
238 /***
239 * Indicates if this is an asynchronous consumer.
240 * <p/>
241 * An asynchronous consumer has a client <code>MessageConsumer</code> with
242 * an associated <code>MessageListener</code>.
243 *
244 * @param asynchronous if <code>true</code> marks this as an asynchronous
245 * consumer
246 */
247 public synchronized void setAsynchronous(boolean asynchronous) {
248 _asynchronous = asynchronous;
249 }
250
251 /***
252 * Determines if this is an asynchronous consumer.
253 *
254 * @return <code>true</code> if this is an asynchronous consumer; otherwise
255 * <code>false</code>
256 */
257 public synchronized boolean isAsynchronous() {
258 return _asynchronous;
259 }
260
261 /***
262 * Indicates that the client is currently waiting for a message.
263 *
264 * @param condition the condition to evaluate to determine if the client is
265 * waiting for message. May be <code>null</code>.
266 */
267 public synchronized void setWaitingForMessage(Condition condition) {
268 _waitingForMessage = condition;
269 }
270
271 /***
272 * Determines if the client is currently waiting for a message.
273 *
274 * @return <code>true</code> if the client is waiting for messages;
275 * otherwise <code>false</code>
276 */
277 public synchronized boolean isWaitingForMessage() {
278 return _waitingForMessage != null && _waitingForMessage.get();
279 }
280
281 /***
282 * Set the listener for this consumer. If a listener is set, it is notified
283 * when messages become available.
284 *
285 * @param listener the listener to add, or <code>null</code> to remove an
286 * existing listener
287 */
288 public synchronized void setListener(ConsumerEndpointListener listener) {
289 _listener = listener;
290 }
291
292 /***
293 * Determines if this consumer is closed, or in the process of being
294 * closed.
295 *
296 * @return <code>true</code> if this consumer is closed; otherwise
297 * <code>false</code>
298 */
299 public final boolean isClosed() {
300 return _closed.get();
301 }
302
303 /***
304 * Close this endpoint.
305 */
306 public final void close() {
307 _closed.set(true);
308 synchronized (this) {
309 _listener = null;
310 doClose();
311 }
312 }
313
314 /***
315 * Returns a stringified version of the consumer.
316 *
317 * @return a stringified version of the consumer
318 */
319 public String toString() {
320 return _id + ":" + getDestination();
321 }
322
323 /***
324 * Return the next available message to the client.
325 * <p/>
326 * This method will not be invoked if the consumer is being closed, however
327 * it is possible for {@link #close()} to be invoked while this method is in
328 * progress. Implementations should therefore invoke isClosed() to determine
329 * if the consumer is in the process of being closed, and if so, return
330 * <code>null</code>.
331 *
332 * @param cancel
333 * @return the next message, or <code>null</code> if none is available
334 * @throws JMSException for any error
335 */
336 protected abstract MessageHandle doReceive(Condition cancel)
337 throws JMSException;
338
339 /***
340 * Closes the endpoint.
341 */
342 protected abstract void doClose();
343
344 /***
345 * Notify the listener that a message is available for this consumer.
346 */
347 protected synchronized void notifyMessageAvailable() {
348 if (_listener != null && !_closed.get()) {
349 _listener.messageAvailable(this);
350 }
351 }
352
353 /***
354 * Sets the message selector.
355 *
356 * @param selector the message selector. May be <code>null</code>
357 * @throws InvalidSelectorException if the selector is not well formed
358 */
359 protected void setSelector(String selector)
360 throws InvalidSelectorException {
361 _selector = (selector != null) ? new Selector(selector) : null;
362 }
363
364 /***
365 * Determines if locally produced messages are being inhibited.
366 *
367 * @param noLocal if <code>true</code>, locally published messages are
368 * inhibited.
369 */
370 protected void setNoLocal(boolean noLocal) {
371 _noLocal = noLocal;
372 }
373
374 }