View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: IpcJmsMessageListener.java,v 1.8 2003/08/07 13:32:54 tanderson Exp $
44   *
45   * Date         Author  Changes
46   * $Date	    jimm    Created
47   */
48  
49  package org.exolab.jms.client.mipc;
50  
51  import java.util.Collections;
52  import java.util.HashMap;
53  import java.util.Map;
54  import java.util.Vector;
55  
56  import javax.jms.Message;
57  
58  import org.exolab.core.mipc.MultiplexConnectionIfc;
59  import org.exolab.core.mipc.ObjectChannel;
60  import org.exolab.jms.client.JmsMessageListener;
61  
62  
63  /***
64   * All callbacks for ipc message delivery are registered with the single
65   * instance of this object, by their session id's.
66   * Arriving messages are also contain a session id, which is used to look up
67   * the callback this message is destined for.
68   *
69   * @version     $Revision: 1.8 $ $Date: 2003/08/07 13:32:54 $
70   * @author      <a href="mailto:mourikis@exolab.org">Jim Mourikis</a>
71   * @see         org.exolab.jms.client.mipc.IpcJmsSessionStub
72   */
73  class IpcJmsMessageListener implements Runnable {
74  
75      /***
76       * The set of MessageListener instances, keyed on session id
77       */
78      private Map _listeners = Collections.synchronizedMap(new HashMap(10));
79  
80      /***
81       * The channel to receive messages on
82       */
83      private ObjectChannel _channel;
84  
85      /***
86       * Thread for receiving and dispatching messages
87       */
88      private Thread _thread;
89  
90      /***
91       * If <code>true</code> indicates that the thread should stop
92       */
93      private boolean _interrupted = false;
94  
95      /***
96       * Synchronization helper
97       */
98      private final Object _lock = new Object();
99  
100 
101     /***
102      * Construct a new <code>IpcJmsMessageListener</code>
103      *
104      * @param connection the connection to the server
105      */
106     public IpcJmsMessageListener(MultiplexConnectionIfc connection) {
107         if (connection == null) {
108             throw new IllegalArgumentException(
109                 "Argument 'connection' is null");
110         }
111         synchronized (connection) {
112             _channel = new ObjectChannel("message", connection);
113         }
114     }
115 
116     /***
117      * Set a new callback for a session.
118      *
119      * @param sessionId the unique session id
120      * @param listener the listener callback to call when new messages arrive.
121      */
122     public void setListener(String sessionId, JmsMessageListener listener) {
123         _listeners.put(sessionId, listener);
124     }
125 
126     /***
127      * A session has been closed. Remove its callback.
128      *
129      * @param sessionId the unique session id
130      */
131     public void closeSession(String sessionId) {
132         _listeners.remove(sessionId);
133     }
134 
135     /***
136      * Close all session callbacks
137      */
138     public void closeAllSessions() {
139         _listeners.clear();
140     }
141 
142     /***
143      * This method is called back by the server whenever it has a message for
144      * this session.
145      *
146      * @param sessionId the session this message is for
147      * @param message the message being sent to this session.
148      */
149     public void onMessage(String sessionId, Message message) {
150         JmsMessageListener listener =
151             (JmsMessageListener) _listeners.get(sessionId);
152 
153         if (listener != null) {
154             listener.onMessage(message);
155         }
156     }
157 
158     /***
159      * This method is called back by the server whenever it has one or more
160      * messages for this session.
161      *
162      * @param sessionId the session this message is for.
163      * @param messages the messages
164      */
165     public void onMessages(String sessionId, Vector messages) {
166         JmsMessageListener listener =
167             (JmsMessageListener) _listeners.get(sessionId);
168         if (listener != null) {
169             listener.onMessages(messages);
170         }
171     }
172 
173     /***
174      * This method is called to notify the client that a message is available.
175      *
176      * @param sessionId the session this message is for.
177      * @param clientId the client id to notify
178      */
179     public void onMessageAvailable(String sessionId, long clientId) {
180         JmsMessageListener listener =
181             (JmsMessageListener) _listeners.get(sessionId);
182         if (listener != null) {
183             listener.onMessageAvailable(clientId);
184         }
185     }
186 
187     /***
188      * Start a thread to receive messages
189      */
190     public void start() {
191         synchronized (_lock) {
192             if (_thread == null) {
193                 _interrupted = false;
194                 _thread = new Thread(this);
195                 _thread.start();
196             }
197         }
198     }
199 
200     /***
201      * Stop the message receiver thread
202      */
203     public void stop() {
204         closeAllSessions();
205         synchronized (_lock) {
206             if (_thread != null) {
207                 _interrupted = true;
208                 _thread.interrupt();
209             }
210         }
211     }
212 
213     /***
214      * This is called when we start up an inbound message pump in a
215      * a separate thread. The method sits in a loop and consumes messages
216      * coming from the server.
217      */
218     public void run() {
219         try {
220             while (!_interrupted) {
221                 Vector v = (Vector) _channel.receive();
222 
223                 String sessionId = (String) v.get(0);
224                 if (v.get(1) instanceof Message) {
225                     onMessage(sessionId, (Message) v.get(1));
226                 } else if (v.get(1) instanceof Vector) {
227                     onMessages(sessionId, (Vector) v.get(1));
228                 } else if (v.get(1) instanceof Long) {
229                     onMessageAvailable(sessionId,
230                         ((Long) v.get(1)).longValue());
231                 }
232                 _channel.send(new Vector());
233             }
234         } catch (Exception ignore) {
235         } finally {
236             try {
237                 _channel.close();
238             } catch (Exception ignore) {
239             }
240 
241             synchronized (_lock) {
242                 _thread = null;
243             }
244         }
245     }
246 
247 } //-- IpcJmsMessageListener