View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2001,2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: HttpJmsSessionStub.java,v 1.16 2003/08/25 03:35:45 tanderson Exp $
44   *
45   * Date             Author      Changes
46   * 12 Oct 2001      mourikis    Created
47   */
48  package org.exolab.jms.client.http;
49  
50  import java.io.Serializable;
51  import java.net.InetAddress;
52  import java.net.URL;
53  import java.net.UnknownHostException;
54  import java.util.Vector;
55  
56  import javax.jms.JMSException;
57  import javax.jms.Message;
58  import javax.jms.MessageListener;
59  import javax.transaction.xa.XAException;
60  import javax.transaction.xa.XAResource;
61  import javax.transaction.xa.Xid;
62  
63  import org.apache.commons.logging.Log;
64  import org.apache.commons.logging.LogFactory;
65  
66  import org.exolab.core.http.HttpClient;
67  import org.exolab.core.ipc.IpcIfc;
68  import org.exolab.core.ipc.NotifierIfc;
69  import org.exolab.core.ipc.Server;
70  import org.exolab.jms.client.JmsMessageListener;
71  import org.exolab.jms.client.JmsQueue;
72  import org.exolab.jms.client.JmsSessionStubIfc;
73  import org.exolab.jms.client.JmsTopic;
74  import org.exolab.jms.jndi.JndiConstants;
75  import org.exolab.jms.message.MessageImpl;
76  
77  
78  /***
79   * The client side stub implementing the JmsServerSession. All session
80   * requests are passed on to the server.
81   *
82   * @version     $Revision: 1.16 $ $Date: 2003/08/25 03:35:45 $
83   * @author      <a href="mailto:mourikis@exolab.org">Jim Mourikis</a>
84   * @see         org.exolab.core.http.HttpClient
85   */
86  public class HttpJmsSessionStub implements JmsSessionStubIfc, NotifierIfc {
87  
88      /***
89       * A reference to the client connection
90       */
91      private IpcIfc _connection = null;
92  
93      /***
94       * The client connection id
95       */
96      private String _clientId;
97  
98      /***
99       * The destination connection id
100      */
101     private String _connectionId;
102 
103     /***
104      * The session id
105      */
106     private String _sessionId;
107 
108     /***
109      * The tcp connection for receiving messages
110      */
111     private Server _msgReceiver = null;
112 
113     /***
114      *  The listener
115      */
116     private JmsMessageListener _listener = null;
117 
118     /***
119      * The url of the client's servlet. This gets passed to the server to
120      * allow the server to connect back via client's servlet.
121      * This host and port of the URL default to that of the server's web
122      * This can be overriden by specifying the system property
123      * <em>JndiConstants.HTTP_CLIENT_URL_PROPERTY</em>
124      */
125     private String _url;
126 
127     /***
128      * The host of the machine running the client. Defaults to the local
129      * host address. This can be overriden by specifying the system property
130      * <em>JndiConstants.HTTP_CLIENT_SERVER_ADDRESS_PROPERTY</em>
131      * which is useful if the client machine has multiple NICS.
132      */
133     private String _host = "localhost";
134 
135     /***
136      * The client servlet
137      */
138     private static final String CLIENT_SERVLET = "openjms/OpenJMSClient";
139 
140     /***
141      * The logger
142      */
143     private static final Log _log =
144         LogFactory.getLog(HttpJmsSessionStub.class);
145 
146 
147     /***
148      * A new session has been established with these ids.
149      *
150      * @param connection  The http connection to the server.
151      * @param clientId This clients unique id.
152      * @param connectionId This objects connection identifier.
153      * @param sessionId The unique session id for this object.
154      */
155     public HttpJmsSessionStub(IpcIfc connection, String clientId,
156                               String connectionId, String sessionId) {
157         _connection = connection;
158         _clientId = clientId;
159         _connectionId = connectionId;
160         _sessionId = sessionId;
161         String str = null;
162 
163         String clientHost = System.getProperty(
164             JndiConstants.HTTP_CLIENT_SERVER_ADDRESS_PROPERTY);
165         if (clientHost != null) {
166             _host = clientHost;
167         }
168         if (_host.equals("localhost")) {
169             try {
170                 _host = InetAddress.getLocalHost().getHostAddress();
171             } catch (UnknownHostException ignore) {
172             }
173         }
174 
175         String clientWebServerURL = System.getProperty(
176             JndiConstants.HTTP_CLIENT_URL_PROPERTY);
177 
178         if (clientWebServerURL == null) {
179             // default to the host and port of the server's webserver
180             URL url = ((HttpClient) _connection).getURL();
181             clientWebServerURL = url.getProtocol() + "://" + url.getHost() +
182                 ":" + url.getPort() + "/";
183         }
184 
185         if (!clientWebServerURL.endsWith("/")) {
186             clientWebServerURL += "/";
187         }
188         _url = clientWebServerURL + CLIENT_SERVLET;
189     }
190 
191     /***
192      * Get the client Id
193      *
194      * @return the client id
195      */
196     public String getClientId() {
197         return _clientId;
198     }
199 
200     /***
201      * Get the sessionId
202      *
203      * @return the id of this session
204      */
205     public String getSessionId() {
206         return _sessionId;
207     }
208 
209     // implementation of JmsSessionStubIfc.beforeClose
210     public void beforeClose() throws JMSException {
211     }
212 
213     /***
214      * Close this session.
215      *
216      * @throws JMSException if the close fails
217      */
218     public void close() throws JMSException {
219         Vector v = pack("close", 0);
220         synchronized (_connection) {
221             send(v, true);
222             checkReply("close");
223         }
224 
225         _listener = null;
226         if (_msgReceiver != null) {
227             if (_log.isDebugEnabled()) {
228                 _log.debug("Stopping receiver on port="
229                            + _msgReceiver.getPort());
230             }
231             _msgReceiver.stop();
232             _msgReceiver = null;
233         }
234     }
235 
236     /***
237      * Acknowledge a message
238      *
239      * @param clientId the identity of the client
240      * @param messageId the message identity to ack
241      * @throws JMSException if the message can't be acknowledged
242      */
243     public void acknowledgeMessage(long clientId, String messageId)
244         throws JMSException {
245 
246         Vector v = pack("acknowledgeMessage", 2);
247         //Send back the ack to the consumer endpoint that forwarded the
248         // message.
249         v.add(new Long(clientId));
250         v.add(messageId);
251         synchronized (_connection) {
252             send(v, true);
253             checkReply("acknowledgeMessage");
254         }
255     }
256 
257     // implementation of JmsSessionStubIfc.sendMessage
258     public void sendMessage(Message message) throws JMSException {
259         Vector v = pack("sendMessage", 1);
260         v.add(message);
261         synchronized (_connection) {
262             if (((MessageImpl) message).isPersistent()) {
263                 send(v, true);
264                 checkReply("sendMessage");
265             } else {
266                 send(v, false);
267             }
268 
269         }
270     }
271 
272     // implementation of JmsSessionStubIfc.sendMessages
273     public void sendMessages(Vector messages) throws JMSException {
274         Vector v = pack("sendMessages", 1);
275         v.add(messages);
276         synchronized (_connection) {
277             send(v, true);
278             // always check for a reply
279             checkReply("sendMessages");
280         }
281     }
282 
283     // implementation of JmsSessionStubIfc.receiveMessage
284     public Message receiveMessage(long clientId, long wait)
285         throws JMSException {
286 
287         Message message = null;
288 
289         Vector v = pack("receiveMessage", 2);
290         v.add(new Long(clientId));
291         v.add(new Long(wait));
292         synchronized (_connection) {
293             send(v, true);
294             Vector reply = checkReply("receiveMessage");
295             Boolean result = (Boolean) reply.get(0);
296 
297             // check that the call completed before
298             // extracting the message
299             if (result.booleanValue()) {
300                 message = (Message) reply.get(1);
301             }
302         }
303 
304         return message;
305     }
306 
307     // implementation of JmsSessionStubIfc.receiveMessages
308     public Vector receiveMessages(long clientId, int count)
309         throws JMSException {
310 
311         Vector messages = null;
312 
313         Vector v = pack("receiveMessages", 2);
314         v.add(new Long(clientId));
315         v.add(new Integer(count));
316         synchronized (_connection) {
317             send(v, true);
318             Vector reply = checkReply("receiveMessages");
319             Boolean result = (Boolean) reply.get(0);
320 
321             // check that the call completed before
322             // extracting the message
323             if (result.booleanValue()) {
324                 messages = (Vector) reply.get(1);
325             }
326         }
327 
328         return messages;
329     }
330 
331     /***
332      * Create a new queue
333      *
334      * @param topic the queue to create
335      * @throws JMSException if the queue can't be created
336      */
337     public void createQueue(JmsQueue queue) throws JMSException {
338         Vector v = pack("createQueue", 1);
339         v.add(queue);
340         synchronized (_connection) {
341             send(v, true);
342             checkReply("createQueue");
343         }
344     }
345 
346     /***
347      * Create a new topic
348      *
349      * @param topic the topic to create
350      * @throws JMSException if the topic can't be created
351      */
352     public void createTopic(JmsTopic topic) throws JMSException {
353         Vector v = pack("createTopic", 1);
354         v.add(topic);
355         synchronized (_connection) {
356             send(v, true);
357             checkReply("createTopic");
358         }
359     }
360 
361     /***
362      * Create a receiver. Get the IP address of the machine the consumer runs
363      * on, and the port it is listening to, and pass this to the server, so
364      * it can make a new dedicated connection for sending all messages to
365      * this client.
366      *
367      * @param queue the queue to listen to
368      * @param clientId the session allocated identifier
369      * @param selector the selector to filter messages (may be null)
370      * @throws JMSException if the receiver cannot be created
371      */
372     public void createReceiver(JmsQueue queue, long clientId, String selector)
373         throws JMSException {
374 
375         startReceiver();
376         Vector v = pack("createReceiver", 6);
377         v.add(queue);
378         v.add(new Long(clientId));
379         v.add(selector);
380         v.add(_host);
381         v.add(String.valueOf(_msgReceiver.getPort()));
382         v.add(_url);
383         synchronized (_connection) {
384             send(v, true);
385             checkReply("createReceiver");
386         }
387     }
388 
389     /***
390      * Create a queue sender
391      *
392      * @param queue the queue to send messages to
393      * @throws JMSException if the sender cannot be created
394      */
395     public void createSender(JmsQueue queue) throws JMSException {
396         Vector v = pack("createSender", 1);
397         v.add(queue);
398         synchronized (_connection) {
399             send(v, true);
400             checkReply("createSender");
401         }
402     }
403 
404     /***
405      * Create a queue browser for this session. This allows clients to browse
406      * a queue without removing any messages.
407      *
408      * @param queue the queue to browse
409      * @param clientId the identity of the client
410      * @param selector the message selector. May be null
411      * @throws JMSException if the browser can't be created
412      */
413     public void createBrowser(JmsQueue queue, long clientId, String selector)
414         throws JMSException {
415 
416         startReceiver();
417         Vector v = pack("createBrowser", 6);
418         v.add(queue);
419         v.add(new Long(clientId));
420         v.add(selector);
421         v.add(_host);
422         v.add(String.valueOf(_msgReceiver.getPort()));
423         v.add(_url);
424         synchronized (_connection) {
425             send(v, true);
426             checkReply("createBrowser");
427         }
428     }
429 
430     /***
431      * Delete the receiver for this queue.
432      *
433      * @param clientId the id of the client to delete
434      * @throws JMSException if the receiver cannot be deleted
435      */
436     public void deleteReceiver(long clientId) throws JMSException {
437         Vector v = pack("deleteReceiver", 1);
438         v.add(new Long(clientId));
439         synchronized (_connection) {
440             send(v, true);
441             checkReply("deleteReceiver");
442         }
443     }
444 
445     /***
446      * Delete the queue browser associated with the specified queue from
447      * the session.
448      *
449      * @param clientId the identity of the browser
450      * @throws JMSException if the browser cannot be deleted
451      */
452     public void deleteBrowser(long clientId) throws JMSException {
453         Vector v = pack("deleteBrowser", 1);
454         v.add(new Long(clientId));
455         synchronized (_connection) {
456             send(v, true);
457             checkReply("deleteBrowser");
458         }
459     }
460 
461     /***
462      * Create a new topic subscriber
463      *
464      * @param topic the topic to subscribe to
465      * @param name the subscribers name
466      * @param client the client identity
467      * @param selector the selector to filter messages (may be null)
468      * @throws JMSException if the topic subscriber can't be created
469      */
470     public void createSubscriber(JmsTopic topic, String name, long clientId,
471                                  String selector, boolean noLocal)
472         throws JMSException {
473 
474         startReceiver();
475         Vector v = pack("createSubscriber", 8);
476         v.add(topic);
477         v.add(name);
478         v.add(new Long(clientId));
479         v.add(selector);
480         v.add(new Boolean(noLocal));
481         v.add(_host);
482         v.add(String.valueOf(_msgReceiver.getPort()));
483         v.add(_url);
484         synchronized (_connection) {
485             send(v, true);
486             checkReply("createSubscriber");
487         }
488     }
489 
490     /***
491      * Create a new topic publisher
492      *
493      * @param topic the topic to publish to
494      * @throws JMSException if the publisher can't be created
495      */
496     public void createPublisher(JmsTopic topic) throws JMSException {
497         Vector v = pack("createPublisher", 1);
498         v.add(topic);
499         synchronized (_connection) {
500             send(v, true);
501             checkReply("createPublisher");
502         }
503     }
504 
505     /***
506      * Unsubscribe a durable subscription
507      *
508      * @param name the name used to identify the subscription
509      * @throws JMSException if the subscription cannot be removed
510      */
511     public void unsubscribe(String name) throws JMSException {
512         Vector v = pack("unsubscribe", 1);
513         v.add(name);
514         synchronized (_connection) {
515             send(v, true);
516             checkReply("unsubscribe");
517         }
518     }
519 
520     /***
521      * Delete the subscriber for this topic
522      *
523      * @param clientId the client identity
524      * @throws JMSException for any error
525      */
526     public void deleteSubscriber(long clientId) throws JMSException {
527         Vector v = pack("deleteSubscriber", 1);
528         v.add(new Long(clientId));
529         synchronized (_connection) {
530             send(v, true);
531             checkReply("deleteSubscriber");
532         }
533     }
534 
535     /***
536      * Stop message delivery for this session.
537      *
538      * @throws JMSException for any error
539      */
540     public void stopMessageDelivery() throws JMSException {
541         Vector v = pack("stopMessageDelivery", 0);
542         synchronized (_connection) {
543             send(v, true);
544             checkReply("stopMessageDelivery");
545         }
546     }
547 
548     /***
549      * Start message delivery for this session.
550      *
551      * @throws JMSException for any error
552      */
553     public void startMessageDelivery() throws JMSException {
554         Vector v = pack("startMessageDelivery", 0);
555         synchronized (_connection) {
556             send(v, true);
557             checkReply("startMessageDelivery");
558         }
559     }
560 
561     // implementation of JmsSessionStubIfc.recover
562     public void recover() throws JMSException {
563         Vector v = pack("recover", 0);
564         synchronized (_connection) {
565             send(v, true);
566             checkReply("recover");
567         }
568     }
569 
570     // implementation of JmsSessionStubIfc.commit
571     public void commit() throws JMSException {
572         Vector v = pack("commit", 0);
573         synchronized (_connection) {
574             send(v, true);
575             checkReply("commit");
576         }
577     }
578 
579     // implementation of JmsSessionStubIfc.rollback
580     public void rollback() throws JMSException {
581         Vector v = pack("rollback", 0);
582         synchronized (_connection) {
583             send(v, true);
584             checkReply("rollback");
585         }
586     }
587 
588     // implementation of JmsSessionStubIfc.commit
589     public void commit(Xid xid, boolean onePhase) throws XAException {
590         try {
591             Vector v = pack("xa_commit", 2);
592             v.add(xid);
593             v.add(new Boolean(onePhase));
594             synchronized (_connection) {
595                 send(v, true);
596                 checkReply("xa_commit");
597             }
598         } catch (JMSException exception) {
599             // rethrow as a XAException
600             throw new XAException("Failed to commit session " +
601                 exception);
602         }
603     }
604 
605     // implementation of JmsSessionStubIfc.end
606     public void end(Xid xid, int flags) throws XAException {
607         try {
608             Vector v = pack("xa_end", 2);
609             v.add(xid);
610             v.add(new Integer(flags));
611             synchronized (_connection) {
612                 send(v, true);
613                 checkReply("xa_end");
614             }
615         } catch (JMSException exception) {
616             // rethrow as a XAException
617             throw new XAException("Failed to commit session " +
618                 exception);
619         }
620     }
621 
622     // implementation of JmsSessionStubIfc.forget
623     public void forget(Xid xid) throws XAException {
624         try {
625             Vector v = pack("xa_forget", 1);
626             v.add(xid);
627             synchronized (_connection) {
628                 send(v, true);
629                 checkReply("xa_forget");
630             }
631         } catch (JMSException exception) {
632             // rethrow as a XAException
633             throw new XAException("Failed to commit session " +
634                 exception);
635         }
636     }
637 
638     // implementation of JmsSessionStubIfc.getTransactionTimeout
639     public int getTransactionTimeout() throws XAException {
640         int timeout = 0;
641 
642         try {
643             Vector v = pack("xa_getTransactionTimeout", 0);
644             synchronized (_connection) {
645                 send(v, true);
646                 Vector reply = checkReply("xa_getTransactionTimeout");
647                 Boolean result = (Boolean) reply.get(0);
648 
649                 // check that the call completed before
650                 // extracting the message
651                 if (result.booleanValue()) {
652                     timeout = ((Integer) reply.get(1)).intValue();
653                 }
654             }
655         } catch (JMSException exception) {
656             // rethrow as a XAException
657             throw new XAException("Failed to getTransactionTimeout session " +
658                 exception);
659         }
660 
661         return timeout;
662     }
663 
664     // implementation of JmsSessionStubIfc.prepare
665     public int prepare(Xid xid) throws XAException {
666         int value = 0;
667 
668         try {
669             Vector v = pack("xa_prepare", 1);
670             v.add(xid);
671             synchronized (_connection) {
672                 send(v, true);
673                 Vector reply = checkReply("xa_prepare");
674                 Boolean result = (Boolean) reply.get(0);
675 
676                 // check that the call completed before
677                 // extracting the message
678                 if (result.booleanValue()) {
679                     value = ((Integer) reply.get(1)).intValue();
680                 }
681             }
682         } catch (JMSException exception) {
683             // rethrow as a XAException
684             throw new XAException("Failed to prepare session " +
685                 exception);
686         }
687 
688         return value;
689     }
690 
691     // implementation of JmsSessionStubIfc.recover
692     public Xid[] recover(int flag) throws XAException {
693         Xid[] xids = new Xid[0];
694 
695         try {
696             Vector v = pack("xa_recover", 1);
697             v.add(new Integer(flag));
698             synchronized (_connection) {
699                 send(v, true);
700                 Vector reply = checkReply("xa_recover");
701                 Boolean result = (Boolean) reply.get(0);
702 
703                 // check that the call completed before
704                 // extracting the message
705                 if (result.booleanValue()) {
706                     xids = (Xid[]) reply.get(1);
707                 }
708             }
709         } catch (JMSException exception) {
710             // rethrow as a XAException
711             throw new XAException("Failed to recover session " +
712                 exception);
713         }
714 
715         return xids;
716     }
717 
718     // implementation of JmsSessionStubIfc.rollback
719     public void rollback(Xid xid) throws XAException {
720         try {
721             Vector v = pack("xa_rollback", 1);
722             v.add(xid);
723             synchronized (_connection) {
724                 send(v, true);
725                 checkReply("xa_rollback");
726             }
727         } catch (JMSException exception) {
728             // rethrow as a XAException
729             throw new XAException("Failed to rollback session " +
730                 exception);
731         }
732     }
733 
734     // implementation of JmsSessionStubIfc.setTransactionTimeout
735     public boolean setTransactionTimeout(int seconds) throws XAException {
736         boolean value = false;
737 
738         try {
739             Vector v = pack("xa_setTransactionTimeout", 1);
740             v.add(new Integer(seconds));
741             synchronized (_connection) {
742                 send(v, true);
743                 Vector reply = checkReply("xa_setTransactionTimeout");
744                 Boolean result = (Boolean) reply.get(0);
745 
746                 // check that the call completed before
747                 // extracting the message
748                 if (result.booleanValue()) {
749                     value = ((Boolean) reply.get(1)).booleanValue();
750                 }
751             }
752         } catch (JMSException exception) {
753             // rethrow as a XAException
754             throw new XAException("Failed to setTransactionTimeout " +
755                 exception);
756         }
757 
758         return value;
759     }
760 
761     // implementation of JmsSessionStubIfc.start
762     public void start(Xid xid, int flags) throws XAException {
763         try {
764             Vector v = pack("xa_start", 2);
765             v.add(xid);
766             v.add(new Integer(flags));
767             synchronized (_connection) {
768                 send(v, true);
769                 checkReply("xa_start");
770             }
771         } catch (JMSException exception) {
772             // rethrow as a XAException
773             throw new XAException("Failed to start session " +
774                 exception);
775         }
776     }
777 
778     // implementation of JmsSessionStubIfc.getResourceManagerId
779     public String getResourceManagerId() throws XAException {
780         String rid = null;
781 
782         try {
783             Vector v = pack("xa_getResourceManagerId", 0);
784             synchronized (_connection) {
785                 send(v, true);
786                 Vector reply = checkReply("xa_getResourceManagerId");
787                 Boolean result = (Boolean) reply.get(0);
788 
789                 // check that the call completed before
790                 // extracting the message
791                 if (result.booleanValue()) {
792                     rid = (String) reply.get(1);
793                 }
794             }
795         } catch (JMSException exception) {
796             // rethrow as a XAException
797             throw new XAException("Failed to getResourceManagerId " +
798                 exception);
799         }
800 
801         return rid;
802     }
803 
804     /***
805      * Set a message listener to be called when new Messages arrive from the
806      * server.
807      *
808      * @param listener A reference to the client listener.
809      */
810     public void setMessageListener(JmsMessageListener listener) {
811         _listener = listener;
812     }
813 
814     // implementation of JmsSessionStubIfc.enableAsynchronousDelivery
815     public void enableAsynchronousDelivery(long clientId, String id,
816                                            boolean enable)
817         throws JMSException {
818 
819         Vector v = pack("enableAsynchronousDelivery", 3);
820         v.add(new Long(clientId));
821         v.add(id);
822         v.add(new Boolean(enable));
823         synchronized (_connection) {
824             send(v, true);
825             checkReply("enableAsynchronousDelivery");
826         }
827     }
828 
829     /***
830      * Stop the message receiver thread.
831      */
832     public synchronized void stopReceiver() {
833         _listener = null;
834         if (_msgReceiver != null) {
835             _msgReceiver.stop();
836             _msgReceiver = null;
837         }
838     }
839 
840     /***
841      * Pack all the data that is required by the server in a vector.
842      * Set the size of the vector to be exactly the right size for efficiency.
843      *
844      * @param method The function to activate on the server.
845      * @param numParams The number of paramaters this method will require.
846      * @return Vector The vector containing all the data.
847      */
848     private Vector pack(String method, int numParams) {
849         Vector v = new Vector(5 + numParams);
850         v.add("org.exolab.jms.server.http.HttpJmsSessionConnection");
851         packCommon(v, method);
852         return v;
853     }
854 
855     /***
856      * Pack the common data required by all connection types.
857      *
858      * @param v The vector to add the common items to.
859      * @param method The function to activate on the server.
860      */
861     private void packCommon(Vector v, String method) {
862         v.add(method);
863         v.add(_clientId);
864         v.add(_connectionId);
865         v.add(_sessionId);
866     }
867 
868     /***
869      * A convenience method to check the success of operations which return
870      * a true on sucess.
871      *
872      * @param method The requested server function.
873      * @throws JMSException for any error.
874      */
875     private Vector checkReply(String method) throws JMSException {
876         Vector v = null;
877         try {
878             v = (Vector) _connection.receive();
879         } catch (Exception err) {
880             // rethrow as a JMSException
881             throw new JMSException("Operation " + method + " failed: " + err);
882         }
883 
884         if (v != null) {
885             Boolean b = (Boolean) v.get(0);
886             if (!b.booleanValue()) {
887                 if (v.get(1) instanceof JMSException) {
888                     throw (JMSException) v.get(1);
889                 } else {
890                     throw new JMSException("Operation " + method +
891                         " failed: " + v.get(1));
892                 }
893             }
894         } else {
895             throw new JMSException("Unknown connection error for " + method);
896         }
897 
898         return v;
899     }
900 
901     /***
902      * A message has been received.
903      *
904      *
905      * @param ob The data received,
906      * @param id The connection id this data is received from
907      * @return any requested result, or null, nothing is sent back to the
908      * client
909      */
910     public Serializable notify(Object ob, String id) {
911         Vector v = (Vector) ob;
912         Vector reply = new Vector();
913 
914         if (_listener != null) {
915             if (v.size() == 2) {
916                 if (v.get(1) instanceof Message) {
917                     _listener.onMessage((Message) v.get(1));
918                 } else if (v.get(1) instanceof Vector) {
919                     _listener.onMessages((Vector) v.get(1));
920                 } else if (v.get(1) instanceof Long) {
921                     _listener.onMessageAvailable(((Long) v.get(1)).longValue());
922                 }
923             } else {
924                 // if size == 1, then this is just a ping.
925             }
926         }
927         return reply;
928     }
929 
930     /***
931      * A client has disconnected. Notify the caller.
932      *
933      * @param The unique identifier of this connection.
934      */
935     public void disconnection(String id) {
936     }
937 
938 
939     /***
940      * A convenience method to send a packed command to the server.
941      *
942      * @throws JMSException for any error.
943      */
944     private void send(Vector v, boolean replyExpected) throws JMSException {
945         try {
946             if (replyExpected) {
947                 _connection.send(v);
948             } else {
949                 ((HttpClient) _connection).sendWithoutResponse(v);
950             }
951         } catch (Exception err) {
952             // rethrow as a JMSException
953             throw new JMSException("Operation Failed" + err);
954         }
955     }
956 
957     /***
958      * Start a message receiver thread to receive messages
959      *
960      * @throws JMSException If the IpcConnection cannot be created
961      */
962     private synchronized void startReceiver() throws JMSException {
963         try {
964             if (_msgReceiver == null) {
965                 _msgReceiver = new Server(this);
966                 if (_log.isDebugEnabled()) {
967                     _log.debug("Starting receiver on port="
968                                + _msgReceiver.getPort());
969                 }
970                 new Thread(_msgReceiver).start();
971             }
972         } catch (Exception err) {
973             throw new JMSException(err.getMessage());
974         }
975     }
976 
977 } // --HttpJmsSessionStub