1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: ServerSessionImpl.java,v 1.2 2005/11/18 03:29:41 tanderson Exp $
44 */
45 package org.exolab.jms.server;
46
47 import java.util.Iterator;
48 import java.util.List;
49 import javax.jms.InvalidDestinationException;
50 import javax.jms.JMSException;
51 import javax.jms.Session;
52 import javax.transaction.xa.XAException;
53 import javax.transaction.xa.XAResource;
54 import javax.transaction.xa.Xid;
55
56 import org.apache.commons.logging.Log;
57 import org.apache.commons.logging.LogFactory;
58
59 import org.exolab.jms.client.JmsDestination;
60 import org.exolab.jms.client.JmsMessageListener;
61 import org.exolab.jms.client.JmsQueue;
62 import org.exolab.jms.client.JmsTopic;
63 import org.exolab.jms.message.MessageImpl;
64 import org.exolab.jms.messagemgr.ConsumerEndpoint;
65 import org.exolab.jms.messagemgr.ConsumerManager;
66 import org.exolab.jms.messagemgr.Flag;
67 import org.exolab.jms.messagemgr.MessageManager;
68 import org.exolab.jms.messagemgr.ResourceManager;
69 import org.exolab.jms.persistence.DatabaseService;
70 import org.exolab.jms.scheduler.Scheduler;
71
72
73 /***
74 * A session represents a server side endpoint to the JMSServer. A client can
75 * create producers, consumers and destinations through the session in addi-
76 * tion to other functions. A session has a unique identifer which is a comb-
77 * ination of clientId-connectionId-sessionId.
78 * <p/>
79 * A session represents a single-threaded context which implies that it cannot
80 * be used with more than one thread concurrently. Threads registered with this
81 * session are synchronized.
82 *
83 * @author <a href="mailto:jima@exoffice.com">Jim Alateras</a>
84 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
85 * @version $Revision: 1.2 $ $Date: 2005/11/18 03:29:41 $
86 * @see ServerConnectionImpl
87 */
88 class ServerSessionImpl implements ServerSession, XAResource {
89
90 /***
91 * The connection that created this session.
92 */
93 private final ServerConnectionImpl _connection;
94
95 /***
96 * The message manager.
97 */
98 private final MessageManager _messages;
99
100 /***
101 * The consumer manager.
102 */
103 private final ConsumerManager _consumerMgr;
104
105 /***
106 * The resource manager.
107 */
108 private final ResourceManager _resources;
109
110 /***
111 * Holds the current xid that this session is associated with. A session can
112 * olny be associated with one xid at any one time.
113 */
114 private Xid _xid = null;
115
116 /***
117 * Indicates that the session has been closed.
118 */
119 private Flag _closed = new Flag(false);
120
121 /***
122 * The session consumer. All consumers fdr the session are managed by
123 * this.
124 */
125 private final SessionConsumer _consumer;
126
127 /***
128 * The logger.
129 */
130 private static final Log _log = LogFactory.getLog(ServerSessionImpl.class);
131
132
133 /***
134 * Construct a new <code>ServerSessionImpl</code>.
135 *
136 * @param connection the connection that created this session
137 * @param ackMode the acknowledgement mode for the session
138 * @param transacted <code>true</code> if the session is transactional
139 * @param messageMgr the message manager
140 * @param consumerMgr the consumer manager
141 * @param resourceMgr the resource manager
142 * @param database the database service
143 * @param scheduler the scheduler
144 */
145 public ServerSessionImpl(ServerConnectionImpl connection, int ackMode,
146 boolean transacted,
147 MessageManager messageMgr,
148 ConsumerManager consumerMgr,
149 ResourceManager resourceMgr,
150 DatabaseService database,
151 Scheduler scheduler) {
152 _connection = connection;
153 if (transacted) {
154 ackMode = Session.SESSION_TRANSACTED;
155 }
156 _consumer = new SessionConsumer(ackMode, database, scheduler);
157 _messages = messageMgr;
158 _consumerMgr = consumerMgr;
159 _resources = resourceMgr;
160 }
161
162 /***
163 * Returns the identifier of the connection that created this session.
164 *
165 * @return the connection identifier
166 */
167 public long getConnectionId() {
168 return _connection.getConnectionId();
169 }
170
171 /***
172 * Acknowledge that a message has been processed.
173 *
174 * @param consumerId the identity of the consumer performing the ack
175 * @param messageId the message identifier
176 * @throws JMSException for any error
177 */
178 public void acknowledgeMessage(long consumerId, String messageId)
179 throws JMSException {
180 _consumer.acknowledge(consumerId, messageId);
181 }
182
183 /***
184 * Send a message.
185 *
186 * @param message the message to send
187 * @throws JMSException for any error
188 */
189 public void send(MessageImpl message) throws JMSException {
190 if (message == null) {
191 throw new JMSException("Argument 'message' is null");
192 }
193
194 try {
195
196
197 message.setConnectionId(_connection.getConnectionId());
198
199
200
201
202 if (_xid != null) {
203 _resources.logPublishedMessage(_xid, message);
204 } else {
205 _messages.add(message);
206 }
207 } catch (JMSException exception) {
208 _log.error("Failed to process message", exception);
209 throw exception;
210 } catch (OutOfMemoryError exception) {
211 String msg =
212 "Failed to process message due to out-of-memory error";
213 _log.error(msg, exception);
214 throw new JMSException(msg);
215 } catch (Exception exception) {
216 String msg = "Failed to process message";
217 _log.error(msg, exception);
218 throw new JMSException(msg);
219 }
220 }
221
222 /***
223 * Send a set of messages.
224 *
225 * @param messages a list of <code>MessageImpl</code> instances
226 * @throws JMSException for any JMS error
227 */
228 public void send(List messages) throws JMSException {
229 if (messages == null) {
230 throw new JMSException("Argument 'messages' is null");
231 }
232
233 Iterator iterator = messages.iterator();
234 while (iterator.hasNext()) {
235 MessageImpl message = (MessageImpl) iterator.next();
236 send(message);
237 }
238 }
239
240 /***
241 * Return the next available mesage to the specified consumer.
242 * <p/>
243 * This method is non-blocking. If no messages are available, it will return
244 * immediately.
245 *
246 * @param consumerId the consumer identifier
247 * @return the next message or <code>null</code> if none is available
248 * @throws JMSException for any JMS error
249 */
250 public MessageImpl receiveNoWait(long consumerId) throws JMSException {
251 return _consumer.receiveNoWait(consumerId);
252 }
253
254 /***
255 * Return the next available message to the specified consumer.
256 * <p/>
257 * This method is non-blocking. However, clients can specify a
258 * <code>wait</code> interval to indicate how long they are prepared to wait
259 * for a message. If no message is available, and the client indicates that
260 * it will wait, it will be notified via the registered {@link
261 * JmsMessageListener} if one subsequently becomes available.
262 *
263 * @param consumerId the consumer identifier
264 * @param wait number of milliseconds to wait. A value of <code>0
265 * </code> indicates to wait indefinitely
266 * @return the next message or <code>null</code> if none is available
267 * @throws JMSException for any JMS error
268 */
269 public MessageImpl receive(long consumerId, long wait) throws JMSException {
270 return _consumer.receive(consumerId, wait);
271 }
272
273 /***
274 * Browse up to count messages.
275 *
276 * @param consumerId the consumer identifier
277 * @param count the maximum number of messages to receive
278 * @return a list of {@link MessageImpl} instances
279 * @throws JMSException for any JMS error
280 */
281 public List browse(long consumerId, int count) throws JMSException {
282 return _consumer.browse(consumerId, count);
283 }
284
285 /***
286 * Create a new message consumer.
287 *
288 * @param destination the destination to consume messages from
289 * @param selector the message selector. May be <code>null</code>
290 * @param noLocal if true, and the destination is a topic, inhibits the
291 * delivery of messages published by its own connection.
292 * The behavior for <code>noLocal</code> is not specified
293 * if the destination is a queue.
294 * @return the identifty of the message consumer
295 * @throws JMSException for any JMS error
296 */
297 public long createConsumer(JmsDestination destination, String selector,
298 boolean noLocal) throws JMSException {
299 if (_log.isDebugEnabled()) {
300 _log.debug("createConsumer(destination=" + destination
301 + ", selector=" + selector + ", noLocal=" + noLocal
302 + ") [session=" + this + "]");
303 }
304
305 if (destination == null) {
306 throw new InvalidDestinationException(
307 "Cannot create MessageConsumer for null destination");
308 }
309
310 ConsumerEndpoint consumer = _consumerMgr.createConsumer(
311 destination, _connection.getConnectionId(), selector, noLocal);
312 _consumer.addConsumer(consumer);
313 return consumer.getId();
314 }
315
316 /***
317 * Create a new durable consumer. Durable consumers may only consume from
318 * non-temporary <code>Topic</code> destinations.
319 *
320 * @param topic the non-temporary <code>Topic</code> to subscribe to
321 * @param name the name used to identify this subscription
322 * @param selector only messages with properties matching the message
323 * selector expression are delivered. A value of null or an
324 * empty string indicates that there is no message selector
325 * for the message consumer.
326 * @param noLocal if set, inhibits the delivery of messages published by
327 * its own connection
328 * @return the identity of the durable consumer
329 * @throws JMSException for any JMS error
330 */
331 public long createDurableConsumer(JmsTopic topic, String name,
332 String selector, boolean noLocal)
333 throws JMSException {
334 if (_log.isDebugEnabled()) {
335 _log.debug("createDurableConsumer(topic=" + topic + ", name="
336 + name
337 + ", selector=" + selector + ", noLocal=" + noLocal
338 + ") [session=" + this + "]");
339 }
340
341
342
343 ConsumerEndpoint consumer = _consumerMgr.createDurableConsumer(topic,
344 name,
345 _connection.getClientID(),
346 _connection.getConnectionId(),
347 noLocal,
348 selector);
349 _consumer.addConsumer(consumer);
350 return consumer.getId();
351 }
352
353 /***
354 * Create a queue browser for this session. This allows clients to browse a
355 * queue without removing any messages.
356 *
357 * @param queue the queue to browse
358 * @param selector the message selector. May be <code>null</code>
359 * @return the identity of the queue browser
360 * @throws JMSException for any JMS error
361 */
362 public long createBrowser(JmsQueue queue, String selector)
363 throws JMSException {
364 if (_log.isDebugEnabled()) {
365 _log.debug("createBrowser(queue=" + queue + ", selector="
366 + selector
367 + ") [session=" + this + "]");
368 }
369
370 if (queue == null) {
371 throw new JMSException("Cannot create QueueBrowser for null queue");
372 }
373
374 ConsumerEndpoint consumer = _consumerMgr.createQueueBrowser(queue,
375 selector);
376
377 _consumer.addConsumer(consumer);
378 return consumer.getId();
379 }
380
381 /***
382 * Close a message consumer.
383 *
384 * @param consumerId the identity of the consumer to close
385 * @throws JMSException for any JMS error
386 */
387 public void closeConsumer(long consumerId) throws JMSException {
388 if (_log.isDebugEnabled()) {
389 _log.debug("removeConsumer(consumerId=" + consumerId
390 + ") [session="
391 + this + "]");
392 }
393
394 ConsumerEndpoint consumer = _consumer.removeConsumer(consumerId);
395 _consumerMgr.closeConsumer(consumer);
396 }
397
398 /***
399 * Unsubscribe a durable subscription.
400 *
401 * @param name the name used to identify the subscription
402 * @throws JMSException for any JMS error
403 */
404 public void unsubscribe(String name) throws JMSException {
405 if (_log.isDebugEnabled()) {
406 _log.debug("unsubscribe(name=" + name + ") [session=" + this + "]");
407 }
408
409 _consumerMgr.unsubscribe(name, _connection.getClientID());
410 }
411
412 /***
413 * Start the message delivery for the session.
414 *
415 * @throws JMSException for any JMS error
416 */
417 public void start() throws JMSException {
418 if (_log.isDebugEnabled()) {
419 _log.debug("start() [session=" + this + "]");
420 }
421 _consumer.start();
422 }
423
424 /***
425 * Stop message delivery for the session.
426 */
427 public void stop() {
428 if (_log.isDebugEnabled()) {
429 _log.debug("stop() [session=" + this + "]");
430 }
431 _consumer.stop();
432 }
433
434 /***
435 * Set the listener for this session.
436 * <p/>
437 * The listener is notified whenever a message for the session is present.
438 *
439 * @param listener the message listener
440 */
441 public void setMessageListener(JmsMessageListener listener) {
442 _consumer.setMessageListener(listener);
443 }
444
445 /***
446 * Enable or disable asynchronous message delivery for a particular
447 * consumer.
448 *
449 * @param consumerId the consumer identifier
450 * @param enable true to enable; false to disable
451 * @throws JMSException for any JMS error
452 */
453 public void setAsynchronous(long consumerId, boolean enable)
454 throws JMSException {
455 _consumer.setAsynchronous(consumerId, enable);
456 }
457
458 /***
459 * Close and release any resource allocated to this session.
460 *
461 * @throws JMSException if the session cannot be closed
462 */
463 public void close() throws JMSException {
464 boolean closed;
465 synchronized (_closed) {
466 closed = _closed.get();
467 }
468
469 if (!closed) {
470 _closed.set(true);
471 if (_log.isDebugEnabled()) {
472 _log.debug("close() [session=" + this + "]");
473 }
474
475 _consumer.stop();
476 ConsumerEndpoint[] consumers = _consumer.getConsumers();
477 for (int i = 0; i < consumers.length; ++i) {
478 ConsumerEndpoint consumer = consumers[i];
479 _consumer.removeConsumer(consumer.getId());
480 _consumerMgr.closeConsumer(consumer);
481 }
482
483 _consumer.close();
484
485
486 _connection.closed(this);
487 } else {
488 if (_log.isDebugEnabled()) {
489 _log.debug("close() [session=" + this +
490 "]: session already closed");
491 }
492 }
493 }
494
495 /***
496 * Recover the session.
497 * <p/>
498 * All unacknowledged messages are re-delivered with the JMSRedelivered flag
499 * set.
500 *
501 * @throws JMSException if the session cannot be recovered
502 */
503 public void recover() throws JMSException {
504 _consumer.recover();
505 }
506
507 /***
508 * Commit the session.
509 * <p/>
510 * This will acknowledge all delivered messages.
511 *
512 * @throws JMSException if the session cannot be committed
513 */
514 public void commit() throws JMSException {
515 _consumer.commit();
516 }
517
518 /***
519 * Rollback the session.
520 * <p/>
521 * All messages delivered to the client will be redelivered with the
522 * JMSRedelivered flag set.
523 *
524 * @throws JMSException - if there are any problems
525 */
526 public void rollback() throws JMSException {
527 _consumer.rollback();
528 }
529
530 /***
531 * Start work on behalf of a transaction branch specified in xid If TMJOIN
532 * is specified, the start is for joining a transaction previously seen by
533 * the resource manager
534 *
535 * @param xid the xa transaction identity
536 * @param flags One of TMNOFLAGS, TMJOIN, or TMRESUME
537 * @throws XAException if there is a problem completing the call
538 */
539 public void start(Xid xid, int flags) throws XAException {
540 _resources.start(xid, flags);
541
542
543 _xid = xid;
544 }
545
546 /***
547 * Ask the resource manager to prepare for a transaction commit of the
548 * transaction specified in xid.
549 *
550 * @param xid the xa transaction identity
551 * @return XA_RDONLY or XA_OK
552 * @throws XAException if there is a problem completing the call
553 */
554 public int prepare(Xid xid) throws XAException {
555 return _resources.prepare(xid);
556 }
557
558 /***
559 * Commits an XA transaction that is in progress.
560 *
561 * @param xid the xa transaction identity
562 * @param onePhase true if it is a one phase commit
563 * @throws XAException if there is a problem completing the call
564 */
565 public void commit(Xid xid, boolean onePhase) throws XAException {
566 try {
567 _resources.commit(xid, onePhase);
568 } finally {
569 _xid = null;
570 }
571 }
572
573 /***
574 * Ends the work performed on behalf of a transaction branch. The resource
575 * manager disassociates the XA resource from the transaction branch
576 * specified and let the transaction be completedCommits an XA transaction
577 * that is in progress.
578 *
579 * @param xid the xa transaction identity
580 * @param flags one of TMSUCCESS, TMFAIL, or TMSUSPEND
581 * @throws XAException if there is a problem completing the call
582 */
583 public void end(Xid xid, int flags) throws XAException {
584 try {
585 _resources.end(xid, flags);
586 } finally {
587 _xid = null;
588 }
589 }
590
591 /***
592 * Tell the resource manager to forget about a heuristically completed
593 * transaction branch.
594 *
595 * @param xid the xa transaction identity
596 * @throws XAException if there is a problem completing the call
597 */
598 public void forget(Xid xid) throws XAException {
599 try {
600 _resources.forget(xid);
601 } finally {
602 _xid = null;
603 }
604 }
605
606 /***
607 * Obtain a list of prepared transaction branches from a resource manager.
608 * The transaction manager calls this method during recovery to obtain the
609 * list of transaction branches that are currently in prepared or
610 * heuristically completed states.
611 *
612 * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS. TMNOFLAGS
613 * @return the set of Xids to recover
614 * @throws XAException - if there is a problem completing the call
615 */
616 public Xid[] recover(int flag) throws XAException {
617 return _resources.recover(flag);
618 }
619
620 /***
621 * Inform the resource manager to roll back work done on behalf of a
622 * transaction branch
623 *
624 * @param xid the xa transaction identity
625 * @throws XAException if there is a problem completing the call
626 */
627 public void rollback(Xid xid) throws XAException {
628 try {
629 _resources.rollback(xid);
630 } finally {
631
632 _xid = null;
633 }
634 }
635
636 /***
637 * Return the transaction timeout for this instance of the resource
638 * manager.
639 *
640 * @return the timeout in seconds
641 * @throws XAException if there is a problem completing the call
642 */
643 public int getTransactionTimeout() throws XAException {
644 return _resources.getTransactionTimeout();
645 }
646
647 /***
648 * Set the current transaction timeout value for this XAResource instance.
649 *
650 * @param seconds timeout in seconds
651 * @return if the new transaction timeout was accepted
652 * @throws XAException if there is a problem completing the call
653 */
654 public boolean setTransactionTimeout(int seconds) throws XAException {
655 return _resources.setTransactionTimeout(seconds);
656 }
657
658 /***
659 * This method is called to determine if the resource manager instance
660 * represented by the target object is the same as the resouce manager
661 * instance represented by the parameter xares.
662 *
663 * @param xares an XAResource object whose resource manager instance is to
664 * be compared with the resource manager instance of the target
665 * object.
666 * @return true if it's the same RM instance; otherwise false.
667 * @throws XAException for any error
668 */
669 public boolean isSameRM(XAResource xares) throws XAException {
670 boolean result = (xares instanceof ServerSessionImpl);
671 if (result) {
672 ServerSessionImpl other = (ServerSessionImpl) xares;
673 result = (other.getResourceManagerId() == getResourceManagerId());
674 }
675
676 return result;
677 }
678
679 /***
680 * Return the xid that is currently associated with this session or null if
681 * this session is currently not part of a global transactions
682 *
683 * @return Xid
684 */
685 public Xid getXid() {
686 return _xid;
687 }
688
689 /***
690 * Return the identity of the {@link ResourceManager}. The transaction
691 * manager should be the only one to initiating this call.
692 *
693 * @return the identity of the resource manager
694 * @throws XAException - if it cannot retrieve the rid.
695 */
696 public String getResourceManagerId() throws XAException {
697 return _resources.getResourceManagerId();
698 }
699
700 }