View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: AbstractMessageHandler.java,v 1.2 2005/10/20 14:07:03 tanderson Exp $
44   */
45  package org.exolab.jms.tools.migration.proxy;
46  
47  import java.io.ByteArrayOutputStream;
48  import java.io.IOException;
49  import java.io.ObjectInputStream;
50  import java.io.ObjectOutputStream;
51  import java.sql.Blob;
52  import java.sql.Connection;
53  import java.sql.PreparedStatement;
54  import java.sql.ResultSet;
55  import java.sql.SQLException;
56  import java.util.Enumeration;
57  import java.util.HashMap;
58  import java.util.Iterator;
59  import java.util.Map;
60  import javax.jms.Destination;
61  import javax.jms.JMSException;
62  import javax.jms.Message;
63  
64  import org.exolab.jms.client.JmsDestination;
65  import org.exolab.jms.persistence.PersistenceException;
66  import org.exolab.jms.persistence.SQLHelper;
67  
68  
69  /***
70   * Abstract implementation of the  <code>MessageHandler</code> interface.
71   *
72   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
73   * @version $Revision: 1.2 $ $Date: 2005/10/20 14:07:03 $
74   */
75  abstract class AbstractMessageHandler implements MessageHandler, DBConstants {
76  
77      /***
78       * The destination store.
79       */
80      private final DestinationStore _destinations;
81  
82      /***
83       * The database connection.
84       */
85      private Connection _connection;
86  
87  
88      /***
89       * Construct a new <code>AbstractMessageHandler</code>.
90       *
91       * @param destinations the destination store
92       * @param connection   the database connection
93       */
94      public AbstractMessageHandler(DestinationStore destinations,
95                                    Connection connection) {
96          _destinations = destinations;
97          _connection = connection;
98      }
99  
100     /***
101      * Add a message.
102      *
103      * @param message the message to add
104      * @throws JMSException         for any JMS error
105      * @throws PersistenceException for any persistence error
106      */
107     public void add(Message message) throws JMSException, PersistenceException {
108         add(message, getType());
109     }
110 
111     /***
112      * Returns a message given its identifier.
113      *
114      * @param messageId the identifier of the message to retrieve
115      * @return the message corresponding to <code>messageId</code>
116      * @throws JMSException         for any JMS error
117      * @throws PersistenceException for any persistence error
118      */
119     public Message get(String messageId) throws JMSException,
120             PersistenceException {
121         Message message = newMessage();
122         get(messageId, message);
123         return message;
124     }
125 
126     /***
127      * Returns the type of message that this handler supports.
128      *
129      * @return the type of message
130      */
131     protected abstract String getType();
132 
133     /***
134      * Create a new message.
135      *
136      * @return a new message
137      * @throws JMSException for any JMS error
138      */
139     protected abstract Message newMessage() throws JMSException;
140 
141     /***
142      * Populate the message body.
143      *
144      * @param body    the message body
145      * @param message the message to populate
146      * @throws JMSException         for any JMS error
147      * @throws PersistenceException for any persistence error
148      */
149     protected abstract void setBody(Object body, Message message)
150             throws JMSException, PersistenceException;
151 
152     /***
153      * Returns the body of the message.
154      *
155      * @param message the message
156      * @return the body of the message
157      * @throws JMSException for any JMS error
158      */
159     protected abstract Object getBody(Message message) throws JMSException;
160 
161     /***
162      * Populate a message.
163      *
164      * @param messageId the message identifier
165      * @param message   the message to populate
166      * @throws JMSException         for any JMS error
167      * @throws PersistenceException for any persistence error
168      */
169     protected void get(String messageId, Message message)
170             throws JMSException, PersistenceException {
171         PreparedStatement select = null;
172         ResultSet set = null;
173         try {
174             select = _connection.prepareStatement(
175                     "select * from " + MESSAGE_TABLE + " where message_id = ?");
176             select.setString(1, messageId);
177             set = select.executeQuery();
178             if (!set.next()) {
179                 throw new PersistenceException(
180                         "Message not found, JMSMessageID=" + messageId);
181             }
182             String correlationId = set.getString("correlation_id");
183             int deliveryMode = set.getInt("delivery_mode");
184             long destinationId = set.getLong("destination_id");
185             long expiration = set.getLong("expiration");
186             int priority = set.getInt("priority");
187             boolean redelivered = set.getBoolean("redelivered");
188             long replyToId = set.getLong("reply_to_id");
189             long timestamp = set.getLong("timestamp");
190             String type = set.getString("type");
191 
192             Destination destination = _destinations.get(destinationId);
193 
194             message.setJMSMessageID(messageId);
195             message.setJMSCorrelationID(correlationId);
196             message.setJMSDeliveryMode(deliveryMode);
197             message.setJMSDestination(destination);
198             message.setJMSExpiration(expiration);
199             message.setJMSPriority(priority);
200             message.setJMSRedelivered(redelivered);
201             if (replyToId != 0) {
202                 Destination replyTo = _destinations.get(replyToId);
203                 message.setJMSReplyTo(replyTo);
204             }
205             message.setJMSTimestamp(timestamp);
206             message.setJMSType(type);
207 
208             Blob blob = set.getBlob("body");
209             Object body;
210             try {
211                 body = deserialize(blob);
212             } catch (Exception exception) {
213                 throw new PersistenceException(
214                         "Failed to deserialize message body, JMSMessageID="
215                         + messageId, exception);
216             }
217             setBody(body, message);
218         } catch (SQLException exception) {
219             throw new PersistenceException(
220                     "Failed to populate message, JMSMessageID="
221                     + messageId, exception);
222         } finally {
223             SQLHelper.close(set);
224             SQLHelper.close(select);
225         }
226 
227         getProperties(messageId, message);
228     }
229 
230     /***
231      * Populate message properties.
232      *
233      * @param messageId the message identifier
234      * @param message   the message to populate
235      * @throws JMSException         for any JMS error
236      * @throws PersistenceException for any persistence error
237      */
238     protected void getProperties(String messageId, Message message)
239             throws JMSException, PersistenceException {
240 
241         Map properties = getProperties(messageId);
242         Iterator iterator = properties.entrySet().iterator();
243         while (iterator.hasNext()) {
244             Map.Entry entry = (Map.Entry) iterator.next();
245             String name = (String) entry.getKey();
246             Object value = entry.getValue();
247             message.setObjectProperty(name, value);
248         }
249     }
250 
251     /***
252      * Add a message.
253      *
254      * @param message the message to add
255      * @param type    the type of the message
256      * @throws JMSException         for any JMS error
257      * @throws PersistenceException for any persistence error
258      */
259     protected void add(Message message, String type)
260             throws JMSException, PersistenceException {
261 
262         PreparedStatement insert = null;
263         String messageId = null;
264         try {
265             insert = _connection.prepareStatement(
266                     "insert into " + MESSAGE_TABLE
267                     + " values  (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
268 
269             messageId = message.getJMSMessageID();
270 
271             long destinationId = _destinations.getId(
272                     (JmsDestination) message.getJMSDestination());
273             
274             // insert message header
275             insert.setString(1, messageId);
276             insert.setString(2, type);
277             insert.setString(3, message.getJMSCorrelationID());
278             insert.setInt(4, message.getJMSDeliveryMode());
279             insert.setLong(5, destinationId);
280             insert.setLong(6, message.getJMSExpiration());
281             insert.setInt(7, message.getJMSPriority());
282             insert.setBoolean(8, message.getJMSRedelivered());
283             long replyToId = 0;
284             if (message.getJMSReplyTo() != null) {
285                 JmsDestination replyTo =
286                         (JmsDestination) message.getJMSReplyTo();
287                 replyToId = _destinations.getId(replyTo);
288             }
289             insert.setLong(9, replyToId);
290             insert.setLong(10, message.getJMSTimestamp());
291             insert.setString(11, message.getJMSType());
292             Object body = getBody(message);
293             byte[] blob;
294             try {
295                 blob = serialize(body);
296             } catch (Exception exception) {
297                 throw new PersistenceException(
298                         "Failed to serialize message body, JMSMessageID="
299                         + messageId, exception);
300             }
301             insert.setObject(12, blob);
302 
303             insert.executeUpdate();
304             
305             // insert header properties
306             Enumeration iterator = message.getPropertyNames();
307             while (iterator.hasMoreElements()) {
308                 String name = (String) iterator.nextElement();
309                 Object value = message.getObjectProperty(name);
310                 addProperty(messageId, name, value);
311             }
312             _connection.commit();
313         } catch (SQLException exception) {
314             throw new PersistenceException(
315                     "Failed to add message, JMSMessageID=" + messageId,
316                     exception);
317         } finally {
318             SQLHelper.close(insert);
319         }
320     }
321 
322     /***
323      * Returns properties for a message.
324      *
325      * @param messageId the message identifier
326      * @return a map of properties
327      * @throws PersistenceException for any persistence error
328      */
329     protected Map getProperties(String messageId)
330             throws PersistenceException {
331 
332         HashMap result = new HashMap();
333 
334         PreparedStatement select = null;
335         ResultSet set = null;
336         try {
337             select = _connection.prepareStatement(
338                     "select name, value from " + MESSAGE_PROPERTIES_TABLE
339                     + " where message_id = ?");
340             select.setString(1, messageId);
341             set = select.executeQuery();
342             while (set.next()) {
343                 String name = set.getString("name");
344                 Blob blob = set.getBlob("value");
345                 Object value;
346                 try {
347                     value = deserialize(blob);
348                 } catch (Exception exception) {
349                     String message = "Failed to destream property for "
350                             + "message, JMSMessageID=" + messageId
351                             + ", property=" + name;
352                     throw new PersistenceException(message, exception);
353                 }
354                 result.put(name, value);
355             }
356         } catch (SQLException exception) {
357             throw new PersistenceException(
358                     "Failed to get properties for message, JMSMessageID="
359                     + messageId, exception);
360         } finally {
361             SQLHelper.close(set);
362             SQLHelper.close(select);
363         }
364         return result;
365     }
366 
367     /***
368      * Add a property.
369      *
370      * @param messageId the message identifier
371      * @param name      the property name
372      * @param value     the property value
373      * @throws PersistenceException for any persistence error
374      */
375     protected void addProperty(String messageId,
376                                String name, Object value)
377             throws PersistenceException {
378 
379         byte[] blob;
380         try {
381             blob = serialize(value);
382         } catch (IOException exception) {
383             String message = "Failed to serialize property for message, "
384                     + "JMSMessageID=" + messageId + ", name=" + name;
385             if (value != null) {
386                 message += " of type " + value.getClass().getName();
387             }
388             throw new PersistenceException(message, exception);
389         }
390 
391         PreparedStatement insert = null;
392         try {
393             insert = _connection.prepareStatement(
394                     "insert into " + MESSAGE_PROPERTIES_TABLE
395                     + " values (?, ?, ?)");
396             insert.setString(1, messageId);
397             insert.setString(2, name);
398             insert.setObject(3, blob);
399             insert.executeUpdate();
400         } catch (SQLException exception) {
401             throw new PersistenceException(
402                     "Failed to add property for message, JMSMessageID="
403                     + messageId + ", name=" + name + ", value=" + value,
404                     exception);
405         } finally {
406             SQLHelper.close(insert);
407         }
408     }
409 
410     /***
411      * Helper to serialize an object to a byte array.
412      *
413      * @param object the object to serialize
414      * @return the serialized object
415      * @throws IOException if the object cannot be serialized
416      */
417     public byte[] serialize(Object object) throws IOException {
418         byte[] result;
419         ByteArrayOutputStream bstream = new ByteArrayOutputStream();
420         ObjectOutputStream ostream = new ObjectOutputStream(bstream);
421         ostream.writeObject(object);
422         ostream.close();
423         result = bstream.toByteArray();
424         return result;
425     }
426 
427     /***
428      * Helper to deserialize an object from a byte array.
429      *
430      * @param blob the blob containing object to deserialize
431      * @return the destreamed object
432      * @throws ClassNotFoundException if the class of a serialized object cannot
433      *                                be found.
434      * @throws IOException            if the object cannot be deserialized
435      * @throws SQLException           if there is an error accessing the
436      *                                <code>blob</code>
437      */
438     protected Object deserialize(Blob blob)
439             throws ClassNotFoundException, IOException, SQLException {
440         Object result = null;
441 
442         if (blob != null) {
443             ObjectInputStream istream = new ObjectInputStream(
444                     blob.getBinaryStream());
445             result = istream.readObject();
446             istream.close();
447         }
448         return result;
449     }
450 
451     /***
452      * Returns the database connection.
453      *
454      * @return the connection to the database
455      */
456     protected Connection getConnection() {
457         return _connection;
458     }
459 
460 }