View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   */
43  package org.exolab.jms.messagemgr;
44  
45  import java.sql.Connection;
46  import java.sql.SQLException;
47  import java.util.Date;
48  import java.util.HashMap;
49  import java.util.Iterator;
50  
51  import javax.jms.JMSException;
52  import javax.jms.Message;
53  import javax.transaction.TransactionManager;
54  
55  import org.apache.commons.logging.Log;
56  import org.apache.commons.logging.LogFactory;
57  
58  import org.exolab.core.service.BasicService;
59  import org.exolab.core.service.ServiceException;
60  import org.exolab.jms.client.JmsDestination;
61  import org.exolab.jms.client.JmsQueue;
62  import org.exolab.jms.client.JmsTopic;
63  import org.exolab.jms.config.AdministeredDestinations;
64  import org.exolab.jms.config.AdministeredQueue;
65  import org.exolab.jms.config.AdministeredTopic;
66  import org.exolab.jms.config.Configuration;
67  import org.exolab.jms.config.DatabaseConfiguration;
68  import org.exolab.jms.config.MessageManagerConfiguration;
69  import org.exolab.jms.config.Subscriber;
70  import org.exolab.jms.message.MessageHandle;
71  import org.exolab.jms.message.MessageImpl;
72  import org.exolab.jms.persistence.DatabaseService;
73  import org.exolab.jms.persistence.PersistenceException;
74  
75  
76  /***
77   * This is the active message handling component within the JMS server.
78   * Messages are passed in and added to the appropriate dispatchers for delivery
79   * to the clients.
80   *
81   * @version     $Revision: 1.93 $ $Date: 2003/08/17 01:32:24 $
82   * @author      <a href="mailto:mourikis@intalio.com">Jim Mourikis</a>
83   * @author      <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
84   */
85  public class MessageMgr extends BasicService {
86  
87      /***
88       * The service name of the message manager
89       */
90      private final static String MM_SERVICE_NAME = "MessageManager";
91  
92      /***
93       * Caches the singleton instance of the message manager.
94       */
95      private static MessageMgr _instance;
96  
97      /***
98       * used to synchronise the singleton construction
99       */
100     private static final Object _block = new Object();
101 
102     /***
103      * Maintain a list of registered MessageManagerEventListener objects, that
104      * get notified when certain events occur in the MessageManager
105      */
106     private transient HashMap _listeners = new HashMap(1023);
107 
108     /***
109      * The sequence number generator is used to differentiate messages arriving
110      * on the same millisecond
111      */
112     private long _sequenceNumberGenerator = 0;
113 
114     /***
115      * This is the maximum size the cache can reach before we are forced to
116      * run garbage collection. Garbage collection will also execute in the
117      * background periodically to remove processed messages from the cache.
118      */
119     private int _maximumSize = 2500;
120 
121     /***
122      * Tracks the number of messages processed
123      */
124     private long _messagesProcessed;
125 
126     /***
127      * The logger
128      */
129     private static final Log _log = LogFactory.getLog(MessageMgr.class);
130 
131 
132     /***
133      * Create and return an instance of the singleton. If the singleton already
134      * exists then simply return it. If there is a problem creating the
135      * singleton then throw an exception
136      *
137      * @return MessageMgr - the singleton instance
138      * @throws MessageMgrException
139      */
140     public static MessageMgr createInstance() throws MessageMgrException {
141         if (_instance == null) {
142             synchronized (_block) {
143                 if (_instance == null) {
144                     _instance = new MessageMgr();
145                 }
146             }
147         }
148         return _instance;
149     }
150 
151     /***
152      * Return an instance to the MessageMgr singleton. This method assumes that
153      * the singleton has already been created with a call to
154      * {@link #createInstance}
155      *
156      * @return MessageMgr
157      */
158     public static MessageMgr instance() {
159         return _instance;
160     }
161 
162     /***
163      * The constructor will register itself with the garbage collection
164      * service.
165      *
166      * @throws MessageMgrException - if there are problems during construction
167      */
168     private MessageMgr() throws MessageMgrException {
169         super(MM_SERVICE_NAME);
170     }
171 
172     // ovverride BasicService.start
173     public void start() throws ServiceException {
174         try {
175             DestinationManager.createInstance();
176             ConsumerManager.createInstance();
177         } catch (ServiceException exception) {
178             throw exception;
179         } catch (Exception exception) {
180             String msg = "Failed to start MessageMgr";
181             _log.error(msg, exception);
182             throw new ServiceException(msg + ":" + exception);
183         }
184     }
185 
186     // implement BasicService.run
187     public void run() {
188         // do nothing
189     }
190 
191     // override BasicService.stop
192     public synchronized void stop() throws ServiceException {
193         try {
194             // destroy the consumer manager.
195             ConsumerManager.instance().destroy();
196 
197             // destroy the destination manager.
198             DestinationManager.instance().destroy();
199 
200             // clear state
201             _listeners.clear();
202         } catch (Exception error) {
203             error.printStackTrace();
204             throw new ServiceException("Failed to stop MessageMgr : " +
205                 error.toString());
206         }
207 
208         // clear the static reference
209         synchronized (_block) {
210             _instance = null;
211 
212         }
213     }
214 
215     /***
216      * Create the specified destination. The destination is a container
217      * for messages and consumers. Consumers listen for messages posted on a
218      * particular destination.
219      * <p>
220      * This can be called multiple times without any side effects. If the
221      * destination is null then it throws a JMSException
222      *
223      * @param destination - create this destination
224      * @throws JMSException - if the params is null
225      */
226     public void addDestination(JmsDestination destination)
227         throws JMSException {
228 
229         // check the methods preconditions
230         if (destination == null) {
231             throw new JMSException("Call to addDestination with null object");
232         }
233 
234         DestinationManager.instance().createDestinationCache(destination);
235     }
236 
237     /***
238      * Remove this destination and all attached consumers. If the destination
239      * is null then throw an exception.
240      *
241      * @param destination - the destination to remove
242      * @throws JMSException
243      */
244     public void removeDestination(JmsDestination destination)
245         throws JMSException {
246 
247         // check the method's preconditions
248         if (destination == null) {
249             throw new JMSException("Call to removeDestination with null object");
250         }
251 
252         DestinationManager.instance().destroyDestinationCache(destination);
253     }
254 
255     /***
256      * Return true if the specified destination exists.
257      *
258      * @param destination - destination to check
259      * @return boolean - true if a it exists
260      */
261     public boolean exists(JmsDestination destination) {
262         return DestinationManager.instance().hasDestinationCache(destination);
263     }
264 
265     /***
266      * Add a message to the message manager for the specified destination.
267      * If the message or the destination are null then throw a JMSException
268      * <p>
269      * If the destination, specified in the message, does not exist then
270      * create it.
271      * destinations
272      *
273      * @param       message             the message to add
274      * @throws      JMSException        if the message cannot be added
275      */
276     public void add(MessageImpl message) throws JMSException {
277         if (message != null) {
278             // if the message is persistent then process it accordingly,
279             // otherwise use the non-persistent quality of service
280             if (message.isPersistent()) {
281                 addPersistentMessage(message);
282             } else {
283                 addNonPersistentMessage(message);
284             }
285         } else {
286             _log.error("Cannot process a null message.");
287         }
288     }
289 
290     /***
291      * Add a message to the message manager for the specified destination.
292      * Note that this method is called exclusively by the 
293      * {@link ResourceManager} and should not be used for any other purpose.
294      *
295      * @param connection - this is the database connection that is used
296      * @param message - the message to add
297      * @throws JMSException - thrown if  there is a problem processing msg
298      */
299     void add(Connection connection, MessageImpl message) throws JMSException {
300         if (message != null) {
301             // if the message is persistent then process it accordingly,
302             // otherwise use the non-persistent quality of service
303             if (message.isPersistent()) {
304                 addPersistentMessage(connection, message);
305             } else {
306                 addNonPersistentMessage(message);
307             }
308         } else {
309             _log.error("Cannot process a null message.");
310         }
311     }
312 
313     /***
314      * This method is used to process non-persistent messages.
315      *
316      * @param message - the message to process
317      * @throws JMSException - if the message cannot be processed
318      */
319     protected void addNonPersistentMessage(MessageImpl message)
320         throws JMSException {
321         // mark the message as accepted and attach a sequence number
322         message.setAcceptedTime((new Date()).getTime());
323         message.setSequenceNumber(++_sequenceNumberGenerator);
324         message.setReadOnly(true);
325 
326         // Use the message to retrieve the corresponding destination object.
327         JmsDestination destination =
328             (JmsDestination) message.getJMSDestination();
329         if (destination != null) {
330             // notify all registered listeners that a new message has arrived
331             // for the specified destination.
332             notifyOnAddMessage(destination, message);
333             _messagesProcessed++;
334         } else {
335             _log.error("Can't locate destination for message");
336         }
337     }
338 
339     /***
340      * This method is used to process persistent messages.
341      *
342      * @param message - the message to process
343      * @throws JMSException - if the message cannot be processed
344      */
345     protected void addPersistentMessage(MessageImpl message)
346         throws JMSException {
347 
348         // mark the message as accepted and attach a sequence number
349         message.setAcceptedTime((new Date()).getTime());
350         message.setSequenceNumber(++_sequenceNumberGenerator);
351         message.setReadOnly(true);
352 
353         JmsDestination destination =
354             (JmsDestination) message.getJMSDestination();
355         if (destination == null) {
356             throw new JMSException(
357                 "Can't process message - JMSDestination is null");
358         }
359 
360         Connection connection = null;
361         TransactionManager tm = null;
362 
363         // do all persistent work in this block
364         try {
365             // get a database connection
366             connection = DatabaseService.getConnection();
367 
368             // add the message to the database
369             DatabaseService.getAdapter().addMessage(connection, message);
370 
371             if (destination instanceof JmsTopic) {
372                 // let the consumer manager handle this
373                 ConsumerManager.instance().persistentMessageAdded(
374                     connection, message);
375             } else {
376                 // if this message is for a queue then simply create a
377                 // persistent handle for the destination
378                 MessageHandleFactory.createPersistentHandle(
379                     connection, destination, null, message);
380             }
381 
382             // commit the persistent message and handle
383             connection.commit();
384 
385             // Notify all listeners that a persistent message has arrived
386             notifyOnAddPersistentMessage(null, destination, message);
387             _messagesProcessed++;
388         } catch (Exception exception) {
389             if (connection != null) {
390                 try {
391                     connection.rollback();
392                 } catch (SQLException ignore) {
393                     // no-op
394                 }
395             }
396             _log.error("Failed to make message persistent", exception);
397             throw new JMSException(
398                 "Failed to make message persistent: " +
399                 exception.toString());
400         } finally {
401             if (connection != null) {
402                 try {
403                     connection.close();
404                 } catch (Exception ignore) {
405                     // no-op
406                 }
407             }
408         }
409     }
410 
411     /***
412      * This method is used to process persistent messages published through
413      * the resource manager.
414      *
415      * @param connection - the database connection to use.
416      * @param message - the message to process
417      * @throws JMSException - if the message cannot be processed
418      */
419     protected void addPersistentMessage(Connection connection,
420                                         MessageImpl message)
421         throws JMSException {
422 
423         // Use the message to retrieve the corresponding destination object.
424         // This method will create the object if one does not already exist.
425         JmsDestination destination = (JmsDestination) message.getJMSDestination();
426         if (destination != null) {
427             try {
428                 // notify all listeensers that a persistent message has arrived
429                 notifyOnAddPersistentMessage(connection, destination, message);
430                 _messagesProcessed++;
431             } catch (PersistenceException exception) {
432                 throw new JMSException("Failed in addPersistentMessage : " +
433                     exception.toString());
434             } catch (Exception exception) {
435                 throw new JMSException("Failed in addPersistentMessage : " +
436                     exception.toString());
437             }
438         } else {
439             // shouldn't really get here, since the message should have been
440             // checked and prepared before passed to this routine.
441             _log.error("Can't locate destination for message");
442         }
443     }
444 
445     /***
446      * Return the message given the specified message handle.
447      * This will delegate to the appropriate {@link DestinationCache} or
448      * {@link ConsumerManager}
449      *
450      * @param handle - the handle
451      * @return MessageImpl - the associated message or null
452      */
453     MessageImpl getMessage(MessageHandle handle) {
454         // precondition; ensure that the handle is not null
455         if (handle == null) {
456             return null;
457         }
458 
459         MessageImpl message = null;
460         if (handle.getDestination() instanceof JmsTopic) {
461             // is for a topic so check the consumer endpoint
462             // cache
463             TopicConsumerEndpoint endpoint = (TopicConsumerEndpoint)
464                 ConsumerManager.instance().getConsumerEndpoint(
465                     handle.getConsumerName());
466             if (endpoint != null) {
467                 message = endpoint.getMessage(handle);
468             }
469         } else {
470             // must be for a queue so check the destination cache
471             DestinationCache cache =
472                 DestinationManager.instance().getDestinationCache(
473                     handle.getDestination());
474             if (cache != null) {
475                 message = cache.getMessage(handle);
476             }
477         }
478 
479         return message;
480     }
481 
482     /***
483      * This method prepares the message without actually passing it through the
484      * system. It is used by the {@link ResourceManager} to process incoming
485      * messages.
486      * <p>
487      * If there are any issues with the message the method will throw an
488      * exception
489      *
490      * @param message - the message
491      * @throws JMSException - if the message is invalid or cannot be prep'ed
492      */
493     void checkAndPrepareMessage(MessageImpl message)
494         throws JMSException {
495         if (message != null) {
496             // mark the message as accepted and attach a sequence number
497             message.setAcceptedTime((new Date()).getTime());
498             message.setSequenceNumber(++_sequenceNumberGenerator);
499             message.setReadOnly(true);
500 
501             if (message.getJMSDestination() == null) {
502                 throw new JMSException("Null destination specified in message");
503             }
504         } else {
505             throw new JMSException("checkAndPrepareMessage failed for null message");
506         }
507     }
508 
509     /***
510      * Returns true if there are any messages for the specified consumer
511      *
512      * @param consumer - the consumer to check
513      * @return boolean - true if messages are queued
514      * @throws JMSException - if the consumer can't be checked
515      */
516     public boolean hasMessages(ConsumerEndpoint consumer) throws JMSException {
517         if (consumer == null) {
518             throw new JMSException(
519                 "Can't call hasMessages with null consumer");
520         }
521         return (consumer.getMessageCount() > 0);
522     }
523 
524     /***
525      * Returns a list of active destinations
526      *
527      * @return List a list of JmsDestination objects
528      */
529     public Iterator getDestinations() {
530         return DestinationManager.instance().destinations();
531     }
532 
533     /***
534      * Returns an iterator of active consumers registered to a given
535      * destination
536      *
537      * @return Iterator - iterator of {@link ConsumerEndpoint} objects.
538      * @throws JMSException
539      */
540     public Iterator getConsumers(JmsDestination destination)
541         throws JMSException {
542         //check to see that the destination is not null
543         if (destination == null) {
544             throw new JMSException("destination is null in getConsumer");
545         }
546 
547         DestinationCache dest =
548             DestinationManager.instance().getDestinationCache(destination);
549 
550         return (dest == null) ? null : dest.getConsumers();
551     }
552 
553     /***
554      * Resolves a destination given its name
555      *
556      * @param       name                the name of the destination
557      * @return      JmsDestination      if an active destination exists for
558      *                                  the given name, else it returns
559      *                                  <tt>null</tt>
560      */
561     public JmsDestination resolve(String name) {
562         return DestinationManager.instance().destinationFromString(name);
563     }
564 
565     /***
566      * Resolves a consumer given its destination and an identity. Should look
567      * removing t from here.
568      *
569      * @param       destination         the destination
570      * @param       name                the name of the consumer
571      * @return      ConsumerIfc         if an active consumer exists for
572      *                                  the given name, else it returns
573      *                                  <tt>null</tt>
574      */
575     public ConsumerEndpoint resolveConsumer(JmsDestination destination,
576                                             String id) {
577         return ConsumerManager.instance().getConsumerEndpoint(id);
578     }
579 
580     /***
581      * Stop/start a consumer. When stopped, the consumer will not receive
582      * messages until the consumer is re-started.
583      * This is invoked when the underlying connection is stopped or started
584      *
585      * @param       consumer            the consumer to stop/start
586      * @param       stop                when <tt>true</tt> stop the consumer
587      *                                  else start it.
588      */
589     public void setStopped(ConsumerEndpoint consumer, boolean stop)
590         throws JMSException {
591         // need to implement this for the consumer
592     }
593 
594     /***
595      * Add a message listener for a specific destination to be informed
596      * when messages, for the destination are added or removed from the
597      * queue. More than one listener can be registered per desitnation
598      * and the same listener can be registered for multiple destinations.
599      * <p>
600      * If a listener is already registered for a particuler destination
601      * then it fails silently.
602      *
603      * @param destination - what messgaes to listen for
604      * @param listener - a JmsMessageListener instance
605      */
606     public void addEventListener(JmsDestination destination,
607                                  MessageManagerEventListener listener) {
608 
609         if ((destination != null) &&
610             (listener != null)) {
611             synchronized (_listeners) {
612                 if (!_listeners.containsKey(destination)) {
613                     _listeners.put(destination, listener);
614                 }
615             }
616         }
617     }
618 
619     /***
620      * Remove the listener for the specified destination. If one is not
621      * registered then ignore it.
622      *
623      * @param destination - destination that it listens for
624      * @param listener - listener for that destination.
625      */
626     public void removeEventListener(JmsDestination destination,
627                                     MessageManagerEventListener listener) {
628         if ((destination != null) &&
629             (listener != null)) {
630             synchronized (_listeners) {
631                 if (_listeners.containsKey(destination)) {
632                     _listeners.remove(destination);
633                 }
634             }
635         }
636     }
637 
638     /***
639      * Notify the listeners, registered for the destination that a message has
640      * been added to the message manager.
641      * <p>
642      * All errors are propagated as JMSException exceptions
643      *
644      * @param destination - destination for which message exits
645      * @param message - message that was added
646      * @return boolean - true if the message was processed
647      * @throws JMSException - for any processing error
648      */
649     boolean notifyOnAddMessage(JmsDestination destination,
650                                MessageImpl message) throws JMSException {
651         boolean result = false;
652         MessageManagerEventListener listener =
653             (MessageManagerEventListener) _listeners.get(destination);
654 
655         if (listener != null) {
656             // if there is a registered destination cache then let the cache
657             // process it.
658             result = listener.messageAdded(destination, message);
659         } else {
660             // let the {@link DestinationManager handle the message
661             result = DestinationManager.instance().messageAdded(destination,
662                 message);
663         }
664 
665         return result;
666     }
667 
668     /***
669      * Notify the listeners, registered for the destination that a message has
670      * been removed from the message manager. There maybe several reason why
671      * this has happened (i.e the message has expired, message has been
672      * purged, message has been consumed etc).
673      *
674      * @param destination - destination for which message exits
675      * @param message - message that was removed
676      * @throws JMSException for any processing error
677      */
678     void notifyOnRemoveMessage(JmsDestination destination,
679                                MessageImpl message) throws JMSException {
680 
681         MessageManagerEventListener listener =
682             (MessageManagerEventListener) _listeners.get(destination);
683 
684         if (listener != null) {
685             // send the notification to the active listener
686             listener.messageRemoved(destination, message);
687         } else {
688             // there is not active listener, send it to the Destination
689             // Manager
690             DestinationManager.instance().messageRemoved(destination, message);
691         }
692     }
693 
694     /***
695      * Notify the listeners, registered for the destination that a persistent
696      * message has been added to the message manager.
697      *
698      * @param connection - the database connection to use.
699      * @param destination - destination for which message exits
700      * @param message - message that was added
701      * @return boolean - true if the message was processed
702      * @throws JMSException - is a processing error occured
703      * @throws PersistenceException - if a persistence error occured
704      */
705     boolean notifyOnAddPersistentMessage(Connection connection,
706                                          JmsDestination destination,
707                                          MessageImpl message)
708         throws JMSException, PersistenceException {
709 
710         boolean result = false;
711         MessageManagerEventListener listener =
712             (MessageManagerEventListener) _listeners.get(destination);
713 
714         if (listener != null) {
715             // if there is a registered destination cache then let the cache
716             // process it.
717             result = listener.persistentMessageAdded(connection,
718                 destination, message);
719         } else {
720             // let the {@link DestinationManager} handle the message
721             result = DestinationManager.instance().persistentMessageAdded(
722                 connection, destination, message);
723         }
724 
725         return result;
726     }
727 
728     /***
729      * Notify the listeners, registered for the destination that a persistent
730      * message has been removed from the message manager. There maybe several
731      * reason why this has happened (i.e the message has expired, message has
732      * been purged, message has been consumed etc).
733      *
734      * @param connection - the database connection to use
735      * @param destination - destination for which message exits
736      * @param message - message that was removed
737      * @throws JMSException - for any processing problem
738      * @throws PersistenceException - for any persistence related problem
739      */
740     void notifyOnRemovePersistentMessage(Connection connection,
741                                          JmsDestination destination,
742                                          MessageImpl message)
743         throws JMSException, PersistenceException {
744 
745         MessageManagerEventListener listener =
746             (MessageManagerEventListener) _listeners.get(destination);
747 
748         if (listener != null) {
749             // send the notification to the active listener
750             listener.persistentMessageRemoved(connection, destination,
751                 message);
752         } else {
753             // there is not active listener, send it to the Destination
754             // Manager
755             DestinationManager.instance().persistentMessageRemoved(connection,
756                 destination, message);
757         }
758     }
759 
760     /***
761      * Return the maximum size of the cache
762      *
763      * @return int - maximum size of cache
764      */
765     public int getMaximumSize() {
766         return _maximumSize;
767     }
768 
769     /***
770      * Notify the destruction of a handle.
771      * <p>
772      * If the handle has been destroyed then we need to do the following
773      * 1. if the handle is for a queue then we can remove the message
774      *    from the cache
775      * 2. if the handle is for a topic then we need to see whether we can
776      *    garbage collect it
777      *
778      * @param handle a TransientMessageHandle
779      */
780     public void handleDestroyed(MessageHandle handle) {
781 
782         // precondition: handle != null
783         if (handle == null) {
784             return;
785         }
786 
787         if (handle.getDestination() instanceof JmsTopic) {
788             TopicConsumerEndpoint endpoint = (TopicConsumerEndpoint)
789                 ConsumerManager.instance().getConsumerEndpoint(
790                     handle.getConsumerName());
791 
792             if (endpoint != null) {
793                 endpoint.deleteMessage(handle);
794             }
795         } else {
796             DestinationCache cache =
797                 DestinationManager.instance().getDestinationCache(
798                     handle.getDestination());
799 
800             if (cache != null) {
801                 cache.deleteMessage(handle);
802             }
803         }
804     }
805 }