View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: JmsSession.java,v 1.59 2004/01/21 04:08:13 tanderson Exp $
44   */
45  package org.exolab.jms.client;
46  
47  import java.io.Serializable;
48  import java.util.Enumeration;
49  import java.util.Hashtable;
50  import java.util.Vector;
51  
52  import javax.jms.BytesMessage;
53  import javax.jms.IllegalStateException;
54  import javax.jms.JMSException;
55  import javax.jms.MapMessage;
56  import javax.jms.Message;
57  import javax.jms.MessageListener;
58  import javax.jms.ObjectMessage;
59  import javax.jms.Session;
60  import javax.jms.StreamMessage;
61  import javax.jms.TextMessage;
62  
63  import org.apache.commons.logging.Log;
64  import org.apache.commons.logging.LogFactory;
65  
66  import org.exolab.jms.message.BytesMessageImpl;
67  import org.exolab.jms.message.MapMessageImpl;
68  import org.exolab.jms.message.MessageConverter;
69  import org.exolab.jms.message.MessageConverterFactory;
70  import org.exolab.jms.message.MessageImpl;
71  import org.exolab.jms.message.MessageSessionIfc;
72  import org.exolab.jms.message.ObjectMessageImpl;
73  import org.exolab.jms.message.StreamMessageImpl;
74  import org.exolab.jms.message.TextMessageImpl;
75  
76  
77  /***
78   * This class implements a Session interface and supports a single threaded
79   * context. A session supports multiple consumers and producers but only within
80   * the context of a single thread.
81   *
82   * @version     $Revision: 1.59 $ $Date: 2004/01/21 04:08:13 $
83   * @author      <a href="mailto:jima@exoffice.com">Jim Alateras</a>
84   * @author      <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
85   */
86  abstract class JmsSession
87      implements Session, JmsMessageListener, MessageSessionIfc {
88  
89      /***
90       * If true, indicates that the session has been closed
91       */
92      private volatile boolean _closed = false;
93  
94      /***
95       * If true, indicates that the session is in the process of being closed
96       */
97      private volatile boolean _closing = false;
98  
99      /***
100      * This flag determines whether message delivery is enabled or disabled.
101      * Message delivery if disabled if the enclosing connection is stopped.
102      */
103     private volatile boolean _stopped = true;
104 
105     /***
106      * If true, indicates that the session has been stopped. When started,
107      * messages may be sent
108      */
109     private volatile boolean _started = false;
110 
111     /***
112      * A transacted session is bounded by successive commit. If this variable
113      * set to true then this session is transacted. This implies that the
114      * session is always in a transaction and transactions are demarcated by
115      * commit or rollback.
116      */
117     private final boolean _transacted;
118 
119     /***
120      * Indicates whether the consumer or the client will
121      * acknowledge any messages it receives. Ignored if the session is
122      * transacted. Legal values are
123      * <code>Session.AUTO_ACKNOWLEDGE</code>,
124      * <code>Session.CLIENT_ACKNOWLEDGE</code> and
125      * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
126      */
127     private final int _ackMode;
128 
129     /***
130      * This is the owner of the session. It also maintains a remote stub to
131      * the server which it can use for its remote work.
132      */
133     private JmsConnection _connection;
134 
135     /***
136      * Maintains the a map of JmsMessageConsumer.getClientId() ->
137      * JmsMessageConsumer objects
138      */
139     private Hashtable _consumers = new Hashtable();
140 
141     /***
142      * Maintains a list of producers for the session
143      */
144     private Vector _producers = new Vector();
145 
146     /***
147      * Maintain a collection of acked messages for this transacted
148      * session. These messages are only sent to the server on commit.
149      */
150     private Vector _messagesToSend = new Vector();
151 
152     /***
153      * The identifier of the session
154      */
155     private final String _sessionId;
156 
157     /***
158      * Maintain a copy of the stub that connects this machine to the remote
159      * server.
160      */
161     private JmsSessionStubIfc _stub = null;
162 
163     /***
164      * This is the session's session listener which is used to receive all
165      * messages associated with all consumers registered with this session.
166      * Even if consumers have registered listeners messages for those consumers
167      * will go to the session's message listener instead.
168      */
169     private MessageListener _listener = null;
170 
171     /***
172      * The consumer Id seed to allocate to new consumers
173      */
174     private long _consumerIdSeed = 0;
175 
176     /***
177      * Tracks the number of messages sent by this session
178      */
179     private long _publishCount;
180 
181     /***
182      * The message cache holds all messages for the session, allocated by
183      * a JmsConnectionConsumer.
184      */
185     private Vector _messageCache = new Vector();
186 
187     /***
188      * Monitor used to block consumers, if the session has been
189      * stopped, or no messages are available
190      */
191     private final Object _receiveLock = new Object();
192 
193     /***
194      * The logger
195      */
196     private static final Log _log = LogFactory.getLog(JmsSession.class);
197 
198 
199     /***
200      * Construct a new <code>JmsSession</code>
201      *
202      * @param connection the owner of the session
203      * @param transacted if <code>true</code>, the session is transacted.
204      * @param ackMode indicates whether the consumer or the client will
205      * acknowledge any messages it receives. This parameter will be ignored if
206      * the session is transacted. Legal values are
207      * <code>Session.AUTO_ACKNOWLEDGE</code>,
208      * <code>Session.CLIENT_ACKNOWLEDGE</code> and
209      * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
210      * @throws JMSException if the session cannot be created
211      */
212     public JmsSession(JmsConnection connection, boolean transacted,
213                       int ackMode) throws JMSException {
214 
215         if (connection == null) {
216             throw new IllegalArgumentException(
217                 "Argument 'connection' is null");
218         }
219 
220         _connection = connection;
221         _transacted = transacted;
222         _ackMode = ackMode;
223 
224         // construct the remote stub
225         _stub = connection.getJmsConnectionStub().createSession(
226             _ackMode, transacted);
227         _sessionId = _stub.getSessionId();
228 
229         // set up this instance to be a message listener
230         _stub.setMessageListener(this);
231 
232         // now we need to check whether we should start the session
233         if (!connection.isStopped()) {
234             start();
235         }
236     }
237 
238     /***
239      * Create a <code>BytesMessage</code>
240      *
241      * @return a new <code>BytesMessage</code>
242      * @throws JMSException if the message can't be created
243      */
244     public BytesMessage createBytesMessage() throws JMSException {
245         ensureOpen();
246         return new BytesMessageImpl();
247     }
248 
249     /***
250      * Create a <code>MapMessage</code>
251      *
252      * @return a new <code>MapMessage</code>
253      * @throws JMSException if the message can't be created
254      */
255     public MapMessage createMapMessage() throws JMSException {
256         ensureOpen();
257         return new MapMessageImpl();
258     }
259 
260     /***
261      * Create a <code>Message</code>
262      *
263      * @return a new <code>Message</code>
264      * @throws JMSException if the message can't be created
265      */
266     public Message createMessage() throws JMSException {
267         ensureOpen();
268         return new MessageImpl();
269     }
270 
271     /***
272      * Create an <code>ObjectMessage</code>
273      *
274      * @return a new <code>ObjectMessage</code>
275      * @throws JMSException if the message can't be created
276      */
277     public ObjectMessage createObjectMessage() throws JMSException {
278         ensureOpen();
279         return new ObjectMessageImpl();
280     }
281 
282     /***
283      * Create an <code>ObjectMessage</code>
284      *
285      * @param object the object to use to initialise the message
286      * @return a new <code>ObjectMessage</code>
287      * @throws JMSException if the message can't be created
288      */
289     public ObjectMessage createObjectMessage(Serializable object)
290         throws JMSException {
291         ensureOpen();
292         ObjectMessageImpl result = new ObjectMessageImpl();
293         result.setObject(object);
294         return result;
295     }
296 
297     /***
298      * Create a <code>StreamMessage</code>
299      *
300      * @return a new <code>StreamMessage</code>
301      * @throws JMSException if the message can't be created
302      */
303     public StreamMessage createStreamMessage() throws JMSException {
304         ensureOpen();
305         return new StreamMessageImpl();
306     }
307 
308     /***
309      * Create a <code>TextMessage</code>
310      *
311      * @return a new <code>TextMessage</code>
312      * @throws JMSException if the message can't be created
313      */
314     public TextMessage createTextMessage() throws JMSException {
315         ensureOpen();
316         return new TextMessageImpl();
317     }
318 
319     /***
320      * Create a <code>TextMessage</code>
321      *
322      * @param text the text to use to initialise the message
323      * @return a new <code>TextMessage</code>
324      * @throws JMSException if the message can't be created
325      */
326     public TextMessage createTextMessage(String text) throws JMSException {
327         ensureOpen();
328         TextMessageImpl result = new TextMessageImpl();
329         result.setText(text);
330         return result;
331     }
332 
333     /***
334      * Determines if the session is transacted
335      *
336      * @return <code>true</code> if the session is transacted
337      * @throws JMSException if the session is closed
338      */
339     public boolean getTransacted() throws JMSException {
340         ensureOpen();
341         return _transacted;
342     }
343 
344     /***
345      * Commit all messages done in this transaction
346      *
347      * @throws JMSException if the transaction cannot be committed
348      */
349     public synchronized void commit() throws JMSException {
350         ensureOpen();
351         ensureTransactional();
352 
353         // send all the cached messages to the server
354         getJmsSessionStub().sendMessages(_messagesToSend);
355         _publishCount += _messagesToSend.size();
356         _messagesToSend.clear();
357 
358         // commit the session
359         getJmsSessionStub().commit();
360     }
361 
362     /***
363      * Rollback any messages done in this transaction
364      *
365      * @throws JMSException if the transaction cannot be rolled back
366      */
367     public synchronized void rollback() throws JMSException {
368         ensureOpen();
369         ensureTransactional();
370 
371         // clear all the cached messages
372         _messagesToSend.clear();
373 
374         // rollback the session
375         getJmsSessionStub().rollback();
376     }
377 
378     /***
379      * Close the session. This call will block until a receive or message
380      * listener in progress has completed. A blocked message consumer receive
381      * call returns <code>null</code> when this session is closed.
382      *
383      * @throws JMSException if the session can't be closed
384      */
385     public synchronized void close() throws JMSException {
386         if (!_closed) {
387             _closing = true;
388 
389             // signal the stub that we are preparing to close the
390             // connection.
391             getJmsSessionStub().beforeClose();
392 
393             // must stop first before we close
394             stop();
395 
396             // wake up any blocking consumers
397             notifyConsumers();
398 
399             // go through all the producer and call close on them
400             // respectively
401             Enumeration producers = getProducers();
402             while (producers.hasMoreElements()) {
403                 JmsMessageProducer producer =
404                     (JmsMessageProducer) producers.nextElement();
405                 producer.close();
406             }
407 
408             // go through all the consumer and call close on them
409             // respectively
410             Enumeration consumers = getConsumers();
411             while (consumers.hasMoreElements()) {
412                 JmsMessageConsumer consumer =
413                     (JmsMessageConsumer) consumers.nextElement();
414                 consumer.close();
415             }
416 
417             // deregister this with the connection
418             _connection.removeSession(this);
419             _connection = null;
420 
421             // clear any cached messages or acks
422             _messagesToSend.clear();
423 
424             // issue a close to the remote session. This will release any
425             // allocated remote resources
426             getJmsSessionStub().close();
427             _stub = null;
428 
429             // update the session state
430             _closed = true;
431             _closing = false;
432         }
433     }
434 
435     /***
436      * Stop message delivery in this session, and restart sending messages with
437      * the oldest unacknowledged message
438      *
439      * @throws JMSException if the session can't be recovered
440      */
441     public synchronized void recover() throws JMSException {
442         ensureOpen();
443         if (!_transacted) {
444             // let the server handle the recovery
445             getJmsSessionStub().recover();
446         } else {
447             throw new IllegalStateException(
448                 "Cannot recover from a transacted session");
449         }
450     }
451 
452     /***
453      * Returns the message listener associated with the session
454      *
455      * @return the message listener associated with the session, or
456      * <code>null</code> if no listener is registered
457      * @throws JMSException if the session is closed
458      */
459     public MessageListener getMessageListener() throws JMSException {
460         ensureOpen();
461         return _listener;
462     }
463 
464     /***
465      * Sets the session's message listener.
466      *
467      * @param listener the session's message listener
468      * @throws JMSException if the session is closed
469      */
470     public void setMessageListener(MessageListener listener)
471         throws JMSException {
472         ensureOpen();
473         _listener = listener;
474     }
475 
476     /***
477      * Iterates through the list of messages added by an
478      * {@link JmsConnectionConsumer}, sending them to the registered listener
479      */
480     public void run() {
481         try {
482             while (!_messageCache.isEmpty()) {
483                 Message message = (Message) _messageCache.remove(0);
484                 _listener.onMessage(message);
485             }
486         } catch (Exception exception) {
487             _log.error("Error in the Session.run()", exception);
488         } finally {
489             // Clear message cache
490             _messageCache.clear();
491         }
492     }
493 
494     /***
495      * Set the message listener for a particular consumer.
496      * <p>
497      * If a listener is already registered for the consumer, it will be
498      * automatically overwritten
499      *
500      * @param listener the message listener
501      * @throws JMSException if the listener can't be set
502      */
503     public void setMessageListener(JmsMessageConsumer listener)
504         throws JMSException {
505         ensureOpen();
506         enableAsynchronousDelivery(listener.getClientId(),
507                                    listener.getLastMessageDelivered(), true);
508     }
509 
510     /***
511      * Remove a message listener
512      *
513      * @param listener the message listener to remove
514      * @throws JMSException if the listener can't be removed
515      */
516     public void removeMessageListener(JmsMessageConsumer listener)
517         throws JMSException {
518 
519         ensureOpen();
520         enableAsynchronousDelivery(listener.getClientId(),
521             listener.getLastMessageDelivered(), false);
522     }
523 
524     /***
525      * This will start message delivery to this session. If message delivery
526      * has already started then this is a no-op.
527      *
528      * @throws JMSException if message delivery can't be started
529      */
530     public void start() throws JMSException {
531         ensureOpen();
532         if (_stopped) {
533             getJmsSessionStub().startMessageDelivery();
534             _stopped = false;
535 
536             // wake up any blocking consumers
537             notifyConsumers();
538         }
539     }
540 
541     /***
542      * This will stop message delivery to this session. If message delivery
543      * has already stoped then this is a no-op.
544      *
545      * @throws JMSException if message delivery can't be stopped
546      */
547     public void stop() throws JMSException {
548         ensureOpen();
549         if (!_stopped) {
550             getJmsSessionStub().stopMessageDelivery();
551             _stopped = true;
552 
553             // wake up any blocking consumers
554             notifyConsumers();
555         }
556     }
557 
558     /***
559      * Acknowledge the specified message. This is only applicable for
560      * CLIENT_ACKNOWLEDGE sessions. For other session types, the request
561      * is ignored.
562      * <p>
563      * Acking a message automatically acks all those that have come
564      * before it.
565      *
566      * @param message the message to acknowledge
567      * @throws JMSException if the message can't be acknowledged
568      */
569     public void acknowledgeMessage(Message message) throws JMSException {
570         ensureOpen();
571         if (_ackMode == Session.CLIENT_ACKNOWLEDGE) {
572             MessageImpl impl = (MessageImpl) message;
573             getJmsSessionStub().acknowledgeMessage(impl.getClientId(),
574                                                    impl.getAckMessageID());
575         }
576     }
577 
578     /***
579      * Enable or disable asynchronous message delivery for the specified
580      * client.
581      *
582      * @param clientId - the client identity
583      * @param id - the last message delivered asynchronously
584      * @param enable - <code>true</code> to enable; <code>false</code> to
585      * disable
586      * @throws JMSException if message delivery cannot be enabled or disabled
587      */
588     public void enableAsynchronousDelivery(long clientId, String id,
589                                            boolean enable)
590         throws JMSException {
591 
592         ensureOpen();
593         getJmsSessionStub().enableAsynchronousDelivery(clientId, id, enable);
594     }
595 
596     /***
597      * Asynchronously deliver a message to a <code>MessageConsumer</code>
598      *
599      * @param message the message to deliver
600      */
601     public void onMessage(Message message) {
602         if (message != null) {
603             MessageImpl impl = (MessageImpl) message;
604             impl.setJMSXRcvTimestamp(System.currentTimeMillis());
605 
606             // dispatch the message;
607             execute(message);
608         }
609     }
610 
611     /***
612      * Asynchronously deliver a set of message to a
613      * <code>MessageConsumer</code>
614      *
615      * @param messages the messages to deliver
616      */
617     public void onMessages(Vector messages) {
618         while (messages.size() > 0) {
619             onMessage((Message) messages.remove(0));
620         }
621     }
622 
623     /***
624      * Inform the session that there is a message available
625      * for the message consumer with the specified identity
626      *
627      * @param clientId the identity of the client
628      */
629     public void onMessageAvailable(long clientId) {
630         // wake up any blocking consumers
631         notifyConsumers();
632     }
633 
634     /***
635      * This is the called to process messages asynchronously delivered by the
636      * server. The session is then responsible for delivering it to the
637      * appropriate registered consumer. If it cannot resolve the consumer then
638      * it must log an exception
639      * <p>
640      * If the session has a registered listener then all messages will be
641      * delivered to the session's listener instead of the individual consumer
642      * message listeners.
643      *
644      * @param       object         received message
645      */
646     public synchronized void execute(Object object) {
647         // if the session is closed then drop the object
648         if (_closed) {
649             _log.error("Received a message for a closed session");
650             return;
651         }
652 
653         MessageImpl message = (MessageImpl) object;
654         long clientId = message.getClientId();
655         JmsMessageConsumer consumer =
656             (JmsMessageConsumer) _consumers.get(new Long(clientId));
657 
658         // tag the session that received this message
659         message.setSession(this);
660         if (consumer != null) {
661             // if a listener is defined for the session then send all the
662             // messages to that listener regardless if any consumers are
663             // have registered listeners...bit confusing but this is what
664             // I believe it should do
665             if (_listener != null) {
666                 _listener.onMessage(message);
667             } else {
668                 // send it to the appropriate consumer
669                 consumer.onMessage(message);
670             }
671         } else {
672             // consumer no longer active...so drop the message
673             _log.error("Received a message for an inactive consumer");
674         }
675     }
676 
677     /***
678      * Returns the session identifier
679      *
680      * @return the session identifier
681      */
682     public String getSessionId() {
683         return _sessionId;
684     }
685 
686     /***
687      * Return the acknowledgement mode for the session
688      *
689      * @return the acknowledgement mode for the session
690      */
691     public int getAckMode() {
692         return _ackMode;
693     }
694 
695     /***
696      * Fetch the next message for this client. If the session's ackMode is
697      * client acknowledge then set the session for the message, othwerwise
698      * ack the message before returning it.
699      *
700      * @param clientId the consumer identififer.
701      * @param wait the maximum time to wait for a message, in milliseconds.
702      * If <code>-1</code>, don't wait, if <code>0</code> wait indefinitely,
703      * otherwise wait the specified time.
704      * @return the received message, or <code>null</code>, if no message is
705      * available
706      * @throws JMSException if an error occurs retrieving the message
707      */
708     public Message retrieveMessage(long clientId, long wait)
709         throws JMSException {
710 
711         ensureOpen();
712 
713         boolean breakOnNextRead = false;
714         long start = System.currentTimeMillis();
715         long end = start + wait;
716         MessageImpl message = null;
717         while (true) {
718             synchronized (_receiveLock) {
719                 if (_closing || _closed) {
720                     // session is in the process of closing, or has been
721                     // closed. Need to return null.
722                     break;
723                 } else if (_stopped) {
724                     // connection has been stopped. No message can be returned,
725                     // but receives continue to time out
726                 } else {
727                     // connection is started. Messages may be returned.
728                     message = (MessageImpl) getJmsSessionStub().receiveMessage(
729                         clientId, wait);
730                 }
731                 if (message != null) {
732                     message.setSession(this);
733                     break;
734                 } else {
735                     // if we have instructed to break, then exit the loop.
736                     if (breakOnNextRead) {
737                         break;
738                     }
739 
740                     // no message was received. Block for the specified time
741                     // until one of the following occurs:
742                     // . a message is received
743                     // . the receive times out
744                     // . the session is closed
745                     if (wait >= 0) {
746                         try {
747                             if (wait > 0) {
748                                 // wait for a specific period of time
749                                 _receiveLock.wait(wait);
750                                 long current = System.currentTimeMillis();
751                                 if (current >= end) {
752                                     breakOnNextRead = true;
753                                 } else {
754                                     // update the time to wait. If the value
755                                     // is zero then break on the next read
756                                     wait = end - current;
757                                     if (wait == 0) {
758                                         breakOnNextRead = true;
759                                     }
760                                 }
761                             } else {
762                                 // wait indefinitely
763                                 _receiveLock.wait();
764                             }
765                         } catch (InterruptedException ignore) {
766                             // no-op
767                         }
768                     } else {
769                         // exit the loop since the client is performing a non
770                         // blocking read
771                         break;
772                     }
773                 }
774             }
775         }
776 
777         return message;
778     }
779 
780     /***
781      * Fetch up to count messages from the endpoint. This should only
782      * be called via a {@link JmsQueueBrowser}.
783      *
784      * @param clientId scoped to the session
785      * @param count the max messages to retrieve.
786      * @return the set of retrieve messages
787      * @throws JMSException if messages can't be retrieved
788      */
789     public synchronized Vector retrieveMessages(long clientId, int count)
790         throws JMSException {
791         ensureOpen();
792         return getJmsSessionStub().receiveMessages(clientId, count);
793     }
794 
795     /***
796      * Release local resources used by this session object
797      *
798      * @throws JMSException - if there is a problem completing this request
799      */
800     public void destroy() throws JMSException {
801         if (!_closed) {
802             _closing = true;
803 
804             // wake up any blocking consumers
805             notifyConsumers();
806 
807             // go through all the producer and call close on them
808             // respectively
809             Enumeration producers = getProducers();
810             while (producers.hasMoreElements()) {
811                 JmsMessageProducer producer =
812                     (JmsMessageProducer) producers.nextElement();
813                 producer.destroy();
814             }
815 
816             // go through all the consumer and call close on them
817             // respectively
818             Enumeration consumers = getConsumers();
819             while (consumers.hasMoreElements()) {
820                 JmsMessageConsumer consumer =
821                     (JmsMessageConsumer) consumers.nextElement();
822                 consumer.destroy();
823             }
824 
825             // deregister this with the connection
826             _connection.removeSession(this);
827             _connection = null;
828 
829             // clear any cached messages or acks
830             _messagesToSend.clear();
831 
832             // simply release the reference to the sever session
833             _stub = null;
834 
835             // update the session state
836             _closed = true;
837             _closing = false;
838         }
839     }
840 
841     /***
842      * Send the specified message to the server.
843      *
844      * @param message the message to send
845      * @throws JMSException if the message can't be sent
846      */
847     protected synchronized void sendMessage(Message message)
848         throws JMSException {
849 
850         if (_transacted) {
851             // if the session is transacted then cache the message locally.
852             // and wait for a commit or a rollback
853             if (message instanceof MessageImpl) {
854                 try {
855                     message = (Message) ((MessageImpl) message).clone();
856                 } catch (CloneNotSupportedException error) {
857                     throw new JMSException(error.getMessage());
858                 }
859             } else {
860                 message = convert(message);
861             }
862             _messagesToSend.addElement(message);
863         } else {
864             if (!(message instanceof MessageImpl)) {
865                 message = convert(message);
866             }
867             getJmsSessionStub().sendMessage(message);
868             _publishCount++;
869         }
870     }
871 
872     /***
873      * Return an instance of the remote stub. This is set during object
874      * creation time
875      *
876      * @return the remote stub
877      */
878     protected JmsSessionStubIfc getJmsSessionStub() {
879         return _stub;
880     }
881 
882     /***
883      * Return a reference to the connection that created this session
884      *
885      * @return the owning connection
886      */
887     protected JmsConnection getConnection() {
888         return _connection;
889     }
890 
891     /***
892      * This method checks the destination. If the destination is not temporary
893      * then return true. If it is a temporary destination and it is owned by
894      * this session's connection then it returns true. If it is a tmeporary
895      * destination and it is owned by another connection then it returns false
896      *
897      * @param destination the destination to check
898      * @return <code>true</code> if the destination is valid
899      */
900     protected boolean checkForValidTemporaryDestination(
901         JmsDestination destination) {
902         boolean result = false;
903 
904         if (destination.isTemporaryDestination()) {
905             JmsTemporaryDestination temp =
906                 (JmsTemporaryDestination) destination;
907 
908             // check  that this temp destination is owned by the session's
909             // connection.
910             if (temp.validForConnection(getConnection())) {
911                 result = true;
912             }
913         } else {
914             result = true;
915         }
916 
917         return result;
918     }
919 
920     /***
921      * Returns a list of registered producers for the session
922      *
923      * @return an enumeration of the producers managed by the session
924      */
925     protected Enumeration getProducers() {
926         return _producers.elements();
927     }
928 
929     /***
930      * Returns a list of registered consumers for the session
931      *
932      * @return an enumeration of the consumers managed by the session
933      */
934     protected Enumeration getConsumers() {
935         return _consumers.elements();
936     }
937 
938     /***
939      * Returns the next seed value to be allocated to a new consumer
940      *
941      * @return  a unique identifier for a consumer for this session
942      */
943     protected long getNextConsumerId() {
944         return ++_consumerIdSeed;
945     }
946 
947     /***
948      * Add a consumer to the list of consumers managed by this session
949      *
950      * @param consumer the consumer to add
951      */
952     protected void addConsumer(JmsMessageConsumer consumer) {
953         _consumers.put(new Long(consumer.getClientId()), consumer);
954     }
955 
956     /***
957      * Remove the consumer with the specified id from the list of managed
958      * consumers
959      *
960      * @param consumer the consumer to remove
961      */
962     protected void removeConsumer(JmsMessageConsumer consumer) {
963         _consumers.remove(new Long(consumer.getClientId()));
964     }
965 
966     /***
967      * Add a producer to the list of producers managed by this session
968      *
969      * @param producer the producer to add
970      */
971     protected void addProducer(JmsMessageProducer producer) {
972         _producers.addElement(producer);
973     }
974 
975     /***
976      * Remove the producer from the list of managed producers
977      *
978      * @param producer the producer to remove
979      */
980     protected void removeProducer(JmsMessageProducer producer) {
981         _producers.remove(producer);
982     }
983 
984     /***
985      * Check if the session is closed
986      *
987      * @return <code>true</code> if the session is closed
988      */
989     protected final boolean isClosed() {
990         return _closed;
991     }
992 
993     /***
994      * Add a message to the message cache. This message will be processed
995      * when the run() method is called.
996      *
997      * @param message the message to add.
998      */
999     protected void addMessage(Message message) {
1000         _messageCache.addElement(message);
1001     }
1002 
1003     /***
1004      * Verifies that the session isn't closed
1005      *
1006      * @throws IllegalStateException if the session is closed
1007      */
1008     protected void ensureOpen() throws IllegalStateException {
1009         if (_closed) {
1010             throw new IllegalStateException(
1011                 "Cannot perform operation - session has been closed");
1012         }
1013     }
1014 
1015     /***
1016      * Verifies that the session is transactional
1017      *
1018      * @throws IllegalStateException if the session isn't transactional
1019      */
1020     private void ensureTransactional() throws IllegalStateException {
1021         if (!_transacted) {
1022             throw new IllegalStateException(
1023                 "Cannot perform operatiorn - session is not transactional");
1024         }
1025     }
1026 
1027     /***
1028      * Notifies any blocking synchronous consumers
1029      */
1030     private void notifyConsumers() {
1031         synchronized (_receiveLock) {
1032             _receiveLock.notifyAll();
1033         }
1034     }
1035 
1036     /***
1037      * Convert a message to its corresponding OpenJMS implementation
1038      *
1039      * @param message the message to convert
1040      * @return the OpenJMS implementation of the message
1041      * @throws JMSException for any error
1042      */
1043     private Message convert(Message message) throws JMSException {
1044         MessageConverter converter =
1045             MessageConverterFactory.create(message);
1046         return converter.convert(message);
1047     }
1048 
1049 }
1050