View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: JmsTopicSession.java,v 1.25 2004/01/20 14:14:21 tanderson Exp $
44   */
45  package org.exolab.jms.client;
46  
47  import java.util.HashSet;
48  import javax.jms.InvalidDestinationException;
49  import javax.jms.JMSException;
50  import javax.jms.TopicSession;
51  import javax.jms.Topic;
52  import javax.jms.TopicSubscriber;
53  import javax.jms.TopicPublisher;
54  import javax.jms.TemporaryTopic;
55  
56  
57  /***
58   * Client implementation of the <code>javax.jms.TopicSession</code> interface
59   *
60   * @version     $Revision: 1.25 $ $Date: 2004/01/20 14:14:21 $
61   * @author      <a href="mailto:jima@exoffice.com">Jim Alateras</a>
62   */
63  class JmsTopicSession
64      extends JmsSession
65      implements TopicSession {
66  
67      /***
68       * This set maintains the list of active durable subscriber names for the
69       * session. Durable subscriber names must be unique within the session
70       */
71      private HashSet _durableNames = new HashSet();
72  
73  
74      /***
75       * Construct a new <code>JmsTopicSession</code>
76       *
77       * @param connection the owner of the session
78       * @param transacted if <code>true</code>, the session is transacted.
79       * @param ackMode indicates whether the consumer or the client will
80       * acknowledge any messages it receives. This parameter will be ignored if
81       * the session is transacted. Legal values are
82       * <code>Session.AUTO_ACKNOWLEDGE</code>,
83       * <code>Session.CLIENT_ACKNOWLEDGE</code> and
84       * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
85       * @throws JMSException if the session cannot be created
86       */
87      public JmsTopicSession(JmsTopicConnection connection, boolean transacted,
88                             int ackMode) throws JMSException {
89          super(connection, transacted, ackMode);
90      }
91  
92      /***
93       * Create a topic identity given its name
94       *
95       * @param topicName the topic name
96       * @return a topic with the given name.
97       * @throws JMSException if the topic can't be created
98       */
99      public synchronized Topic createTopic(String topicName)
100         throws JMSException {
101 
102         ensureOpen();
103 
104         JmsTopic topic = null;
105 
106         if (topicName != null && topicName.length() > 0) {
107             topic = new JmsTopic(topicName);
108         } else {
109             throw new JMSException("Invalid or null topic name specified");
110         }
111 
112         return topic;
113     }
114 
115     /***
116      * Create a non-durable subscriber for the specified topic
117      *
118      * @param topic the topic to subscriber to
119      * @return the new subscriber
120      * @throws JMSException if the subscriber cannot be created
121      */
122     public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
123         return createSubscriber(topic, null, false);
124     }
125 
126     /***
127      * Create a non-durable subscriber for the specified topic
128      *
129      * @param topic the topic to subscriber to
130      * @param selector the message selector to filter messages.
131      * May be <code>null</code>
132      * @param noLocal if <code>true</code>, inhibits the delivery of messages
133      * published by its own connection
134      * @return the new subscriber
135      * @throws JMSException if the subscriber cannot be created
136      */
137     public synchronized TopicSubscriber createSubscriber(
138         Topic topic, String selector, boolean noLocal) throws JMSException {
139 
140         JmsTopicSubscriber subscriber = null;
141 
142         ensureOpen();
143 
144         if (topic == null) {
145             throw new InvalidDestinationException(
146                 "Cannot create subscriber: argument 'topic' is null");
147         }
148 
149         // check to see if the topic is temporary. A temporary topic
150         // can only be used within the context of the owning connection
151         if (!checkForValidTemporaryDestination((JmsDestination) topic)) {
152             throw new InvalidDestinationException(
153                 "Trying to create a subscriber for a temp topic "
154                 + "that is not bound to this connection");
155         }
156 
157         subscriber = new JmsTopicSubscriber(
158             this, getNextConsumerId(), (JmsTopic) topic, selector, noLocal,
159             null);
160         addSubscriber(subscriber, null);
161 
162         return subscriber;
163     }
164 
165     /***
166      * Create a durable subscriber for the specified topic
167      *
168      * @param topic the topic to subscribe to
169      * @param name the durable subscription name
170      * @return a new durable subscriber
171      * @throws JMSException if the subscriber can't be created
172      */
173     public TopicSubscriber createDurableSubscriber(Topic topic, String name)
174         throws JMSException {
175         return createDurableSubscriber(topic, name, null, false);
176     }
177 
178     /***
179      * Create a durable subscriber for the specified topic
180      *
181      * @param topic the topic to subscribe to
182      * @param name the durable subscription name
183      * @param selector the message selector to filter messages.
184      * May be <code>null</code>
185      * @param noLocal if <code>true</code>, inhibits the delivery of messages
186      * published by its own connection
187      * @return a new durable subscriber
188      * @throws JMSException if the subscriber can't be created
189      */
190     public synchronized TopicSubscriber createDurableSubscriber(
191         Topic topic, String name, String selector, boolean noLocal)
192         throws JMSException {
193 
194         JmsTopicSubscriber subscriber = null;
195 
196         ensureOpen();
197 
198         if (topic == null) {
199             throw new InvalidDestinationException(
200                 "Cannot create durable subscriber: argument 'topic' is null");
201         }
202         if (name == null || name.trim().length() == 0) {
203             throw new JMSException("Invalid subscription name specified");
204         }
205 
206         // ensure that no durable subscriber has already been created
207         // with the same name
208         if (_durableNames.contains(name)) {
209             throw new JMSException("A durable subscriber already exists "
210                                    + "with name=" + name);
211         }
212 
213         // check to see if the topic is a temporary topic. You cannot
214         // create a durable subscriber for a temporary topic
215         if (((JmsTopic) topic).isTemporaryDestination()) {
216             throw new InvalidDestinationException(
217                 "Cannot create a durable subscriber for a temporary topic");
218         }
219 
220         // since it is a durable subscriber, the persistent flag
221         // must be set within the topic.
222         JmsTopic durable = null;
223         try {
224             durable = (JmsTopic) ((JmsTopic) topic).clone();
225         } catch (CloneNotSupportedException error) {
226             throw new JMSException("Failed to clone JmsTopic");
227         }
228         durable.setPersistent(true);
229         subscriber = new JmsTopicSubscriber(
230             this, getNextConsumerId(), durable, selector, noLocal, name);
231         addSubscriber(subscriber, name);
232         _durableNames.add(name);
233 
234         return subscriber;
235     }
236 
237     /***
238      * Create a publisher for the specified topic.
239      *
240      * @param topic the topic to publish to, or <code>null</code> if this is an
241      * unidentified producer
242      * @return the new publisher
243      * @throws JMSException if the publisher can't be created
244      */
245     public synchronized TopicPublisher createPublisher(Topic topic)
246         throws JMSException {
247 
248         ensureOpen();
249 
250         if (topic != null && ((JmsTopic) topic).isWildCard()) {
251             throw new JMSException(
252                 "Cannot create a publisher using a wildcard topic");
253         }
254 
255         JmsTopicPublisher publisher =
256             new JmsTopicPublisher(this, (JmsTopic) topic);
257         addPublisher(publisher);
258 
259         return publisher;
260     }
261 
262     /***
263      * Create a temporary topic. It's lifetime is that of the TopicConnection,
264      * unless deleted earlier.
265      *
266      * @return a new temporary topic
267      * @throws JMSException if the topic cannot be created
268      */
269     public synchronized TemporaryTopic createTemporaryTopic()
270         throws JMSException {
271 
272         ensureOpen();
273 
274         JmsTemporaryTopic topic = new JmsTemporaryTopic();
275         topic.setOwningConnection(getConnection());
276         return topic;
277     }
278 
279     /***
280      * Unsubscribe a durable subscription.
281      *
282      * @param name the durable subscription name
283      * @throws JMSException if the durable subscription can't be removed
284      */
285     public synchronized void unsubscribe(String name) throws JMSException {
286         ensureOpen();
287         if (name != null && name.length() > 0) {
288             if (_durableNames.contains(name)) {
289                 throw new JMSException("Cannot unsubscribe while an active "
290                                        + "TopicSubscriber exists");
291             }
292             getJmsSessionStub().unsubscribe(name);
293             _durableNames.remove(name);
294         } else {
295             throw new JMSException("Cannot unsubscribe with a null name");
296         }
297     }
298 
299     /***
300      * Register a subscriber
301      *
302      * @param subscriber the subscriber to register
303      * @param name durable subscription name, for durable subscribers,
304      * <code>null</code> otherwise.
305      * @throws JMSException if the subscriber cannot be registered with the
306      * server
307      */
308     protected synchronized void addSubscriber(JmsTopicSubscriber subscriber,
309                                               String name)
310         throws JMSException {
311 
312         // create it on the server
313         getJmsSessionStub().createSubscriber(
314             (JmsTopic) subscriber.getTopic(), name,
315             subscriber.getClientId(),
316             subscriber.getMessageSelector(),
317             subscriber.getNoLocal());
318 
319         // register locally
320         addConsumer(subscriber);
321     }
322 
323     /***
324      * Register a publisher
325      *
326      * @param publisher the publisher to register
327      * @throws JMSException if the publisher cannot be registered with the
328      * server
329      */
330     protected synchronized void addPublisher(JmsTopicPublisher publisher)
331         throws JMSException {
332         getJmsSessionStub().createPublisher((JmsTopic) publisher.getTopic());
333         addProducer(publisher);
334     }
335 
336     /***
337      * Deregister azsubscriber
338      *
339      * @param subscriber the subscriber to deregister
340      * @throws JMSException if the subscriber cannot be deregistered from the
341      * server
342      */
343     protected synchronized void removeSubscriber(JmsTopicSubscriber subscriber)
344         throws JMSException {
345 
346         if (!isClosed()) {
347             // remove the listener before deleting the subscriber. This is
348             // the correct order for clean up.
349             removeMessageListener(subscriber);
350             getJmsSessionStub().deleteSubscriber(subscriber.getClientId());
351         }
352 
353         // local clean up
354         removeConsumer(subscriber);
355 
356         if (subscriber.isDurableSubscriber()) {
357             _durableNames.remove(subscriber.getName());
358         }
359     }
360 
361     /***
362      * Deregister a publisher
363      *
364      * @param publisher the publisher to deregister
365      */
366     protected synchronized void removePublisher(JmsTopicPublisher publisher) {
367         removeProducer(publisher);
368     }
369 
370 }
371