View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: IpcJmsSessionStub.java,v 1.20 2003/08/25 03:30:33 tanderson Exp $
44   *
45   * Date         Author  Changes
46   * $Date        jimm    Created
47   */
48  package org.exolab.jms.client.mipc;
49  
50  import java.util.Vector;
51  
52  import javax.jms.JMSException;
53  import javax.jms.Message;
54  import javax.jms.MessageListener;
55  import javax.transaction.xa.XAException;
56  import javax.transaction.xa.XAResource;
57  import javax.transaction.xa.Xid;
58  
59  import org.apache.commons.logging.Log;
60  import org.apache.commons.logging.LogFactory;
61  
62  import org.exolab.core.ipc.IpcIfc;
63  import org.exolab.core.mipc.ObjectChannel;
64  import org.exolab.jms.client.JmsMessageListener;
65  import org.exolab.jms.client.JmsQueue;
66  import org.exolab.jms.client.JmsSessionStubIfc;
67  import org.exolab.jms.client.JmsTopic;
68  import org.exolab.jms.message.MessageImpl;
69  
70  
71  /***
72   * The client side stub implementing the JmsServerSession. All session
73   * requests are passed on to the server. This class also an IPC server
74   * connection when a listener os subscriber is set up.
75   *
76   * <P>Note: There is only one receive connection per client, all
77   * JmsMessages for any queue/topic that the client is interested in
78   * receiving are multiplexed on this single connection. The connection
79   * is not bi-directional, that is no replies are sent back using this
80   * connection. It is assumed that the underlying IPC protocol will confirm
81   * delivery to the client. After that it is the clients responsibility to
82   * ensure correct message processing. Durable messages are acked on a
83   * separate port, reliable messages, are not.
84   *
85   * @version     $Revision: 1.20 $ $Date: 2003/08/25 03:30:33 $
86   * @author      <a href="mailto:mourikis@exolab.org">Jim Mourikis</a>
87   * @see         org.exolab.jms.server.mipc.IpcJmsSessionConnection
88   * @see         org.exolab.core.ipc.Server
89   */
90  public class IpcJmsSessionStub implements JmsSessionStubIfc {
91  
92      /***
93       * The client connection
94       */
95      private IpcIfc _connection = null;
96  
97      /***
98       * The client connection id
99       */
100     private String _clientId;
101 
102     /***
103      * The destination connection id
104      */
105     private String _connectionId;
106 
107     /***
108      * The session id
109      */
110     private String _sessionId;
111 
112     /***
113      * The message dispatcher
114      */
115     private IpcJmsMessageListener _listener;
116 
117     /***
118      * The logger
119      */
120     private static final Log _log = LogFactory.getLog(IpcJmsSessionStub.class);
121 
122 
123     /***
124      * A new session has been established with these ids.
125      *
126      * @param connection the connection to the server.
127      * @param clientId this clients unique id.
128      * @param connectionId this objects connection identifier.
129      * @param sessionId the unique session id for this object.
130      * @param listener the message dispatcher
131      */
132     public IpcJmsSessionStub(IpcIfc connection, String clientId,
133                              String connectionId, String sessionId,
134                              IpcJmsMessageListener listener) {
135         _connection = connection;
136         _clientId = clientId;
137         _connectionId = connectionId;
138         _sessionId = sessionId;
139         _listener = listener;
140     }
141 
142 
143     /***
144      * Get the client Id
145      *
146      * @return         String  The client id
147      * @exception      JMSException On error
148      *
149      */
150     public String getClientId() throws JMSException {
151         return _clientId;
152     }
153 
154 
155     /***
156      * Get the sessionId
157      *
158      * @return         String  The session id of this session
159      * @exception      JMSException On error
160      *
161      */
162     public String getSessionId() throws JMSException {
163         return _sessionId;
164     }
165 
166     // implementation of JmsSessionStubIfc.beforeClose
167     public void beforeClose()
168         throws JMSException {
169     }
170 
171     /***
172      * Close this session.
173      *
174      * @exception    JMSException On error
175      *
176      */
177     public void close() throws JMSException {
178         Vector v = pack("close", 0);
179         synchronized (_connection) {
180             send(v);
181             checkReply("close");
182         }
183 
184         _listener.closeSession(_sessionId);
185     }
186 
187 
188     /***
189      * Extract the destination and messageId for the message and send back
190      * an ack.
191      *
192      * @param       clientId        the identity ofthe client
193      * @param       messageId       the message identity to ack
194      * @exception   JMSException
195      */
196     public void acknowledgeMessage(long clientId, String messageId)
197         throws JMSException {
198         Vector v = pack("acknowledgeMessage", 2);
199         //Send back the ack to the consumer endpoint that forwarded the
200         // message.
201         v.add(new Long(clientId));
202         v.add(messageId);
203         synchronized (_connection) {
204             send(v);
205             checkReply("acknowledgeMessage");
206         }
207     }
208 
209 
210     // implementation of JmsSessionStubIfc.sendMessage
211     public void sendMessage(Message message) throws JMSException {
212         Vector v = pack("sendMessage", 1);
213         v.add(message);
214         synchronized (_connection) {
215             send(v);
216             if (((MessageImpl) message).isPersistent()) {
217                 checkReply("sendMessage");
218             }
219         }
220     }
221 
222     // implementation of JmsSessionStubIfc.sendMessages
223     public void sendMessages(Vector messages) throws JMSException {
224         Vector v = pack("sendMessages", 1);
225         v.add(messages);
226         synchronized (_connection) {
227             send(v);
228             // always check for a reply
229             checkReply("sendMessages");
230         }
231     }
232 
233     // implementation of JmsSessionStubIfc.receiveMessage
234     public Message receiveMessage(long clientId, long wait)
235         throws JMSException {
236         Message message = null;
237 
238         Vector v = pack("receiveMessage", 2);
239         v.add(new Long(clientId));
240         v.add(new Long(wait));
241         synchronized (_connection) {
242             send(v);
243             Vector reply = checkReply("receiveMessage");
244             Boolean result = (Boolean) reply.get(0);
245 
246             // check that the call completed before
247             // extracting the message
248             if (result.booleanValue()) {
249                 message = (Message) reply.get(1);
250             }
251         }
252 
253         return message;
254     }
255 
256     // implementation of JmsSessionStubIfc.receiveMessages
257     public Vector receiveMessages(long clientId, int count)
258         throws JMSException {
259         Vector messages = null;
260 
261         Vector v = pack("receiveMessages", 2);
262         v.add(new Long(clientId));
263         v.add(new Integer(count));
264         synchronized (_connection) {
265             send(v);
266             Vector reply = checkReply("receiveMessages");
267             Boolean result = (Boolean) reply.get(0);
268 
269             // check that the call completed before
270             // extracting the message
271             if (result.booleanValue()) {
272                 messages = (Vector) reply.get(1);
273             }
274         }
275 
276         return messages;
277     }
278 
279     /***
280      * Create a new Queue.
281      *
282      * @param         queue  The queue to create.
283      * @exception     JMSException On error
284      *
285      */
286     public void createQueue(JmsQueue queue) throws JMSException {
287         Vector v = pack("createQueue", 1);
288         v.add(queue);
289         synchronized (_connection) {
290             send(v);
291             checkReply("createQueue");
292         }
293     }
294 
295 
296     /***
297      * Create a new topic
298      *
299      * @param         topic  The topic to create.
300      * @exception     JMSException On error
301      *
302      */
303     public void createTopic(JmsTopic topic) throws JMSException {
304         Vector v = pack("createTopic", 1);
305         v.add(topic);
306         synchronized (_connection) {
307             send(v);
308             checkReply("createTopic");
309         }
310     }
311 
312 
313     /***
314      * Create a receiver. Get the IP address of the machine the consumer runs
315      * on, and the port it is listening too, and pass this to the server, so
316      * it can make a new dedicated connection for sending all messages to
317      * this client.
318      *
319      * @param         queue  The queue to listen to
320      * @param         clientId The session allocated identifier
321      * @param         selector The selector to filter messages (may be null)
322      * @exception     JMSException On error
323      *
324      */
325     public void createReceiver(JmsQueue queue, long clientId, String selector)
326         throws JMSException {
327         _listener.start();
328         Vector v = pack("createReceiver", 5);
329         v.add(queue);
330         v.add(new Long(clientId));
331         v.add(selector);
332         v.add(((ObjectChannel) _connection).getConnection().getHost());
333         v.add(String.valueOf
334             (((ObjectChannel) _connection).getConnection().getPort()));
335         v.add("n");
336         synchronized (_connection) {
337             send(v);
338             checkReply("createReceiver");
339         }
340     }
341 
342 
343     /***
344      * Create a queue sender
345      *
346      * @param         queue  The queue to send messages to
347      * @exception     JMSException On error
348      *
349      */
350     public void createSender(JmsQueue queue) throws JMSException {
351         Vector v = pack("createSender", 1);
352         v.add(queue);
353         synchronized (_connection) {
354             send(v);
355             checkReply("createSender");
356         }
357     }
358 
359     /***
360      * Create a queue browser for this session. This allows clients to browse
361      * a queue without removing any messages.
362      * <p>
363      *
364      * You cannot create more than one queue browser for the same queue
365      * in a single session.
366      *
367      * @param       queue               queue to browse
368      * @param       clientId            idenity of the client
369      * @param       selector            message selector. This may be null
370      * @exception   JMSException
371      */
372     public void createBrowser(JmsQueue queue, long clientId, String selector)
373         throws JMSException {
374         _listener.start();
375         Vector v = pack("createBrowser", 3);
376         v.add(queue);
377         v.add(new Long(clientId));
378         v.add(selector);
379         v.add(((ObjectChannel) _connection).getConnection().getHost());
380         v.add(String.valueOf
381             (((ObjectChannel) _connection).getConnection().getPort()));
382         v.add("n");
383         synchronized (_connection) {
384             send(v);
385             checkReply("createBrowser");
386         }
387     }
388 
389     /***
390      * Delete the receiver for this queue.
391      *
392      * @param         clientId  The id of the client to delete
393      * @exception     JMSException On error
394      *
395      */
396     public void deleteReceiver(long clientId) throws JMSException {
397         Vector v = pack("deleteReceiver", 1);
398         v.add(new Long(clientId));
399         synchronized (_connection) {
400             send(v);
401             checkReply("deleteReceiver");
402         }
403     }
404 
405 
406     /***
407      * Delete the queue browser associated with the specified queue from
408      * the session.
409      * If the corresponding queue does not exist or it cannot be deleted,
410      * then throw a JMSException
411      *
412      * @param       clientId            identity of the browser
413      * @exception   JMSException
414      */
415     public void deleteBrowser(long clientId)
416         throws JMSException {
417         Vector v = pack("deleteBrowser", 1);
418         v.add(new Long(clientId));
419         synchronized (_connection) {
420             send(v);
421             checkReply("deleteBrowser");
422         }
423     }
424 
425     /***
426      * Create a new topic subscriber
427      *
428      * @param         topic  The topic to subscribe to
429      * @param         name The subscribers name
430      * @param         client The client identity
431      * @param         selector The selector to filter messages (may be null)
432      * @exception     JMSException On error
433      *
434      */
435     public void createSubscriber(JmsTopic topic, String name, long clientId,
436                                  String selector, boolean noLocal)
437         throws JMSException {
438         _listener.start();
439         Vector v = pack("createSubscriber", 5);
440         v.add(topic);
441         v.add(name);
442         v.add(new Long(clientId));
443         v.add(selector);
444         v.add(new Boolean(noLocal));
445         v.add(((ObjectChannel) _connection).getConnection().getHost());
446         v.add(String.valueOf
447             (((ObjectChannel) _connection).getConnection().getPort()));
448         v.add("n");
449         synchronized (_connection) {
450             send(v);
451             checkReply("createSubscriber");
452         }
453     }
454 
455 
456     /***
457      * Create a new topic publisher
458      *
459      * @param         topic  The topic to publish to
460      * @exception     JMSException On error
461      *
462      */
463     public void createPublisher(JmsTopic topic) throws JMSException {
464         Vector v = pack("createPublisher", 1);
465         v.add(topic);
466         synchronized (_connection) {
467             send(v);
468             checkReply("createPublisher");
469         }
470     }
471 
472 
473     /***
474      * Unsubscribe a durable subscription
475      *
476      * @param       name                the name used to identify the
477      *                                  subscription
478      * @exception   JMSException        if the subscription cannot be removed
479      */
480     public void unsubscribe(String name)
481         throws JMSException {
482         Vector v = pack("unsubscribe", 1);
483         v.add(name);
484         synchronized (_connection) {
485             send(v);
486             checkReply("unsubscribe");
487         }
488     }
489 
490     /***
491      * Delete the subscriber for this topic
492      *
493      * @param         clientId - the client identity
494      * @exception     JMSException On error
495      *
496      */
497     public void deleteSubscriber(long clientId) throws JMSException {
498         Vector v = pack("deleteSubscriber", 1);
499         v.add(new Long(clientId));
500         synchronized (_connection) {
501             send(v);
502             checkReply("deleteSubscriber");
503         }
504     }
505 
506     /***
507      * Stop message delivery for this session.
508      *
509      * @exception     JMSException On error
510      *
511      */
512     public void stopMessageDelivery() throws JMSException {
513         Vector v = pack("stopMessageDelivery", 0);
514         synchronized (_connection) {
515             send(v);
516             checkReply("stopMessageDelivery");
517         }
518     }
519 
520 
521     /***
522      * Start message delivery for this session.
523      *
524      * @exception     JMSException On error
525      *
526      */
527     public void startMessageDelivery() throws JMSException {
528         Vector v = pack("startMessageDelivery", 0);
529         synchronized (_connection) {
530             send(v);
531             checkReply("startMessageDelivery");
532         }
533     }
534 
535     // implementation of JmsSessionStubIfc.recover
536     public void recover() throws JMSException {
537         Vector v = pack("recover", 0);
538         synchronized (_connection) {
539             send(v);
540             checkReply("recover");
541         }
542     }
543 
544     // implementation of JmsSessionStubIfc.commit
545     public void commit() throws JMSException {
546         Vector v = pack("commit", 0);
547         synchronized (_connection) {
548             send(v);
549             checkReply("commit");
550         }
551     }
552 
553     // implementation of JmsSessionStubIfc.rollback
554     public void rollback() throws JMSException {
555         Vector v = pack("rollback", 0);
556         synchronized (_connection) {
557             send(v);
558             checkReply("rollback");
559         }
560     }
561 
562     // implementation of JmsSessionStubIfc.commit
563     public void commit(Xid xid, boolean onePhase)
564         throws XAException {
565         try {
566             Vector v = pack("xa_commit", 2);
567             v.add(xid);
568             v.add(new Boolean(onePhase));
569             synchronized (_connection) {
570                 send(v);
571                 checkReply("xa_commit");
572             }
573         } catch (JMSException exception) {
574             // rethrow as a XAException
575             throw new XAException("Failed to commit session " +
576                 exception);
577         }
578     }
579 
580     // implementation of JmsSessionStubIfc.end
581     public void end(Xid xid, int flags)
582         throws XAException {
583         try {
584             Vector v = pack("xa_end", 2);
585             v.add(xid);
586             v.add(new Integer(flags));
587             synchronized (_connection) {
588                 send(v);
589                 checkReply("xa_end");
590             }
591         } catch (JMSException exception) {
592             // rethrow as a XAException
593             throw new XAException("Failed to commit session " +
594                 exception);
595         }
596     }
597 
598     // implementation of JmsSessionStubIfc.forget
599     public void forget(Xid xid)
600         throws XAException {
601         try {
602             Vector v = pack("xa_forget", 1);
603             v.add(xid);
604             synchronized (_connection) {
605                 send(v);
606                 checkReply("xa_forget");
607             }
608         } catch (JMSException exception) {
609             // rethrow as a XAException
610             throw new XAException("Failed to commit session " +
611                 exception);
612         }
613     }
614 
615     // implementation of JmsSessionStubIfc.getResourceManagerId
616     public String getResourceManagerId() throws XAException {
617         String rid = null;
618 
619         try {
620             Vector v = pack("xa_getResourceManagerId", 0);
621             synchronized (_connection) {
622                 send(v);
623                 Vector reply = checkReply("xa_getResourceManagerId");
624                 Boolean result = (Boolean) reply.get(0);
625 
626                 // check that the call completed before
627                 // extracting the message
628                 if (result.booleanValue()) {
629                     rid = (String) reply.get(1);
630                 }
631             }
632         } catch (JMSException exception) {
633             // rethrow as a XAException
634             throw new XAException("Failed to getResourceManagerId session " +
635                 exception);
636         }
637 
638         return rid;
639     }
640 
641     // implementation of JmsSessionStubIfc.getTransactionTimeout
642     public int getTransactionTimeout()
643         throws XAException {
644         int timeout = 0;
645 
646         try {
647             Vector v = pack("xa_getTransactionTimeout", 0);
648             synchronized (_connection) {
649                 send(v);
650                 Vector reply = checkReply("xa_getTransactionTimeout");
651                 Boolean result = (Boolean) reply.get(0);
652 
653                 // check that the call completed before
654                 // extracting the message
655                 if (result.booleanValue()) {
656                     timeout = ((Integer) reply.get(1)).intValue();
657                 }
658             }
659         } catch (JMSException exception) {
660             // rethrow as a XAException
661             throw new XAException("Failed to getTransactionTimeout session " +
662                 exception);
663         }
664 
665         return timeout;
666     }
667 
668     // implementation of JmsSessionStubIfc.prepare
669     public int prepare(Xid xid)
670         throws XAException {
671         int value = 0;
672 
673         try {
674             Vector v = pack("xa_prepare", 1);
675             v.add(xid);
676             synchronized (_connection) {
677                 send(v);
678                 Vector reply = checkReply("xa_prepare");
679                 Boolean result = (Boolean) reply.get(0);
680 
681                 // check that the call completed before
682                 // extracting the message
683                 if (result.booleanValue()) {
684                     value = ((Integer) reply.get(1)).intValue();
685                 }
686             }
687         } catch (JMSException exception) {
688             // rethrow as a XAException
689             throw new XAException("Failed to prepare session " +
690                 exception);
691         }
692 
693         return value;
694     }
695 
696     // implementation of JmsSessionStubIfc.recover
697     public Xid[] recover(int flag)
698         throws XAException {
699         Xid[] xids = new Xid[0];
700 
701         try {
702             Vector v = pack("xa_recover", 1);
703             v.add(new Integer(flag));
704             synchronized (_connection) {
705                 send(v);
706                 Vector reply = checkReply("xa_recover");
707                 Boolean result = (Boolean) reply.get(0);
708 
709                 // check that the call completed before
710                 // extracting the message
711                 if (result.booleanValue()) {
712                     xids = (Xid[]) reply.get(1);
713                 }
714             }
715         } catch (JMSException exception) {
716             // rethrow as a XAException
717             throw new XAException("Failed to recover session " +
718                 exception);
719         }
720 
721         return xids;
722     }
723 
724     // implementation of JmsSessionStubIfc.rollback
725     public void rollback(Xid xid)
726         throws XAException {
727         try {
728             Vector v = pack("xa_rollback", 1);
729             v.add(xid);
730             synchronized (_connection) {
731                 send(v);
732                 checkReply("xa_rollback");
733             }
734         } catch (JMSException exception) {
735             // rethrow as a XAException
736             throw new XAException("Failed to rollback session " +
737                 exception);
738         }
739     }
740 
741     // implementation of JmsSessionStubIfc.setTransactionTimeout
742     public boolean setTransactionTimeout(int seconds)
743         throws XAException {
744         boolean value = false;
745 
746         try {
747             Vector v = pack("xa_setTransactionTimeout", 1);
748             v.add(new Integer(seconds));
749             synchronized (_connection) {
750                 send(v);
751                 Vector reply = checkReply("xa_setTransactionTimeout");
752                 Boolean result = (Boolean) reply.get(0);
753 
754                 // check that the call completed before
755                 // extracting the message
756                 if (result.booleanValue()) {
757                     value = ((Boolean) reply.get(1)).booleanValue();
758                 }
759             }
760         } catch (JMSException exception) {
761             // rethrow as a XAException
762             throw new XAException("Failed to setTransactionTimeout " +
763                 exception);
764         }
765 
766         return value;
767     }
768 
769     // implementation of JmsSessionStubIfc.start
770     public void start(Xid xid, int flags)
771         throws XAException {
772         try {
773             Vector v = pack("xa_start", 2);
774             v.add(xid);
775             v.add(new Integer(flags));
776             synchronized (_connection) {
777                 send(v);
778                 checkReply("xa_start");
779             }
780         } catch (JMSException exception) {
781             // rethrow as a XAException
782             throw new XAException("Failed to start session " +
783                 exception);
784         }
785     }
786 
787     /***
788      * Set a message listener to be called when new Messages arrive from the
789      * server.
790      *
791      * @param listener A reference to the client listener.
792      */
793     public void setMessageListener(JmsMessageListener listener) {
794         _listener.setListener(_sessionId, listener);
795     }
796 
797     // implementation of JmsSessionStubIfc.enableAsynchronousDelivery
798     public void enableAsynchronousDelivery(long clientId, String id,
799                                            boolean enable)
800         throws JMSException {
801         Vector v = pack("enableAsynchronousDelivery", 3);
802         v.add(new Long(clientId));
803         v.add(id);
804         v.add(new Boolean(enable));
805         synchronized (_connection) {
806             send(v);
807             checkReply("enableAsynchronousDelivery");
808         }
809     }
810 
811     /***
812      * Pack all the data that is required by the server in a vector.
813      * Set the size of the vector to be exactly the right size for efficiency.
814      *
815      * @param method The function to activate on the server.
816      * @param numParams The number of paramaters this method will require.
817      * @return Vector The vector containing all the data.
818      *
819      */
820     private Vector pack(String method, int numParams) {
821         Vector v = new Vector(5 + numParams);
822         v.add("org.exolab.jms.server.mipc.IpcJmsSessionConnection");
823         v.add(method);
824         v.add(_clientId);
825         v.add(_connectionId);
826         v.add(_sessionId);
827         return v;
828     }
829 
830     /***
831      * A convenience method to check the success of operations which return
832      * a true on sucess.
833      *
834      * @param method The requested server function.
835      * @exception JMSException On any failure.
836      *
837      */
838     private Vector checkReply(String method) throws JMSException {
839         Vector v = null;
840         try {
841             v = (Vector) _connection.receive();
842         } catch (Exception err) {
843             // rethrow as a JMSException
844             throw new JMSException("Operation " + method + " failed:\n" + err);
845         }
846 
847         if (v != null) {
848             Boolean b = (Boolean) v.get(0);
849             if (!b.booleanValue()) {
850                 if (v.get(1) instanceof JMSException) {
851                     throw (JMSException) v.get(1);
852                 } else {
853                     throw new JMSException("Operation " + method +
854                         " failed:\n" + v.get(1));
855                 }
856             }
857         } else {
858             throw new JMSException("Unknown connection error for " + method);
859         }
860 
861         return v;
862     }
863 
864     /***
865      * A convenience method to send a packed command to the server.
866      *
867      * @throws JMSException for any failure.
868      */
869     private void send(Vector v) throws JMSException {
870         try {
871             _connection.send(v);
872         } catch (Exception err) {
873             // rethrow as a JMSException
874             throw new JMSException("Operation Failed" + err);
875         }
876     }
877 
878 } //-- IpcJmsSessionStub