View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2001-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: TopicDestinationCache.java,v 1.22 2003/10/21 14:41:24 tanderson Exp $
44   *
45   * Date         Author  Changes
46   * 3/1/2001     jima    Created
47   */
48  package org.exolab.jms.messagemgr;
49  
50  import java.sql.Connection;
51  import java.util.Iterator;
52  import java.util.List;
53  
54  import javax.jms.DeliveryMode;
55  import javax.jms.JMSException;
56  import javax.transaction.TransactionManager;
57  
58  import org.apache.commons.logging.Log;
59  import org.apache.commons.logging.LogFactory;
60  
61  import org.exolab.jms.client.JmsDestination;
62  import org.exolab.jms.client.JmsTopic;
63  import org.exolab.jms.message.MessageHandle;
64  import org.exolab.jms.message.MessageImpl;
65  import org.exolab.jms.persistence.PersistenceException;
66  
67  
68  /***
69   * A DestinationCache for Topics. This cache extends DestinationCache but does
70   * not actually hold a reference to the messages. Instead it forwards them on
71   * to registered consumers.
72   * <p>
73   * We may need to build the cache for clients that fail over to the new server
74   * so we should maintain a cache of at least persistent messages handles. This
75   * is something that needs to be considered. As for non-persistent messages well
76   * that is the penalty for using them. If you want to ensure that you get every
77   * message, even during failover then you best publsih them using persistent
78   * delivery mode.
79   *
80   * @version     $Revision: 1.22 $ $Date: 2003/10/21 14:41:24 $
81   * @author      <a href="mailto:jima@exoffice.com">Jim Alateras</a>
82   **/
83  public class TopicDestinationCache
84      extends DestinationCache {
85  
86      /***
87       * Underlying destination
88       */
89      private JmsTopic _topic = null;
90  
91      /***
92       * The logger
93       */
94      private static final Log _log =
95          LogFactory.getLog(TopicDestinationCache.class);
96  
97  
98      /***
99       * Construct a message cache for a particular destination. This object
100      * does not cache any messages.
101      *
102      * @param destination - the destination that owns this cache
103      * @throws FailedToInitializeException
104      */
105     TopicDestinationCache(JmsTopic destination)
106         throws FailedToInitializeException {
107         super();
108 
109         // don't need the local messages cache for the initial release, will need
110         // it for clustering
111         _topic = destination;
112 
113         // call init on the base class
114         try {
115             init();
116         } catch (FailedToInitializeException exception) {
117             // rethrow
118             throw exception;
119         } catch (Exception exception) {
120             throw new FailedToInitializeException(
121                 "Failed to construct TopicDestinationCache " +
122                 exception.toString());
123         }
124     }
125 
126     /***
127      * Construct a message cache for a particular destination using the specified
128      * {@link Connection} to complete any database access.
129      *
130      * @param connection - the connection to use.
131      * @param destination - the destination that owns this cache
132      * @throws FailedToInitializeException
133      */
134     TopicDestinationCache(Connection connection, JmsTopic destination)
135         throws FailedToInitializeException {
136         super();
137 
138         // don't need the local messages cache for the initial release, will need
139         // it for clustering
140         _topic = destination;
141 
142         // call init on the base class
143         try {
144             init(connection);
145         } catch (FailedToInitializeException exception) {
146             // rethrow
147             throw exception;
148         } catch (Exception exception) {
149             throw new FailedToInitializeException(
150                 "Failed to construct TopicDestinationCache " +
151                 exception.toString());
152         }
153     }
154 
155     // implementation of DestinationCache.getDestination
156     public JmsDestination getDestination() {
157         return _topic;
158     }
159 
160     // implementation of MessageMgr.messageAdded
161     public boolean messageAdded(JmsDestination destination, MessageImpl message) {
162         boolean processed = false;
163 
164         if ((destination != null) &&
165             (message != null)) {
166             // check that it is not already present before adding it.
167             if (destination.equals(_topic)) {
168                 processed = notifyOnAddMessage(message);
169                 // create a lease iff one is required and the message has actually
170                 // been accepted by at least one endpoint
171                 if (processed) {
172                     checkMessageExpiry(message);
173                 }
174             }
175         }
176 
177         return processed;
178     }
179 
180     // implementation of MessageMgr.messageRemoved
181     public void messageRemoved(JmsDestination destination, MessageImpl message) {
182         if ((destination != null) &&
183             (message != null)) {
184             // call remove regardless whether it exists
185             if (destination.equals(_topic)) {
186                 notifyOnRemoveMessage(message);
187             }
188         }
189     }
190 
191     // implementation of MessageMgr.persistentMessageAdded
192     public boolean persistentMessageAdded(Connection connection,
193                                           JmsDestination destination, MessageImpl message)
194         throws PersistenceException {
195         boolean processed = false;
196 
197         if ((destination != null) &&
198             (message != null)) {
199             // check that it is not already present before adding it.
200             if (destination.equals(_topic)) {
201                 processed = notifyOnAddPersistentMessage(connection, message);
202 
203                 // create a lease iff one is required and the message has actually
204                 // been accepted by at least one endpoint
205                 if (processed) {
206                     checkMessageExpiry(message);
207                 }
208             }
209         }
210 
211         return processed;
212     }
213 
214     // implementation of MessageMgr.persistentMessageRemoved
215     public void persistentMessageRemoved(Connection connection,
216                                          JmsDestination destination, MessageImpl message)
217         throws PersistenceException {
218         if ((destination != null) &&
219             (message != null)) {
220             // call remove regardless whether it exists
221             if (destination.equals(_topic)) {
222                 notifyOnRemovePersistentMessage(connection, message);
223             }
224         }
225     }
226 
227     // implementation of DestinationCache.notifyOnAddMessage
228     boolean notifyOnAddMessage(MessageImpl message) {
229         boolean processed = true;
230 
231         // process for all the active consumers.
232         Object[] iter = getConsumersByArray();
233         for (int index = 0; index < iter.length; index++) {
234             DestinationCacheEventListener listener =
235                 (DestinationCacheEventListener) iter[index];
236             processed |= listener.messageAdded(message);
237         }
238 
239         return processed;
240     }
241 
242     // implementation of DestinationCache.notifyOnRemoveMessage
243     void notifyOnRemoveMessage(MessageImpl message) {
244         Object[] iter = getConsumersByArray();
245         for (int index = 0; index < iter.length; index++) {
246             if (((DestinationCacheEventListener)
247                 iter[index]).messageRemoved(message)) {
248             }
249         }
250     }
251 
252     // implementation of DestinationCache.notifyOnAddPersistentMessage
253     boolean notifyOnAddPersistentMessage(Connection connection,
254                                          MessageImpl message)
255         throws PersistenceException {
256         boolean processed = true;
257 
258         // This is not done in MessageMgr
259         // first let's create a persistent message handle for all registered
260         // durable consumers
261         //try {
262         //    processed |= ConsumerManager.instance().persistentMessageAdded(
263         //        connection, message);
264         //} catch (Exception exception) {
265         //    throw new PersistenceException("Error in notifyOnAddPersistentMessage " +
266         //                                   exception);
267         //}
268 
269         // now send the message to all active durable and non-durable consumers
270         Object[] iter = getConsumersByArray();
271         for (int index = 0; index < iter.length; index++) {
272             DestinationCacheEventListener listener =
273                 (DestinationCacheEventListener) iter[index];
274             processed |= listener.persistentMessageAdded(connection, message);
275         }
276 
277         return processed;
278     }
279 
280     // implementation of DestinationCache.notifyOnRemovePersistentMessage
281     void notifyOnRemovePersistentMessage(Connection connection,
282                                          MessageImpl message)
283         throws PersistenceException {
284 
285         Object[] iter = getConsumersByArray();
286         for (int index = 0; index < iter.length; index++) {
287             DestinationCacheEventListener listener =
288                 (DestinationCacheEventListener) iter[index];
289             // if the listener is of type {@link DurableConsumerEndpoint} then
290             // indicate that a persistentMessage has been removed. If the listener
291             // is a {@link TopicConsumerEndpoint} then indicatw that non-
292             // persistent message has been removed.
293             if (listener instanceof DurableConsumerEndpoint) {
294                 listener.persistentMessageRemoved(connection, message);
295             } else if (listener instanceof TopicConsumerEndpoint) {
296                 listener.messageRemoved(message);
297             }
298         }
299 
300         // since it is a persistent message we need to send it
301         // to inactive durable consumers subscribing to the destination
302         try {
303             ConsumerManager.instance().persistentMessageRemoved(connection,
304                 message);
305         } catch (Exception exception) {
306             _log.error("Error in notifyOnRemovePersistentMessage",
307                 exception);
308         }
309     }
310 
311     // override implementation of DestinationCache.registerConsumer
312     public boolean registerConsumer(ConsumerEndpoint consumer) {
313 
314         boolean result = false;
315 
316         // check to see that the consumer can actually subscribe to
317         // this destination
318         JmsTopic cdest = (JmsTopic) consumer.getDestination();
319         JmsTopic ddest = (JmsTopic) getDestination();
320 
321         if (cdest.match(ddest)) {
322             if (!_consumers.contains(consumer)) {
323                 _consumers.add(consumer);
324                 consumer.setMaximumSize(this.getMaximumSize());
325                 result = true;
326             }
327         }
328 
329         return result;
330     }
331 
332     // implementation of DestinationCache.hasActiveConsumers
333     boolean hasActiveConsumers() {
334         return (_consumers.size() > 0);
335     }
336 
337     // implementation of DestinationCache.getMessageCount
338     public int getMessageCount() {
339         return 0;
340     }
341 
342     /***
343      * Determines if this cache can be destroyed.
344      * A <code>TopicDestinationCache</code> can be destroyed if there
345      * are no active consumers.
346      *
347      * @return <code>true</code> if the cache can be destroyed, otherwise
348      * <code>false</code>
349      */
350     public boolean canDestroy() {
351         return !hasActiveConsumers();
352     }
353 
354     // override Object.toString
355     public String toString() {
356         return _topic.toString();
357     }
358 
359     // override Object.hashCode
360     public int hashCode() {
361         return _topic.hashCode();
362     }
363 
364     /***
365      * Resolve an expired message through its handle
366      *
367      * @param handle the expired message's handle
368      * @return the expired message. May be null.
369      */
370     protected MessageImpl resolveExpiredMessage(MessageHandle handle) {
371         MessageImpl message = null;
372 
373         if (handle.getConsumerName() != null) {
374             message = super.resolveExpiredMessage(handle);
375         } else {
376             // can't resolve the message through the handle 
377             // Need to find the first consumer which has it.
378             // @todo - design flaw!
379             Iterator iterator = _consumers.iterator();
380             while (iterator.hasNext()) {
381                 ConsumerEndpoint endpoint = (ConsumerEndpoint) iterator.next();
382                 message = endpoint.getMessage(handle);
383                 if (message != null) {
384                     break;
385                 }
386             }
387         }
388         return message;
389     }
390 
391 }
392