1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2002 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: Importer.java,v 1.4 2001/11/20 20:56:28 tima Exp $
44 */
45
46 package org.exolab.jms.tools.migration.proxy;
47
48 import java.io.ByteArrayInputStream;
49 import java.io.ByteArrayOutputStream;
50 import java.io.IOException;
51 import java.io.ObjectInputStream;
52 import java.io.ObjectOutputStream;
53 import java.sql.Connection;
54 import java.sql.PreparedStatement;
55 import java.sql.ResultSet;
56 import java.sql.SQLException;
57 import java.util.Enumeration;
58 import java.util.HashMap;
59 import java.util.Iterator;
60 import java.util.Map;
61
62 import javax.jms.Destination;
63 import javax.jms.JMSException;
64 import javax.jms.Message;
65
66 import org.exolab.jms.client.JmsDestination;
67 import org.exolab.jms.persistence.SQLHelper;
68
69
70 abstract class AbstractMessageHandler {
71
72 private Context _context;
73
74 /***
75 * Construct a new <code>AbstractMessageHandler</code>
76 *
77 * @param context context information
78 */
79 public AbstractMessageHandler(Context context) {
80 _context = context;
81 }
82
83 /***
84 * Retrieve the message associated with the supplied message identifier
85 *
86 * @param messageId the identifier of the message to retrieve
87 * @return the message corresponding to <code>messageId</code>
88 * @throws JMSException if a JMS exception is encountered
89 * @throws SQLException in an SQL exception is encountered
90 */
91 public abstract Message get(String messageId)
92 throws JMSException, SQLException;
93
94 public abstract void put(Message message)
95 throws JMSException, SQLException;
96
97 protected Message get(String messageId, Message message)
98 throws JMSException, SQLException {
99
100 PreparedStatement select = null;
101 ResultSet set = null;
102 try {
103 select = getConnection().prepareStatement(
104 "select * from exp_message where message_id = ?");
105 select.setString(1, messageId);
106 set = select.executeQuery();
107 if (set.next()) {
108 String correlationId = set.getString("correlation_id");
109 int deliveryMode = set.getInt("delivery_mode");
110 long destinationId = set.getLong("destination_id");
111 long expiration = set.getLong("expiration");
112 int priority = set.getInt("priority");
113 boolean redelivered = set.getBoolean("redelivered");
114 long replyToId = set.getLong("reply_to_id");
115 long timestamp = set.getLong("timestamp");
116 String type = set.getString("type");
117
118 Destinations destinations = _context.getDestinations();
119 Destination destination = destinations.get(destinationId);
120
121 message.setJMSMessageID(messageId);
122 message.setJMSCorrelationID(correlationId);
123 message.setJMSDeliveryMode(deliveryMode);
124 message.setJMSDestination(destination);
125 message.setJMSExpiration(expiration);
126 message.setJMSPriority(priority);
127 message.setJMSRedelivered(redelivered);
128 if (replyToId != 0) {
129 Destination replyTo = destinations.get(replyToId);
130 message.setJMSReplyTo(replyTo);
131 }
132 message.setJMSTimestamp(timestamp);
133 message.setJMSType(type);
134
135 getProperties(messageId, message);
136 }
137 } finally {
138 SQLHelper.close(set);
139 SQLHelper.close(select);
140 }
141 return message;
142 }
143
144 protected void getProperties(String messageId, Message message)
145 throws JMSException, SQLException {
146
147 HashMap properties = getProperties("exp_msg_props", messageId);
148 Iterator iterator = properties.entrySet().iterator();
149 while (iterator.hasNext()) {
150 Map.Entry entry = (Map.Entry) iterator.next();
151 String name = (String) entry.getKey();
152 Object value = entry.getValue();
153 message.setObjectProperty(name, value);
154 }
155 }
156
157 protected void put(Message message, String type)
158 throws JMSException, SQLException {
159
160 Destinations destinations = _context.getDestinations();
161
162 PreparedStatement insert = null;
163 try {
164 insert = getConnection().prepareStatement(
165 "insert into exp_message values " +
166 "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
167
168 String messageId = message.getJMSMessageID();
169
170 long destinationId = destinations.getId(
171 (JmsDestination) message.getJMSDestination());
172
173 // insert message header
174 insert.setString(1, messageId);
175 insert.setString(2, type);
176 insert.setString(3, message.getJMSCorrelationID());
177 insert.setInt(4, message.getJMSDeliveryMode());
178 insert.setLong(5, destinationId);
179 insert.setLong(6, message.getJMSExpiration());
180 insert.setInt(7, message.getJMSPriority());
181 insert.setBoolean(8, message.getJMSRedelivered());
182 if (message.getJMSReplyTo() != null) {
183 JmsDestination replyTo =
184 (JmsDestination) message.getJMSReplyTo();
185 long replyToId = destinations.getId(replyTo);
186 insert.setLong(9, replyToId);
187 }
188 insert.setLong(10, message.getJMSTimestamp());
189 insert.setString(11, message.getJMSType());
190
191 insert.executeUpdate();
192
193 // insert header properties
194 Enumeration iterator = message.getPropertyNames();
195 while (iterator.hasMoreElements()) {
196 String name = (String) iterator.nextElement();
197 Object value = message.getObjectProperty(name);
198 addProperty("exp_msg_props", messageId, name, value);
199 }
200 } finally {
201 SQLHelper.close(insert);
202 }
203 }
204
205 protected Connection getConnection() {
206 return _context.getConnection();
207 }
208
209 protected HashMap getProperties(String table, String messageId)
210 throws SQLException {
211
212 HashMap result = new HashMap();
213
214 PreparedStatement select = null;
215 ResultSet set = null;
216 try {
217 select = getConnection().prepareStatement(
218 "select * from " + table + " where message_id = ?");
219 select.setString(1, messageId);
220 set = select.executeQuery();
221 while (set.next()) {
222 String name = set.getString("name");
223 byte[] blob = set.getBytes("value");
224 Object value;
225 try {
226 value = deserialize(blob);
227 } catch (Exception exception) {
228 String message = "Failed to destream property for " +
229 "message, JMSMessageID = " + messageId +
230 ", property=" + name + ": " + exception.getMessage();
231 throw new SQLException(message);
232 }
233 result.put(name, value);
234 }
235 } finally {
236 SQLHelper.close(set);
237 SQLHelper.close(select);
238 }
239 return result;
240 }
241
242 protected void addProperty(String table, String messageId,
243 String name, Object value)
244 throws SQLException {
245
246 byte[] blob;
247 try {
248 blob = serialize(value);
249 } catch (IOException exception) {
250 String message = "Failed to serialize object";
251 if (value != null) {
252 message += " of type " + value.getClass().getName();
253 }
254 message += ":" + exception.getMessage();
255 throw new SQLException(message);
256 }
257
258 PreparedStatement insert = null;
259 try {
260 insert = getConnection().prepareStatement(
261 "insert into " + table + " values (?, ?, ?)");
262 insert.setString(1, messageId);
263 insert.setString(2, name);
264 insert.setBytes(3, blob);
265 insert.executeUpdate();
266 } finally {
267 SQLHelper.close(insert);
268 }
269 }
270
271 /***
272 * Serialize an object to a byte array
273 *
274 * @param object the object to serialize
275 * @return the serialized object
276 * @throws IOException if the object cannot be serialized
277 */
278 public byte[] serialize(Object object) throws IOException {
279
280 byte[] result = null;
281 ByteArrayOutputStream bstream = new ByteArrayOutputStream();
282 ObjectOutputStream ostream = new ObjectOutputStream(bstream);
283 ostream.writeObject(object);
284 ostream.close();
285 result = bstream.toByteArray();
286 return result;
287 }
288
289 /***
290 * Deserialize an object from a byte array
291 *
292 * @param blob the array containing object to deserialize
293 * @return the destreamed object
294 * @throws ClassNotFoundException if the class of a serialized object
295 * cannot be found.
296 * @throws IOException if the object cannot be deserialized
297 */
298 protected Object deserialize(byte[] blob)
299 throws ClassNotFoundException, IOException {
300 Object result = null;
301
302 if (blob != null) {
303 ByteArrayInputStream bstream = new ByteArrayInputStream(blob);
304 ObjectInputStream istream = new ObjectInputStream(bstream);
305 result = istream.readObject();
306 istream.close();
307 }
308 return result;
309 }
310
311 } //-- AbstractMessageHandler
This page was automatically generated by Maven