1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: JmsConnection.java,v 1.4 2005/05/24 13:35:18 tanderson Exp $
44 */
45 package org.exolab.jms.client;
46
47 import java.util.ArrayList;
48 import java.util.Iterator;
49 import java.util.List;
50 import javax.jms.Connection;
51 import javax.jms.ConnectionConsumer;
52 import javax.jms.ConnectionMetaData;
53 import javax.jms.Destination;
54 import javax.jms.ExceptionListener;
55 import javax.jms.IllegalStateException;
56 import javax.jms.InvalidDestinationException;
57 import javax.jms.InvalidSelectorException;
58 import javax.jms.JMSException;
59 import javax.jms.ServerSessionPool;
60 import javax.jms.Session;
61 import javax.jms.Topic;
62 import javax.jms.InvalidClientIDException;
63
64 import org.apache.commons.logging.Log;
65 import org.apache.commons.logging.LogFactory;
66
67 import org.exolab.jms.server.ServerConnection;
68
69
70 /***
71 * Client side implementation of the <code>javax.jms.Connection</code>
72 * interface.
73 *
74 * @author <a href="mailto:jima@comware.com.au">Jim Alateras</a>
75 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
76 * @version $Revision: 1.4 $ $Date: 2005/05/24 13:35:18 $
77 */
78 class JmsConnection implements Connection {
79
80 /***
81 * The connection factory that constructed this.
82 */
83 private JmsConnectionFactory _factory;
84
85 /***
86 * The proxy for the remote connection implementation.
87 */
88 private ServerConnection _connection;
89
90 /***
91 * The connection identifier, assigned by the server.
92 */
93 private final long _connectionId;
94
95 /***
96 * This flag indicates whether or not the connection is closed.
97 */
98 private boolean _closed = false;
99
100 /***
101 * This flag indicates whether the connection is in the start or stopped
102 * state.
103 */
104 private boolean _stopped = true;
105
106 /***
107 * This flag indicates whether the connection has been modified. If so,
108 * subsequent attempts to invoke {@link #setClientID} will cause an
109 * <code>IllegalStateException</code> being thrown
110 */
111 private boolean _modified = false;
112
113 /***
114 * The identity associated with the client, set via the {@link
115 * JmsConnectionFactory} or {@link #setClientID}
116 */
117 private String _clientId;
118
119 /***
120 * Gates the setting of the clientId more than once.
121 */
122 private boolean _clientIdSet = false;
123
124 /***
125 * The exception listener for this connection.
126 */
127 private ExceptionListener _exceptionListener;
128
129 /***
130 * The active sessions managed by this connection.
131 */
132 private List _sessions = new ArrayList();
133
134 /***
135 * The connection data is immutable at this stage. This enables us to cache
136 * a single copy in memory.
137 */
138 private static final JmsConnectionMetaData _metaData =
139 new JmsConnectionMetaData();
140
141 /***
142 * The logger
143 */
144 private static final Log _log = LogFactory.getLog(JmsConnection.class);
145
146
147 /***
148 * Construct a new <code>JmsConnection</code>.
149 * <p/>
150 * This attempts to establish a connection to the JMS server
151 *
152 * @param factory the connection factory responsible for creating this
153 * @param clientID the pre-configured client identifier. May be
154 * <code>null</code>
155 * @param username the client username
156 * @param password the client password
157 * @throws JMSException if a connection cannot be established
158 */
159 protected JmsConnection(JmsConnectionFactory factory, String clientID,
160 String username, String password)
161 throws JMSException {
162
163 if (factory == null) {
164 throw new IllegalArgumentException("Argument 'factory' is null");
165 }
166 _factory = factory;
167 _clientId = clientID;
168
169 _stopped = true;
170
171
172
173
174 _connection = factory.getProxy().createConnection(_clientId, username,
175 password);
176 _connectionId = _connection.getConnectionId();
177 }
178
179 /***
180 * Returns the connection identifier.
181 *
182 * @return the connection identifier
183 */
184 public long getConnectionId() {
185 return _connectionId;
186 }
187
188 /***
189 * Gets the client identifier for this connection.
190 *
191 * @return the unique client identifier
192 * @throws JMSException if the JMS provider fails to return the client ID
193 * for this connection due to some internal error.
194 */
195 public String getClientID() throws JMSException {
196 ensureOpen();
197 setModified();
198
199 return _clientId;
200 }
201
202 /***
203 * Sets the client identifier for this connection.
204 *
205 * @param clientID the unique client identifier
206 * @throws JMSException if the JMS provider fails to set the
207 * client ID for this connection due to
208 * some internal error.
209 * @throws InvalidClientIDException if the JMS client specifies an invalid
210 * or duplicate client ID.
211 * @throws IllegalStateException if the JMS client attempts to set a
212 * connection's client ID at the wrong time
213 * or when it has been administratively
214 * configured.
215 */
216 public void setClientID(String clientID) throws JMSException {
217 ensureOpen();
218
219
220 if (_clientIdSet) {
221 throw new IllegalStateException(
222 "The client id has already been set");
223 }
224
225 if (_modified) {
226 throw new IllegalStateException(
227 "The client identifier must be set before any other "
228 + "operation is performed");
229 }
230
231 _connection.setClientID(clientID);
232 _clientId = clientID;
233 _clientIdSet = true;
234
235 }
236
237
238 /***
239 * Returns the metadata for this connection.
240 *
241 * @return the connection metadata
242 * @throws JMSException if the JMS provider fails to get the connection
243 * metadata for this connection.
244 */
245 public ConnectionMetaData getMetaData() throws JMSException {
246 ensureOpen();
247 setModified();
248 return _metaData;
249 }
250
251 /***
252 * Returns the <code>ExceptionListener</code> for this connection.
253 *
254 * @return the <code>ExceptionListener</code> for this connection, or
255 * <code>null</code> if none is associated with this connection.
256 * @throws JMSException if the JMS provider fails to get the
257 * <code>ExceptionListener</code> for this connection.
258 */
259 public ExceptionListener getExceptionListener() throws JMSException {
260 ensureOpen();
261 setModified();
262 return _exceptionListener;
263 }
264
265 /***
266 * Sets an exception listener for this connection.
267 *
268 * @param listener the exception listener
269 * @throws JMSException if the JMS provider fails to set the exception
270 * listener for this connection.
271 */
272 public void setExceptionListener(ExceptionListener listener)
273 throws JMSException {
274 ensureOpen();
275 setModified();
276 _exceptionListener = listener;
277 }
278
279 /***
280 * Notify the exception listener of a JMSException. If the exception
281 * listener is not set then ignore it.
282 *
283 * @param message message to deliver
284 */
285 public void notifyExceptionListener(JMSException message) {
286
287 if (message.getErrorCode() != null &&
288 message.getErrorCode().equals(
289 JmsErrorCodes.CONNECTION_TO_SERVER_DROPPED)) {
290
291
292 try {
293 close();
294 } catch (JMSException exception) {
295 _log.error(exception.getMessage(), exception);
296 }
297 }
298
299
300 if (_exceptionListener != null) {
301 _exceptionListener.onException(message);
302 }
303 }
304
305 /***
306 * Starts (or restarts) a connection's delivery of incoming messages. A call
307 * to <code>start</code> on a connection that has already been started is
308 * ignored.
309 *
310 * @throws JMSException if the JMS provider fails to start message delivery
311 * due to some internal error.
312 */
313 public synchronized void start() throws JMSException {
314 ensureOpen();
315 setModified();
316
317 try {
318 if (_stopped) {
319
320 Iterator iterator = _sessions.iterator();
321 while (iterator.hasNext()) {
322 JmsSession session = (JmsSession) iterator.next();
323 session.start();
324 }
325
326 _stopped = false;
327 }
328 } catch (JMSException exception) {
329
330
331 throw exception;
332 }
333 }
334
335 /***
336 * Temporarily stops a connection's delivery of incoming messages. Delivery
337 * can be restarted using the connection's <code>start</code> method. When
338 * the connection is stopped, delivery to all the connection's message
339 * consumers is inhibited: synchronous receives block, and messages are not
340 * delivered to message listeners.
341 * <p/>
342 * <P>This call blocks until receives and/or message listeners in progress
343 * have completed.
344 * <p/>
345 * <P>Stopping a connection has no effect on its ability to send messages. A
346 * call to <code>stop</code> on a connection that has already been stopped
347 * is ignored.
348 *
349 * @throws JMSException if the JMS provider fails to stop message delivery
350 * due to some internal error.
351 */
352 public synchronized void stop() throws JMSException {
353 ensureOpen();
354 setModified();
355
356 if (!_stopped) {
357
358 synchronized (_sessions) {
359 Iterator iterator = _sessions.iterator();
360 while (iterator.hasNext()) {
361 JmsSession session = (JmsSession) iterator.next();
362 session.stop();
363 }
364 }
365
366 _stopped = true;
367 }
368 }
369
370 /***
371 * Closes the connection.
372 * <P>When this method is invoked, it should not return until message
373 * processing has been shut down in an orderly fashion. This means that all
374 * message listeners that may have been running have returned, and that all
375 * pending receives have returned. A close terminates all pending message
376 * receives on the connection's sessions' consumers. The receives may return
377 * with a message or with null, depending on whether there was a message
378 * available at the time of the close. If one or more of the connection's
379 * sessions' message listeners is processing a message at the time when
380 * connection <code>close</code> is invoked, all the facilities of the
381 * connection and its sessions must remain available to those listeners
382 * until they return control to the JMS provider.
383 * <p/>
384 * <P>Closing a connection causes any of its sessions' transactions in
385 * progress to be rolled back. In the case where a session's work is
386 * coordinated by an external transaction manager, a session's
387 * <code>commit</code> and <code>rollback</code> methods are not used and
388 * the result of a closed session's work is determined later by the
389 * transaction manager.
390 * <p/>
391 * Closing a connection does NOT force an acknowledgment of
392 * client-acknowledged sessions.
393 * <p/>
394 * <P>Invoking the <code>acknowledge</code> method of a received message
395 * from a closed connection's session must throw an
396 * <code>IllegalStateException</code>.
397 * Closing a closed connection must NOT throw an exception.
398 *
399 * @throws JMSException if the JMS provider fails to close the connection
400 * due to some internal error.
401 */
402 public synchronized void close() throws JMSException {
403 if (!_closed) {
404
405
406 stop();
407
408
409 JmsSession[] sessions = null;
410 synchronized (_sessions) {
411 sessions = (JmsSession[]) _sessions.toArray(new JmsSession[0]);
412 }
413 for (int i = 0; i < sessions.length; ++i) {
414 sessions[i].close();
415
416
417 }
418
419
420 getServerConnection().close();
421 _connection = null;
422
423
424
425 _factory.removeConnection(this);
426 _factory = null;
427
428
429
430 _closed = true;
431 }
432 }
433
434 /***
435 * Creates a <code>Session</code> object.
436 *
437 * @param transacted indicates whether the session is transacted
438 * @param acknowledgeMode indicates whether the consumer or the client will
439 * acknowledge any messages it receives; ignored if
440 * the session is transacted. Legal values are
441 * <code>Session.AUTO_ACKNOWLEDGE</code>,
442 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
443 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
444 * @return a newly created session
445 * @throws JMSException if the <code>Connection</code> object fails to
446 * create a session due to some internal error or lack
447 * of support for the specific transaction and
448 * acknowledgement mode.
449 * @see Session#AUTO_ACKNOWLEDGE
450 * @see Session#CLIENT_ACKNOWLEDGE
451 * @see Session#DUPS_OK_ACKNOWLEDGE
452 */
453 public Session createSession(boolean transacted, int acknowledgeMode)
454 throws JMSException {
455 ensureOpen();
456 setModified();
457
458 JmsSession session = new JmsSession(this, transacted, acknowledgeMode);
459
460
461 if (!isStopped()) {
462 session.start();
463 }
464
465
466 addSession(session);
467
468 return session;
469 }
470
471 /***
472 * Creates a connection consumer for this connection (optional operation).
473 * This is an expert facility not used by regular JMS clients.
474 *
475 * @param destination the destination to access
476 * @param messageSelector only messages with properties matching the message
477 * selector expression are delivered. A value of
478 * null or an empty string indicates that there is no
479 * message selector for the message consumer.
480 * @param sessionPool the server session pool to associate with this
481 * connection consumer
482 * @param maxMessages the maximum number of messages that can be
483 * assigned to a server session at one time
484 * @return the connection consumer
485 * @throws JMSException if the <code>Connection</code> object fails to
486 * create a connection consumer due to some internal
487 * error or invalid arguments for
488 * <code>sessionPool</code> and
489 * <code>messageSelector</code>.
490 * @throws InvalidDestinationException if an invalid destination is
491 * specified.
492 * @throws InvalidSelectorException if the message selector is invalid.
493 */
494 public ConnectionConsumer createConnectionConsumer(
495 Destination destination, String messageSelector,
496 ServerSessionPool sessionPool, int maxMessages)
497 throws JMSException {
498 ensureOpen();
499 setModified();
500 return new JmsConnectionConsumer(this, destination, sessionPool,
501 messageSelector, maxMessages);
502 }
503
504 /***
505 * Create a durable connection consumer for this connection.
506 *
507 * @param topic topic to access
508 * @param subscriptionName durable subscription name
509 * @param messageSelector only messages with properties matching the
510 * message selector expression are delivered. A
511 * value of null or an empty string indicates that
512 * there is no message selector for the message
513 * consumer.
514 * @param sessionPool the server session pool to associate with this
515 * durable connection consumer
516 * @param maxMessages the maximum number of messages that can be
517 * assigned to a server session at one time
518 * @return the durable connection consumer
519 * @throws JMSException if the <code>Connection</code> object fails to
520 * create a connection consumer due to some internal
521 * error or invalid arguments for
522 * <code>sessionPool</code> and
523 * <code>messageSelector</code>.
524 * @throws InvalidDestinationException if an invalid destination is
525 * specified.
526 * @throws InvalidSelectorException if the message selector is invalid.
527 */
528 public ConnectionConsumer createDurableConnectionConsumer(
529 Topic topic, String subscriptionName, String messageSelector,
530 ServerSessionPool sessionPool, int maxMessages)
531 throws JMSException {
532 ensureOpen();
533 setModified();
534 return new JmsConnectionConsumer(this, topic, subscriptionName,
535 sessionPool, messageSelector,
536 maxMessages);
537 }
538
539 /***
540 * Returns the server connection.
541 *
542 * @return the server connection
543 * @throws JMSException if the connection is <code>null</code>
544 */
545 protected ServerConnection getServerConnection() throws JMSException {
546 if (_connection == null) {
547 throw new JMSException("Connection closed");
548 }
549
550 return _connection;
551 }
552
553 /***
554 * Add the specified session to the list of managed sessions.
555 *
556 * @param session session to register
557 */
558 protected void addSession(JmsSession session) {
559 synchronized (_sessions) {
560 _sessions.add(session);
561 }
562 }
563
564 /***
565 * Remove the specified session from the list of managed sessions. If it
566 * doesn't exist then fail silently.
567 *
568 * @param session session to remove
569 */
570 protected void removeSession(JmsSession session) {
571 synchronized (_sessions) {
572 _sessions.remove(session);
573 }
574 }
575
576 /***
577 * Returns the running state of the connection.
578 *
579 * @return <code>true</code> if stopped
580 */
581 protected boolean isStopped() {
582 return _stopped;
583 }
584
585 /***
586 * Flags this connection as being modified. Subsequent attempts to invoke
587 * {@link #setClientID} will result in an <code>IllegalStateException</code>
588 * being thrown.
589 */
590 protected void setModified() {
591 _modified = true;
592 }
593
594 /***
595 * Delete the temporary destination and all the registered sessions
596 * consumers waiting to receive messages from this destination will be
597 * stopped.
598 * <p/>
599 * It will throw a JMSException if the specified destination is not
600 * temporary or if the destination is null or if the destination is not
601 * owned by this connection
602 *
603 * @param destination temporary destination to delete
604 * @throws JMSException
605 */
606 protected synchronized void deleteTemporaryDestination(
607 JmsDestination destination)
608 throws JMSException {
609 if ((destination != null) &&
610 (destination instanceof JmsTemporaryDestination)) {
611 JmsTemporaryDestination temp_dest =
612 (JmsTemporaryDestination) destination;
613
614
615
616 if (temp_dest.getOwningConnection() == this) {
617
618
619 } else {
620 throw new JMSException(
621 "The temp destination cannot be used outside the scope "
622 + "of the connection creating it");
623 }
624 } else {
625 throw new JMSException("The destination is not temporary");
626 }
627 }
628
629 /***
630 * Verifies that the connection is open.
631 *
632 * @throws IllegalStateException if the connection is closed
633 */
634 protected void ensureOpen() throws IllegalStateException {
635 if (_closed) {
636 throw new IllegalStateException(
637 "Cannot perform operation - session has been closed");
638 }
639 }
640
641 }
642