1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2001-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: QueueDestinationCache.java,v 1.34.2.1 2004/05/01 12:05:26 tanderson Exp $
44 *
45 * Date Author Changes
46 * 3/1/2001 jima Created
47 */
48 package org.exolab.jms.messagemgr;
49
50 import java.sql.Connection;
51 import java.util.Collections;
52 import java.util.Enumeration;
53 import java.util.Iterator;
54 import java.util.LinkedList;
55 import java.util.List;
56 import java.util.Vector;
57
58 import javax.jms.JMSException;
59 import javax.transaction.TransactionManager;
60
61 import org.apache.commons.logging.Log;
62 import org.apache.commons.logging.LogFactory;
63
64 import org.exolab.jms.client.JmsDestination;
65 import org.exolab.jms.client.JmsQueue;
66 import org.exolab.jms.client.JmsTemporaryDestination;
67 import org.exolab.jms.message.MessageHandle;
68 import org.exolab.jms.message.MessageImpl;
69 import org.exolab.jms.persistence.DatabaseService;
70 import org.exolab.jms.persistence.PersistenceException;
71 import org.exolab.jms.selector.Selector;
72 import org.exolab.jms.server.JmsServerConnectionManager;
73
74
75 /***
76 * A DestinationCache for Queues
77 *
78 * @version $Revision: 1.34.2.1 $ $Date: 2004/05/01 12:05:26 $
79 * @author <a href="mailto:jima@exoffice.com">Jim Alateras</a>
80 */
81 public class QueueDestinationCache
82 extends DestinationCache {
83
84 /***
85 * Maintains a list of queue listeners for this cache
86 */
87 protected List _queueListeners =
88 Collections.synchronizedList(new LinkedList());
89
90 /***
91 * Underlying destination
92 */
93 private JmsQueue _queue = null;
94
95 /***
96 * Index of the last listener that received a message from this
97 * destination. If multiple listeners are attached to this queue then
98 * messages will be sent to each in a round robin fashion
99 */
100 private int _lastConsumerIndex = 0;
101
102 /***
103 * Tracks the number of messages added to the destination cache
104 */
105 private long _publishCount;
106
107 /***
108 * Tracks the number of messages consumed from the destination cache
109 */
110 private long _consumeCount;
111
112 /***
113 * The logger
114 */
115 private static final Log _log = LogFactory.getLog(
116 QueueDestinationCache.class);
117
118
119 /***
120 * Construct a message cache for a queue destination. This cache will
121 * receive all messages published under the specified destination.
122 * <p>
123 * the constructor will also attempt to load any persistent messages
124 * from the database.
125 * <p>
126 *
127 * @param destination the queue
128 * @throws FailedToInitializeException
129 */
130 QueueDestinationCache(JmsQueue destination)
131 throws FailedToInitializeException {
132
133 _queue = destination;
134
135 init();
136 if (DestinationManager.instance().isAdministeredDestination(destination.getName())) {
137
138
139
140 Connection connection = null;
141 TransactionManager tm = null;
142 try {
143 connection = DatabaseService.getConnection();
144
145
146 init(destination, connection);
147
148
149 connection.commit();
150 } catch (PersistenceException exception) {
151 if (connection != null) {
152 try {
153 connection.rollback();
154 } catch (Exception nested) {
155
156 }
157 }
158 throw new FailedToInitializeException(
159 "QueueDestinationCache init failed " + exception);
160 } catch (Exception exception) {
161
162 throw new FailedToInitializeException(
163 "QueueDestinationCache init failed " + exception);
164 } finally {
165 if (connection != null) {
166 try {
167 connection.close();
168 } catch (Exception nested) {
169
170 }
171 }
172 }
173 }
174 }
175
176
177 /***
178 * Construct a message cache for a queue destination using the specified
179 * database connection. This cache will receive all messages published
180 * under the destination.
181 * <p>
182 * the constructor will also attempt to load any persistent messages
183 * from the database using the specified connection
184 * <p>
185 * If there is any problem during construction a FailedToInitializeException
186 * will be raised.
187 *
188 * @param destination - the queue
189 * @throws FialedToInitializeException
190 */
191 QueueDestinationCache(Connection connection, JmsQueue destination)
192 throws FailedToInitializeException {
193 super();
194 _queue = destination;
195
196
197 init(connection);
198 if (DestinationManager.instance().isAdministeredDestination(destination.getName())) {
199
200
201
202 try {
203
204 init(destination, connection);
205 } catch (Exception exception) {
206
207 throw new FailedToInitializeException(
208 "QueueDestinationCache init failed " + exception);
209 }
210 }
211 }
212
213
214 /***
215 * This common method is used to help initialise the cache. It basically
216 * removes all the expired messages and then retrieves all unacked messages
217 * from the database and stores them locally.
218 * <p>
219 * It will throw a PersistenceException if there is a database related
220 * problem.
221 *
222 * @param destination - the queue
223 * @param connection - the database connection to use
224 * @throws PersistenceException
225 */
226 void init(JmsQueue destination, Connection connection)
227 throws PersistenceException {
228
229 DatabaseService.getAdapter().removeExpiredMessageHandles(
230 connection, destination.getName());
231 Vector handles = DatabaseService.getAdapter().getMessageHandles(
232 connection, destination, destination.getName());
233 if (handles != null) {
234 Enumeration iter = handles.elements();
235 while (iter.hasMoreElements()) {
236 addMessage((MessageHandle) iter.nextElement());
237 }
238 }
239 }
240
241
242 public JmsDestination getDestination() {
243 return _queue;
244 }
245
246 /***
247 * A Queue can also hav a queue listener, which simply gets informed of all
248 * messages that arrive at this destination
249 *
250 * @param listener - queue listener
251 */
252 public void addQueueListener(QueueListener listener) {
253
254 if (!_queueListeners.contains(listener)) {
255 _queueListeners.add(listener);
256 }
257 }
258
259 /***
260 * Remove the queue listener associated with this cache
261 *
262 * @param listener - queue listener to remove
263 */
264 public void removeQueueListener(QueueListener listener) {
265
266 if (_queueListeners.contains(listener)) {
267 _queueListeners.remove(listener);
268 }
269 }
270
271
272 public boolean messageAdded(JmsDestination destination,
273 MessageImpl message) {
274 boolean processed = false;
275
276 if ((destination != null) &&
277 (message != null)) {
278
279
280 if (destination.equals(_queue)) {
281
282
283 try {
284 MessageHandle handle =
285 MessageHandleFactory.createHandle(this, message);
286
287
288
289
290 addMessage(handle, message);
291
292
293 _publishCount++;
294
295
296
297
298 QueueConsumerEndpoint endpoint =
299 getEndpointForMessage(message);
300 if (endpoint != null) {
301 endpoint.messageAdded(message);
302 }
303
304
305 notifyQueueListeners(message);
306
307
308 checkMessageExpiry(message);
309
310
311 processed = true;
312 } catch (JMSException exception) {
313 _log.error("Failed to add message", exception);
314 }
315 } else {
316
317
318 _log.error("Dropping message " + message.getMessageId()
319 + " for destination " + destination.getName());
320 }
321 }
322
323 return processed;
324 }
325
326 /***
327 * This method is called when the {@link MessageMgr} removes a message
328 * from the cache.
329 *
330 * @param destination the message destination
331 * @param message the message removed from cache
332 */
333 public void messageRemoved(JmsDestination destination,
334 MessageImpl message) {
335
336 if ((destination != null) &&
337 (message != null)) {
338
339 try {
340 MessageHandle handle =
341 MessageHandleFactory.getHandle(this, message);
342
343
344 if (destination.equals(_queue)) {
345 removeMessage(handle);
346 notifyOnRemoveMessage(message);
347 handle.destroy();
348 }
349 } catch (JMSException exception) {
350 _log.error("Failed to remove message", exception);
351 }
352 }
353 }
354
355
356 public boolean persistentMessageAdded(Connection connection,
357 JmsDestination destination,
358 MessageImpl message)
359 throws PersistenceException {
360
361 boolean processed = false;
362
363 if ((destination != null) &&
364 (message != null)) {
365
366
367 if (destination.equals(_queue)) {
368
369
370 try {
371
372
373
374 MessageHandle handle =
375 MessageHandleFactory.getHandle(this, message);
376 addMessage(handle, message);
377
378
379 _publishCount++;
380
381
382
383
384 QueueConsumerEndpoint endpoint =
385 getEndpointForMessage(message);
386 if (endpoint != null) {
387 endpoint.persistentMessageAdded(connection, message);
388 }
389
390
391 notifyQueueListeners(message);
392
393
394 checkMessageExpiry(message);
395
396
397 processed = true;
398 } catch (JMSException exception) {
399 _log.error("Failed to add persistent message",
400 exception);
401 }
402 } else {
403
404
405 }
406 }
407
408 return processed;
409 }
410
411
412 public synchronized void persistentMessageRemoved(
413 Connection connection, JmsDestination destination,
414 MessageImpl message)
415 throws PersistenceException {
416
417 if ((destination != null) &&
418 (message != null)) {
419
420 try {
421 PersistentMessageHandle handle = (PersistentMessageHandle)
422 MessageHandleFactory.getHandle(this, message);
423
424
425 if (destination.equals(_queue)) {
426 removeMessage(handle);
427 notifyOnRemovePersistentMessage(connection, message);
428 MessageHandleFactory.destroyPersistentHandle(connection,
429 handle);
430 }
431 } catch (JMSException exception) {
432 _log.error("Failed to remove persistent message", exception);
433 }
434 }
435 }
436
437 /***
438 * Return the next {@link ConsumerEndpoint} that can consume the specified
439 * message or null if there is none.
440 *
441 * @param message - the message to consume
442 * @return the consumer who should receive this message or null
443 */
444 private synchronized QueueConsumerEndpoint getEndpointForMessage(
445 MessageImpl message) {
446 QueueConsumerEndpoint selectedEndpoint = null;
447
448 if (_consumers.size() > 0) {
449
450
451 if ((_lastConsumerIndex + 1) > _consumers.size()) {
452 _lastConsumerIndex = 0;
453 }
454
455
456
457 int index = _lastConsumerIndex;
458 do {
459 QueueConsumerEndpoint endpoint =
460 (QueueConsumerEndpoint) _consumers.get(index);
461 Selector selector = endpoint.getSelector();
462
463
464
465
466
467 if (((endpoint.hasMessageListener()) ||
468 (endpoint.isWaitingForMessage())) &&
469 ((selector == null) ||
470 (selector.selects(message)))) {
471 _lastConsumerIndex = ++index;
472 selectedEndpoint = endpoint;
473 break;
474 }
475
476
477 if (++index >= _consumers.size()) {
478 index = 0;
479 }
480 } while (index != _lastConsumerIndex);
481 }
482
483 return selectedEndpoint;
484 }
485
486 /***
487 * Return the first message of the queue or null if there are no messages
488 * in the cache
489 *
490 * @param QueueConsumerEndpoint - the consumer who will receive the message
491 * @return MessageHandle - handle to the first message
492 */
493 public synchronized MessageHandle getMessage(
494 QueueConsumerEndpoint endpoint) {
495 MessageHandle handle = null;
496
497 if ((endpoint != null) &&
498 (getMessageCount() > 0)) {
499 Selector selector = endpoint.getSelector();
500 if (selector == null) {
501
502
503 handle = removeFirstMessage();
504 _consumeCount++;
505 } else {
506
507 Object[] handles = toMessageArray();
508 for (int i = 0; i < handles.length; ++i) {
509 MessageHandle hdl = (MessageHandle) handles[i];
510 MessageImpl message = hdl.getMessage();
511 if (message != null && selector.selects(message)) {
512 handle = hdl;
513 removeMessage(hdl);
514 _consumeCount++;
515 break;
516 }
517 }
518 }
519 }
520
521 return handle;
522 }
523
524 /***
525 * Playback all the messages in the cache to the specified
526 * {@link QueueListener}
527 *
528 * @param listener - the queue listener
529 */
530 public void playbackMessages(QueueListener listener) {
531
532 Object[] messages = toMessageArray();
533 if ((listener != null) &&
534 (messages.length > 0)) {
535 try {
536 for (int index = 0; index < messages.length; index++) {
537 listener.onMessage(((MessageHandle) messages[index]).getMessage());
538 }
539 } catch (IndexOutOfBoundsException exception) {
540
541
542 }
543 }
544 }
545
546 /***
547 * Return the specified message to top of the queue. This is called to
548 * recover unsent or unacked messages
549 *
550 * @param message - message to return
551 */
552 public synchronized void returnMessage(MessageHandle handle) {
553
554
555 addMessage(handle);
556
557
558
559 if (_consumers.size() > 0) {
560
561
562 if ((_lastConsumerIndex + 1) > _consumers.size()) {
563 _lastConsumerIndex = 0;
564 }
565
566 int index =
567 (_lastConsumerIndex >= _consumers.size()) ?
568 0 : _lastConsumerIndex;
569
570 do {
571
572 QueueConsumerEndpoint endpoint =
573 (QueueConsumerEndpoint) _consumers.get(index);
574
575
576
577 if (endpoint.hasMessageListener()) {
578 endpoint.schedule();
579 _lastConsumerIndex = ++index;
580 break;
581 }
582
583
584 if (++index >= _consumers.size()) {
585 index = 0;
586 }
587 } while (index != _lastConsumerIndex);
588 }
589 }
590
591 /***
592 * Notify all the queue listeners, that this message has arrived. This is
593 * ideal for browsers and iterators
594 *
595 * @param message - message to deliver
596 */
597 void notifyQueueListeners(MessageImpl message) {
598 if (!_queueListeners.isEmpty()) {
599 QueueListener[] listeners =
600 (QueueListener[]) _queueListeners.toArray(
601 new QueueListener[0]);
602
603 int size = listeners.length;
604 for (int index = 0; index < size; ++index) {
605 QueueListener listener = listeners[index];
606 if (listener instanceof QueueBrowserEndpoint) {
607 QueueBrowserEndpoint browser =
608 (QueueBrowserEndpoint) listener;
609 Selector selector = browser.getSelector();
610
611
612
613 if ((selector == null) ||
614 (selector.selects(message))) {
615 browser.onMessage(message);
616 }
617 } else {
618
619
620 listener.onMessage(message);
621 }
622 }
623 }
624 }
625
626
627 boolean notifyOnAddMessage(MessageImpl message) {
628 return true;
629 }
630
631
632 void notifyOnRemoveMessage(MessageImpl message) {
633 }
634
635
636 boolean hasActiveConsumers() {
637 boolean active = true;
638 if (_queueListeners.isEmpty() && _consumers.isEmpty()) {
639 active = false;
640 }
641 if (_log.isDebugEnabled()) {
642 _log.debug("hasActiveConsumers()[queue=" + _queue + "]=" + active);
643 }
644 return active;
645 }
646
647 /***
648 * Determines if this cache can be destroyed.
649 * A <code>QueueDestinationCache</code> can be destroyed if there
650 * are no active consumers and:
651 * <ul>
652 * <li>the queue is persistent and there are no messages</li>
653 * <li>
654 * the queue is temporary and the corresponding connection is closed
655 * </li>
656 * </ul>
657 *
658 * @return <code>true</code> if the cache can be destroyed, otherwise
659 * <code>false</code>
660 */
661 public boolean canDestroy() {
662 boolean destroy = false;
663 if (!hasActiveConsumers()) {
664 JmsDestination queue = getDestination();
665 if (queue.getPersistent() && getMessageCount() == 0) {
666 destroy = true;
667 } else if (queue.isTemporaryDestination()) {
668
669
670 String connectionId =
671 ((JmsTemporaryDestination) queue).getConnectionId();
672 JmsServerConnectionManager manager =
673 JmsServerConnectionManager.instance();
674 if (manager.getConnection(connectionId) == null) {
675 destroy = true;
676 }
677 }
678 }
679 return destroy;
680 }
681
682 /***
683 * Destroy this object
684 */
685 synchronized void destroy() {
686 super.destroy();
687 _queueListeners.clear();
688 }
689
690
691 public String toString() {
692 return _queue.toString();
693 }
694
695
696 public int hashCode() {
697 return _queue.hashCode();
698 }
699
700 }