1 /*** 2 * Redistribution and use of this software and associated documentation 3 * ("Software"), with or without modification, are permitted provided 4 * that the following conditions are met: 5 * 6 * 1. Redistributions of source code must retain copyright 7 * statements and notices. Redistributions must also contain a 8 * copy of this document. 9 * 10 * 2. Redistributions in binary form must reproduce the 11 * above copyright notice, this list of conditions and the 12 * following disclaimer in the documentation and/or other 13 * materials provided with the distribution. 14 * 15 * 3. The name "Exolab" must not be used to endorse or promote 16 * products derived from this Software without prior written 17 * permission of Exoffice Technologies. For written permission, 18 * please contact info@exolab.org. 19 * 20 * 4. Products derived from this Software may not be called "Exolab" 21 * nor may "Exolab" appear in their names without prior written 22 * permission of Exoffice Technologies. Exolab is a registered 23 * trademark of Exoffice Technologies. 24 * 25 * 5. Due credit should be given to the Exolab Project 26 * (http://www.exolab.org/). 27 * 28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS 29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT 30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND 31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL 32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, 33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, 37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED 39 * OF THE POSSIBILITY OF SUCH DAMAGE. 40 * 41 * Copyright 2005 (C) Exoffice Technologies Inc. All Rights Reserved. 42 */ 43 package org.exolab.jms.tools.migration.proxy; 44 45 import java.sql.PreparedStatement; 46 import java.sql.ResultSet; 47 import java.sql.SQLException; 48 import java.sql.Connection; 49 import java.util.ArrayList; 50 import java.util.Iterator; 51 import java.util.List; 52 import javax.jms.JMSException; 53 54 import org.exolab.jms.client.JmsDestination; 55 import org.exolab.jms.client.JmsQueue; 56 import org.exolab.jms.persistence.PersistenceException; 57 import org.exolab.jms.persistence.SQLHelper; 58 import org.exolab.jms.tools.migration.Store; 59 import org.exolab.jms.tools.migration.StoreIterator; 60 61 62 /*** 63 * Provides persistency for {@link Consumer} instances. 64 * 65 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a> 66 * @version $Revision: 1.2 $ $Date: 2005/10/20 14:07:03 $ 67 */ 68 public class ConsumerStore implements Store, DBConstants { 69 70 /*** 71 * The destination store. 72 */ 73 private final DestinationStore _destinations; 74 75 /*** 76 * The database connection. 77 */ 78 private final Connection _connection; 79 80 /*** 81 * The seed used to generate identifiers for consumers. 82 */ 83 private long _seed = 0; 84 85 86 /*** 87 * Construct a new <code>ConsumerStore</code>. 88 * 89 * @param destinations the destination store 90 * @param connection the database connection 91 */ 92 public ConsumerStore(DestinationStore destinations, Connection connection) { 93 _destinations = destinations; 94 _connection = connection; 95 } 96 97 /*** 98 * Export the consumers. 99 * 100 * @return an iterator over the collection 101 * @throws JMSException for any JMS error 102 * @throws PersistenceException for any persistence error 103 */ 104 public StoreIterator exportCollection() throws JMSException, 105 PersistenceException { 106 List consumerIds = getConsumerIds(); 107 return new ConsumerIterator(consumerIds); 108 } 109 110 /*** 111 * Import consumers into the store. 112 * 113 * @param iterator an iterator over the collection 114 * @throws JMSException for any JMS error 115 * @throws PersistenceException for any persistence error 116 */ 117 public void importCollection(StoreIterator iterator) throws JMSException, 118 PersistenceException { 119 while (iterator.hasNext()) { 120 Consumer consumer = (Consumer) iterator.next(); 121 add(consumer); 122 } 123 } 124 125 /*** 126 * Returns the number of elements in the collection. 127 * 128 * @return the number of elements in the collection 129 * @throws PersistenceException for any persistence error 130 */ 131 public int size() throws PersistenceException { 132 return getConsumerIds().size(); 133 } 134 135 /*** 136 * Add a new consumer. 137 * 138 * @param consumer the consumer to add 139 * @throws PersistenceException for any persistence error 140 */ 141 public synchronized void add(Consumer consumer) 142 throws PersistenceException { 143 PreparedStatement insert = null; 144 try { 145 long consumerId = ++_seed; 146 147 insert = _connection.prepareStatement( 148 "insert into " + CONSUMER_TABLE + " values (?, ?, ?, ?)"); 149 150 insert.setLong(1, consumerId); 151 insert.setString(2, consumer.getName()); 152 insert.setString(3, consumer.getClientID()); 153 insert.setBoolean(4, consumer.isQueueConsumer()); 154 insert.executeUpdate(); 155 156 addSubscriptions(consumerId, consumer); 157 } catch (SQLException exception) { 158 throw new PersistenceException("Failed to add consumer", 159 exception); 160 } finally { 161 SQLHelper.close(insert); 162 } 163 } 164 165 /*** 166 * Returns a consumer for a given identifier. 167 * 168 * @param consumerId the identity of the consumer 169 * @return the consumer corresponding to <code>consumerId</code> or 170 * <code>null</code> if no such consumer exists 171 * @throws PersistenceException for any persistence error 172 */ 173 public Consumer get(long consumerId) throws PersistenceException { 174 Consumer result = null; 175 PreparedStatement select = null; 176 ResultSet set = null; 177 try { 178 select = _connection.prepareStatement( 179 "select name, client_id, queue_consumer from " 180 + CONSUMER_TABLE + " where consumer_id = ?"); 181 select.setLong(1, consumerId); 182 set = select.executeQuery(); 183 if (set.next()) { 184 String name = set.getString(1); 185 String clientId = set.getString(2); 186 boolean isQueue = set.getBoolean(3); 187 if (isQueue) { 188 result = new Consumer(new JmsQueue(name)); 189 } else { 190 result = new Consumer(name, clientId); 191 } 192 getSubscriptions(consumerId, result); 193 } 194 } catch (SQLException exception) { 195 throw new PersistenceException("Failed to get consumer", 196 exception); 197 } finally { 198 SQLHelper.close(set); 199 SQLHelper.close(select); 200 } 201 return result; 202 } 203 204 /*** 205 * Returns all consumer identifiers. 206 * 207 * @return a list of consumer identifiers 208 * @throws PersistenceException for any persistence error 209 */ 210 public List getConsumerIds() throws PersistenceException { 211 ArrayList result = new ArrayList(); 212 213 PreparedStatement select = null; 214 ResultSet set = null; 215 try { 216 select = _connection.prepareStatement( 217 "select consumer_id from " + CONSUMER_TABLE); 218 219 set = select.executeQuery(); 220 while (set.next()) { 221 long consumerId = set.getLong("consumer_id"); 222 result.add(new Long(consumerId)); 223 } 224 } catch (SQLException exception) { 225 throw new PersistenceException( 226 "Failed to retrieve consumer identifiers", exception); 227 } finally { 228 SQLHelper.close(set); 229 SQLHelper.close(select); 230 } 231 return result; 232 } 233 234 /*** 235 * Add subscriptions for a consumer. 236 * 237 * @param consumerId the identity of the consumer 238 * @param consumer the consumer 239 * @throws PersistenceException for any persistence error 240 */ 241 protected void addSubscriptions(long consumerId, Consumer consumer) 242 throws PersistenceException { 243 Iterator iterator = consumer.getSubscriptions().iterator(); 244 while (iterator.hasNext()) { 245 Subscription subscription = (Subscription) iterator.next(); 246 long destinationId = _destinations.getId( 247 subscription.getDestination()); 248 if (destinationId == -1) { 249 throw new PersistenceException( 250 "Destination identifier not found for destination=" 251 + subscription.getDestination().getName()); 252 } 253 254 PreparedStatement insert = null; 255 try { 256 insert = _connection.prepareStatement( 257 "insert into " + SUBSCRIPTION_TABLE + " values (?, ?)"); 258 259 insert.setLong(1, consumerId); 260 insert.setLong(2, destinationId); 261 insert.executeUpdate(); 262 } catch (SQLException exception) { 263 throw new PersistenceException("Failed to insert subscription", 264 exception); 265 } finally { 266 SQLHelper.close(insert); 267 } 268 269 addMessages(consumerId, destinationId, subscription); 270 } 271 } 272 273 /*** 274 * Add messages for a subscription. 275 * 276 * @param consumerId the identity of the consumer 277 * @param destinationId the identify of the destination 278 * @param subscription the consumer subscription 279 * @throws PersistenceException for any persistence error 280 */ 281 protected void addMessages(long consumerId, long destinationId, 282 Subscription subscription) 283 throws PersistenceException { 284 285 Iterator iterator = subscription.getMessages().iterator(); 286 while (iterator.hasNext()) { 287 MessageState message = (MessageState) iterator.next(); 288 289 PreparedStatement insert = null; 290 try { 291 insert = _connection.prepareStatement( 292 "insert into " + MESSAGE_HANDLE_TABLE + " " 293 + "(message_id, destination_id, consumer_id, delivered)" 294 + " values (?, ?, ?, ?)"); 295 insert.setString(1, message.getMessageId()); 296 insert.setLong(2, destinationId); 297 insert.setLong(3, consumerId); 298 insert.setBoolean(4, message.getDelivered()); 299 insert.executeUpdate(); 300 } catch (SQLException exception) { 301 throw new PersistenceException( 302 "Failed to insert subscription state", exception); 303 } finally { 304 SQLHelper.close(insert); 305 } 306 } 307 } 308 309 /*** 310 * Get subscriptions for a consumer. 311 * 312 * @param consumerId the identity of the consumer 313 * @param consumer the consumer to populate 314 * @throws PersistenceException for any persistence error 315 */ 316 protected void getSubscriptions(long consumerId, Consumer consumer) 317 throws PersistenceException { 318 319 PreparedStatement select = null; 320 ResultSet set = null; 321 try { 322 select = _connection.prepareStatement( 323 "select destination_id " 324 + "from " + SUBSCRIPTION_TABLE 325 + " where consumer_id = ?"); 326 select.setLong(1, consumerId); 327 328 set = select.executeQuery(); 329 while (set.next()) { 330 long destinationId = set.getLong("destination_id"); 331 JmsDestination destination = _destinations.get(destinationId); 332 if (destination == null) { 333 throw new PersistenceException( 334 "Failed to locate destination for id=" 335 + destinationId); 336 } 337 Subscription subscription = new Subscription(destination); 338 getMessages(consumerId, destinationId, subscription); 339 consumer.addSubscription(subscription); 340 } 341 } catch (SQLException exception) { 342 throw new PersistenceException( 343 "Failed to get subscriptions for consumer=" + consumerId, 344 exception); 345 } finally { 346 SQLHelper.close(set); 347 SQLHelper.close(select); 348 } 349 } 350 351 /*** 352 * Get messages for a subscription. 353 * 354 * @param consumerId the identity of the consumer 355 * @param destinationId the identify of the destination 356 * @param subscription the consumer subscription 357 * @throws SQLException if a database error is encountered 358 */ 359 protected void getMessages(long consumerId, long destinationId, 360 Subscription subscription) 361 throws SQLException { 362 363 PreparedStatement select = null; 364 ResultSet set = null; 365 try { 366 select = _connection.prepareStatement( 367 "select message_id, delivered " 368 + "from " + MESSAGE_HANDLE_TABLE 369 + " where consumer_id = ? and destination_id = ?"); 370 select.setLong(1, consumerId); 371 select.setLong(2, destinationId); 372 373 set = select.executeQuery(); 374 while (set.next()) { 375 String messageId = set.getString("message_id"); 376 boolean delivered = set.getBoolean("delivered"); 377 subscription.addMessage(messageId, delivered); 378 } 379 } finally { 380 SQLHelper.close(set); 381 SQLHelper.close(select); 382 } 383 } 384 385 private class ConsumerIterator implements StoreIterator { 386 387 private Iterator _iterator; 388 389 public ConsumerIterator(List consumerIds) { 390 _iterator = consumerIds.iterator(); 391 } 392 393 public boolean hasNext() { 394 return _iterator.hasNext(); 395 } 396 397 public Object next() throws PersistenceException { 398 Consumer result = null; 399 400 Long consumerId = (Long) _iterator.next(); 401 402 result = get(consumerId.longValue()); 403 return result; 404 } 405 } 406 407 } 408 409 410