View Javadoc
1 /*** 2 * Redistribution and use of this software and associated documentation 3 * ("Software"), with or without modification, are permitted provided 4 * that the following conditions are met: 5 * 6 * 1. Redistributions of source code must retain copyright 7 * statements and notices. Redistributions must also contain a 8 * copy of this document. 9 * 10 * 2. Redistributions in binary form must reproduce the 11 * above copyright notice, this list of conditions and the 12 * following disclaimer in the documentation and/or other 13 * materials provided with the distribution. 14 * 15 * 3. The name "Exolab" must not be used to endorse or promote 16 * products derived from this Software without prior written 17 * permission of Exoffice Technologies. For written permission, 18 * please contact info@exolab.org. 19 * 20 * 4. Products derived from this Software may not be called "Exolab" 21 * nor may "Exolab" appear in their names without prior written 22 * permission of Exoffice Technologies. Exolab is a registered 23 * trademark of Exoffice Technologies. 24 * 25 * 5. Due credit should be given to the Exolab Project 26 * (http://www.exolab.org/). 27 * 28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS 29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT 30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL 32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, 33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, 37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED 39 * OF THE POSSIBILITY OF SUCH DAMAGE. 40 * 41 * Copyright 2002 (C) Exoffice Technologies Inc. All Rights Reserved. 42 * 43 * $Id: Importer.java,v 1.4 2001/11/20 20:56:28 tima Exp $ 44 */ 45 46 package org.exolab.jms.tools.migration.proxy; 47 48 import java.io.ByteArrayInputStream; 49 import java.io.ByteArrayOutputStream; 50 import java.io.IOException; 51 import java.io.ObjectInputStream; 52 import java.io.ObjectOutputStream; 53 import java.sql.Connection; 54 import java.sql.PreparedStatement; 55 import java.sql.ResultSet; 56 import java.sql.SQLException; 57 import java.util.Enumeration; 58 import java.util.HashMap; 59 import java.util.Iterator; 60 import java.util.Map; 61 62 import javax.jms.Destination; 63 import javax.jms.JMSException; 64 import javax.jms.Message; 65 66 import org.exolab.jms.client.JmsDestination; 67 import org.exolab.jms.persistence.SQLHelper; 68 69 70 abstract class AbstractMessageHandler { 71 72 private Context _context; 73 74 /*** 75 * Construct a new <code>AbstractMessageHandler</code> 76 * 77 * @param context context information 78 */ 79 public AbstractMessageHandler(Context context) { 80 _context = context; 81 } 82 83 /*** 84 * Retrieve the message associated with the supplied message identifier 85 * 86 * @param messageId the identifier of the message to retrieve 87 * @return the message corresponding to <code>messageId</code> 88 * @throws JMSException if a JMS exception is encountered 89 * @throws SQLException in an SQL exception is encountered 90 */ 91 public abstract Message get(String messageId) 92 throws JMSException, SQLException; 93 94 public abstract void put(Message message) 95 throws JMSException, SQLException; 96 97 protected Message get(String messageId, Message message) 98 throws JMSException, SQLException { 99 100 PreparedStatement select = null; 101 ResultSet set = null; 102 try { 103 select = getConnection().prepareStatement( 104 "select * from exp_message where message_id = ?"); 105 select.setString(1, messageId); 106 set = select.executeQuery(); 107 if (set.next()) { 108 String correlationId = set.getString("correlation_id"); 109 int deliveryMode = set.getInt("delivery_mode"); 110 long destinationId = set.getLong("destination_id"); 111 long expiration = set.getLong("expiration"); 112 int priority = set.getInt("priority"); 113 boolean redelivered = set.getBoolean("redelivered"); 114 long replyToId = set.getLong("reply_to_id"); 115 long timestamp = set.getLong("timestamp"); 116 String type = set.getString("type"); 117 118 Destinations destinations = _context.getDestinations(); 119 Destination destination = destinations.get(destinationId); 120 121 message.setJMSMessageID(messageId); 122 message.setJMSCorrelationID(correlationId); 123 message.setJMSDeliveryMode(deliveryMode); 124 message.setJMSDestination(destination); 125 message.setJMSExpiration(expiration); 126 message.setJMSPriority(priority); 127 message.setJMSRedelivered(redelivered); 128 if (replyToId != 0) { 129 Destination replyTo = destinations.get(replyToId); 130 message.setJMSReplyTo(replyTo); 131 } 132 message.setJMSTimestamp(timestamp); 133 message.setJMSType(type); 134 135 getProperties(messageId, message); 136 } 137 } finally { 138 SQLHelper.close(set); 139 SQLHelper.close(select); 140 } 141 return message; 142 } 143 144 protected void getProperties(String messageId, Message message) 145 throws JMSException, SQLException { 146 147 HashMap properties = getProperties("exp_msg_props", messageId); 148 Iterator iterator = properties.entrySet().iterator(); 149 while (iterator.hasNext()) { 150 Map.Entry entry = (Map.Entry) iterator.next(); 151 String name = (String) entry.getKey(); 152 Object value = entry.getValue(); 153 message.setObjectProperty(name, value); 154 } 155 } 156 157 protected void put(Message message, String type) 158 throws JMSException, SQLException { 159 160 Destinations destinations = _context.getDestinations(); 161 162 PreparedStatement insert = null; 163 try { 164 insert = getConnection().prepareStatement( 165 "insert into exp_message values " + 166 "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"); 167 168 String messageId = message.getJMSMessageID(); 169 170 long destinationId = destinations.getId( 171 (JmsDestination) message.getJMSDestination()); 172 173 // insert message header 174 insert.setString(1, messageId); 175 insert.setString(2, type); 176 insert.setString(3, message.getJMSCorrelationID()); 177 insert.setInt(4, message.getJMSDeliveryMode()); 178 insert.setLong(5, destinationId); 179 insert.setLong(6, message.getJMSExpiration()); 180 insert.setInt(7, message.getJMSPriority()); 181 insert.setBoolean(8, message.getJMSRedelivered()); 182 if (message.getJMSReplyTo() != null) { 183 JmsDestination replyTo = 184 (JmsDestination) message.getJMSReplyTo(); 185 long replyToId = destinations.getId(replyTo); 186 insert.setLong(9, replyToId); 187 } 188 insert.setLong(10, message.getJMSTimestamp()); 189 insert.setString(11, message.getJMSType()); 190 191 insert.executeUpdate(); 192 193 // insert header properties 194 Enumeration iterator = message.getPropertyNames(); 195 while (iterator.hasMoreElements()) { 196 String name = (String) iterator.nextElement(); 197 Object value = message.getObjectProperty(name); 198 addProperty("exp_msg_props", messageId, name, value); 199 } 200 } finally { 201 SQLHelper.close(insert); 202 } 203 } 204 205 protected Connection getConnection() { 206 return _context.getConnection(); 207 } 208 209 protected HashMap getProperties(String table, String messageId) 210 throws SQLException { 211 212 HashMap result = new HashMap(); 213 214 PreparedStatement select = null; 215 ResultSet set = null; 216 try { 217 select = getConnection().prepareStatement( 218 "select * from " + table + " where message_id = ?"); 219 select.setString(1, messageId); 220 set = select.executeQuery(); 221 while (set.next()) { 222 String name = set.getString("name"); 223 byte[] blob = set.getBytes("value"); 224 Object value; 225 try { 226 value = deserialize(blob); 227 } catch (Exception exception) { 228 String message = "Failed to destream property for " + 229 "message, JMSMessageID = " + messageId + 230 ", property=" + name + ": " + exception.getMessage(); 231 throw new SQLException(message); 232 } 233 result.put(name, value); 234 } 235 } finally { 236 SQLHelper.close(set); 237 SQLHelper.close(select); 238 } 239 return result; 240 } 241 242 protected void addProperty(String table, String messageId, 243 String name, Object value) 244 throws SQLException { 245 246 byte[] blob; 247 try { 248 blob = serialize(value); 249 } catch (IOException exception) { 250 String message = "Failed to serialize object"; 251 if (value != null) { 252 message += " of type " + value.getClass().getName(); 253 } 254 message += ":" + exception.getMessage(); 255 throw new SQLException(message); 256 } 257 258 PreparedStatement insert = null; 259 try { 260 insert = getConnection().prepareStatement( 261 "insert into " + table + " values (?, ?, ?)"); 262 insert.setString(1, messageId); 263 insert.setString(2, name); 264 insert.setBytes(3, blob); 265 insert.executeUpdate(); 266 } finally { 267 SQLHelper.close(insert); 268 } 269 } 270 271 /*** 272 * Serialize an object to a byte array 273 * 274 * @param object the object to serialize 275 * @return the serialized object 276 * @throws IOException if the object cannot be serialized 277 */ 278 public byte[] serialize(Object object) throws IOException { 279 280 byte[] result = null; 281 ByteArrayOutputStream bstream = new ByteArrayOutputStream(); 282 ObjectOutputStream ostream = new ObjectOutputStream(bstream); 283 ostream.writeObject(object); 284 ostream.close(); 285 result = bstream.toByteArray(); 286 return result; 287 } 288 289 /*** 290 * Deserialize an object from a byte array 291 * 292 * @param blob the array containing object to deserialize 293 * @return the destreamed object 294 * @throws ClassNotFoundException if the class of a serialized object 295 * cannot be found. 296 * @throws IOException if the object cannot be deserialized 297 */ 298 protected Object deserialize(byte[] blob) 299 throws ClassNotFoundException, IOException { 300 Object result = null; 301 302 if (blob != null) { 303 ByteArrayInputStream bstream = new ByteArrayInputStream(blob); 304 ObjectInputStream istream = new ObjectInputStream(bstream); 305 result = istream.readObject(); 306 istream.close(); 307 } 308 return result; 309 } 310 311 } //-- AbstractMessageHandler

This page was automatically generated by Maven