1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 */
43 package org.exolab.jms.tools.migration.proxy;
44
45 import java.sql.PreparedStatement;
46 import java.sql.ResultSet;
47 import java.sql.SQLException;
48 import java.sql.Connection;
49 import java.util.ArrayList;
50 import java.util.Iterator;
51 import java.util.List;
52 import javax.jms.JMSException;
53
54 import org.exolab.jms.client.JmsDestination;
55 import org.exolab.jms.client.JmsQueue;
56 import org.exolab.jms.persistence.PersistenceException;
57 import org.exolab.jms.persistence.SQLHelper;
58 import org.exolab.jms.tools.migration.Store;
59 import org.exolab.jms.tools.migration.StoreIterator;
60
61
62 /***
63 * Provides persistency for {@link Consumer} instances.
64 *
65 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
66 * @version $Revision: 1.2 $ $Date: 2005/10/20 14:07:03 $
67 */
68 public class ConsumerStore implements Store, DBConstants {
69
70 /***
71 * The destination store.
72 */
73 private final DestinationStore _destinations;
74
75 /***
76 * The database connection.
77 */
78 private final Connection _connection;
79
80 /***
81 * The seed used to generate identifiers for consumers.
82 */
83 private long _seed = 0;
84
85
86 /***
87 * Construct a new <code>ConsumerStore</code>.
88 *
89 * @param destinations the destination store
90 * @param connection the database connection
91 */
92 public ConsumerStore(DestinationStore destinations, Connection connection) {
93 _destinations = destinations;
94 _connection = connection;
95 }
96
97 /***
98 * Export the consumers.
99 *
100 * @return an iterator over the collection
101 * @throws JMSException for any JMS error
102 * @throws PersistenceException for any persistence error
103 */
104 public StoreIterator exportCollection() throws JMSException,
105 PersistenceException {
106 List consumerIds = getConsumerIds();
107 return new ConsumerIterator(consumerIds);
108 }
109
110 /***
111 * Import consumers into the store.
112 *
113 * @param iterator an iterator over the collection
114 * @throws JMSException for any JMS error
115 * @throws PersistenceException for any persistence error
116 */
117 public void importCollection(StoreIterator iterator) throws JMSException,
118 PersistenceException {
119 while (iterator.hasNext()) {
120 Consumer consumer = (Consumer) iterator.next();
121 add(consumer);
122 }
123 }
124
125 /***
126 * Returns the number of elements in the collection.
127 *
128 * @return the number of elements in the collection
129 * @throws PersistenceException for any persistence error
130 */
131 public int size() throws PersistenceException {
132 return getConsumerIds().size();
133 }
134
135 /***
136 * Add a new consumer.
137 *
138 * @param consumer the consumer to add
139 * @throws PersistenceException for any persistence error
140 */
141 public synchronized void add(Consumer consumer)
142 throws PersistenceException {
143 PreparedStatement insert = null;
144 try {
145 long consumerId = ++_seed;
146
147 insert = _connection.prepareStatement(
148 "insert into " + CONSUMER_TABLE + " values (?, ?, ?, ?)");
149
150 insert.setLong(1, consumerId);
151 insert.setString(2, consumer.getName());
152 insert.setString(3, consumer.getClientID());
153 insert.setBoolean(4, consumer.isQueueConsumer());
154 insert.executeUpdate();
155
156 addSubscriptions(consumerId, consumer);
157 } catch (SQLException exception) {
158 throw new PersistenceException("Failed to add consumer",
159 exception);
160 } finally {
161 SQLHelper.close(insert);
162 }
163 }
164
165 /***
166 * Returns a consumer for a given identifier.
167 *
168 * @param consumerId the identity of the consumer
169 * @return the consumer corresponding to <code>consumerId</code> or
170 * <code>null</code> if no such consumer exists
171 * @throws PersistenceException for any persistence error
172 */
173 public Consumer get(long consumerId) throws PersistenceException {
174 Consumer result = null;
175 PreparedStatement select = null;
176 ResultSet set = null;
177 try {
178 select = _connection.prepareStatement(
179 "select name, client_id, queue_consumer from "
180 + CONSUMER_TABLE + " where consumer_id = ?");
181 select.setLong(1, consumerId);
182 set = select.executeQuery();
183 if (set.next()) {
184 String name = set.getString(1);
185 String clientId = set.getString(2);
186 boolean isQueue = set.getBoolean(3);
187 if (isQueue) {
188 result = new Consumer(new JmsQueue(name));
189 } else {
190 result = new Consumer(name, clientId);
191 }
192 getSubscriptions(consumerId, result);
193 }
194 } catch (SQLException exception) {
195 throw new PersistenceException("Failed to get consumer",
196 exception);
197 } finally {
198 SQLHelper.close(set);
199 SQLHelper.close(select);
200 }
201 return result;
202 }
203
204 /***
205 * Returns all consumer identifiers.
206 *
207 * @return a list of consumer identifiers
208 * @throws PersistenceException for any persistence error
209 */
210 public List getConsumerIds() throws PersistenceException {
211 ArrayList result = new ArrayList();
212
213 PreparedStatement select = null;
214 ResultSet set = null;
215 try {
216 select = _connection.prepareStatement(
217 "select consumer_id from " + CONSUMER_TABLE);
218
219 set = select.executeQuery();
220 while (set.next()) {
221 long consumerId = set.getLong("consumer_id");
222 result.add(new Long(consumerId));
223 }
224 } catch (SQLException exception) {
225 throw new PersistenceException(
226 "Failed to retrieve consumer identifiers", exception);
227 } finally {
228 SQLHelper.close(set);
229 SQLHelper.close(select);
230 }
231 return result;
232 }
233
234 /***
235 * Add subscriptions for a consumer.
236 *
237 * @param consumerId the identity of the consumer
238 * @param consumer the consumer
239 * @throws PersistenceException for any persistence error
240 */
241 protected void addSubscriptions(long consumerId, Consumer consumer)
242 throws PersistenceException {
243 Iterator iterator = consumer.getSubscriptions().iterator();
244 while (iterator.hasNext()) {
245 Subscription subscription = (Subscription) iterator.next();
246 long destinationId = _destinations.getId(
247 subscription.getDestination());
248 if (destinationId == -1) {
249 throw new PersistenceException(
250 "Destination identifier not found for destination="
251 + subscription.getDestination().getName());
252 }
253
254 PreparedStatement insert = null;
255 try {
256 insert = _connection.prepareStatement(
257 "insert into " + SUBSCRIPTION_TABLE + " values (?, ?)");
258
259 insert.setLong(1, consumerId);
260 insert.setLong(2, destinationId);
261 insert.executeUpdate();
262 } catch (SQLException exception) {
263 throw new PersistenceException("Failed to insert subscription",
264 exception);
265 } finally {
266 SQLHelper.close(insert);
267 }
268
269 addMessages(consumerId, destinationId, subscription);
270 }
271 }
272
273 /***
274 * Add messages for a subscription.
275 *
276 * @param consumerId the identity of the consumer
277 * @param destinationId the identify of the destination
278 * @param subscription the consumer subscription
279 * @throws PersistenceException for any persistence error
280 */
281 protected void addMessages(long consumerId, long destinationId,
282 Subscription subscription)
283 throws PersistenceException {
284
285 Iterator iterator = subscription.getMessages().iterator();
286 while (iterator.hasNext()) {
287 MessageState message = (MessageState) iterator.next();
288
289 PreparedStatement insert = null;
290 try {
291 insert = _connection.prepareStatement(
292 "insert into " + MESSAGE_HANDLE_TABLE + " "
293 + "(message_id, destination_id, consumer_id, delivered)"
294 + " values (?, ?, ?, ?)");
295 insert.setString(1, message.getMessageId());
296 insert.setLong(2, destinationId);
297 insert.setLong(3, consumerId);
298 insert.setBoolean(4, message.getDelivered());
299 insert.executeUpdate();
300 } catch (SQLException exception) {
301 throw new PersistenceException(
302 "Failed to insert subscription state", exception);
303 } finally {
304 SQLHelper.close(insert);
305 }
306 }
307 }
308
309 /***
310 * Get subscriptions for a consumer.
311 *
312 * @param consumerId the identity of the consumer
313 * @param consumer the consumer to populate
314 * @throws PersistenceException for any persistence error
315 */
316 protected void getSubscriptions(long consumerId, Consumer consumer)
317 throws PersistenceException {
318
319 PreparedStatement select = null;
320 ResultSet set = null;
321 try {
322 select = _connection.prepareStatement(
323 "select destination_id "
324 + "from " + SUBSCRIPTION_TABLE
325 + " where consumer_id = ?");
326 select.setLong(1, consumerId);
327
328 set = select.executeQuery();
329 while (set.next()) {
330 long destinationId = set.getLong("destination_id");
331 JmsDestination destination = _destinations.get(destinationId);
332 if (destination == null) {
333 throw new PersistenceException(
334 "Failed to locate destination for id="
335 + destinationId);
336 }
337 Subscription subscription = new Subscription(destination);
338 getMessages(consumerId, destinationId, subscription);
339 consumer.addSubscription(subscription);
340 }
341 } catch (SQLException exception) {
342 throw new PersistenceException(
343 "Failed to get subscriptions for consumer=" + consumerId,
344 exception);
345 } finally {
346 SQLHelper.close(set);
347 SQLHelper.close(select);
348 }
349 }
350
351 /***
352 * Get messages for a subscription.
353 *
354 * @param consumerId the identity of the consumer
355 * @param destinationId the identify of the destination
356 * @param subscription the consumer subscription
357 * @throws SQLException if a database error is encountered
358 */
359 protected void getMessages(long consumerId, long destinationId,
360 Subscription subscription)
361 throws SQLException {
362
363 PreparedStatement select = null;
364 ResultSet set = null;
365 try {
366 select = _connection.prepareStatement(
367 "select message_id, delivered "
368 + "from " + MESSAGE_HANDLE_TABLE
369 + " where consumer_id = ? and destination_id = ?");
370 select.setLong(1, consumerId);
371 select.setLong(2, destinationId);
372
373 set = select.executeQuery();
374 while (set.next()) {
375 String messageId = set.getString("message_id");
376 boolean delivered = set.getBoolean("delivered");
377 subscription.addMessage(messageId, delivered);
378 }
379 } finally {
380 SQLHelper.close(set);
381 SQLHelper.close(select);
382 }
383 }
384
385 private class ConsumerIterator implements StoreIterator {
386
387 private Iterator _iterator;
388
389 public ConsumerIterator(List consumerIds) {
390 _iterator = consumerIds.iterator();
391 }
392
393 public boolean hasNext() {
394 return _iterator.hasNext();
395 }
396
397 public Object next() throws PersistenceException {
398 Consumer result = null;
399
400 Long consumerId = (Long) _iterator.next();
401
402 result = get(consumerId.longValue());
403 return result;
404 }
405 }
406
407 }
408
409
410