1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: QueueDestinationCache.java,v 1.9 2006/06/09 12:58:56 tanderson Exp $
44 */
45 package org.exolab.jms.messagemgr;
46
47 import java.sql.Connection;
48 import java.util.Collections;
49 import java.util.Iterator;
50 import java.util.LinkedList;
51 import java.util.List;
52 import javax.jms.JMSException;
53
54 import org.apache.commons.logging.Log;
55 import org.apache.commons.logging.LogFactory;
56
57 import org.exolab.jms.client.JmsDestination;
58 import org.exolab.jms.client.JmsQueue;
59 import org.exolab.jms.client.JmsTemporaryDestination;
60 import org.exolab.jms.lease.LeaseManager;
61 import org.exolab.jms.message.MessageImpl;
62 import org.exolab.jms.persistence.DatabaseService;
63 import org.exolab.jms.persistence.PersistenceException;
64 import org.exolab.jms.selector.Selector;
65 import org.exolab.jms.server.ServerConnectionManager;
66
67
68 /***
69 * A {@link DestinationCache} for queues.
70 *
71 * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a>
72 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
73 * @version $Revision: 1.9 $ $Date: 2006/06/09 12:58:56 $
74 */
75 public class QueueDestinationCache extends AbstractDestinationCache {
76
77 /***
78 * Maintains a list of {@link QueueConsumerMessageHandle} instances.
79 */
80 private final MessageQueue _handles = new MessageQueue();
81
82 /***
83 * Maintains a list of queue browsers for this cache.
84 */
85 private final List _browsers
86 = Collections.synchronizedList(new LinkedList());
87
88 /***
89 * The connection manager.
90 */
91 private final ServerConnectionManager _connections;
92
93 /***
94 * Synchronization helper.
95 */
96 private final Object _lock = new Object();
97
98 /***
99 * Index of the last {@link QueueConsumerEndpoint} that received a message
100 * from this destination. If multiple consumers are attached to this queue
101 * then messages will be sent to each in a round robin fashion
102 */
103 private int _lastConsumerIndex = 0;
104
105 /***
106 * The logger.
107 */
108 private static final Log _log = LogFactory.getLog(
109 QueueDestinationCache.class);
110
111
112 /***
113 * Construct a new <code>QueueDestinationCache</code>.
114 *
115 * @param queue the queue to cache messages for
116 * @param database the database service
117 * @param leases the lease manager
118 * @param connections the connection manager
119 * @throws JMSException if the cache can't be initialised
120 */
121 public QueueDestinationCache(JmsQueue queue,
122 DatabaseService database,
123 LeaseManager leases,
124 ServerConnectionManager connections)
125 throws JMSException {
126 super(queue, database, leases);
127 if (connections == null) {
128 throw new IllegalArgumentException(
129 "Argument 'connections' is null");
130 }
131 _connections = connections;
132
133 if (queue.getPersistent()) {
134 init();
135 }
136 }
137
138 /***
139 * A Queue can also hav a queue listener, which simply gets informed of all
140 * messages that arrive at this destination.
141 *
142 * @param listener - queue listener
143 */
144 public void addQueueListener(QueueBrowserEndpoint listener) {
145
146 if (!_browsers.contains(listener)) {
147 _browsers.add(listener);
148 }
149 }
150
151 /***
152 * Remove the queue listener associated with this cache
153 *
154 * @param listener - queue listener to remove
155 */
156 public void removeQueueListener(QueueBrowserEndpoint listener) {
157
158 if (_browsers.contains(listener)) {
159 _browsers.remove(listener);
160 }
161 }
162
163 /***
164 * Invoked when the {@link MessageMgr} receives a non-persistent message.
165 *
166 * @param destination the message's destination
167 * @param message the message
168 * @throws JMSException if the listener fails to handle the message
169 */
170 public void messageAdded(JmsDestination destination, MessageImpl message)
171 throws JMSException {
172 MessageRef reference = new CachedMessageRef(message, false,
173 getMessageCache());
174 MessageHandle shared = new SharedMessageHandle(this, reference,
175 message);
176 MessageHandle handle = new QueueConsumerMessageHandle(shared);
177
178
179
180 addMessage(reference, message, handle);
181
182
183
184 ConsumerEndpoint consumer = getConsumerForMessage(message);
185 if (consumer != null) {
186 consumer.messageAdded(handle, message);
187 }
188 }
189
190 /***
191 * Invoked when the {@link MessageMgr} receives a persistent message.
192 *
193 * @param destination the message's destination
194 * @param message the message
195 * @throws JMSException if the listener fails to handle the message
196 * @throws PersistenceException if there is a persistence related problem
197 */
198 public void persistentMessageAdded(JmsDestination destination,
199 MessageImpl message)
200 throws JMSException, PersistenceException {
201 MessageRef reference = new CachedMessageRef(message, true,
202 getMessageCache());
203 MessageHandle shared = new SharedMessageHandle(this, reference,
204 message);
205 MessageHandle handle = new QueueConsumerMessageHandle(shared);
206 handle.add();
207
208 addMessage(reference, message, handle);
209
210
211
212 ConsumerEndpoint consumer = getConsumerForMessage(message);
213 if (consumer != null) {
214 consumer.persistentMessageAdded(handle, message);
215 }
216 }
217
218 /***
219 * Returns the first available message matching the supplied message
220 * selector.
221 *
222 * @param selector the message selector to use. May be <code>null</code>
223 * @param cancel
224 * @return handle to the first message, or <code>null</code> if there are no
225 * messages, or none matching <code>selector</code>
226 * @throws JMSException for any error
227 */
228 public synchronized MessageHandle getMessage(Selector selector,
229 Condition cancel)
230 throws JMSException {
231 QueueConsumerMessageHandle handle = null;
232 if (selector == null) {
233
234
235 handle = (QueueConsumerMessageHandle) _handles.removeFirst();
236 } else {
237
238 MessageHandle[] handles = _handles.toArray();
239 for (int i = 0; i < handles.length && !cancel.get(); ++i) {
240 MessageHandle hdl = handles[i];
241 MessageImpl message = hdl.getMessage();
242 if (message != null && selector.selects(message)) {
243 handle = (QueueConsumerMessageHandle) hdl;
244 _handles.remove(handle);
245 break;
246 }
247 }
248 }
249 return handle;
250 }
251
252 /***
253 * Playback all the messages in the cache to the specified {@link
254 * QueueBrowserEndpoint}.
255 *
256 * @param browser the queue browser
257 * @throws JMSException for any error
258 */
259 public void playbackMessages(QueueBrowserEndpoint browser)
260 throws JMSException {
261 MessageHandle[] handles = _handles.toArray();
262 for (int i = 0; i < handles.length; ++i) {
263 MessageHandle handle = handles[i];
264 MessageImpl message = handle.getMessage();
265 if (message != null) {
266 browser.messageAdded(handle, message);
267 }
268 }
269 }
270
271 /***
272 * Return a message handle back to the cache, to recover unsent or
273 * unacknowledged messages.
274 *
275 * @param handle the message handle to return
276 */
277 public void returnMessageHandle(MessageHandle handle) {
278
279 _handles.add(handle);
280 try {
281 MessageImpl message = handle.getMessage();
282 if (message != null) {
283
284
285 ConsumerEndpoint consumer = getConsumerForMessage(message);
286 if (consumer != null) {
287 consumer.messageAdded(handle, message);
288 }
289 }
290 } catch (JMSException exception) {
291 _log.debug(exception, exception);
292 }
293 }
294
295 /***
296 * Determines if there are any registered consumers.
297 *
298 * @return <code>true</code> if there are registered consumers
299 */
300 public boolean hasConsumers() {
301 boolean active = super.hasConsumers();
302 if (!active && !_browsers.isEmpty()) {
303 active = true;
304 }
305 if (_log.isDebugEnabled()) {
306 _log.debug("hasActiveConsumers()[queue=" + getDestination() + "]="
307 + active);
308 }
309 return active;
310 }
311
312 /***
313 * Returns the number of messages in the cache.
314 *
315 * @return the number of messages in the cache
316 */
317 public int getMessageCount() {
318 return _handles.size();
319 }
320
321 /***
322 * Determines if this cache can be destroyed. A <code>QueueDestinationCache</code>
323 * can be destroyed if there are no active consumers and: <ul> <li>the queue
324 * is persistent and there are no messages</li> <li> the queue is temporary
325 * and the corresponding connection is closed </li> </ul>
326 *
327 * @return <code>true</code> if the cache can be destroyed, otherwise
328 * <code>false</code>
329 */
330 public boolean canDestroy() {
331 boolean destroy = false;
332 if (!hasConsumers()) {
333 JmsDestination queue = getDestination();
334 if (queue.getPersistent() && getMessageCount() == 0) {
335 destroy = true;
336 } else if (queue.isTemporaryDestination()) {
337
338
339 long connectionId =
340 ((JmsTemporaryDestination) queue).getConnectionId();
341 if (_connections.getConnection(connectionId) == null) {
342 destroy = true;
343 }
344 }
345 }
346 return destroy;
347 }
348
349 /***
350 * Destroy this object.
351 */
352 public void destroy() {
353 super.destroy();
354 _browsers.clear();
355 }
356
357 /***
358 * Initialise the cache. This removes all the expired messages, and then
359 * retrieves all unacked messages from the database and stores them
360 * locally.
361 *
362 * @throws JMSException if the cache can't be initialised
363 */
364 protected void init() throws JMSException {
365 JmsDestination queue = getDestination();
366
367 List handles;
368 DatabaseService service = null;
369 try {
370 service = DatabaseService.getInstance();
371 Connection connection = service.getConnection();
372 service.getAdapter().removeExpiredMessageHandles(connection,
373 queue.getName());
374 handles = service.getAdapter().getMessageHandles(connection, queue,
375 queue.getName());
376 } catch (PersistenceException exception) {
377 _log.error(exception, exception);
378 try {
379 if (service != null) {
380 service.rollback();
381 }
382 } catch (PersistenceException error) {
383 _log.error(error, error);
384 }
385 throw new JMSException(exception.getMessage());
386 }
387
388 Iterator iterator = handles.iterator();
389 DefaultMessageCache cache = getMessageCache();
390 while (iterator.hasNext()) {
391 PersistentMessageHandle handle = (PersistentMessageHandle) iterator.next();
392 String messageId = handle.getMessageId();
393 MessageRef reference = cache.getMessageRef(messageId);
394 if (reference == null) {
395 reference = new CachedMessageRef(messageId, true, cache);
396 }
397 cache.addMessageRef(reference);
398 handle.reference(reference);
399 handle.setDestinationCache(this);
400 _handles.add(new QueueConsumerMessageHandle(handle));
401
402 checkMessageExpiry(reference, handle.getExpiryTime());
403 }
404 }
405
406 /***
407 * Add a message, and notify any listeners.
408 *
409 * @param reference a reference to the message
410 * @param message the message
411 * @param handle the handle to add
412 * @throws JMSException for any error
413 */
414 protected void addMessage(MessageRef reference, MessageImpl message,
415 MessageHandle handle) throws JMSException {
416 addMessage(reference, message);
417 _handles.add(handle);
418
419
420 notifyQueueListeners(handle, message);
421
422
423 checkMessageExpiry(reference, message);
424 }
425
426
427 /***
428 * Notify queue browsers that a message has arrived.
429 *
430 * @param handle a handle to the message
431 * @param message the message
432 * @throws JMSException if a browser fails to handle the message
433 */
434 protected void notifyQueueListeners(MessageHandle handle,
435 MessageImpl message)
436 throws JMSException {
437 QueueBrowserEndpoint[] browsers =
438 (QueueBrowserEndpoint[]) _browsers.toArray(
439 new QueueBrowserEndpoint[0]);
440
441 for (int index = 0; index < browsers.length; ++index) {
442 QueueBrowserEndpoint browser = browsers[index];
443 browser.messageAdded(handle, message);
444 }
445 }
446
447 /***
448 * Remove an expired non-peristent message, and notify any listeners.
449 *
450 * @param reference the reference to the expired message
451 * @throws JMSException for any error
452 */
453 protected void messageExpired(MessageRef reference) throws JMSException {
454 _handles.remove(reference.getMessageId());
455
456 super.messageExpired(reference);
457 }
458
459 /***
460 * Remove an expired persistent message, and notify any listeners.
461 *
462 * @param reference the reference to the expired message
463 * @throws JMSException if a listener fails to handle the
464 * expiration
465 * @throws PersistenceException if there is a persistence related problem
466 */
467 protected void persistentMessageExpired(MessageRef reference)
468 throws JMSException, PersistenceException {
469 _handles.remove(reference.getMessageId());
470
471 super.messageExpired(reference);
472 }
473
474 /***
475 * Return the next QueueConsumerEndpoint that can consume the specified
476 * message or null if there is none.
477 *
478 * @param message - the message to consume
479 * @return the consumer who should receive this message, or null
480 */
481 private ConsumerEndpoint getConsumerForMessage(MessageImpl message) {
482 ConsumerEndpoint result = null;
483
484 ConsumerEndpoint[] consumers = getConsumerArray();
485 final int size = consumers.length;
486 if (size > 0) {
487 synchronized (_lock) {
488
489
490 if ((_lastConsumerIndex + 1) > size) {
491 _lastConsumerIndex = 0;
492 }
493
494
495
496 int index = _lastConsumerIndex;
497 do {
498 ConsumerEndpoint consumer = consumers[index];
499
500
501
502
503 if ((consumer.isAsynchronous()
504 || consumer.isWaitingForMessage())
505 && consumer.selects(message)) {
506 _lastConsumerIndex = ++index;
507 result = consumer;
508 break;
509 }
510
511
512 if (++index >= size) {
513 index = 0;
514 }
515 } while (index != _lastConsumerIndex);
516 }
517 }
518
519 return result;
520 }
521
522 }