View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: IpcJmsSessionSender.java,v 1.10 2003/08/07 13:33:09 tanderson Exp $
44   *
45   * Date         Author  Changes
46   * $Date	    jimm    Created
47   */
48  
49  
50  package org.exolab.jms.server.mipc;
51  
52  import java.util.Vector;
53  
54  import javax.jms.Message;
55  
56  import org.exolab.core.mipc.ObjectChannel;
57  import org.exolab.jms.client.JmsMessageListener;
58  import org.exolab.jms.message.MessageImpl;
59  import org.exolab.jms.scheduler.Scheduler;
60  import org.exolab.jms.server.ClientDisconnectionException;
61  import org.exolab.jms.server.JmsServerSession;
62  
63  
64  /***
65   * This class contains the ipc connection to a receiver or subscriber
66   * for passing the JmsMessages.
67   *
68   * If the connection is detected as closed, the Session will also be shutdown.
69   *
70   * @version     $Revision: 1.10 $ $Date: 2003/08/07 13:33:09 $
71   * @author      <a href="mailto:mourikis@exolab.org">Jim Mourikis</a>
72   * @see         org.exolab.jms.server.mipc.IpcJmsSessionConnection
73   * @see		org.exolab.jms.server.JmsServerSession
74   * @see		org.exolab.core.ipc.Client
75   *
76   */
77  public class IpcJmsSessionSender implements JmsMessageListener {
78  
79      /***
80       * The client connection
81       */
82      private ObjectChannel _client;
83  
84      /***
85       * The session
86       */
87      private JmsServerSession _session = null;
88  
89  
90      /***
91       * Create a connection to the client listener for sending JmsMessages.
92       *
93       * @param client The connection to the client.
94       * @param session The session this connection belongs to.
95       *
96       */
97      public IpcJmsSessionSender(ObjectChannel client,
98                                 JmsServerSession session) {
99          _client = client;
100         _session = session;
101         session.setMessageListener(this);
102     }
103 
104     /***
105      * Send a JmsMessage to a listener.
106      *
107      * @param message The message to send.
108      */
109     public synchronized void onMessage(Message message) {
110         try {
111             if (_client != null) {
112                 Vector v = new Vector(2);
113                 v.add(_session.getSessionId());
114                 v.add((MessageImpl) message);
115 
116                 // artw: this was not synchronized before.  In the old code,
117                 // it was "safe" to call into Client by multi-threads.  Now
118                 // with the Mutliplex stuff, each channel reader/writer
119                 // has state in it during the receive/read and write/send
120                 // calls.  I could synchronize the inside of the channel
121                 // but decided to do it right here for now.
122                 _client.send(v);
123                 Vector reply = (Vector) _client.receive();
124             }
125         } catch (Exception err) {
126             throw new ClientDisconnectionException(err.getMessage());
127         }
128     }
129 
130     /***
131      * Send the collection of messages to the client. This is used for async
132      * message delivery.
133      *
134      * @param messages - collection of MessageImpl objects
135      */
136     public synchronized void onMessages(Vector messages) {
137         try {
138             if (_client != null) {
139                 Vector v = new Vector(2);
140                 v.add(_session.getSessionId());
141                 v.add(messages);
142                 // artw: this was not synchronized before.  In the old code,
143                 // it was "safe" to call into Client by multi-threads.  Now
144                 // with the Mutliplex stuff, each channel reader/writer
145                 // has state in it during the receive/read and write/send
146                 // calls.  I could synchronize the inside of the channel
147                 // but decided to do it right here for now.
148                 _client.send(v);
149                 Vector reply = (Vector) _client.receive();
150             }
151         } catch (Exception err) {
152             throw new ClientDisconnectionException(err.getMessage());
153         }
154     }
155 
156     /***
157      * Notify the specified client that a message avaiable.
158      *
159      * @param clinet - the client identity
160      */
161     public synchronized void onMessageAvailable(long clientId) {
162         // Workaround for bug 777419 - Deadlock in the tcp connector
163         Scheduler.instance().add(new Notifier(clientId));
164     }
165 
166     /***
167      * Remove the callback from the JmsServerSession.
168      */
169     public synchronized void close() {
170         if (_session != null) {
171             _session.setMessageListener(null);
172             _session = null;
173         }
174         _client = null;
175     }
176 
177     private class Notifier implements Runnable {
178 
179         private long _clientId;
180 
181         public Notifier(long clientId) {
182             _clientId = clientId;
183         }
184 
185         public void run() {
186             synchronized (IpcJmsSessionSender.this) {
187                 try {
188                     if (_client != null) {
189                         Vector v = new Vector(2);
190                         v.add(_session.getSessionId());
191                         v.add(new Long(_clientId));
192                         _client.send(v);
193                         Vector reply = (Vector) _client.receive();
194                     }
195                 } catch (Exception exception) {
196                     exception.printStackTrace();
197                 }
198             }
199         }
200     }
201 
202 } //-- IpcJmsSessionSender