View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: JmsServerSession.java,v 1.89 2003/08/17 01:32:26 tanderson Exp $
44   *
45   * Date         Author  Changes
46   * 04/07/2000   jima    Created
47   * 04/25/2000   jima    Changes to the interface
48   */
49  package org.exolab.jms.server;
50  
51  import java.util.HashMap;
52  import java.util.Iterator;
53  import java.util.Vector;
54  
55  import javax.jms.DeliveryMode;
56  import javax.jms.Destination;
57  import javax.jms.InvalidDestinationException;
58  import javax.jms.InvalidSelectorException;
59  import javax.jms.JMSException;
60  import javax.jms.Message;
61  import javax.jms.MessageConsumer;
62  import javax.jms.Session;
63  import javax.transaction.TransactionManager;
64  import javax.transaction.xa.XAException;
65  import javax.transaction.xa.XAResource;
66  import javax.transaction.xa.Xid;
67  
68  import org.apache.commons.logging.Log;
69  import org.apache.commons.logging.LogFactory;
70  
71  import org.exolab.jms.JMSErrorCodes;
72  import org.exolab.jms.client.JmsMessageListener;
73  import org.exolab.jms.client.JmsQueue;
74  import org.exolab.jms.client.JmsTopic;
75  import org.exolab.jms.message.MessageHandle;
76  import org.exolab.jms.message.MessageId;
77  import org.exolab.jms.message.MessageImpl;
78  import org.exolab.jms.messagemgr.ConsumerEndpoint;
79  import org.exolab.jms.messagemgr.ConsumerManager;
80  import org.exolab.jms.messagemgr.DestinationManager;
81  import org.exolab.jms.messagemgr.InternalMessageListener;
82  import org.exolab.jms.messagemgr.MessageMgr;
83  import org.exolab.jms.messagemgr.QueueBrowserEndpoint;
84  import org.exolab.jms.messagemgr.ResourceManager;
85  import org.exolab.jms.messagemgr.ResourceManagerException;
86  
87  
88  /***
89   * A session represents a server side endpoint to the JMSServer. A client can
90   * create producers, consumers and destinations through the session in addi-
91   * tion to other functions. A session has a unique identifer which is a comb-
92   * ination of clientId-connectionId-sessionId.
93   * <p>
94   * A session represents a single-threaded context which implies that it cannot
95   * be used with more than one thread concurrently. Threads registered with this
96   * session are synchronized.
97   * <p>
98   * Finally, instances of this object can only be created by classes within the
99   * same package.
100  *
101  * @version     $Revision: 1.89 $ $Date: 2003/08/17 01:32:26 $
102  * @author      <a href="mailto:jima@exoffice.com">Jim Alateras</a>
103  * @author      <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
104  * @see         JmsServerConnection
105  */
106 public class JmsServerSession
107     implements InternalMessageListener, XAResource {
108 
109     /***
110      * The client identifies the owner of the session.
111      */
112     private String _clientId = null;
113 
114     /***
115      * The session identifier uniquely distinguishes this session from any
116      * other session. No two sessions can have the same session identifier.
117      * This is allocated by the connection that created this session
118      */
119     private String _sessionId = null;
120 
121     /***
122      * Back pointer to the connection that created this session. This
123      * is set during object creation time
124      */
125     private JmsServerConnection _connection = null;
126 
127     /***
128      * Maintain a list of consumers along with their associated destinations
129      * that have been created through this session
130      */
131     private HashMap _consumers = new HashMap();
132 
133     /***
134      * The message listener is the reference to a remote client that wull
135      * receive the messages
136      */
137     private JmsMessageListener _listener = null;
138 
139     /***
140      * This is the acknowledgement mode for the session
141      */
142     private int _ackMode = Session.AUTO_ACKNOWLEDGE;
143 
144     /***
145      * Indicates whether the session is transactional
146      */
147     private boolean _transacted = false;
148 
149     /***
150      * Holds the current xid that this session is associated with. A session
151      * can olny be associated with one xid at any one time.
152      */
153     private Xid _xid = null;
154 
155     /***
156      * Indicates if the underlying connection of this session has been stopped
157      */
158     private boolean _stopped = true;
159 
160     /***
161      * Indicated that the session has been closed
162      */
163     private boolean _closed = false;
164 
165     /***
166      * Holds the number of messages published by this session
167      */
168     private long _publishCount;
169 
170     /***
171      * Holds the number of messages consumed by this session
172      */
173     private long _consumeCount;
174 
175     /***
176      * Caches all sent messages
177      */
178     private SentMessageCache _sentMessageCache;
179 
180     /***
181      * The logger
182      */
183     private static final Log _log = LogFactory.getLog(JmsServerSession.class);
184 
185 
186     /***
187      * Create an instance of the session using the specified client identifier.
188      * also pass a back pointer to the connection that created this session.
189      * By default, the session is stopped. The start() method must be invoked
190      * before messages will be dispatched to consumers.
191      *
192      * @param       connection  connection that created this session
193      * @param       id          client identity
194      * @param       int         acknowledgement mode for the session
195      * @param       transacted  true if the session is transactional
196      */
197     JmsServerSession(JmsServerConnection connection, String id, int ackmode,
198                      boolean transacted) {
199         _connection = connection;
200         _clientId = id;
201         _ackMode = ackmode;
202         _transacted = transacted;
203         _stopped = true;
204         _sentMessageCache = new SentMessageCache(this);
205     }
206 
207     /***
208      * Set the session id for this object. The Connection is responsible for
209      * setting the session id once it has been created
210      *
211      * @param       id          session id
212      */
213     void setSessionId(String id) {
214         _sessionId = id;
215     }
216 
217     /***
218      * Return a reference to the client id
219      *
220      * @return      String      client id
221      */
222     public String getClientId() {
223         return _clientId;
224     }
225 
226     /***
227      * Return a reference to the session id
228      *
229      * @return      String      session id
230      */
231     public String getSessionId() {
232         return _sessionId;
233     }
234 
235     /***
236      * Start the message delivery for the session. If session delivery has
237      * already started then treat the operation as an no-op
238      */
239     public void start() {
240         if (_log.isDebugEnabled()) {
241             _log.debug("start() [sessionId=" + _sessionId + "]");
242         }
243 
244         if (_stopped) {
245             pause(false);
246             _stopped = false;
247         }
248     }
249 
250     /***
251      * Stop message delivery for the session
252      */
253     public void stop() {
254         if (_log.isDebugEnabled()) {
255             _log.debug("stop() [sessionId=" + _sessionId + "]");
256         }
257         if (!_stopped) {
258             pause(true);
259             _stopped = true;
260         }
261     }
262 
263     /***
264      * Close and release any resource allocated to this session.
265      * This method may be called by multiple threads.
266      *
267      * @throws JMSException if the session cannot be closed
268      */
269     public void close() throws JMSException {
270         boolean closed = false;
271 
272         synchronized (this) {
273             closed = _closed;
274             if (!closed) {
275                 _closed = true;
276             }
277         }
278 
279         if (!closed) {
280             if (_log.isDebugEnabled()) {
281                 _log.debug("close() [sessionId=" + _sessionId + "]");
282             }
283 
284             // reset the listener
285             setMessageListener(null);
286 
287             // iterate over the list of consumers and deregister the
288             // associated endpoints and then remove all the entries
289             Iterator consumers = _consumers.values().iterator();
290             while (consumers.hasNext()) {
291                 ConsumerEndpoint consumer =
292                     (ConsumerEndpoint) consumers.next();
293                 ConsumerManager.instance().deleteConsumerEndpoint(consumer);
294             }
295 
296             // clear the unacked message cache
297             _sentMessageCache.clear();
298 
299             // clear the consumers
300             _consumers.clear();
301 
302             // de-register the session from the connection
303             _connection.deleteSession(this);
304         } else {
305             if (_log.isDebugEnabled()) {
306                 _log.debug("close() [sessionId=" + _sessionId +
307                     "]: session already closed");
308             }
309         }
310     }
311 
312     /***
313      * Acknowledge that the message with the following id has been processed
314      *
315      * @param clientId the clientId that sent the message to the client
316      * @param id the message to ack
317      * @throws JMSException if message cannot be acknowledged
318      */
319     public void acknowledgeMessage(long clientId, String id)
320         throws JMSException {
321         _sentMessageCache.acknowledgeMessage(new MessageId(id), clientId);
322     }
323 
324     /***
325      * Send the specified message to the server
326      *
327      * @param message the message to send
328      * @throws JMSException if the message can't be sent
329      */
330     public void sendMessage(Message message) throws JMSException {
331         if (message == null) {
332             throw new JMSException("Message is null");
333         }
334 
335         try {
336             // check the delivery mode of the message
337             checkDeliveryMode((MessageImpl) message);
338 
339             // set the connection identity and then let the message meanager
340             // process it
341             ((MessageImpl) message).setConnectionId(_connection.hashCode());
342 
343             // if there is a global transaction currently in process then
344             // we must send the message to the resource manager, otherwise
345             // send it directly to the message manager
346             if (_xid != null) {
347                 ResourceManager.instance().logPublishedMessage(_xid,
348                     (MessageImpl) message);
349             } else {
350                 MessageMgr.instance().add((MessageImpl) message);
351                 _publishCount++;
352             }
353         } catch (JMSException exception) {
354             _log.error("Failed to process message", exception);
355             throw exception;
356         } catch (OutOfMemoryError exception) {
357             String msg =
358                 "Failed to process message due to out-of-memory error";
359             _log.error(msg, exception);
360             throw new JMSException(msg);
361         } catch (Exception exception) {
362             String msg = "Failed to process message";
363             _log.error(msg, exception);
364             throw new JMSException(msg);
365         }
366     }
367 
368     /***
369      * Send the specified messages to the server.
370      *
371      * @param messages the messages to send
372      * @throws JMSException if the messages can't be sent
373      */
374     public void sendMessages(Vector messages) throws JMSException {
375         if (messages == null) {
376             throw new JMSException("No messages to send");
377         }
378 
379         MessageImpl message = null;
380         while ((messages.size() > 0) &&
381             ((message = (MessageImpl) messages.remove(0)) != null)) {
382             try {
383                 // check the delivery mode of the message
384                 checkDeliveryMode((MessageImpl) message);
385 
386                 // set the connection identity and then let the message manager
387                 // process it
388                 message.setConnectionId(_connection.hashCode());
389 
390                 // if there is a global transaction in progress then send the
391                 // message to the resource manager, otherwise send it to the
392                 // message manager
393                 if (_xid != null) {
394                     ResourceManager.instance().logPublishedMessage(_xid,
395                         message);
396                 } else {
397                     MessageMgr.instance().add(message);
398                     _publishCount++;
399                 }
400             } catch (JMSException exception) {
401                 _log.error("Failed to process message", exception);
402                 throw exception;
403             } catch (OutOfMemoryError exception) {
404                 String msg =
405                     "Failed to process message due to out-of-memory error";
406                 _log.error(msg, exception);
407                 throw new JMSException(msg);
408             } catch (Exception exception) {
409                 String msg = "Failed to process messages";
410                 _log.error(msg, exception);
411                 throw new JMSException(msg);
412             }
413         }
414     }
415 
416     /***
417      * Return the next message for the specified client. The <code>wait</code>
418      * parameter indicates how long many milliseconds to wait for a message
419      * before returning. If <code>wait</code> is 0 then do not wait at all. If
420      * <code>wait</code> is -1 then wait indefinitely for the next message
421      *
422      * @param clientId the client identity
423      * @param wait number of ms to wait
424      * @return the next message or <code>null</code> if there is no message
425      * @throws JMSException if the message can't be received
426      */
427     public Message receiveMessage(long clientId, long wait)
428         throws JMSException {
429         MessageImpl message = null;
430         ConsumerEndpoint consumer = getConsumerEndpoint(clientId);
431         if (consumer == null) {
432             throw new JMSException(
433                 "Can't receive message: no consumer registered with "
434                 + "identifier " + clientId + " on session " + _sessionId);
435         }
436 
437         // we have a valid consumer, now we need retrieve a handle.
438         MessageHandle handle = consumer.receiveMessage(wait);
439         if (handle != null) {
440             // if we get a non-null handle the retrieve the message,
441             // clone
442             MessageImpl orig = handle.getMessage();
443             if (orig != null) {
444                 try {
445                     message = (MessageImpl) orig.clone();
446                     message.setJMSRedelivered(handle.getDelivered());
447                     message.setClientId(handle.getClientId());
448                     _consumeCount++;
449                 } catch (Exception exception) {
450                     _log.error(exception);
451                     message = null;
452                 }
453             }
454         }
455 
456         // if we have a non-null message then add it to the sent message
457         // cache. Additionally, if we are part of a global transaction then
458         // we must also sent it to the ResourceManager for recovery.
459         if (handle != null) {
460             _sentMessageCache.process(handle);
461 
462             if (_xid != null) {
463                 try {
464                     ResourceManager.instance().logReceivedMessage(
465                         _xid, consumer.getId(), handle);
466                 } catch (Exception exception) {
467                     _log.error(exception);
468                     JMSException jms_exception = new JMSException(
469                         "Error in receiveMessage");
470                     jms_exception.setLinkedException(exception);
471                     throw jms_exception;
472                 }
473             }
474         }
475 
476         return message;
477     }
478 
479     /***
480      * Return up to count messages from the endpoint with the specified
481      * client identity. The client must be a QueueBrowser.
482      *
483      * @param       clientId            the client identity
484      * @param       count               the maximum number of messages retrieve
485      * @return      Message             the next message or null
486      * @throws JMSException if the endpoint does not exist, or is not a
487      * {@link QueueBrowserEndpoint}
488      */
489     public Vector receiveMessages(long clientId, int count)
490         throws JMSException {
491 
492         ConsumerEndpoint consumer = getConsumerEndpoint(clientId);
493         if (consumer == null) {
494             throw new JMSException(
495                 "Can't receive messages: no consumer registered with "
496                 + "identifier " + clientId + " on session " + _sessionId);
497         }
498 
499         if (!(consumer instanceof QueueBrowserEndpoint)) {
500             throw new JMSException(
501                 "Can't receive messages: consumer with identifier "
502                 + "identifier " + clientId + " is not a QueueBrowser");
503         }
504 
505         // we have a valid consumer, now we need retrieve upto count
506         // handles
507         Vector handles = ((QueueBrowserEndpoint) consumer).receiveMessages(
508             count);
509         Vector messages = new Vector();
510         if (handles.size() > 0) {
511 
512             // process the handles
513             int max = handles.size();
514             for (int index = 0; index < max; index++) {
515                 MessageHandle handle = (MessageHandle) handles.elementAt(index);
516                 MessageImpl orig = handle.getMessage();
517                 MessageImpl message = null;
518                 if (orig != null) {
519                     try {
520                         message = (MessageImpl) orig.clone();
521                         message.setJMSRedelivered(handle.getDelivered());
522                         message.setClientId(handle.getClientId());
523                         messages.addElement(message);
524                     } catch (Exception exception) {
525                         _log.error(exception);
526                         message = null;
527                     }
528                 }
529             }
530         }
531 
532         return messages;
533     }
534 
535     /***
536      * Create an amdinistered queue, through the message manager admin
537      * interface.
538      *
539      * @param queue administered queue to create
540      * @throws JMSException if the queue can't be created
541      */
542     public void createQueue(JmsQueue queue) throws JMSException {
543         if (!DestinationManager.instance().createAdministeredDestination(
544             queue)) {
545             throw new JMSException("Failed to create queue: " +
546                 queue.getName());
547         }
548     }
549 
550     /***
551      * Create an administered topic, through the message manager admin
552      * interface.
553      *
554      * @param topic administered topic to create
555      * @throws JMSException if the topic can't be created
556      */
557     public void createTopic(JmsTopic topic) throws JMSException {
558         if (!DestinationManager.instance().createAdministeredDestination(
559             topic)) {
560             throw new JMSException("Failed to create topic: " +
561                 topic.getName());
562         }
563     }
564 
565     /***
566      * Create a receiver endpoint for this session. A receiver is a message
567      * consumer specific to the queue message model. The receiver is
568      * associated with a queue.
569      * <p>
570      * You cannot create more than one receiver for the same destination
571      *
572      * @param queue the receiver destination
573      * @param consumerId the client session allocated identifier of the
574      * consumer
575      * @param selector the selector to filter messages. May be
576      * <code>null</code>
577      * @throws JMSException if the receiver can't be created
578      */
579     public void createReceiver(JmsQueue queue, long clientId, String selector)
580         throws JMSException {
581         if (_log.isDebugEnabled()) {
582             _log.debug("createReceiver(queue=" + queue + ", clientId="
583                 + clientId + ", selector=" + selector
584                 + ") [sessionId=" + _sessionId + "]");
585         }
586 
587         if (queue == null) {
588             throw new JMSException("Cannot create receiver for null queue");
589         }
590 
591         // Retrieve the destination from the destination manager and use
592         // it to create the consumer
593         ConsumerEndpoint consumer =
594             ConsumerManager.instance().createConsumerEndpoint(this,
595                 clientId, queue, selector);
596         consumer.setAckMode(_ackMode);
597         consumer.setConnectionId(_connection.hashCode());
598         consumer.setTransacted(_transacted);
599         // if the session is stopped then we should also stop the
600         // consumer, so that it doesn't deliver messages and then
601         // cache it for future reference.
602         consumer.setStopped(_stopped);
603         _consumers.put(Long.toString(clientId), consumer);
604     }
605 
606     /***
607      * This is a no-op
608      */
609     public void createSender(JmsQueue queue) throws JMSException {
610     }
611 
612     /***
613      * Create a queue browser for this session. This allows clients to browse
614      * a queue without removing any messages.
615      * <p>
616      *
617      * You cannot create more than one queue browser for the same queue
618      * in a single session.
619      *
620      * @param       queue               queue to browse
621      * @param       clientId            the client identity
622      * @param       selector            message selector. This may be null
623      * @throws   JMSException.
624      */
625     public void createBrowser(JmsQueue queue, long clientId, String selector)
626         throws JMSException {
627         if (_log.isDebugEnabled()) {
628             _log.debug("createBrowser(queue=" + queue + ", clientId="
629                 + clientId + ", selector=" + selector
630                 + ") [sessionId=" + _sessionId + "]");
631         }
632 
633         // check to see that we have a valid queue
634         if (queue == null) {
635             throw new JMSException("Cannot create browser for null queue");
636         }
637 
638         // Retrieve the destination from the destination manager and use
639         // it to create the consumer
640         ConsumerEndpoint consumer =
641             ConsumerManager.instance().createQueueBrowserEndpoint(this,
642                 clientId, queue, selector);
643 
644         // if the session is stopped then we should also stop the
645         // consumer, so that it doesn't deliver messages and then
646         // cache it for future reference.
647         consumer.setStopped(_stopped);
648         _consumers.put(Long.toString(clientId), consumer);
649     }
650 
651     /***
652      * Delete the receiver with the specified identity and clean up all
653      * associated resources.
654      *
655      * @param clientId the identity of the receiver
656      * @throws JMSException if the receiver cannot be deleted
657      */
658     public void deleteReceiver(long clientId) throws JMSException {
659         if (_log.isDebugEnabled()) {
660             _log.debug("deleteReceiver(clientId=" + clientId + ") [sessionId="
661                 + _sessionId + "]");
662         }
663 
664         ConsumerEndpoint consumer =
665             (ConsumerEndpoint) _consumers.remove(Long.toString(clientId));
666         if (consumer == null) {
667             throw new JMSException("No receiver with id " + clientId);
668         }
669 
670         // destroy the consumer endpoint
671         ConsumerManager.instance().deleteConsumerEndpoint(consumer);
672     }
673 
674     /***
675      * Delete the sender associated with the specified queue from the session
676      * If the corresponding sender does not exist or it cannot delete it then
677      * throw the JMSException.
678      *
679      * @param clientId the identity of the sender
680      * @throws JMSException if the sender cannot be deleted
681      */
682     public void deleteSender(long clientId) throws JMSException {
683         // no-op
684     }
685 
686     /***
687      * Delete the queue browser associated with the specified queue from
688      * the session.
689      *
690      * @param clientId the identity of the browser
691      * @throws JMSException if the browser cannot be deleted
692      */
693     public void deleteBrowser(long clientId) throws JMSException {
694         ConsumerEndpoint consumer =
695             (ConsumerEndpoint) _consumers.remove(Long.toString(clientId));
696         if (consumer == null) {
697             throw new JMSException("No browser with id " + clientId);
698         }
699         // destroy the consumer endpoint
700         ConsumerManager.instance().deleteConsumerEndpoint(consumer);
701     }
702 
703     /***
704      * Create a subscriber endpoint for this session. A subscriber is a message
705      * consumer specific to the topic message model. The subscriber is
706      * associated with a topic. Register the consumer with the message
707      * manager so that a queue can be set up for it. Finally add the consumer
708      * to the list of consumers managed by this session.
709      * <p>
710      * Note that the message manager manages consumers  for all server sessions
711      * <p>
712      * You cannot create more than one subscriber for the same destination.
713      * Currently we don't check this
714      *
715      * @param       topic               subscriber destination
716      * @param       name                consumer name
717      * @param       clientId            the client session allocated
718      *                                  identifier of the consumer
719      * @param       selector            the selector to filter messages.
720      *                                  This may be null.
721      * @param       noLocal             true to inhibit consumption of messages
722      *                                  published on this connection.
723      * @throws   JMSException.
724      */
725     public void createSubscriber(JmsTopic topic, String name, long clientId,
726                                  String selector, boolean noLocal)
727         throws JMSException {
728 
729         if (_log.isDebugEnabled()) {
730             _log.debug("createSubscriber(topic=" + topic + ", name=" + name
731                 + ", clientId=" + clientId + ", selector=" + selector
732                 + ", noLocal=" + noLocal + ") [sessionId="
733                 + _sessionId + "]");
734         }
735 
736         // check to ensure that the methods preconditions have been met
737         if (topic == null) {
738             throw new JMSException("Cannot create subscriber for null topic");
739         }
740 
741         // Retrieve the destination from the destination manager and
742         // use it to create the consumer through the consumer manager.
743         ConsumerEndpoint consumer = null;
744 
745         if (name != null) {
746             if (name.length() > 0) {
747 
748                 // for a durable consumer the topic must be
749                 ConsumerManager manager = ConsumerManager.instance();
750 
751                 if (manager.durableConsumerExists(name)) {
752                     // if the durable consumer exists then validate that
753                     // it was the specified topic that it was registered
754                     // under. If it is not registered for the topic then
755                     // we must delete the existing entry and recreate it
756                     // against the new topic
757                     if (!manager.validSubscription(topic.getName(), name)) {
758                         unsubscribe(name);
759                         manager.createDurableConsumer(topic, name);
760                     }
761                 } else {
762                     // if the durable consumer does not exist then create
763                     // it
764                     manager.createDurableConsumer(topic, name);
765                 }
766 
767                 // if a durable subscriber with the specified name is
768                 // alreayd active then this method will throw an
769                 // exception.
770                 // attempt to create a durable consuinmer
771                 consumer = manager.createDurableConsumerEndpoint(this,
772                     topic, name, clientId, selector);
773                 consumer.setConnectionId(_connection.hashCode());
774                 consumer.setTransacted(_transacted);
775                 consumer.setAckMode(_ackMode);
776                 consumer.setNoLocal(noLocal);
777             } else {
778                 throw new JMSException("Name in createSubscriber was null");
779             }
780         } else {
781             // Create a non-durable subscriber for the specified destination
782             // and using the required selector.
783             consumer = ConsumerManager.instance().createConsumerEndpoint(this,
784                 clientId, topic, selector);
785             consumer.setConnectionId(_connection.hashCode());
786             consumer.setTransacted(_transacted);
787             consumer.setAckMode(_ackMode);
788             consumer.setNoLocal(noLocal);
789         }
790 
791         // once the consumer has been created then set it to the same state
792         // as the session and add it to the list on consumers to manage
793         consumer.setStopped(_stopped);
794         _consumers.put(Long.toString(clientId), consumer);
795     }
796 
797     /***
798      * This should be a no operation. Do we need to maintain state information
799      * that a publisher has been created.
800      *
801      * @param       topic               receiver destination
802      * @throws   JMSException.
803      */
804     public void createPublisher(JmsTopic topic)
805         throws JMSException {
806     }
807 
808     /***
809      * This function deletes a persistent subsrciber and its history from
810      * the database. It his subscriber re-connects it get everything available
811      * for the queue topic. If the subscriber is reliable, this is a no op.
812      * See UnregisterSubscriber below for just unregistering the subscriber
813      * but leaving its persistent data in the db.
814      * <p>
815      * The data contains information necessary to delete the subscriber
816      *
817      * @param       clientId            the client identity
818      * @throws   JMSException.
819      */
820     public void deleteSubscriber(long clientId) throws JMSException {
821         if (_log.isDebugEnabled()) {
822             _log.debug("deleteSubscriber(clientId=" + clientId
823                 + ") [sessionId=" + _sessionId + "]");
824         }
825 
826         // retrieve the endpoint corresponding to the client id and
827         // then acknowledge the messsage
828         ConsumerEndpoint consumer =
829             (ConsumerEndpoint) _consumers.remove(Long.toString(clientId));
830         if (consumer == null) {
831             throw new JMSException("Failed to close consumer with id " +
832                 "[" + hashCode() + ":" + clientId + "]");
833         }
834 
835         ConsumerManager.instance().deleteConsumerEndpoint(consumer);
836     }
837 
838     /***
839      * Delete the publisher associated with the specified topic from the
840      * session. If the corresponding publisher does not exist or it cannot
841      * delete it then throw the JMSException.
842      *
843      * @param       topic               sender destination
844      * @throws   JMSException.
845      */
846     public void deletePublisher(JmsTopic topic) throws JMSException {
847         // no-op
848     }
849 
850     /***
851      * Unsubscribe a durable subscription. This will delete the state of
852      * the durable subscriber maintained by the server. A durable subscriber
853      * is uniquely identifiable and the same subscriber cannot be associated
854      * with more than topic.
855      *
856      * @param       name                the name used to uniquely identify the
857      *                                  subscription
858      * @throws   JMSException        if the subscription cannot be removed
859      *                                  or any other problem.
860      */
861     public void unsubscribe(String name) throws JMSException {
862         if (_log.isDebugEnabled()) {
863             _log.debug("unsubscribe(name=" + name + ") [sessionId="
864                 + _sessionId + "]");
865         }
866 
867         ConsumerManager manager = ConsumerManager.instance();
868 
869         // check that the durable consumer actually exists. If it doesn't then
870         // throw an exception
871         if (!manager.durableConsumerExists(name)) {
872             throw new InvalidDestinationException(name +
873                 " is not a durable subscriber name");
874         }
875 
876         // check that the durable consumer is not active before removing it. If
877         // it is then throw an exception
878         if (!manager.isDurableConsumerActive(name)) {
879             manager.removeDurableConsumer(name);
880         } else {
881             throw new JMSException("Failed to unsubscribe subscriber "
882                 + name + " since is still active");
883         }
884     }
885 
886     /***
887      * Stop message delivery to this session. If there are any problems
888      * completing the request then throw the JMSException exception
889      *
890      * @throws   JMSException
891      */
892     public void stopMessageDelivery() throws JMSException {
893         stop();
894     }
895 
896     /***
897      * Start message delivery to this session. If there are any problems
898      * completing this request then throw the JMSException exception
899      *
900      * @throws   JMSException
901      */
902     public void startMessageDelivery() throws JMSException {
903         start();
904     }
905 
906     /***
907      * Check if the specified message handle is in the session's list
908      * of unacked messages
909      *
910      * @param handle - the handle to query
911      * @return boolean - true if it is and false otherwise
912      */
913     public boolean containsUnackedHandle(MessageHandle handle) {
914         return _sentMessageCache.handleInCache(handle);
915     }
916 
917     // implementation of InternalMessageListener.onMessage
918     public void onMessage(MessageHandle handle, boolean ignore)
919         throws Exception {
920 
921         if ((handle != null) &&
922             (_listener != null)) {
923             MessageImpl message = handle.getMessage();
924             MessageImpl m = null;
925             if (message != null) {
926                 m = (MessageImpl) message.clone();
927                 m.setClientId(handle.getClientId());
928                 m.setJMSRedelivered(handle.getDelivered());
929 
930                 // if we are acking the message and the session is
931                 // transacted and the acknowledge mode is
932                 // CLIENT_ACKNOWLEDGE then send it to the cache before
933                 // we send it to the listener. This will enable clients
934                 // to ack the message while in the onMessage method
935                 if ((_transacted) ||
936                     (_ackMode == Session.CLIENT_ACKNOWLEDGE)) {
937                     _sentMessageCache.process(handle);
938                 }
939 
940                 try {
941                     // send the message to the listener.
942                     _listener.onMessage(m);
943 
944                     // if the session is not transacted or the acknowledge mode
945                     // is not CLIENT_ACKNOWLEDGE then process it through the
946                     // sent message cache now.
947                     if ((!_transacted) &&
948                         (_ackMode != Session.CLIENT_ACKNOWLEDGE)) {
949                         _sentMessageCache.process(handle);
950                     }
951                 } catch (ClientDisconnectionException exception) {
952                     // close all resources and rethrow it
953                     close();
954                     throw exception;
955                 }
956             } else {
957                 throw new JMSException(
958                     "Could not get message for handle " + handle,
959                     JMSErrorCodes.FailedToResolveHandle);
960             }
961         }
962     }
963 
964     // implementation of InternalMessageListener.onMessage
965     public void onMessages(Vector handles) throws Exception {
966         _log.error("Illegal to call onMessage");
967         Thread.currentThread().dumpStack();
968     }
969 
970     // implementation of InternalMessageListener.onMessageAvailable
971     public void onMessageAvailable(long clientId) throws Exception {
972         _listener.onMessageAvailable(clientId);
973     }
974 
975     /***
976      * This will send a null message down the connection to the client to
977      * test whether the client endpoint is alive.
978      *
979      * @return <code>true</code> if it is active, otherwise <code>false</code>
980      */
981     public boolean isClientEndpointActive() {
982         boolean active = true;
983         if (_listener != null) {
984             try {
985                 // send the message to the listener.
986                 _listener.onMessage(null);
987             } catch (ClientDisconnectionException exception) {
988                 _log.info("Failed to verify that session " + _sessionId
989                     + " is active.");
990                 active = false;
991                 // ignore the exception
992             }
993         }
994 
995         return active;
996     }
997 
998     /***
999      * Set a message listener for the session. This is the channel used
1000      * to asynchronously deliver messages to consumers created on this
1001      * session.
1002      *
1003      * @param listener the message listener
1004      */
1005     public void setMessageListener(JmsMessageListener listener) {
1006         _listener = listener;
1007     }
1008 
1009     /***
1010      * Check whether to enable asynchronous message delivery for a particular
1011      * consumer
1012      *
1013      * @param clientId the id of the client to check
1014      * @param id the last processed message
1015      * @param enable <code>true</code> to enable; <code>false</code> to disable
1016      */
1017     public void enableAsynchronousDelivery(long clientId, String id,
1018                                            boolean enable)
1019         throws JMSException {
1020         ConsumerEndpoint consumer = getConsumerEndpoint(clientId);
1021         if (consumer == null) {
1022             throw new JMSException(clientId + " is not registered");
1023         }
1024 
1025         if (enable) {
1026             consumer.setMessageListener(this);
1027         } else {
1028             consumer.setMessageListener(null);
1029         }
1030     }
1031 
1032     /***
1033      * Call recover on all registered consumers. This will cause all
1034      * unacknowledged messages to be redelivered. Before we recover we
1035      * need to stop messages delivery. We then need to start redelivery
1036      * when the recovery has been completed
1037      *
1038      * @throws JMSException if the session can't be recovered
1039      */
1040     public void recover() throws JMSException {
1041         // stop message delivery
1042         stop();
1043 
1044         // iterate over the list of consumers recover them
1045         Iterator consumers = _consumers.values().iterator();
1046         while (consumers.hasNext()) {
1047             ((ConsumerEndpoint) consumers.next()).recover();
1048         }
1049 
1050         // clear the messages in the sent message cache
1051         _sentMessageCache.clear();
1052 
1053         // restart message delivery
1054         start();
1055 
1056 
1057     }
1058 
1059     /***
1060      * Commit this session, which will acknowledge all sent messages for
1061      * all consumers.
1062      *
1063      * @throws JMSException - if there are any problems
1064      */
1065     public void commit() throws JMSException {
1066         try {
1067             _sentMessageCache.acknowledgeAllMessages();
1068         } catch (OutOfMemoryError exception) {
1069             String msg =
1070                 "Failed to commit transaction due to out-of-memory error";
1071             _log.error(msg, exception);
1072             throw new JMSException(msg);
1073         }
1074     }
1075 
1076     /***
1077      * Abort, will return all unacked messages to their respective endpoints,
1078      * if they are still active.
1079      *
1080      * @throws JMSException - if there are any problems
1081      */
1082     public void rollback()
1083         throws JMSException {
1084         _sentMessageCache.clear();
1085     }
1086 
1087     // implementation XAResource.commit
1088     public void commit(Xid xid, boolean onePhase) throws XAException {
1089         try {
1090             ResourceManager.instance().commit(xid, onePhase);
1091         } catch (ResourceManagerException exception) {
1092             throw new XAException("Failed in commit " + exception);
1093         } finally {
1094             _xid = null;
1095         }
1096     }
1097 
1098     // implementation of XAResource.end
1099     public void end(Xid xid, int flags) throws XAException {
1100         try {
1101             ResourceManager.instance().end(xid, flags);
1102         } catch (ResourceManagerException exception) {
1103             throw new XAException("Failed in end " + exception);
1104         } finally {
1105             _xid = null;
1106         }
1107     }
1108 
1109     // implementation of XAResource.forget
1110     public void forget(Xid xid) throws XAException {
1111         try {
1112             ResourceManager.instance().forget(xid);
1113         } catch (ResourceManagerException exception) {
1114             throw new XAException("Failed in forget " + exception);
1115         } finally {
1116             _xid = null;
1117         }
1118     }
1119 
1120     // implementation of XAResource.getTransactionTimeout
1121     public int getTransactionTimeout() throws XAException {
1122         try {
1123             return ResourceManager.instance().getTransactionTimeout();
1124         } catch (ResourceManagerException exception) {
1125             throw new XAException("Failed in getTransactionTimeout " +
1126                 exception);
1127         }
1128     }
1129 
1130     // implementation of XAResource.isSameRM
1131     public boolean isSameRM(XAResource xares) throws XAException {
1132         return true;
1133     }
1134 
1135     // implementation XAResource.isSame
1136     public int prepare(Xid xid) throws XAException {
1137         try {
1138             return ResourceManager.instance().prepare(xid);
1139         } catch (ResourceManagerException exception) {
1140             throw new XAException("Failed in prepare " + exception);
1141         }
1142     }
1143 
1144     // implementation of XAResource.prepare
1145     public Xid[] recover(int flag) throws XAException {
1146         try {
1147             return ResourceManager.instance().recover(flag);
1148         } catch (ResourceManagerException exception) {
1149             throw new XAException("Failed in recover " + exception);
1150         }
1151     }
1152 
1153     // implementation of XAResource.recover
1154     public void rollback(Xid xid) throws XAException {
1155         try {
1156             ResourceManager.instance().rollback(xid);
1157         } catch (ResourceManagerException exception) {
1158             throw new XAException("Failed in rollback " + exception);
1159         } finally {
1160             // clear the current xid
1161             _xid = null;
1162         }
1163     }
1164 
1165     // implementation of XAResource.rollback
1166     public boolean setTransactionTimeout(int seconds) throws XAException {
1167         try {
1168             return ResourceManager.instance().setTransactionTimeout(seconds);
1169         } catch (ResourceManagerException exception) {
1170             throw new XAException("Failed in setTransactionTimeout "
1171                 + exception);
1172         }
1173     }
1174 
1175     // implementation of XAResource.setTransactionTimeout
1176     public void start(Xid xid, int flags) throws XAException {
1177         try {
1178             ResourceManager.instance().start(xid, flags);
1179 
1180             // set this as the current xid for this session
1181             _xid = xid;
1182         } catch (ResourceManagerException exception) {
1183             throw new XAException("Failed in start " + exception);
1184         }
1185     }
1186 
1187     /***
1188      * Return the xid that is currently associated with this session or null
1189      * if this session is currently not part of a global transactions
1190      *
1191      * @return Xid
1192      */
1193     public Xid getXid() {
1194         return _xid;
1195     }
1196 
1197     /***
1198      * Return the identity of the {@link ResourceManager}. The transaction
1199      * manager should be the only one to initiating this call.
1200      *
1201      * @return the identity of the resource manager
1202      * @throws XAException - if it cannot retrieve the rid.
1203      */
1204     public String getResourceManagerId() throws XAException {
1205         try {
1206             return ResourceManager.instance().getResourceManagerId();
1207         } catch (ResourceManagerException exception) {
1208             throw new XAException("Failed in getResourceManagerId "
1209                 + exception);
1210         }
1211     }
1212 
1213     /***
1214      * Determines if the session is transacted
1215      *
1216      * @return <code>true</code> if the session is transacted
1217      */
1218     public boolean isTransacted() {
1219         return _transacted;
1220     }
1221 
1222     /***
1223      * Returns the message acknowledgement mode for the session
1224      */
1225     public int getAckMode() {
1226         return _ackMode;
1227     }
1228 
1229     /***
1230      * Returns the consumer endpoint for the supplied client id
1231      *
1232      * @param clientId the identity of the consumer endpoint
1233      * @return the consumer endpoint corresponding to <code>clientId</code>,
1234      * or <code>null</code> if none exists
1235      */
1236     public ConsumerEndpoint getConsumerEndpoint(long clientId) {
1237         String identity = Long.toString(clientId);
1238         return (ConsumerEndpoint) _consumers.get(identity);
1239     }
1240 
1241     /***
1242      * This method is used to stop and restart the session. Stopping the
1243      * session should stop all message delivery to session consumers
1244      *
1245      * @param stop - true if we need to stop the session, false otherwise
1246      */
1247     private void pause(boolean stop) {
1248         Iterator iter = _consumers.values().iterator();
1249         while (iter.hasNext()) {
1250             ((ConsumerEndpoint) iter.next()).setStopped(stop);
1251         }
1252     }
1253 
1254     /***
1255      * Check the delivery mode of the message. If the delivery mode is
1256      * persistent and the destination is non-administered then change the
1257      * delivery mode to non-persistent so that it can be processed correctly
1258      * by the server
1259      *
1260      * @param message - the message to check
1261      * @throws JMSException - propagate JMSException to client
1262      */
1263     private void checkDeliveryMode(MessageImpl message) throws JMSException {
1264         if ((message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT) &&
1265             (!DestinationManager.instance().isMessageForAdministeredDestination(message))) {
1266             message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
1267         }
1268     }
1269 
1270 } //-- JmsServerSession