View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2001-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: DurableConsumerEndpoint.java,v 1.4 2005/11/12 12:27:40 tanderson Exp $
44   */
45  package org.exolab.jms.messagemgr;
46  
47  import java.sql.Connection;
48  import java.util.Iterator;
49  import java.util.List;
50  import javax.jms.IllegalStateException;
51  import javax.jms.InvalidSelectorException;
52  import javax.jms.JMSException;
53  
54  import org.exolab.jms.client.JmsTopic;
55  import org.exolab.jms.persistence.DatabaseService;
56  import org.exolab.jms.persistence.PersistenceException;
57  
58  
59  /***
60   * A {@link ConsumerEndpoint} for durable topic consumers. The state of durable
61   * topic consumers is maintained across server invocations by the persistent
62   * layer.
63   * <p/>
64   * DurableConsumerEndpoints are always loaded in memory, whether they are active
65   * or inactive. When they are inactive they simply process persistent messages.
66   * Non-persistent message are ignored when the durable consumer is inactive.
67   *
68   * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a>
69   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
70   * @version $Revision: 1.4 $ $Date: 2005/11/12 12:27:40 $
71   */
72  public class DurableConsumerEndpoint
73          extends AbstractTopicConsumerEndpoint {
74  
75      /***
76       * The persistent name of the durable subscriber.
77       */
78      private final String _name;
79  
80      /***
81       * Determines if this active.
82       */
83      private boolean _active = false;
84  
85      /***
86       * Synchronization helper when activating the consumer.
87       */
88      private final Object _activateLock = new Object();
89  
90  
91      /***
92       * Construct a new <code>DurableConsumerEndpoint</code>.
93       * <p/>
94       * The consumer is inactive until made active via {@link #activate}.
95       *
96       * @param consumerId   the identity of this consumer
97       * @param topic        the topic to access
98       * @param destinations the destination manager
99       * @param name         the well known name of the durable subscriber
100      * @throws InvalidSelectorException if the selector is invalid
101      * @throws JMSException             if the destination caches can't be
102      *                                  constructed
103      * @throws PersistenceException     for any persistence error
104      */
105     public DurableConsumerEndpoint(long consumerId, JmsTopic topic,
106                                    String name,
107                                    DestinationManager destinations)
108             throws InvalidSelectorException, JMSException,
109                    PersistenceException {
110         super(consumerId, -1, topic, null, false, destinations);
111         _name = name;
112 
113         // register this with the available caches. Note that the consumer
114         // may begin receiving messages immediately.
115         init();
116 
117         DatabaseService service = DatabaseService.getInstance();
118         Connection connection = service.getConnection();
119 
120         // remove expired messages
121         service.getAdapter().removeExpiredMessageHandles(connection, _name);
122 
123         TopicDestinationCache cache = (TopicDestinationCache)
124                 getDestinationManager().getDestinationCache(topic);
125         // @todo - broken for wildcard subscriptions
126         // getMessageHandles() needs to return all handles for a given
127         // subscription name
128         List handles = cache.getDurableMessageHandles(_name);
129 
130         // iterate over each handle and add them to the list of messages
131         // for the durable consumer
132         Iterator iterator = handles.iterator();
133         while (iterator.hasNext()) {
134             MessageHandle handle = (MessageHandle) iterator.next();
135             TopicConsumerMessageHandle consumer =
136                     new TopicConsumerMessageHandle(handle, this);
137             addMessage(consumer);
138         }
139     }
140 
141     /***
142      * Determines if this is a persistent or non-persistent consumer.
143      * <p/>
144      * If persistent, then the consumer is persistent accross subscriptions and
145      * server restarts, and {@link #getPersistentId} returns a non-null value
146      *
147      * @return <code>true</code>
148      */
149     public boolean isPersistent() {
150         return true;
151     }
152 
153     /***
154      * Returns the persistent identifier for this consumer.
155      * <p/>
156      * This is the identity of the consumer which is persistent across
157      * subscriptions and server restarts.
158      * <p/>
159      * This implementation returns the consumer name.
160      *
161      * @return the persistent identifier for this consumer
162      */
163     public String getPersistentId() {
164         return _name;
165     }
166 
167     /***
168      * Activate this durable consumer.
169      *
170      * @param connectionId the identity of the connection that owns this
171      *                     consumer
172      * @param selector     the message selector. May be <code>null</code>
173      * @param noLocal      if true, inhibits the delivery of messages published
174      *                     by its own connection.
175      * @throws JMSException             if the consumer can't be activated
176      * @throws InvalidSelectorException if the selector is invalid
177      */
178     public void activate(long connectionId, String selector, boolean noLocal)
179             throws JMSException {
180         synchronized (_activateLock) {
181             if (_active) {
182                 throw new IllegalStateException(
183                         "Durable consumer " + _name + " is alrady active");
184             }
185             setConnectionId(connectionId);
186             setSelector(selector);
187             setNoLocal(noLocal);
188             _active = true;
189         }
190     }
191 
192     /***
193      * Deactivate this durable consumer.
194      *
195      * @throws JMSException if the consumer can't be deactivated
196      */
197     public void deactivate() throws JMSException {
198         synchronized (_activateLock) {
199             if (!_active) {
200                 throw new IllegalStateException(
201                         "Durable consumer " + _name + " is alrady inactive");
202             }
203             setConnectionId(-1);
204             setSelector(null);
205             _active = false;
206         }
207     }
208 
209     /***
210      * Determines if the endpoint is active.
211      *
212      * @return <code>true</code> if the endpoint is active, <code>false</code>
213      *         if it is inactive
214      */
215     public boolean isActive() {
216         synchronized (_activateLock) {
217             return _active;
218         }
219     }
220 
221 }