View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: JmsConnection.java,v 1.4 2005/05/24 13:35:18 tanderson Exp $
44   */
45  package org.exolab.jms.client;
46  
47  import java.util.ArrayList;
48  import java.util.Iterator;
49  import java.util.List;
50  import javax.jms.Connection;
51  import javax.jms.ConnectionConsumer;
52  import javax.jms.ConnectionMetaData;
53  import javax.jms.Destination;
54  import javax.jms.ExceptionListener;
55  import javax.jms.IllegalStateException;
56  import javax.jms.InvalidDestinationException;
57  import javax.jms.InvalidSelectorException;
58  import javax.jms.JMSException;
59  import javax.jms.ServerSessionPool;
60  import javax.jms.Session;
61  import javax.jms.Topic;
62  import javax.jms.InvalidClientIDException;
63  
64  import org.apache.commons.logging.Log;
65  import org.apache.commons.logging.LogFactory;
66  
67  import org.exolab.jms.server.ServerConnection;
68  
69  
70  /***
71   * Client side implementation of the <code>javax.jms.Connection</code>
72   * interface.
73   *
74   * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a>
75   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
76   * @version $Revision: 1.4 $ $Date: 2005/05/24 13:35:18 $
77   */
78  class JmsConnection implements Connection {
79  
80      /***
81       * The connection factory that constructed this.
82       */
83      private JmsConnectionFactory _factory;
84  
85      /***
86       * The proxy for the remote connection implementation.
87       */
88      private ServerConnection _connection;
89  
90      /***
91       * The connection identifier, assigned by the server.
92       */
93      private final long _connectionId;
94  
95      /***
96       * This flag indicates whether or not the connection is closed.
97       */
98      private boolean _closed = false;
99  
100     /***
101      * This flag indicates whether the connection is in the start or stopped
102      * state.
103      */
104     private boolean _stopped = true;
105 
106     /***
107      * This flag indicates whether the connection has been modified. If so,
108      * subsequent attempts to invoke {@link #setClientID} will cause an
109      * <code>IllegalStateException</code> being thrown
110      */
111     private boolean _modified = false;
112 
113     /***
114      * The identity associated with the client, set via the {@link
115      * JmsConnectionFactory} or {@link #setClientID}
116      */
117     private String _clientId;
118 
119     /***
120      * Gates the setting of the clientId more than once.
121      */
122     private boolean _clientIdSet = false;
123 
124     /***
125      * The exception listener for this connection.
126      */
127     private ExceptionListener _exceptionListener;
128 
129     /***
130      * The active sessions managed by this connection.
131      */
132     private List _sessions = new ArrayList();
133 
134     /***
135      * The connection data is immutable at this stage. This enables us to cache
136      * a single copy in memory.
137      */
138     private static final JmsConnectionMetaData _metaData =
139             new JmsConnectionMetaData();
140 
141     /***
142      * The logger
143      */
144     private static final Log _log = LogFactory.getLog(JmsConnection.class);
145 
146 
147     /***
148      * Construct a new <code>JmsConnection</code>.
149      * <p/>
150      * This attempts to establish a connection to the JMS server
151      *
152      * @param factory  the connection factory responsible for creating this
153      * @param clientID the pre-configured client identifier. May be
154      *                 <code>null</code>
155      * @param username the client username
156      * @param password the client password
157      * @throws JMSException if a connection cannot be established
158      */
159     protected JmsConnection(JmsConnectionFactory factory, String clientID,
160                             String username, String password)
161             throws JMSException {
162 
163         if (factory == null) {
164             throw new IllegalArgumentException("Argument 'factory' is null");
165         }
166         _factory = factory;
167         _clientId = clientID;
168 
169         _stopped = true;
170 
171         // use the factory object to retrieve the proxy that
172         // will be used to get a JmsConnectionStubIfc instance
173         // and cache its identity locally
174         _connection = factory.getProxy().createConnection(_clientId, username,
175                                                           password);
176         _connectionId = _connection.getConnectionId();
177     }
178 
179     /***
180      * Returns the connection identifier.
181      *
182      * @return the connection identifier
183      */
184     public long getConnectionId() {
185         return _connectionId;
186     }
187 
188     /***
189      * Gets the client identifier for this connection.
190      *
191      * @return the unique client identifier
192      * @throws JMSException if the JMS provider fails to return the client ID
193      *                      for this connection due to some internal error.
194      */
195     public String getClientID() throws JMSException {
196         ensureOpen();
197         setModified();
198 
199         return _clientId;
200     }
201 
202     /***
203      * Sets the client identifier for this connection.
204      *
205      * @param clientID the unique client identifier
206      * @throws JMSException             if the JMS provider fails to set the
207      *                                  client ID for this connection due to
208      *                                  some internal error.
209      * @throws InvalidClientIDException if the JMS client specifies an invalid
210      *                                  or duplicate client ID.
211      * @throws IllegalStateException    if the JMS client attempts to set a
212      *                                  connection's client ID at the wrong time
213      *                                  or when it has been administratively
214      *                                  configured.
215      */
216     public void setClientID(String clientID) throws JMSException {
217         ensureOpen();
218 
219         // check if the client id has already been set
220         if (_clientIdSet) {
221             throw new IllegalStateException(
222                     "The client id has already been set");
223         }
224 
225         if (_modified) {
226             throw new IllegalStateException(
227                     "The client identifier must be set before any other "
228                     + "operation is performed");
229         }
230 
231         _connection.setClientID(clientID);
232         _clientId = clientID;
233         _clientIdSet = true; // prevent client id from being set more than once.
234 
235     }
236 
237 
238     /***
239      * Returns the metadata for this connection.
240      *
241      * @return the connection metadata
242      * @throws JMSException if the JMS provider fails to get the connection
243      *                      metadata for this connection.
244      */
245     public ConnectionMetaData getMetaData() throws JMSException {
246         ensureOpen();
247         setModified();
248         return _metaData;
249     }
250 
251     /***
252      * Returns the <code>ExceptionListener</code> for this connection.
253      *
254      * @return the <code>ExceptionListener</code> for this connection, or
255      * <code>null</code> if none is associated with this connection.
256      * @throws JMSException if the JMS provider fails to get the
257      *                      <code>ExceptionListener</code> for this connection.
258      */
259     public ExceptionListener getExceptionListener() throws JMSException {
260         ensureOpen();
261         setModified();
262         return _exceptionListener;
263     }
264 
265     /***
266      * Sets an exception listener for this connection.
267      *
268      * @param listener the exception listener
269      * @throws JMSException if the JMS provider fails to set the exception
270      *                      listener for this connection.
271      */
272     public void setExceptionListener(ExceptionListener listener)
273             throws JMSException {
274         ensureOpen();
275         setModified();
276         _exceptionListener = listener;
277     }
278 
279     /***
280      * Notify the exception listener of a JMSException. If the exception
281      * listener is not set then ignore it.
282      *
283      * @param message message to deliver
284      */
285     public void notifyExceptionListener(JMSException message) {
286         // check the error code
287         if (message.getErrorCode() != null &&
288                 message.getErrorCode().equals(
289                         JmsErrorCodes.CONNECTION_TO_SERVER_DROPPED)) {
290             // the connection to the server has been dropped so we need to
291             // release all local resources.
292             try {
293                 close();
294             } catch (JMSException exception) {
295                 _log.error(exception.getMessage(), exception);
296             }
297         }
298 
299         // finally notify registered exception listener
300         if (_exceptionListener != null) {
301             _exceptionListener.onException(message);
302         }
303     }
304 
305     /***
306      * Starts (or restarts) a connection's delivery of incoming messages. A call
307      * to <code>start</code> on a connection that has already been started is
308      * ignored.
309      *
310      * @throws JMSException if the JMS provider fails to start message delivery
311      *                      due to some internal error.
312      */
313     public synchronized void start() throws JMSException {
314         ensureOpen();
315         setModified();
316 
317         try {
318             if (_stopped) {
319                 // start the associated sessions
320                 Iterator iterator = _sessions.iterator();
321                 while (iterator.hasNext()) {
322                     JmsSession session = (JmsSession) iterator.next();
323                     session.start();
324                 }
325                 // set the state of the connection to start
326                 _stopped = false;
327             }
328         } catch (JMSException exception) {
329             // do we need to change _stopped to true if the any of the
330             // sessions fail to start
331             throw exception;
332         }
333     }
334 
335     /***
336      * Temporarily stops a connection's delivery of incoming messages. Delivery
337      * can be restarted using the connection's <code>start</code> method. When
338      * the connection is stopped, delivery to all the connection's message
339      * consumers is inhibited: synchronous receives block, and messages are not
340      * delivered to message listeners.
341      * <p/>
342      * <P>This call blocks until receives and/or message listeners in progress
343      * have completed.
344      * <p/>
345      * <P>Stopping a connection has no effect on its ability to send messages. A
346      * call to <code>stop</code> on a connection that has already been stopped
347      * is ignored.
348      *
349      * @throws JMSException if the JMS provider fails to stop message delivery
350      *                      due to some internal error.
351      */
352     public synchronized void stop() throws JMSException {
353         ensureOpen();
354         setModified();
355 
356         if (!_stopped) {
357             // stop the associated sessions
358             synchronized (_sessions) {
359                 Iterator iterator = _sessions.iterator();
360                 while (iterator.hasNext()) {
361                     JmsSession session = (JmsSession) iterator.next();
362                     session.stop();
363                 }
364             }
365             // set the state of the connection to stopped
366             _stopped = true;
367         }
368     }
369 
370     /***
371      * Closes the connection.
372      * <P>When this method is invoked, it should not return until message
373      * processing has been shut down in an orderly fashion. This means that all
374      * message listeners that may have been running have returned, and that all
375      * pending receives have returned. A close terminates all pending message
376      * receives on the connection's sessions' consumers. The receives may return
377      * with a message or with null, depending on whether there was a message
378      * available at the time of the close. If one or more of the connection's
379      * sessions' message listeners is processing a message at the time when
380      * connection <code>close</code> is invoked, all the facilities of the
381      * connection and its sessions must remain available to those listeners
382      * until they return control to the JMS provider.
383      * <p/>
384      * <P>Closing a connection causes any of its sessions' transactions in
385      * progress to be rolled back. In the case where a session's work is
386      * coordinated by an external transaction manager, a session's
387      * <code>commit</code> and <code>rollback</code> methods are not used and
388      * the result of a closed session's work is determined later by the
389      * transaction manager.
390      * <p/>
391      * Closing a connection does NOT force an acknowledgment of
392      * client-acknowledged sessions.
393      * <p/>
394      * <P>Invoking the <code>acknowledge</code> method of a received message
395      * from a closed connection's session must throw an
396      * <code>IllegalStateException</code>.
397      * Closing a closed connection must NOT throw an exception.
398      *
399      * @throws JMSException if the JMS provider fails to close the connection
400      *                      due to some internal error.
401      */
402     public synchronized void close() throws JMSException {
403         if (!_closed) {
404             // before we close we should stop the connection and any
405             // associated sessions
406             stop();
407 
408             // close the sessions
409             JmsSession[] sessions = null;
410             synchronized (_sessions) {
411                 sessions = (JmsSession[]) _sessions.toArray(new JmsSession[0]);
412             }
413             for (int i = 0; i < sessions.length; ++i) {
414                 sessions[i].close();
415                 // the session deregisters itself with the connection via
416                 // removeSession()
417             }
418 
419             // notify the server, and null the proxy
420             getServerConnection().close();
421             _connection = null;
422 
423             // remove this from the list of connections managed by the
424             // connection factory and then null the factory.
425             _factory.removeConnection(this);
426             _factory = null;
427 
428             // set the closed flag so calling it multiple times is
429             // cool
430             _closed = true;
431         }
432     }
433 
434     /***
435      * Creates a <code>Session</code> object.
436      *
437      * @param transacted      indicates whether the session is transacted
438      * @param acknowledgeMode indicates whether the consumer or the client will
439      *                        acknowledge any messages it receives; ignored if
440      *                        the session is transacted. Legal values are
441      *                        <code>Session.AUTO_ACKNOWLEDGE</code>,
442      *                        <code>Session.CLIENT_ACKNOWLEDGE</code>, and
443      *                        <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
444      * @return a newly created session
445      * @throws JMSException if the <code>Connection</code> object fails to
446      *                      create a session due to some internal error or lack
447      *                      of support for the specific transaction and
448      *                      acknowledgement mode.
449      * @see Session#AUTO_ACKNOWLEDGE
450      * @see Session#CLIENT_ACKNOWLEDGE
451      * @see Session#DUPS_OK_ACKNOWLEDGE
452      */
453     public Session createSession(boolean transacted, int acknowledgeMode)
454             throws JMSException {
455         ensureOpen();
456         setModified();
457 
458         JmsSession session = new JmsSession(this, transacted, acknowledgeMode);
459 
460         // if the connection is started then also start the session
461         if (!isStopped()) {
462             session.start();
463         }
464 
465         // add it to the list of managed sessions for this connection
466         addSession(session);
467 
468         return session;
469     }
470 
471     /***
472      * Creates a connection consumer for this connection (optional operation).
473      * This is an expert facility not used by regular JMS clients.
474      *
475      * @param destination     the destination to access
476      * @param messageSelector only messages with properties matching the message
477      *                        selector expression are delivered.  A value of
478      *                        null or an empty string indicates that there is no
479      *                        message selector for the message consumer.
480      * @param sessionPool     the server session pool to associate with this
481      *                        connection consumer
482      * @param maxMessages     the maximum number of messages that can be
483      *                        assigned to a server session at one time
484      * @return the connection consumer
485      * @throws JMSException    if the <code>Connection</code> object fails to
486      *                         create a connection consumer due to some internal
487      *                         error or invalid arguments for
488      *                         <code>sessionPool</code> and
489      *                         <code>messageSelector</code>.
490      * @throws InvalidDestinationException if an invalid destination is
491      *                                     specified.
492      * @throws InvalidSelectorException    if the message selector is invalid.
493      */
494     public ConnectionConsumer createConnectionConsumer(
495             Destination destination, String messageSelector,
496             ServerSessionPool sessionPool, int maxMessages)
497             throws JMSException {
498         ensureOpen();
499         setModified();
500         return new JmsConnectionConsumer(this, destination, sessionPool,
501                                          messageSelector, maxMessages);
502     }
503 
504     /***
505      * Create a durable connection consumer for this connection.
506      *
507      * @param topic            topic to access
508      * @param subscriptionName durable subscription name
509      * @param messageSelector  only messages with properties matching the
510      *                         message selector expression are delivered.  A
511      *                         value of null or an empty string indicates that
512      *                         there is no message selector for the message
513      *                         consumer.
514      * @param sessionPool      the server session pool to associate with this
515      *                         durable connection consumer
516      * @param maxMessages      the maximum number of messages that can be
517      *                         assigned to a server session at one time
518      * @return the durable connection consumer
519      * @throws JMSException    if the <code>Connection</code> object fails to
520      *                         create a connection consumer due to some internal
521      *                         error or invalid arguments for
522      *                         <code>sessionPool</code> and
523      *                         <code>messageSelector</code>.
524      * @throws InvalidDestinationException if an invalid destination is
525      *                                     specified.
526      * @throws InvalidSelectorException    if the message selector is invalid.
527      */
528     public ConnectionConsumer createDurableConnectionConsumer(
529             Topic topic, String subscriptionName, String messageSelector,
530             ServerSessionPool sessionPool, int maxMessages)
531             throws JMSException {
532         ensureOpen();
533         setModified();
534         return new JmsConnectionConsumer(this, topic, subscriptionName,
535                                          sessionPool, messageSelector,
536                                          maxMessages);
537     }
538 
539     /***
540      * Returns the server connection.
541      *
542      * @return the server connection
543      * @throws JMSException if the connection is <code>null</code>
544      */
545     protected ServerConnection getServerConnection() throws JMSException {
546         if (_connection == null) {
547             throw new JMSException("Connection closed");
548         }
549 
550         return _connection;
551     }
552 
553     /***
554      * Add the specified session to the list of managed sessions.
555      *
556      * @param session session to register
557      */
558     protected void addSession(JmsSession session) {
559         synchronized (_sessions) {
560             _sessions.add(session);
561         }
562     }
563 
564     /***
565      * Remove the specified session from the list of managed sessions. If it
566      * doesn't exist then fail silently.
567      *
568      * @param session session to remove
569      */
570     protected void removeSession(JmsSession session) {
571         synchronized (_sessions) {
572             _sessions.remove(session);
573         }
574     }
575 
576     /***
577      * Returns the running state of the connection.
578      *
579      * @return <code>true</code> if stopped
580      */
581     protected boolean isStopped() {
582         return _stopped;
583     }
584 
585     /***
586      * Flags this connection as being modified. Subsequent attempts to invoke
587      * {@link #setClientID} will result in an <code>IllegalStateException</code>
588      * being thrown.
589      */
590     protected void setModified() {
591         _modified = true;
592     }
593 
594     /***
595      * Delete the temporary destination and all the registered sessions
596      * consumers waiting to receive messages from this destination will be
597      * stopped.
598      * <p/>
599      * It will throw a JMSException if the specified destination is not
600      * temporary or if the destination is null or if the destination is not
601      * owned by this connection
602      *
603      * @param destination temporary destination to delete
604      * @throws JMSException
605      */
606     protected synchronized void deleteTemporaryDestination(
607             JmsDestination destination)
608             throws JMSException {
609         if ((destination != null) &&
610                 (destination instanceof JmsTemporaryDestination)) {
611             JmsTemporaryDestination temp_dest =
612                     (JmsTemporaryDestination) destination;
613 
614             // check to see that this destination was actually created by
615             // this connection
616             if (temp_dest.getOwningConnection() == this) {
617                 // this is currently a no-op but we probably need a way to
618                 // clean up on the server side
619             } else {
620                 throw new JMSException(
621                         "The temp destination cannot be used outside the scope "
622                         + "of the connection creating it");
623             }
624         } else {
625             throw new JMSException("The destination is not temporary");
626         }
627     }
628 
629     /***
630      * Verifies that the connection is open.
631      *
632      * @throws IllegalStateException if the connection is closed
633      */
634     protected void ensureOpen() throws IllegalStateException {
635         if (_closed) {
636             throw new IllegalStateException(
637                     "Cannot perform operation - session has been closed");
638         }
639     }
640 
641 }
642