1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: AbstractTopicConsumerEndpoint.java,v 1.3 2007/01/24 12:00:28 tanderson Exp $
44 */
45
46 package org.exolab.jms.messagemgr;
47
48 import java.util.Collections;
49 import java.util.HashMap;
50 import java.util.Map;
51 import javax.jms.InvalidSelectorException;
52 import javax.jms.JMSException;
53
54 import org.apache.commons.logging.Log;
55 import org.apache.commons.logging.LogFactory;
56
57 import org.exolab.jms.client.JmsDestination;
58 import org.exolab.jms.client.JmsTopic;
59 import org.exolab.jms.message.MessageImpl;
60 import org.exolab.jms.persistence.PersistenceException;
61 import org.exolab.jms.server.ServerConnection;
62
63
64 /***
65 * A {@link ConsumerEndpoint} for topics.
66 *
67 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
68 * @version $Revision: 1.3 $ $Date: 2007/01/24 12:00:28 $
69 */
70 abstract class AbstractTopicConsumerEndpoint extends AbstractConsumerEndpoint
71 implements DestinationEventListener {
72
73 /***
74 * The identity of the connection that owns this consumer, or
75 * <code>-1</code> if this consumer isn't currently associated with a
76 * connection.
77 */
78 private long _connectionId;
79
80 /***
81 * The destination manager.
82 */
83 private final DestinationManager _destinations;
84
85 /***
86 * Cache of all handles for this consumer.
87 */
88 private MessageQueue _handles = new MessageQueue();
89
90 /***
91 * Maintains a map of TopicDestinationCache that this endpoint subscribes
92 * to, keyed on JmsTopic. A wildcard subscription may point to more than
93 * one.
94 */
95 protected Map _caches = Collections.synchronizedMap(new HashMap());
96
97 /***
98 * The logger.
99 */
100 private static final Log _log =
101 LogFactory.getLog(AbstractTopicConsumerEndpoint.class);
102
103
104 /***
105 * Construct a new <code>TopicConsumerEndpoint</code>.
106 * <p/>
107 * The destination and selector determine where it will be sourcing its
108 * messages from, and scheduler is used to asynchronously deliver messages
109 * to the consumer.
110 *
111 * @param consumerId the identity of this consumer
112 * @param connectionId the identity of the connection that owns this
113 * consumer
114 * @param topic the topic(s) to access. May be a wildcarded topic.
115 * @param selector the message selector. May be <code>null</code>
116 * @param noLocal if true, inhibits the delivery of messages published
117 * by its own connection.
118 * @param destinations the destination manager
119 * @throws InvalidSelectorException if the selector is invalid
120 * @throws JMSException if the destination caches can't be
121 * constructed
122 */
123 public AbstractTopicConsumerEndpoint(long consumerId, long connectionId,
124 JmsTopic topic,
125 String selector, boolean noLocal,
126 DestinationManager destinations)
127 throws JMSException {
128 super(consumerId, topic, selector, noLocal);
129 _connectionId = connectionId;
130 _destinations = destinations;
131 }
132
133 /***
134 * Returns the identity of the connection that owns this consumer.
135 *
136 * @return the identity of the connection, or <code>-1</code> if this is not
137 * currently associated with a connection.
138 * @see ServerConnection#getConnectionId
139 */
140 public long getConnectionId() {
141 return _connectionId;
142 }
143
144 /***
145 * Determines if this consumer can consume messages from the specified
146 * destination.
147 *
148 * @param destination the destination
149 * @return <code>true</code> if the consumer can consume messages from
150 * <code>destination</code>; otherwise <code>false</code>
151 */
152 public boolean canConsume(JmsDestination destination) {
153 boolean result = false;
154 if (destination instanceof JmsTopic) {
155 JmsTopic topic = (JmsTopic) getDestination();
156 if (!topic.isWildCard()) {
157 result = super.canConsume(destination);
158 } else {
159 result = topic.match((JmsTopic) destination);
160 }
161 }
162 return result;
163 }
164
165 /***
166 * Return a delivered, but unacknowledged message to the cache.
167 *
168 * @param handle the handle of the message to return
169 */
170 public void returnMessage(MessageHandle handle) {
171 addMessage(handle);
172 }
173
174 /***
175 * Return the number of unsent messages in the cache for this consumer.
176 *
177 * @return the number of unsent messages
178 */
179 public int getMessageCount() {
180 return _handles.size();
181 }
182
183 /***
184 * This event is called when a non-persistent message is added to the
185 * <code>DestinationCache</code>.
186 *
187 * @param handle a handle to the message
188 * @param message the added message
189 * @return <code>true</code> if the listener accepted the message; otherwise
190 * <code>false</ode>
191 * @throws JMSException if the listener fails to handle the message
192 */
193 public boolean messageAdded(MessageHandle handle, MessageImpl message)
194 throws JMSException {
195 boolean accepted = true;
196
197
198
199 if (getNoLocal() && message.getConnectionId() == getConnectionId()) {
200 accepted = false;
201 } else {
202
203 handle = new TopicConsumerMessageHandle(handle, this);
204
205 if (!_handles.contains(handle)) {
206
207 addMessage(handle);
208 } else {
209 accepted = false;
210 _log.warn("Endpoint=" + this + " already has message cached: " +
211 handle);
212 }
213 }
214 return accepted;
215 }
216
217 /***
218 * This event is called when a message is removed from the
219 * <code>DestinationCache</code>.
220 *
221 * @param messageId the identifier of the removed message
222 * @throws JMSException if the listener fails to handle the message
223 */
224 public void messageRemoved(String messageId) throws JMSException {
225 MessageHandle handle = _handles.remove(messageId);
226 if (handle != null) {
227 handle.destroy();
228 }
229 }
230
231 /***
232 * This event is called when a persistent message is added to the
233 * <code>DestinationCache</code>.
234 *
235 * @param handle a handle to the added message
236 * @param message the added message
237 * @return <code>true</code> if the listener accepted the message;
238 * @throws JMSException if the listener fails to handle the message
239 * @throws PersistenceException if there is a persistence related problem
240 */
241 public boolean persistentMessageAdded(MessageHandle handle,
242 MessageImpl message)
243 throws JMSException, PersistenceException {
244 boolean accepted = true;
245
246
247
248 if (getNoLocal() && message.getConnectionId() == getConnectionId()) {
249 accepted = false;
250 } else {
251
252 handle = new TopicConsumerMessageHandle(handle, this);
253 if (isPersistent()) {
254
255 handle.add();
256 }
257
258 if (!_handles.contains(handle)) {
259
260 addMessage(handle);
261 } else {
262 accepted = false;
263 _log.warn("Endpoint=" + this + " already has message cached: " +
264 handle);
265 }
266 }
267 return accepted;
268 }
269
270 /***
271 * This event is called when a message is removed from the
272 * <code>DestinationCache</code>.
273 *
274 * @param messageId the identifier of the removed message
275 * @throws JMSException if the listener fails to handle the message
276 * @throws PersistenceException if there is a persistence related problem
277 */
278 public void persistentMessageRemoved(String messageId)
279 throws JMSException, PersistenceException {
280 MessageHandle handle = _handles.remove(messageId);
281 if (handle != null) {
282 handle.destroy();
283 }
284 }
285
286
287 /***
288 * Invoked when a destination is created.
289 *
290 * @param destination the destination that was added
291 */
292 public void destinationAdded(JmsDestination destination) {
293
294 }
295
296 /***
297 * Invoked when a destination is removed.
298 *
299 * @param destination the destination that was removed
300 */
301 public void destinationRemoved(JmsDestination destination) {
302
303 }
304
305 /***
306 * Invoked when a message cache is created.
307 *
308 * @param destination the destination that messages are being cached for
309 * @param cache the corresponding cache
310 */
311 public void cacheAdded(JmsDestination destination,
312 DestinationCache cache) {
313 if (destination instanceof JmsTopic) {
314 JmsTopic myTopic = (JmsTopic) getDestination();
315 JmsTopic topic = (JmsTopic) destination;
316 if (myTopic.match(topic) && !_caches.containsKey(topic)) {
317 _caches.put(topic, cache);
318 cache.addConsumer(this);
319 }
320 }
321 }
322
323 /***
324 * Invoked when a message cache is removed.
325 *
326 * @param destination the destination that messages are no longer being
327 * cached for
328 * @param cache the corresponding cache
329 */
330 public void cacheRemoved(JmsDestination destination,
331 DestinationCache cache) {
332 if (destination instanceof JmsTopic) {
333 _caches.remove(destination);
334 }
335 }
336
337 /***
338 * Registers this with the associated {@link DestinationCache}s. The
339 * consumer may receive messages immediately.
340 *
341 * @throws JMSException for any JMS error
342 */
343 protected void init() throws JMSException {
344 JmsTopic topic = (JmsTopic) getDestination();
345
346
347 if (topic.isWildCard()) {
348
349
350 _caches = _destinations.getTopicDestinationCaches(topic);
351
352
353
354 _destinations.addDestinationEventListener(this);
355 DestinationCache[] caches = getDestinationCaches();
356 for (int i = 0; i < caches.length; ++i) {
357 caches[i].addConsumer(this);
358 }
359 } else {
360
361
362
363 DestinationCache cache = _destinations.getDestinationCache(topic);
364 _caches.put(topic, cache);
365 cache.addConsumer(this);
366 }
367 }
368
369 /***
370 * Set the connection identifier.
371 *
372 * @param connectionId the identity of the connection that owns this
373 * consumer
374 * @see #getConnectionId
375 */
376 protected void setConnectionId(long connectionId) {
377 _connectionId = connectionId;
378 }
379
380 /***
381 * Add the handle to the cache.
382 *
383 * @param handle the message handle to add
384 */
385 protected void addMessage(MessageHandle handle) {
386 _handles.add(handle);
387 notifyMessageAvailable();
388 }
389
390 /***
391 * Return the next available message to the client.
392 *
393 * @return the next message, or <code>null</code> if none is available
394 * @throws JMSException for any error
395 * @param cancel
396 */
397 protected MessageHandle doReceive(Condition cancel) throws JMSException {
398 MessageHandle result = null;
399 MessageHandle handle;
400 while (!cancel.get() && (handle = _handles.removeFirst()) != null) {
401 if (_log.isDebugEnabled()) {
402 _log.debug("doReceive() - next available=" + handle.getMessageId());
403 }
404
405 MessageImpl message = handle.getMessage();
406 if (message != null) {
407 if (selects(message)) {
408
409 result = handle;
410 break;
411 } else {
412
413 handle.destroy();
414 }
415 }
416 }
417 if (_log.isDebugEnabled()) {
418 _log.debug("doReceive() - result=" + (result != null ? result.getMessageId() : null));
419 }
420 return result;
421 }
422
423 /***
424 * Closes this endpoint.
425 */
426 protected void doClose() {
427
428 _destinations.removeDestinationEventListener(this);
429
430
431 DestinationCache[] caches = getDestinationCaches();
432 for (int i = 0; i < caches.length; ++i) {
433 caches[i].removeConsumer(this);
434 }
435 _caches.clear();
436
437 if (!isPersistent()) {
438
439
440 MessageHandle[] handles = _handles.toArray();
441 for (int i = 0; i < handles.length; ++i) {
442 MessageHandle handle = handles[i];
443 try {
444 handle.destroy();
445 } catch (JMSException exception) {
446 _log.error(exception, exception);
447 }
448 }
449 }
450 }
451
452 /***
453 * Returns the destination manager.
454 *
455 * @return the destination manager
456 */
457 protected DestinationManager getDestinationManager() {
458 return _destinations;
459 }
460
461 /***
462 * Returns the destination caches.
463 *
464 * @return the destination caches
465 */
466 protected DestinationCache[] getDestinationCaches() {
467 return (DestinationCache[]) _caches.values().toArray(
468 new DestinationCache[0]);
469 }
470
471 }