View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2003-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: Channel.java,v 1.4 2006/12/16 12:37:17 tanderson Exp $
44   */
45  package org.exolab.jms.net.multiplexer;
46  
47  import org.apache.commons.logging.Log;
48  import org.apache.commons.logging.LogFactory;
49  import org.exolab.jms.net.connector.Request;
50  import org.exolab.jms.net.connector.Response;
51  
52  import java.io.IOException;
53  import java.io.ObjectInputStream;
54  import java.io.ObjectOutputStream;
55  import java.rmi.MarshalException;
56  import java.rmi.RemoteException;
57  import java.rmi.UnmarshalException;
58  
59  
60  /***
61   * A <code>Channel</code> represents a single-threaded virtual connection over a
62   * physical connection.
63   *
64   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
65   * @version $Revision: 1.4 $ $Date: 2006/12/16 12:37:17 $
66   */
67  class Channel implements Constants {
68  
69      /***
70       * The channel identifier
71       */
72      private int _id;
73  
74      /***
75       * The multiplexer
76       */
77      private Multiplexer _multiplexer;
78  
79      /***
80       * The input stream
81       */
82      private MultiplexInputStream _in;
83  
84      /***
85       * The output stream
86       */
87      private MultiplexOutputStream _out;
88  
89      /***
90       * The logger
91       */
92      private static final Log _log = LogFactory.getLog(Channel.class);
93  
94  
95      /***
96       * Construct a new <code>Channel</code>
97       *
98       * @param id          the identifier for this channel
99       * @param multiplexer the multiplexer
100      * @param in          the stream to receive data on
101      * @param out         the stream to send data on
102      */
103     public Channel(int id, Multiplexer multiplexer,
104                    MultiplexInputStream in, MultiplexOutputStream out) {
105         _id = id;
106         _multiplexer = multiplexer;
107         _in = in;
108         _out = out;
109     }
110 
111     /***
112      * Returns the channel identifier
113      *
114      * @return the channel identifier
115      */
116     public int getId() {
117         return _id;
118     }
119 
120     /***
121      * Invoke a method on a remote object.
122      *
123      * @param request the request
124      * @return the result of the invocation
125      * @throws RemoteException if the distributed call cannot be made
126      */
127     public Response invoke(Request request) throws RemoteException {
128         if (_log.isDebugEnabled()) {
129             _log.debug("invoke() [channel=" + _id + "]");
130         }
131         Response response;
132         ObjectOutputStream out = null;
133         try {
134             // set the packet type
135             _out.setType(REQUEST);
136 
137             // write the request
138             out = new ObjectOutputStream(_out);
139             request.write(out);
140         } catch (IOException exception) {
141             throw new MarshalException("Failed to marshal call", exception);
142         } catch (Exception exception) {
143             throw new MarshalException("Failed to marshal call", exception);
144         } finally {
145             if (out != null) {
146                 try {
147                     out.close();
148                 } catch (IOException ignore) {
149                     // no-op
150                 }
151             }
152         }
153 
154         // read the response
155         ObjectInputStream in = null;
156         try {
157             in = new ObjectInputStream(_in);
158             response = Response.read(in, request.getMethod());
159         } catch (ClassNotFoundException exception) {
160             throw new UnmarshalException("Failed to unmarshal response",
161                                          exception);
162         } catch (IOException exception) {
163             throw new UnmarshalException("Failed to unmarshal response",
164                                          exception);
165         } finally {
166             try {
167                 if (in != null) {
168                     in.close();
169                 }
170             } catch (IOException ignore) {
171                 // no-ops
172             }
173         }
174         if (_log.isDebugEnabled()) {
175             _log.debug("invoke() [channel=" + _id + "] - end");
176         }
177         return response;
178     }
179 
180     /***
181      * Read a request from the channel.
182      * todo synchronization required due to scheduling in Multiplexer?
183      *
184      * @return the request
185      * @throws IOException if the request can't be read
186      */
187     public synchronized Request readRequest() throws IOException {
188         Request request;
189         ObjectInputStream in = new ObjectInputStream(_in);
190         request = Request.read(in);
191         return request;
192     }
193 
194     /***
195      * Write a response to a request.
196      * todo synchronization required due to scheduling in Multiplexer?
197      *
198      * @param response the response to write
199      * @throws IOException for any I/O error
200      */
201     public synchronized void writeResponse(Response response)
202             throws IOException {
203         // set the packet type
204         _out.setType(RESPONSE);
205 
206         // write the response
207         ObjectOutputStream out = new ObjectOutputStream(_out);
208         try {
209             response.write(out);
210         } finally {
211             out.close();
212         }
213     }
214 
215     /***
216      * Invoked when the underlying physical connection is closed.
217      */
218     public void disconnected() {
219         if (_log.isDebugEnabled()) {
220             _log.debug("disconnected [channel=" + _id + "]");
221         }
222         _in.disconnected();
223         _out.disconnected();
224     }
225 
226     /***
227      * Returns the underlying input stream.
228      *
229      * @return the underlying input stream
230      */
231     public MultiplexInputStream getMultiplexInputStream() {
232         return _in;
233     }
234 
235     /***
236      * Returns the underlying output stream.
237      *
238      * @return the underlying output stream
239      */
240     public MultiplexOutputStream getMultiplexOutputStream() {
241         return _out;
242     }
243 
244     /***
245      * Releases this channel for re-use.
246      */
247     public void release() {
248         _multiplexer.release(this);
249     }
250 
251     /***
252      * Closes this channel.
253      *
254      * @throws IOException for any I/O error
255      */
256     public void close() throws IOException {
257         if (_multiplexer != null) {
258             try {
259                 _multiplexer.close(this);
260             } finally {
261                 _multiplexer = null;
262 
263                 try {
264                     _in.destroy();
265                 } catch (IOException ignore) {
266                     // no need to propagate
267                 }
268                 try {
269                     _out.close();
270                 } catch (IOException ignore) {
271                     // no need to propagate
272                 }
273             }
274         }
275     }
276 
277     /***
278      * Destroy this channel.
279      */
280     public void destroy() {
281         try {
282             close();
283         } catch (IOException exception) {
284             _log.debug("close() failed", exception);
285         }
286     }
287 
288     /***
289      * Returns a string representation of this.
290      *
291      * @return a string representation of this
292      */
293     public String toString() {
294         return "Channel[id=" + _id + ", out=" + _out + ", in=" + _in + " ]";
295     }
296 
297 }