1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: AbstractMessageHandler.java,v 1.2 2005/10/20 14:07:03 tanderson Exp $
44 */
45 package org.exolab.jms.tools.migration.proxy;
46
47 import java.io.ByteArrayOutputStream;
48 import java.io.IOException;
49 import java.io.ObjectInputStream;
50 import java.io.ObjectOutputStream;
51 import java.sql.Blob;
52 import java.sql.Connection;
53 import java.sql.PreparedStatement;
54 import java.sql.ResultSet;
55 import java.sql.SQLException;
56 import java.util.Enumeration;
57 import java.util.HashMap;
58 import java.util.Iterator;
59 import java.util.Map;
60 import javax.jms.Destination;
61 import javax.jms.JMSException;
62 import javax.jms.Message;
63
64 import org.exolab.jms.client.JmsDestination;
65 import org.exolab.jms.persistence.PersistenceException;
66 import org.exolab.jms.persistence.SQLHelper;
67
68
69 /***
70 * Abstract implementation of the <code>MessageHandler</code> interface.
71 *
72 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
73 * @version $Revision: 1.2 $ $Date: 2005/10/20 14:07:03 $
74 */
75 abstract class AbstractMessageHandler implements MessageHandler, DBConstants {
76
77 /***
78 * The destination store.
79 */
80 private final DestinationStore _destinations;
81
82 /***
83 * The database connection.
84 */
85 private Connection _connection;
86
87
88 /***
89 * Construct a new <code>AbstractMessageHandler</code>.
90 *
91 * @param destinations the destination store
92 * @param connection the database connection
93 */
94 public AbstractMessageHandler(DestinationStore destinations,
95 Connection connection) {
96 _destinations = destinations;
97 _connection = connection;
98 }
99
100 /***
101 * Add a message.
102 *
103 * @param message the message to add
104 * @throws JMSException for any JMS error
105 * @throws PersistenceException for any persistence error
106 */
107 public void add(Message message) throws JMSException, PersistenceException {
108 add(message, getType());
109 }
110
111 /***
112 * Returns a message given its identifier.
113 *
114 * @param messageId the identifier of the message to retrieve
115 * @return the message corresponding to <code>messageId</code>
116 * @throws JMSException for any JMS error
117 * @throws PersistenceException for any persistence error
118 */
119 public Message get(String messageId) throws JMSException,
120 PersistenceException {
121 Message message = newMessage();
122 get(messageId, message);
123 return message;
124 }
125
126 /***
127 * Returns the type of message that this handler supports.
128 *
129 * @return the type of message
130 */
131 protected abstract String getType();
132
133 /***
134 * Create a new message.
135 *
136 * @return a new message
137 * @throws JMSException for any JMS error
138 */
139 protected abstract Message newMessage() throws JMSException;
140
141 /***
142 * Populate the message body.
143 *
144 * @param body the message body
145 * @param message the message to populate
146 * @throws JMSException for any JMS error
147 * @throws PersistenceException for any persistence error
148 */
149 protected abstract void setBody(Object body, Message message)
150 throws JMSException, PersistenceException;
151
152 /***
153 * Returns the body of the message.
154 *
155 * @param message the message
156 * @return the body of the message
157 * @throws JMSException for any JMS error
158 */
159 protected abstract Object getBody(Message message) throws JMSException;
160
161 /***
162 * Populate a message.
163 *
164 * @param messageId the message identifier
165 * @param message the message to populate
166 * @throws JMSException for any JMS error
167 * @throws PersistenceException for any persistence error
168 */
169 protected void get(String messageId, Message message)
170 throws JMSException, PersistenceException {
171 PreparedStatement select = null;
172 ResultSet set = null;
173 try {
174 select = _connection.prepareStatement(
175 "select * from " + MESSAGE_TABLE + " where message_id = ?");
176 select.setString(1, messageId);
177 set = select.executeQuery();
178 if (!set.next()) {
179 throw new PersistenceException(
180 "Message not found, JMSMessageID=" + messageId);
181 }
182 String correlationId = set.getString("correlation_id");
183 int deliveryMode = set.getInt("delivery_mode");
184 long destinationId = set.getLong("destination_id");
185 long expiration = set.getLong("expiration");
186 int priority = set.getInt("priority");
187 boolean redelivered = set.getBoolean("redelivered");
188 long replyToId = set.getLong("reply_to_id");
189 long timestamp = set.getLong("timestamp");
190 String type = set.getString("type");
191
192 Destination destination = _destinations.get(destinationId);
193
194 message.setJMSMessageID(messageId);
195 message.setJMSCorrelationID(correlationId);
196 message.setJMSDeliveryMode(deliveryMode);
197 message.setJMSDestination(destination);
198 message.setJMSExpiration(expiration);
199 message.setJMSPriority(priority);
200 message.setJMSRedelivered(redelivered);
201 if (replyToId != 0) {
202 Destination replyTo = _destinations.get(replyToId);
203 message.setJMSReplyTo(replyTo);
204 }
205 message.setJMSTimestamp(timestamp);
206 message.setJMSType(type);
207
208 Blob blob = set.getBlob("body");
209 Object body;
210 try {
211 body = deserialize(blob);
212 } catch (Exception exception) {
213 throw new PersistenceException(
214 "Failed to deserialize message body, JMSMessageID="
215 + messageId, exception);
216 }
217 setBody(body, message);
218 } catch (SQLException exception) {
219 throw new PersistenceException(
220 "Failed to populate message, JMSMessageID="
221 + messageId, exception);
222 } finally {
223 SQLHelper.close(set);
224 SQLHelper.close(select);
225 }
226
227 getProperties(messageId, message);
228 }
229
230 /***
231 * Populate message properties.
232 *
233 * @param messageId the message identifier
234 * @param message the message to populate
235 * @throws JMSException for any JMS error
236 * @throws PersistenceException for any persistence error
237 */
238 protected void getProperties(String messageId, Message message)
239 throws JMSException, PersistenceException {
240
241 Map properties = getProperties(messageId);
242 Iterator iterator = properties.entrySet().iterator();
243 while (iterator.hasNext()) {
244 Map.Entry entry = (Map.Entry) iterator.next();
245 String name = (String) entry.getKey();
246 Object value = entry.getValue();
247 message.setObjectProperty(name, value);
248 }
249 }
250
251 /***
252 * Add a message.
253 *
254 * @param message the message to add
255 * @param type the type of the message
256 * @throws JMSException for any JMS error
257 * @throws PersistenceException for any persistence error
258 */
259 protected void add(Message message, String type)
260 throws JMSException, PersistenceException {
261
262 PreparedStatement insert = null;
263 String messageId = null;
264 try {
265 insert = _connection.prepareStatement(
266 "insert into " + MESSAGE_TABLE
267 + " values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
268
269 messageId = message.getJMSMessageID();
270
271 long destinationId = _destinations.getId(
272 (JmsDestination) message.getJMSDestination());
273
274
275 insert.setString(1, messageId);
276 insert.setString(2, type);
277 insert.setString(3, message.getJMSCorrelationID());
278 insert.setInt(4, message.getJMSDeliveryMode());
279 insert.setLong(5, destinationId);
280 insert.setLong(6, message.getJMSExpiration());
281 insert.setInt(7, message.getJMSPriority());
282 insert.setBoolean(8, message.getJMSRedelivered());
283 long replyToId = 0;
284 if (message.getJMSReplyTo() != null) {
285 JmsDestination replyTo =
286 (JmsDestination) message.getJMSReplyTo();
287 replyToId = _destinations.getId(replyTo);
288 }
289 insert.setLong(9, replyToId);
290 insert.setLong(10, message.getJMSTimestamp());
291 insert.setString(11, message.getJMSType());
292 Object body = getBody(message);
293 byte[] blob;
294 try {
295 blob = serialize(body);
296 } catch (Exception exception) {
297 throw new PersistenceException(
298 "Failed to serialize message body, JMSMessageID="
299 + messageId, exception);
300 }
301 insert.setObject(12, blob);
302
303 insert.executeUpdate();
304
305
306 Enumeration iterator = message.getPropertyNames();
307 while (iterator.hasMoreElements()) {
308 String name = (String) iterator.nextElement();
309 Object value = message.getObjectProperty(name);
310 addProperty(messageId, name, value);
311 }
312 _connection.commit();
313 } catch (SQLException exception) {
314 throw new PersistenceException(
315 "Failed to add message, JMSMessageID=" + messageId,
316 exception);
317 } finally {
318 SQLHelper.close(insert);
319 }
320 }
321
322 /***
323 * Returns properties for a message.
324 *
325 * @param messageId the message identifier
326 * @return a map of properties
327 * @throws PersistenceException for any persistence error
328 */
329 protected Map getProperties(String messageId)
330 throws PersistenceException {
331
332 HashMap result = new HashMap();
333
334 PreparedStatement select = null;
335 ResultSet set = null;
336 try {
337 select = _connection.prepareStatement(
338 "select name, value from " + MESSAGE_PROPERTIES_TABLE
339 + " where message_id = ?");
340 select.setString(1, messageId);
341 set = select.executeQuery();
342 while (set.next()) {
343 String name = set.getString("name");
344 Blob blob = set.getBlob("value");
345 Object value;
346 try {
347 value = deserialize(blob);
348 } catch (Exception exception) {
349 String message = "Failed to destream property for "
350 + "message, JMSMessageID=" + messageId
351 + ", property=" + name;
352 throw new PersistenceException(message, exception);
353 }
354 result.put(name, value);
355 }
356 } catch (SQLException exception) {
357 throw new PersistenceException(
358 "Failed to get properties for message, JMSMessageID="
359 + messageId, exception);
360 } finally {
361 SQLHelper.close(set);
362 SQLHelper.close(select);
363 }
364 return result;
365 }
366
367 /***
368 * Add a property.
369 *
370 * @param messageId the message identifier
371 * @param name the property name
372 * @param value the property value
373 * @throws PersistenceException for any persistence error
374 */
375 protected void addProperty(String messageId,
376 String name, Object value)
377 throws PersistenceException {
378
379 byte[] blob;
380 try {
381 blob = serialize(value);
382 } catch (IOException exception) {
383 String message = "Failed to serialize property for message, "
384 + "JMSMessageID=" + messageId + ", name=" + name;
385 if (value != null) {
386 message += " of type " + value.getClass().getName();
387 }
388 throw new PersistenceException(message, exception);
389 }
390
391 PreparedStatement insert = null;
392 try {
393 insert = _connection.prepareStatement(
394 "insert into " + MESSAGE_PROPERTIES_TABLE
395 + " values (?, ?, ?)");
396 insert.setString(1, messageId);
397 insert.setString(2, name);
398 insert.setObject(3, blob);
399 insert.executeUpdate();
400 } catch (SQLException exception) {
401 throw new PersistenceException(
402 "Failed to add property for message, JMSMessageID="
403 + messageId + ", name=" + name + ", value=" + value,
404 exception);
405 } finally {
406 SQLHelper.close(insert);
407 }
408 }
409
410 /***
411 * Helper to serialize an object to a byte array.
412 *
413 * @param object the object to serialize
414 * @return the serialized object
415 * @throws IOException if the object cannot be serialized
416 */
417 public byte[] serialize(Object object) throws IOException {
418 byte[] result;
419 ByteArrayOutputStream bstream = new ByteArrayOutputStream();
420 ObjectOutputStream ostream = new ObjectOutputStream(bstream);
421 ostream.writeObject(object);
422 ostream.close();
423 result = bstream.toByteArray();
424 return result;
425 }
426
427 /***
428 * Helper to deserialize an object from a byte array.
429 *
430 * @param blob the blob containing object to deserialize
431 * @return the destreamed object
432 * @throws ClassNotFoundException if the class of a serialized object cannot
433 * be found.
434 * @throws IOException if the object cannot be deserialized
435 * @throws SQLException if there is an error accessing the
436 * <code>blob</code>
437 */
438 protected Object deserialize(Blob blob)
439 throws ClassNotFoundException, IOException, SQLException {
440 Object result = null;
441
442 if (blob != null) {
443 ObjectInputStream istream = new ObjectInputStream(
444 blob.getBinaryStream());
445 result = istream.readObject();
446 istream.close();
447 }
448 return result;
449 }
450
451 /***
452 * Returns the database connection.
453 *
454 * @return the connection to the database
455 */
456 protected Connection getConnection() {
457 return _connection;
458 }
459
460 }