View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: ServerSessionImpl.java,v 1.2 2005/11/18 03:29:41 tanderson Exp $
44   */
45  package org.exolab.jms.server;
46  
47  import java.util.Iterator;
48  import java.util.List;
49  import javax.jms.InvalidDestinationException;
50  import javax.jms.JMSException;
51  import javax.jms.Session;
52  import javax.transaction.xa.XAException;
53  import javax.transaction.xa.XAResource;
54  import javax.transaction.xa.Xid;
55  
56  import org.apache.commons.logging.Log;
57  import org.apache.commons.logging.LogFactory;
58  
59  import org.exolab.jms.client.JmsDestination;
60  import org.exolab.jms.client.JmsMessageListener;
61  import org.exolab.jms.client.JmsQueue;
62  import org.exolab.jms.client.JmsTopic;
63  import org.exolab.jms.message.MessageImpl;
64  import org.exolab.jms.messagemgr.ConsumerEndpoint;
65  import org.exolab.jms.messagemgr.ConsumerManager;
66  import org.exolab.jms.messagemgr.Flag;
67  import org.exolab.jms.messagemgr.MessageManager;
68  import org.exolab.jms.messagemgr.ResourceManager;
69  import org.exolab.jms.persistence.DatabaseService;
70  import org.exolab.jms.scheduler.Scheduler;
71  
72  
73  /***
74   * A session represents a server side endpoint to the JMSServer. A client can
75   * create producers, consumers and destinations through the session in addi-
76   * tion to other functions. A session has a unique identifer which is a comb-
77   * ination of clientId-connectionId-sessionId.
78   * <p/>
79   * A session represents a single-threaded context which implies that it cannot
80   * be used with more than one thread concurrently. Threads registered with this
81   * session are synchronized.
82   *
83   * @author <a href="mailto:jima@exoffice.com">Jim Alateras</a>
84   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
85   * @version $Revision: 1.2 $ $Date: 2005/11/18 03:29:41 $
86   * @see ServerConnectionImpl
87   */
88  class ServerSessionImpl implements ServerSession, XAResource {
89  
90      /***
91       * The connection that created this session.
92       */
93      private final ServerConnectionImpl _connection;
94  
95      /***
96       * The message manager.
97       */
98      private final MessageManager _messages;
99  
100     /***
101      * The consumer manager.
102      */
103     private final ConsumerManager _consumerMgr;
104 
105     /***
106      * The resource manager.
107      */
108     private final ResourceManager _resources;
109 
110     /***
111      * Holds the current xid that this session is associated with. A session can
112      * olny be associated with one xid at any one time.
113      */
114     private Xid _xid = null;
115 
116     /***
117      * Indicates that the session has been closed.
118      */
119     private Flag _closed = new Flag(false);
120 
121     /***
122      * The session consumer. All consumers fdr the session are managed by
123      * this.
124      */
125     private final SessionConsumer _consumer;
126 
127     /***
128      * The logger.
129      */
130     private static final Log _log = LogFactory.getLog(ServerSessionImpl.class);
131 
132 
133     /***
134      * Construct a new <code>ServerSessionImpl</code>.
135      *
136      * @param connection  the connection that created this session
137      * @param ackMode     the acknowledgement mode for the session
138      * @param transacted  <code>true</code> if the session is transactional
139      * @param messageMgr  the message manager
140      * @param consumerMgr the consumer manager
141      * @param resourceMgr the resource manager
142      * @param database    the database service
143      * @param scheduler   the scheduler
144      */
145     public ServerSessionImpl(ServerConnectionImpl connection, int ackMode,
146                              boolean transacted,
147                              MessageManager messageMgr,
148                              ConsumerManager consumerMgr,
149                              ResourceManager resourceMgr,
150                              DatabaseService database,
151                              Scheduler scheduler) {
152         _connection = connection;
153         if (transacted) {
154             ackMode = Session.SESSION_TRANSACTED;
155         }
156         _consumer = new SessionConsumer(ackMode, database, scheduler);
157         _messages = messageMgr;
158         _consumerMgr = consumerMgr;
159         _resources = resourceMgr;
160     }
161 
162     /***
163      * Returns the identifier of the connection that created this session.
164      *
165      * @return the connection identifier
166      */
167     public long getConnectionId() {
168         return _connection.getConnectionId();
169     }
170 
171     /***
172      * Acknowledge that a message has been processed.
173      *
174      * @param consumerId the identity of the consumer performing the ack
175      * @param messageId  the message identifier
176      * @throws JMSException for any error
177      */
178     public void acknowledgeMessage(long consumerId, String messageId)
179             throws JMSException {
180         _consumer.acknowledge(consumerId, messageId);
181     }
182 
183     /***
184      * Send a message.
185      *
186      * @param message the message to send
187      * @throws JMSException for any error
188      */
189     public void send(MessageImpl message) throws JMSException {
190         if (message == null) {
191             throw new JMSException("Argument 'message' is null");
192         }
193 
194         try {
195             // set the connection identity and then let the message manager
196             // process it
197             message.setConnectionId(_connection.getConnectionId());
198 
199             // if there is a global transaction currently in process then
200             // we must send the message to the resource manager, otherwise
201             // send it directly to the message manager
202             if (_xid != null) {
203                 _resources.logPublishedMessage(_xid, message);
204             } else {
205                 _messages.add(message);
206             }
207         } catch (JMSException exception) {
208             _log.error("Failed to process message", exception);
209             throw exception;
210         } catch (OutOfMemoryError exception) {
211             String msg =
212                     "Failed to process message due to out-of-memory error";
213             _log.error(msg, exception);
214             throw new JMSException(msg);
215         } catch (Exception exception) {
216             String msg = "Failed to process message";
217             _log.error(msg, exception);
218             throw new JMSException(msg);
219         }
220     }
221 
222     /***
223      * Send a set of messages.
224      *
225      * @param messages a list of <code>MessageImpl</code> instances
226      * @throws JMSException for any JMS error
227      */
228     public void send(List messages) throws JMSException {
229         if (messages == null) {
230             throw new JMSException("Argument 'messages' is null");
231         }
232 
233         Iterator iterator = messages.iterator();
234         while (iterator.hasNext()) {
235             MessageImpl message = (MessageImpl) iterator.next();
236             send(message);
237         }
238     }
239 
240     /***
241      * Return the next available mesage to the specified consumer.
242      * <p/>
243      * This method is non-blocking. If no messages are available, it will return
244      * immediately.
245      *
246      * @param consumerId the consumer identifier
247      * @return the next message or <code>null</code> if none is available
248      * @throws JMSException for any JMS error
249      */
250     public MessageImpl receiveNoWait(long consumerId) throws JMSException {
251         return _consumer.receiveNoWait(consumerId);
252     }
253 
254     /***
255      * Return the next available message to the specified consumer.
256      * <p/>
257      * This method is non-blocking. However, clients can specify a
258      * <code>wait</code> interval to indicate how long they are prepared to wait
259      * for a message. If no message is available, and the client indicates that
260      * it will wait, it will be notified via the registered {@link
261      * JmsMessageListener} if one subsequently becomes available.
262      *
263      * @param consumerId the consumer identifier
264      * @param wait       number of milliseconds to wait. A value of <code>0
265      *                   </code> indicates to wait indefinitely
266      * @return the next message or <code>null</code> if none is available
267      * @throws JMSException for any JMS error
268      */
269     public MessageImpl receive(long consumerId, long wait) throws JMSException {
270         return _consumer.receive(consumerId, wait);
271     }
272 
273     /***
274      * Browse up to count messages.
275      *
276      * @param consumerId the consumer identifier
277      * @param count      the maximum number of messages to receive
278      * @return a list of {@link MessageImpl} instances
279      * @throws JMSException for any JMS error
280      */
281     public List browse(long consumerId, int count) throws JMSException {
282         return _consumer.browse(consumerId, count);
283     }
284 
285     /***
286      * Create a new message consumer.
287      *
288      * @param destination the destination to consume messages from
289      * @param selector    the message selector. May be <code>null</code>
290      * @param noLocal     if true, and the destination is a topic, inhibits the
291      *                    delivery of messages published by its own connection.
292      *                    The behavior for <code>noLocal</code> is not specified
293      *                    if the destination is a queue.
294      * @return the identifty of the message consumer
295      * @throws JMSException for any JMS error
296      */
297     public long createConsumer(JmsDestination destination, String selector,
298                                boolean noLocal) throws JMSException {
299         if (_log.isDebugEnabled()) {
300             _log.debug("createConsumer(destination=" + destination
301                        + ", selector=" + selector + ", noLocal=" + noLocal
302                        + ") [session=" + this + "]");
303         }
304 
305         if (destination == null) {
306             throw new InvalidDestinationException(
307                     "Cannot create MessageConsumer for null destination");
308         }
309 
310         ConsumerEndpoint consumer = _consumerMgr.createConsumer(
311                 destination, _connection.getConnectionId(), selector, noLocal);
312         _consumer.addConsumer(consumer);
313         return consumer.getId();
314     }
315 
316     /***
317      * Create a new durable consumer. Durable consumers may only consume from
318      * non-temporary <code>Topic</code> destinations.
319      *
320      * @param topic    the non-temporary <code>Topic</code> to subscribe to
321      * @param name     the name used to identify this subscription
322      * @param selector only messages with properties matching the message
323      *                 selector expression are delivered.  A value of null or an
324      *                 empty string indicates that there is no message selector
325      *                 for the message consumer.
326      * @param noLocal  if set, inhibits the delivery of messages published by
327      *                 its own connection
328      * @return the identity of the durable consumer
329      * @throws JMSException for any JMS error
330      */
331     public long createDurableConsumer(JmsTopic topic, String name,
332                                       String selector, boolean noLocal)
333             throws JMSException {
334         if (_log.isDebugEnabled()) {
335             _log.debug("createDurableConsumer(topic=" + topic + ", name="
336                        + name
337                        + ", selector=" + selector + ", noLocal=" + noLocal
338                        + ") [session=" + this + "]");
339         }
340 
341         // if a durable subscriber with the specified name is
342         // already active then this method will throw an exception.
343         ConsumerEndpoint consumer = _consumerMgr.createDurableConsumer(topic,
344                                                                        name,
345                                                                        _connection.getClientID(),
346                                                                        _connection.getConnectionId(),
347                                                                        noLocal,
348                                                                        selector);
349         _consumer.addConsumer(consumer);
350         return consumer.getId();
351     }
352 
353     /***
354      * Create a queue browser for this session. This allows clients to browse a
355      * queue without removing any messages.
356      *
357      * @param queue    the queue to browse
358      * @param selector the message selector. May be <code>null</code>
359      * @return the identity of the queue browser
360      * @throws JMSException for any JMS error
361      */
362     public long createBrowser(JmsQueue queue, String selector)
363             throws JMSException {
364         if (_log.isDebugEnabled()) {
365             _log.debug("createBrowser(queue=" + queue + ", selector="
366                        + selector
367                        + ") [session=" + this + "]");
368         }
369 
370         if (queue == null) {
371             throw new JMSException("Cannot create QueueBrowser for null queue");
372         }
373 
374         ConsumerEndpoint consumer = _consumerMgr.createQueueBrowser(queue,
375                                                                     selector);
376 
377         _consumer.addConsumer(consumer);
378         return consumer.getId();
379     }
380 
381     /***
382      * Close a message consumer.
383      *
384      * @param consumerId the identity of the consumer to close
385      * @throws JMSException for any JMS error
386      */
387     public void closeConsumer(long consumerId) throws JMSException {
388         if (_log.isDebugEnabled()) {
389             _log.debug("removeConsumer(consumerId=" + consumerId
390                        + ") [session="
391                        + this + "]");
392         }
393 
394         ConsumerEndpoint consumer = _consumer.removeConsumer(consumerId);
395         _consumerMgr.closeConsumer(consumer);
396     }
397 
398     /***
399      * Unsubscribe a durable subscription.
400      *
401      * @param name the name used to identify the subscription
402      * @throws JMSException for any JMS error
403      */
404     public void unsubscribe(String name) throws JMSException {
405         if (_log.isDebugEnabled()) {
406             _log.debug("unsubscribe(name=" + name + ") [session=" + this + "]");
407         }
408 
409         _consumerMgr.unsubscribe(name, _connection.getClientID());
410     }
411 
412     /***
413      * Start the message delivery for the session.
414      *
415      * @throws JMSException for any JMS error
416      */
417     public void start() throws JMSException {
418         if (_log.isDebugEnabled()) {
419             _log.debug("start() [session=" + this + "]");
420         }
421         _consumer.start();
422     }
423 
424     /***
425      * Stop message delivery for the session.
426      */
427     public void stop() {
428         if (_log.isDebugEnabled()) {
429             _log.debug("stop() [session=" + this + "]");
430         }
431         _consumer.stop();
432     }
433 
434     /***
435      * Set the listener for this session.
436      * <p/>
437      * The listener is notified whenever a message for the session is present.
438      *
439      * @param listener the message listener
440      */
441     public void setMessageListener(JmsMessageListener listener) {
442         _consumer.setMessageListener(listener);
443     }
444 
445     /***
446      * Enable or disable asynchronous message delivery for a particular
447      * consumer.
448      *
449      * @param consumerId the consumer identifier
450      * @param enable     true to enable; false to disable
451      * @throws JMSException for any JMS error
452      */
453     public void setAsynchronous(long consumerId, boolean enable)
454             throws JMSException {
455         _consumer.setAsynchronous(consumerId, enable);
456     }
457 
458     /***
459      * Close and release any resource allocated to this session.
460      *
461      * @throws JMSException if the session cannot be closed
462      */
463     public void close() throws JMSException {
464         boolean closed;
465         synchronized (_closed) {
466             closed = _closed.get();
467         }
468 
469         if (!closed) {
470             _closed.set(true);
471             if (_log.isDebugEnabled()) {
472                 _log.debug("close() [session=" + this + "]");
473             }
474 
475             _consumer.stop();
476             ConsumerEndpoint[] consumers = _consumer.getConsumers();
477             for (int i = 0; i < consumers.length; ++i) {
478                 ConsumerEndpoint consumer = consumers[i];
479                 _consumer.removeConsumer(consumer.getId());
480                 _consumerMgr.closeConsumer(consumer);
481             }
482 
483             _consumer.close();
484 
485             // de-register the session from the connection
486             _connection.closed(this);
487         } else {
488             if (_log.isDebugEnabled()) {
489                 _log.debug("close() [session=" + this +
490                            "]: session already closed");
491             }
492         }
493     }
494 
495     /***
496      * Recover the session.
497      * <p/>
498      * All unacknowledged messages are re-delivered with the JMSRedelivered flag
499      * set.
500      *
501      * @throws JMSException if the session cannot be recovered
502      */
503     public void recover() throws JMSException {
504         _consumer.recover();
505     }
506 
507     /***
508      * Commit the session.
509      * <p/>
510      * This will acknowledge all delivered messages.
511      *
512      * @throws JMSException if the session cannot be committed
513      */
514     public void commit() throws JMSException {
515         _consumer.commit();
516     }
517 
518     /***
519      * Rollback the session.
520      * <p/>
521      * All messages delivered to the client will be redelivered with the
522      * JMSRedelivered flag set.
523      *
524      * @throws JMSException - if there are any problems
525      */
526     public void rollback() throws JMSException {
527         _consumer.rollback();
528     }
529 
530     /***
531      * Start work on behalf of a transaction branch specified in xid If TMJOIN
532      * is specified, the start is for joining a transaction previously seen by
533      * the resource manager
534      *
535      * @param xid   the xa transaction identity
536      * @param flags One of TMNOFLAGS, TMJOIN, or TMRESUME
537      * @throws XAException if there is a problem completing the call
538      */
539     public void start(Xid xid, int flags) throws XAException {
540         _resources.start(xid, flags);
541 
542         // set this as the current xid for this session
543         _xid = xid;
544     }
545 
546     /***
547      * Ask the resource manager to prepare for a transaction commit of the
548      * transaction specified in xid.
549      *
550      * @param xid the xa transaction identity
551      * @return XA_RDONLY or XA_OK
552      * @throws XAException if there is a problem completing the call
553      */
554     public int prepare(Xid xid) throws XAException {
555         return _resources.prepare(xid);
556     }
557 
558     /***
559      * Commits an XA transaction that is in progress.
560      *
561      * @param xid      the xa transaction identity
562      * @param onePhase true if it is a one phase commit
563      * @throws XAException if there is a problem completing the call
564      */
565     public void commit(Xid xid, boolean onePhase) throws XAException {
566         try {
567             _resources.commit(xid, onePhase);
568         } finally {
569             _xid = null;
570         }
571     }
572 
573     /***
574      * Ends the work performed on behalf of a transaction branch. The resource
575      * manager disassociates the XA resource from the transaction branch
576      * specified and let the transaction be completedCommits an XA transaction
577      * that is in progress.
578      *
579      * @param xid   the xa transaction identity
580      * @param flags one of TMSUCCESS, TMFAIL, or TMSUSPEND
581      * @throws XAException if there is a problem completing the call
582      */
583     public void end(Xid xid, int flags) throws XAException {
584         try {
585             _resources.end(xid, flags);
586         } finally {
587             _xid = null;
588         }
589     }
590 
591     /***
592      * Tell the resource manager to forget about a heuristically completed
593      * transaction branch.
594      *
595      * @param xid the xa transaction identity
596      * @throws XAException if there is a problem completing the call
597      */
598     public void forget(Xid xid) throws XAException {
599         try {
600             _resources.forget(xid);
601         } finally {
602             _xid = null;
603         }
604     }
605 
606     /***
607      * Obtain a list of prepared transaction branches from a resource manager.
608      * The transaction manager calls this method during recovery to obtain the
609      * list of transaction branches that are currently in prepared or
610      * heuristically completed states.
611      *
612      * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS. TMNOFLAGS
613      * @return the set of Xids to recover
614      * @throws XAException - if there is a problem completing the call
615      */
616     public Xid[] recover(int flag) throws XAException {
617         return _resources.recover(flag);
618     }
619 
620     /***
621      * Inform the resource manager to roll back work done on behalf of a
622      * transaction branch
623      *
624      * @param xid the xa transaction identity
625      * @throws XAException if there is a problem completing the call
626      */
627     public void rollback(Xid xid) throws XAException {
628         try {
629             _resources.rollback(xid);
630         } finally {
631             // clear the current xid
632             _xid = null;
633         }
634     }
635 
636     /***
637      * Return the transaction timeout for this instance of the resource
638      * manager.
639      *
640      * @return the timeout in seconds
641      * @throws XAException if there is a problem completing the call
642      */
643     public int getTransactionTimeout() throws XAException {
644         return _resources.getTransactionTimeout();
645     }
646 
647     /***
648      * Set the current transaction timeout value for this XAResource instance.
649      *
650      * @param seconds timeout in seconds
651      * @return if the new transaction timeout was accepted
652      * @throws XAException if there is a problem completing the call
653      */
654     public boolean setTransactionTimeout(int seconds) throws XAException {
655         return _resources.setTransactionTimeout(seconds);
656     }
657 
658     /***
659      * This method is called to determine if the resource manager instance
660      * represented by the target object is the same as the resouce manager
661      * instance represented by the parameter xares.
662      *
663      * @param xares an XAResource object whose resource manager instance is to
664      *              be compared with the resource manager instance of the target
665      *              object.
666      * @return true if it's the same RM instance; otherwise false.
667      * @throws XAException for any error
668      */
669     public boolean isSameRM(XAResource xares) throws XAException {
670         boolean result = (xares instanceof ServerSessionImpl);
671         if (result) {
672             ServerSessionImpl other = (ServerSessionImpl) xares;
673             result = (other.getResourceManagerId() == getResourceManagerId());
674         }
675 
676         return result;
677     }
678 
679     /***
680      * Return the xid that is currently associated with this session or null if
681      * this session is currently not part of a global transactions
682      *
683      * @return Xid
684      */
685     public Xid getXid() {
686         return _xid;
687     }
688 
689     /***
690      * Return the identity of the {@link ResourceManager}. The transaction
691      * manager should be the only one to initiating this call.
692      *
693      * @return the identity of the resource manager
694      * @throws XAException - if it cannot retrieve the rid.
695      */
696     public String getResourceManagerId() throws XAException {
697         return _resources.getResourceManagerId();
698     }
699 
700 }