View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: TopicDestinationCache.java,v 1.6 2005/12/20 20:39:45 tanderson Exp $
44   */
45  package org.exolab.jms.messagemgr;
46  
47  import java.sql.Connection;
48  import java.util.ArrayList;
49  import java.util.Iterator;
50  import java.util.List;
51  import java.util.Vector;
52  import javax.jms.JMSException;
53  
54  import org.exolab.jms.client.JmsDestination;
55  import org.exolab.jms.client.JmsTopic;
56  import org.exolab.jms.lease.LeaseManager;
57  import org.exolab.jms.message.MessageImpl;
58  import org.exolab.jms.persistence.DatabaseService;
59  import org.exolab.jms.persistence.PersistenceException;
60  
61  
62  /***
63   * A {@link DestinationCache} for topics.
64   *
65   * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a>
66   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
67   * @version $Revision: 1.6 $ $Date: 2005/12/20 20:39:45 $
68   */
69  class TopicDestinationCache extends AbstractDestinationCache {
70  
71      /***
72       * Construct a new <code>TopicDestinationCache</code>.
73       *
74       * @param topic    the topic to cache messages for
75       * @param database the database service
76       * @param leases   the lease manager
77       */
78      public TopicDestinationCache(JmsTopic topic, DatabaseService database,
79                                   LeaseManager leases) {
80          super(topic, database, leases);
81      }
82  
83      /***
84       * Register a consumer with this cache.
85       *
86       * @param consumer the message consumer for this destination
87       * @return <code>true</code> if registered; otherwise <code>false</code>
88       */
89      public boolean addConsumer(ConsumerEndpoint consumer) {
90  
91          boolean result = false;
92  
93          // check to see that the consumer can actually subscribe to
94          // this destination
95          JmsTopic cdest = (JmsTopic) consumer.getDestination();
96          JmsTopic ddest = (JmsTopic) getDestination();
97  
98          if (cdest.match(ddest)) {
99              result = super.addConsumer(consumer);
100         }
101 
102         return result;
103     }
104 
105     /***
106      * Invoked when the {@link MessageMgr} receives a non-persistent message.
107      *
108      * @param destination the message's destination
109      * @param message     the message
110      * @throws JMSException if the listener fails to handle the message
111      */
112     public void messageAdded(JmsDestination destination, MessageImpl message)
113             throws JMSException {
114         boolean processed = false;
115         MessageRef reference =
116                 new CachedMessageRef(message, false, getMessageCache());
117 
118         reference.reference(); // temporary reference to ensure the message has
119                                // a non-zero reference while passing it to each
120                                // of the consumers, to avoid premature
121                                // destruction
122         addMessage(reference, message);
123         MessageHandle handle = new SharedMessageHandle(this, reference,
124                                                        message);
125 
126         ConsumerEndpoint[] consumers = getConsumerArray();
127         for (int index = 0; index < consumers.length; index++) {
128             ConsumerEndpoint consumer = consumers[index];
129             processed |= consumer.messageAdded(handle, message);
130         }
131 
132         // create a lease iff one is required and the message has actually
133         // been accepted by at least one endpoint
134         if (processed) {
135             checkMessageExpiry(reference, message);
136             reference.dereference(); // remove temporary reference
137         } else {
138             // no consumer picked up the message, so toss it
139             reference.destroy();
140             // @todo - inefficient. Don't really want to add the message
141             // just to remove it again if there are no consumers for it
142         }
143     }
144 
145     /***
146      * Invoked when the {@link MessageMgr} receives a persistent message.
147      *
148      * @param destination the message's destination
149      * @param message     the message
150      * @throws JMSException         if the listener fails to handle the message
151      * @throws PersistenceException if there is a persistence related problem
152      */
153     public void persistentMessageAdded(JmsDestination destination,
154                                        MessageImpl message)
155             throws JMSException, PersistenceException {
156         boolean processed = false;
157         MessageRef reference = new CachedMessageRef(message, true,
158                                                     getMessageCache());
159         reference.reference(); // temporary reference to ensure the message has
160                                // a non-zero reference while passing it to each
161                                // of the consumers, to avoid premature
162                                // destruction
163         addMessage(reference, message);
164         SharedMessageHandle handle = new SharedMessageHandle(this, reference,
165                                                              message);
166 
167         // now send the message to all active consumers
168         ConsumerEndpoint[] consumers = getConsumerArray();
169         for (int index = 0; index < consumers.length; index++) {
170             ConsumerEndpoint consumer = consumers[index];
171             processed |= consumer.persistentMessageAdded(handle, message);
172         }
173 
174         // for each inactive durable consumer, add a persistent handle
175         // @todo - possible race condition between inactive subscription
176         // becoming active again - potential for message loss?
177 /*
178         JmsTopic topic = (JmsTopic) getDestination();
179         List inactive = _consumers.getInactiveSubscriptions(
180                 topic);
181         if (!inactive.isEmpty()) {
182             Iterator iterator = inactive.iterator();
183             while (iterator.hasNext()) {
184                 String name = (String) iterator.next();
185                 TopicConsumerMessageHandle durable
186                         = new TopicConsumerMessageHandle(handle, name);
187                 durable.add(connection);
188             }
189             processed = true;
190         }
191 */
192 
193         // create a lease iff one is required and the message has actually
194         // been accepted by at least one endpoint
195         if (processed) {
196             checkMessageExpiry(reference, message);
197             reference.dereference(); // remove temporary reference
198         } else {
199             // no consumer picked up the message, so toss it
200             handle.destroy();
201             // @todo - inefficient. Don't really want to make the message
202             // persistent, just to remove it again if there are no consumers
203             // for it
204         }
205 
206     }
207 
208     /***
209      * Return a message handle back to the cache, to recover unsent or
210      * unacknowledged messages.
211      *
212      * @param handle the message handle to return
213      */
214     public void returnMessageHandle(MessageHandle handle) {
215         long consumerId = handle.getConsumerId();
216         AbstractTopicConsumerEndpoint endpoint =
217                 (AbstractTopicConsumerEndpoint) getConsumerEndpoint(consumerId);
218         // if the endpoint is still active then return the message
219         // back to it
220         if (endpoint != null) {
221             endpoint.returnMessage(handle);
222         } else {
223             // @todo - need to destroy the handle?
224         }
225     }
226 
227     /***
228      * Load the state of a durable consumer.
229      *
230      * @param name       the durable subscription name
231      * @return a list of {@link MessageHandle} instances
232      * @throws JMSException         for any JMS error
233      */
234     public List getDurableMessageHandles(String name)
235             throws JMSException, PersistenceException {
236         DatabaseService service = DatabaseService.getInstance();
237         Connection connection = service.getConnection();
238         Vector handles = service.getAdapter().getMessageHandles(
239                 connection, getDestination(), name);
240         List result = new ArrayList(handles.size());
241 
242         MessageCache cache = getMessageCache();
243 
244         Iterator iterator = handles.iterator();
245         while (iterator.hasNext()) {
246             PersistentMessageHandle handle =
247                     (PersistentMessageHandle) iterator.next();
248             String messageId = handle.getMessageId();
249             MessageRef reference = cache.getMessageRef(messageId);
250             if (reference == null) {
251                 reference = new CachedMessageRef(messageId, true, cache);
252             }
253             cache.addMessageRef(reference);
254             handle.reference(reference);
255             handle.setDestinationCache(this);
256             result.add(handle);
257 
258             checkMessageExpiry(reference, handle.getExpiryTime());
259         }
260         return result;
261     }
262 
263     /***
264      * Remove an expired persistent message, and notify any listeners.
265      *
266      * @param reference  a handle to the expired message
267      * @throws JMSException         if a listener fails to handle the
268      *                              expiration
269      * @throws PersistenceException if there is a persistence related problem
270      */
271     protected void persistentMessageExpired(MessageRef reference)
272             throws JMSException, PersistenceException {
273         String messageId = reference.getMessageId();
274         ConsumerEndpoint[] consumers = getConsumerArray();
275 
276         for (int i = 0; i < consumers.length; ++i) {
277             consumers[i].persistentMessageRemoved(messageId);
278         }
279     }
280 
281 }
282