View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: JmsSessionStubImpl.java,v 1.6 2007/01/24 12:00:27 tanderson Exp $
44   */
45  package org.exolab.jms.client.net;
46  
47  import java.rmi.RemoteException;
48  import java.util.List;
49  import javax.jms.JMSException;
50  import javax.transaction.xa.XAException;
51  import javax.transaction.xa.Xid;
52  
53  import org.exolab.jms.client.JmsDestination;
54  import org.exolab.jms.client.JmsMessageListener;
55  import org.exolab.jms.client.JmsQueue;
56  import org.exolab.jms.client.JmsTopic;
57  import org.exolab.jms.message.MessageImpl;
58  import org.exolab.jms.net.orb.ORB;
59  import org.exolab.jms.net.proxy.Proxy;
60  import org.exolab.jms.server.ServerSession;
61  
62  
63  /***
64   * Wraps an {@link ServerSession}.
65   *
66   * @author <a href="mailto:jima@exoffice.com">Jim Alateras</a>
67   * @version $Revision: 1.6 $ $Date: 2007/01/24 12:00:27 $
68   */
69  public class JmsSessionStubImpl
70          implements ServerSession, JmsMessageListener {
71  
72      /***
73       * This is the message listener for JMS messages. The MessageListener is
74       * then responsible for delivering it to the actual consumer.
75       */
76      private JmsMessageListener _listener;
77  
78      /***
79       * The ORB to export this with.
80       */
81      private ORB _orb;
82  
83      /***
84       * The reference to the server session.
85       */
86      private ServerSession _session;
87  
88  
89      /***
90       * Construct a new <code>JmsSessionStubImpl</code>.
91       *
92       * @param session     the reference to the server session
93       * @param orb         the ORB to export this with
94       * @param uri         the URI to export this on
95       * @param principal   the security principal. May be <code>null</code>
96       * @param credentials the security credentials. May be <code>null</code>
97       */
98      protected JmsSessionStubImpl(ServerSession session, ORB orb,
99                                   String uri, String principal,
100                                  String credentials)
101             throws RemoteException {
102         if (session == null) {
103             throw new IllegalArgumentException("Argument 'session' is null");
104         }
105         if (orb == null) {
106             throw new IllegalArgumentException("Argument 'orb' is null");
107         }
108         _session = session;
109         _orb = orb;
110         Proxy proxy = orb.exportObjectTo(this, uri, principal, credentials);
111         _session.setMessageListener((JmsMessageListener) proxy);
112     }
113 
114     /***
115      * Close and release any resource allocated to this session.
116      *
117      * @throws JMSException for any JMS error
118      */
119     public void close() throws JMSException {
120         try {
121             _session.close();
122             _orb.unexportObject(this);
123         } catch (RemoteException exception) {
124             rethrow(exception);
125         } finally {
126             if (_session  instanceof Proxy) {
127                 ((Proxy) _session).disposeProxy();
128             }
129             _session = null;
130             _listener = null;
131         }
132     }
133 
134     /***
135      * Acknowledge that a message has been processed.
136      *
137      * @param consumerId the identity of the consumer performing the ack
138      * @param messageId  the message identifier
139      * @throws JMSException for any error
140      */
141     public void acknowledgeMessage(long consumerId, String messageId)
142             throws JMSException {
143         _session.acknowledgeMessage(consumerId, messageId);
144     }
145 
146     /***
147      * Send a message.
148      *
149      * @param message the message to send
150      * @throws JMSException for any error
151      */
152     public void send(MessageImpl message) throws JMSException {
153         _session.send(message);
154     }
155 
156     /***
157      * Send a set of messages.
158      *
159      * @param messages a list of <code>MessageImpl</code> instances
160      * @throws JMSException for any JMS error
161      */
162     public void send(List messages) throws JMSException {
163         _session.send(messages);
164     }
165 
166     /***
167      * Return the next available mesage to the specified consumer.
168      * <p/>
169      * This method is non-blocking. If no messages are available, it will return
170      * immediately.
171      *
172      * @param consumerId the consumer identifier
173      * @return the next message or <code>null</code> if none is available
174      * @throws JMSException for any JMS error
175      */
176     public MessageImpl receiveNoWait(long consumerId) throws JMSException {
177         return _session.receiveNoWait(consumerId);
178     }
179 
180     /***
181      * Return the next available message to the specified consumer.
182      * <p/>
183      * This method is non-blocking. However, clients can specify a
184      * <code>wait</code> interval to indicate how long they are prepared to wait
185      * for a message. If no message is available, and the client indicates that
186      * it will wait, it will be notified via the registered {@link
187      * JmsMessageListener} if one subsequently becomes available.
188      *
189      * @param consumerId the consumer identifier
190      * @param wait       number of milliseconds to wait. A value of <code>0
191      *                   </code> indicates to wait indefinitely
192      * @return the next message or <code>null</code> if none is available
193      * @throws JMSException for any JMS error
194      */
195     public MessageImpl receive(long consumerId, long wait) throws JMSException {
196         return _session.receive(consumerId, wait);
197     }
198 
199     /***
200      * Browse up to count messages.
201      *
202      * @param consumerId the consumer identifier
203      * @param count      the maximum number of messages to receive
204      * @return a list of {@link MessageImpl} instances
205      * @throws JMSException for any JMS error
206      */
207     public List browse(long consumerId, int count) throws JMSException {
208         return _session.browse(consumerId, count);
209     }
210 
211     /***
212      * Create a new message consumer.
213      *
214      * @param destination the destination to consume messages from
215      * @param selector    the message selector. May be <code>null</code>
216      * @param noLocal     if true, and the destination is a topic, inhibits the
217      *                    delivery of messages published by its own connection.
218      *                    The behavior for <code>noLocal</code> is not specified
219      *                    if the destination is a queue.
220      * @return the identifty of the message consumer
221      * @throws JMSException for any JMS error
222      */
223     public long createConsumer(JmsDestination destination, String selector,
224                                boolean noLocal) throws JMSException {
225         return _session.createConsumer(destination, selector, noLocal);
226     }
227 
228     /***
229      * Create a new durable consumer. Durable consumers may only consume from
230      * non-temporary <code>Topic</code> destinations.
231      *
232      * @param topic    the non-temporary <code>Topic</code> to subscribe to
233      * @param name     the name used to identify this subscription
234      * @param selector only messages with properties matching the message
235      *                 selector expression are delivered.  A value of null or an
236      *                 empty string indicates that there is no message selector
237      *                 for the message consumer.
238      * @param noLocal  if set, inhibits the delivery of messages published by
239      *                 its own connection
240      * @return the identity of the durable consumer
241      * @throws JMSException for any JMS error
242      */
243     public long createDurableConsumer(JmsTopic topic, String name,
244                                       String selector, boolean noLocal)
245             throws JMSException {
246         return _session.createDurableConsumer(topic, name, selector, noLocal);
247     }
248 
249     /***
250      * Create a queue browser for this session. This allows clients to browse a
251      * queue without removing any messages.
252      *
253      * @param queue    the queue to browse
254      * @param selector the message selector. May be <code>null</code>
255      * @return the identity of the queue browser
256      * @throws JMSException for any JMS error
257      */
258     public long createBrowser(JmsQueue queue, String selector)
259             throws JMSException {
260         return _session.createBrowser(queue, selector);
261     }
262 
263     /***
264      * Close a message consumer.
265      *
266      * @param consumerId the identity of the consumer to close
267      * @throws JMSException for any JMS error
268      */
269     public void closeConsumer(long consumerId) throws JMSException {
270         _session.closeConsumer(consumerId);
271     }
272 
273     /***
274      * Unsubscribe a durable subscription.
275      *
276      * @param name the name used to identify the subscription
277      * @throws JMSException for any JMS error
278      */
279     public void unsubscribe(String name) throws JMSException {
280         _session.unsubscribe(name);
281     }
282 
283     /***
284      * Start message delivery to this session.
285      *
286      * @throws JMSException for any JMS error
287      */
288     public void start() throws JMSException {
289         _session.start();
290     }
291 
292     /***
293      * Stop message delivery to this session.
294      *
295      * @throws JMSException for any JMS error
296      */
297     public void stop() throws JMSException {
298         _session.stop();
299     }
300 
301     /***
302      * Set the listener for this session.
303      * <p/>
304      * The listener is notified whenever a message for the session is present.
305      *
306      * @param listener the message listener
307      */
308     public void setMessageListener(JmsMessageListener listener) {
309         _listener = listener;
310     }
311 
312     /***
313      * Enable or disable asynchronous message delivery for a particular
314      * consumer.
315      *
316      * @param consumerId the consumer identifier
317      * @param enable     true to enable; false to disable
318      * @throws JMSException for any JMS error
319      */
320     public void setAsynchronous(long consumerId, boolean enable)
321             throws JMSException {
322         _session.setAsynchronous(consumerId, enable);
323     }
324 
325     /***
326      * Recover the session. This means all unacknowledged messages are resent
327      * with the redelivery flag set
328      *
329      * @throws JMSException if the session cannot be recovered
330      */
331     public void recover() throws JMSException {
332         _session.recover();
333     }
334 
335     /***
336      * Commit the session.
337      *
338      * @throws JMSException if the session cannot be committed
339      */
340     public void commit() throws JMSException {
341         _session.commit();
342     }
343 
344     /***
345      * Rollback the session.
346      *
347      * @throws JMSException if the session cannot be rolled back
348      */
349     public void rollback() throws JMSException {
350         _session.rollback();
351     }
352 
353     /***
354      * Start work on behalf of a transaction branch specified in xid. If TMJOIN
355      * is specified, the start is for joining a transaction previously seen by
356      * the resource manager.
357      *
358      * @param xid   the xa transaction identity
359      * @param flags One of TMNOFLAGS, TMJOIN, or TMRESUME
360      * @throws XAException if there is a problem completing the call
361      */
362     public void start(Xid xid, int flags) throws XAException {
363         _session.start(xid, flags);
364     }
365 
366     /***
367      * Ask the resource manager to prepare for a transaction commit of the
368      * transaction specified in xid.
369      *
370      * @param xid the xa transaction identity
371      * @return XA_RDONLY or XA_OK
372      * @throws XAException if there is a problem completing the call
373      */
374     public int prepare(Xid xid) throws XAException {
375         return _session.prepare(xid);
376     }
377 
378     /***
379      * Commits an XA transaction that is in progress.
380      *
381      * @param xid      the xa transaction identity
382      * @param onePhase true if it is a one phase commit
383      * @throws XAException if there is a problem completing the call
384      */
385     public void commit(Xid xid, boolean onePhase) throws XAException {
386         _session.commit(xid, onePhase);
387     }
388 
389     /***
390      * Ends the work performed on behalf of a transaction branch. The resource
391      * manager disassociates the XA resource from the transaction branch
392      * specified and let the transaction be completedCommits an XA transaction
393      * that is in progress.
394      *
395      * @param xid   the xa transaction identity
396      * @param flags one of TMSUCCESS, TMFAIL, or TMSUSPEND
397      * @throws XAException if there is a problem completing the call
398      */
399     public void end(Xid xid, int flags) throws XAException {
400         _session.end(xid, flags);
401     }
402 
403     /***
404      * Tell the resource manager to forget about a heuristically completed
405      * transaction branch.
406      *
407      * @param xid the xa transaction identity
408      * @throws XAException if there is a problem completing the call
409      */
410     public void forget(Xid xid) throws XAException {
411         _session.forget(xid);
412     }
413 
414     /***
415      * Inform the resource manager to roll back work done on behalf of a
416      * transaction branch.
417      *
418      * @param xid the xa transaction identity
419      * @throws XAException if there is a problem completing the call
420      */
421     public void rollback(Xid xid) throws XAException {
422         _session.rollback(xid);
423     }
424 
425     /***
426      * Obtain a list of prepared transaction branches from a resource manager.
427      * The transaction manager calls this method during recovery to obtain the
428      * list of transaction branches that are currently in prepared or
429      * heuristically completed states.
430      *
431      * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS. TMNOFLAGS
432      * @return the set of Xids to recover
433      * @throws XAException - if there is a problem completing the call
434      */
435     public Xid[] recover(int flag) throws XAException {
436         return _session.recover(flag);
437     }
438 
439     /***
440      * Return the transaction timeout for this instance of the resource
441      * manager.
442      *
443      * @return the timeout in seconds
444      * @throws XAException if there is a problem completing the call
445      */
446     public int getTransactionTimeout() throws XAException {
447         return _session.getTransactionTimeout();
448     }
449 
450     /***
451      * Set the current transaction timeout value for this XAResource instance.
452      *
453      * @param seconds timeout in seconds
454      * @return if the new transaction timeout was accepted
455      * @throws XAException if there is a problem completing the call
456      */
457     public boolean setTransactionTimeout(int seconds) throws XAException {
458         return _session.setTransactionTimeout(seconds);
459     }
460 
461     /***
462      * Return the identity of the associated resource manager.
463      *
464      * @return the identity of the resource manager
465      * @throws XAException if there is a problem completing the call
466      */
467     public String getResourceManagerId() throws XAException {
468         return _session.getResourceManagerId();
469     }
470 
471     /***
472      * Deliver a message.
473      *
474      * @param message the message to deliver
475      * @throws RemoteException if the message can't be delivered
476      */
477     public boolean onMessage(MessageImpl message) throws RemoteException {
478         return _listener.onMessage(message);
479     }
480 
481     /***
482      * Inform the session that there is a message available for a synchronous
483      * consumer.
484      */
485     public void onMessageAvailable() throws RemoteException {
486         _listener.onMessageAvailable();
487     }
488 
489     /***
490      * Rethrows a <code>RemoteException</code> as a <code>JMSException</code>.
491      *
492      * @param exception the exception to rethrow
493      * @throws JMSException the rethrown exception
494      */
495     private void rethrow(RemoteException exception) throws JMSException {
496         JMSException error = new JMSException(exception.getMessage());
497         error.setLinkedException(exception);
498         throw error;
499     }
500 
501 }