View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   */
43  package org.exolab.jms.tools.migration.proxy;
44  
45  import java.sql.PreparedStatement;
46  import java.sql.ResultSet;
47  import java.sql.SQLException;
48  import java.sql.Connection;
49  import java.util.ArrayList;
50  import java.util.Iterator;
51  import java.util.List;
52  import javax.jms.JMSException;
53  
54  import org.exolab.jms.client.JmsDestination;
55  import org.exolab.jms.client.JmsQueue;
56  import org.exolab.jms.persistence.PersistenceException;
57  import org.exolab.jms.persistence.SQLHelper;
58  import org.exolab.jms.tools.migration.Store;
59  import org.exolab.jms.tools.migration.StoreIterator;
60  
61  
62  /***
63   * Provides persistency for {@link Consumer} instances.
64   *
65   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
66   * @version $Revision: 1.2 $ $Date: 2005/10/20 14:07:03 $
67   */
68  public class ConsumerStore implements Store, DBConstants {
69  
70      /***
71       * The destination store.
72       */
73      private final DestinationStore _destinations;
74  
75      /***
76       * The database connection.
77       */
78      private final Connection _connection;
79  
80      /***
81       * The seed used to generate identifiers for consumers.
82       */
83      private long _seed = 0;
84  
85  
86      /***
87       * Construct a new <code>ConsumerStore</code>.
88       *
89       * @param destinations the destination store
90       * @param connection the database connection
91       */
92      public ConsumerStore(DestinationStore destinations, Connection connection) {
93          _destinations = destinations;
94          _connection = connection;
95      }
96  
97      /***
98       * Export the consumers.
99       *
100      * @return an iterator over the collection
101      * @throws JMSException         for any JMS error
102      * @throws PersistenceException for any persistence error
103      */
104     public StoreIterator exportCollection() throws JMSException,
105             PersistenceException {
106         List consumerIds = getConsumerIds();
107         return new ConsumerIterator(consumerIds);
108     }
109 
110     /***
111      * Import consumers into the store.
112      *
113      * @param iterator an iterator over the collection
114      * @throws JMSException         for any JMS error
115      * @throws PersistenceException for any persistence error
116      */
117     public void importCollection(StoreIterator iterator) throws JMSException,
118             PersistenceException {
119         while (iterator.hasNext()) {
120             Consumer consumer = (Consumer) iterator.next();
121             add(consumer);
122         }
123     }
124 
125     /***
126      * Returns the number of elements in the collection.
127      *
128      * @return the number of elements in the collection
129      * @throws PersistenceException for any persistence error
130      */
131     public int size() throws PersistenceException {
132         return getConsumerIds().size();
133     }
134 
135     /***
136      * Add a new consumer.
137      *
138      * @param consumer the consumer to add
139      * @throws PersistenceException for any persistence error
140      */
141     public synchronized void add(Consumer consumer)
142             throws PersistenceException {
143         PreparedStatement insert = null;
144         try {
145             long consumerId = ++_seed;
146 
147             insert = _connection.prepareStatement(
148                     "insert into " + CONSUMER_TABLE + " values (?, ?, ?, ?)");
149 
150             insert.setLong(1, consumerId);
151             insert.setString(2, consumer.getName());
152             insert.setString(3, consumer.getClientID());
153             insert.setBoolean(4, consumer.isQueueConsumer());
154             insert.executeUpdate();
155 
156             addSubscriptions(consumerId, consumer);
157         } catch (SQLException exception) {
158             throw new PersistenceException("Failed to add consumer",
159                                            exception);
160         } finally {
161             SQLHelper.close(insert);
162         }
163     }
164 
165     /***
166      * Returns a consumer for a given identifier.
167      *
168      * @param consumerId the identity of the consumer
169      * @return the consumer corresponding to <code>consumerId</code> or
170      *         <code>null</code> if no such consumer exists
171      * @throws PersistenceException for any persistence error
172      */
173     public Consumer get(long consumerId) throws PersistenceException {
174         Consumer result = null;
175         PreparedStatement select = null;
176         ResultSet set = null;
177         try {
178             select = _connection.prepareStatement(
179                     "select name, client_id, queue_consumer from "
180                     + CONSUMER_TABLE + " where consumer_id = ?");
181             select.setLong(1, consumerId);
182             set = select.executeQuery();
183             if (set.next()) {
184                 String name = set.getString(1);
185                 String clientId = set.getString(2);
186                 boolean isQueue = set.getBoolean(3);
187                 if (isQueue) {
188                     result = new Consumer(new JmsQueue(name));
189                 } else {
190                     result = new Consumer(name, clientId);
191                 }
192                 getSubscriptions(consumerId, result);
193             }
194         } catch (SQLException exception) {
195             throw new PersistenceException("Failed to get consumer",
196                                            exception);
197         } finally {
198             SQLHelper.close(set);
199             SQLHelper.close(select);
200         }
201         return result;
202     }
203 
204     /***
205      * Returns all consumer identifiers.
206      *
207      * @return a list of consumer identifiers
208      * @throws PersistenceException for any persistence error
209      */
210     public List getConsumerIds() throws PersistenceException {
211         ArrayList result = new ArrayList();
212 
213         PreparedStatement select = null;
214         ResultSet set = null;
215         try {
216             select = _connection.prepareStatement(
217                     "select consumer_id from " + CONSUMER_TABLE);
218 
219             set = select.executeQuery();
220             while (set.next()) {
221                 long consumerId = set.getLong("consumer_id");
222                 result.add(new Long(consumerId));
223             }
224         } catch (SQLException exception) {
225             throw new PersistenceException(
226                     "Failed to retrieve consumer identifiers", exception);
227         } finally {
228             SQLHelper.close(set);
229             SQLHelper.close(select);
230         }
231         return result;
232     }
233 
234     /***
235      * Add subscriptions for a consumer.
236      *
237      * @param consumerId the identity of the consumer
238      * @param consumer   the consumer
239      * @throws PersistenceException for any persistence error
240      */
241     protected void addSubscriptions(long consumerId, Consumer consumer)
242             throws PersistenceException {
243         Iterator iterator = consumer.getSubscriptions().iterator();
244         while (iterator.hasNext()) {
245             Subscription subscription = (Subscription) iterator.next();
246             long destinationId = _destinations.getId(
247                     subscription.getDestination());
248             if (destinationId == -1) {
249                 throw new PersistenceException(
250                         "Destination identifier not found for destination="
251                         + subscription.getDestination().getName());
252             }
253 
254             PreparedStatement insert = null;
255             try {
256                 insert = _connection.prepareStatement(
257                         "insert into " + SUBSCRIPTION_TABLE + " values (?, ?)");
258 
259                 insert.setLong(1, consumerId);
260                 insert.setLong(2, destinationId);
261                 insert.executeUpdate();
262             } catch (SQLException exception) {
263                 throw new PersistenceException("Failed to insert subscription",
264                                                exception);
265             } finally {
266                 SQLHelper.close(insert);
267             }
268 
269             addMessages(consumerId, destinationId, subscription);
270         }
271     }
272 
273     /***
274      * Add messages for a subscription.
275      *
276      * @param consumerId    the identity of the consumer
277      * @param destinationId the identify of the destination
278      * @param subscription  the consumer subscription
279      * @throws PersistenceException for any persistence error
280      */
281     protected void addMessages(long consumerId, long destinationId,
282                                Subscription subscription)
283             throws PersistenceException {
284 
285         Iterator iterator = subscription.getMessages().iterator();
286         while (iterator.hasNext()) {
287             MessageState message = (MessageState) iterator.next();
288 
289             PreparedStatement insert = null;
290             try {
291                 insert = _connection.prepareStatement(
292                         "insert into " + MESSAGE_HANDLE_TABLE + " "
293                         + "(message_id, destination_id, consumer_id, delivered)"
294                         + " values (?, ?, ?, ?)");
295                 insert.setString(1, message.getMessageId());
296                 insert.setLong(2, destinationId);
297                 insert.setLong(3, consumerId);
298                 insert.setBoolean(4, message.getDelivered());
299                 insert.executeUpdate();
300             } catch (SQLException exception) {
301                 throw new PersistenceException(
302                         "Failed to insert subscription state", exception);
303             } finally {
304                 SQLHelper.close(insert);
305             }
306         }
307     }
308 
309     /***
310      * Get subscriptions for a consumer.
311      *
312      * @param consumerId the identity of the consumer
313      * @param consumer   the consumer to populate
314      * @throws PersistenceException for any persistence error
315      */
316     protected void getSubscriptions(long consumerId, Consumer consumer)
317             throws PersistenceException {
318 
319         PreparedStatement select = null;
320         ResultSet set = null;
321         try {
322             select = _connection.prepareStatement(
323                     "select destination_id "
324                     + "from " + SUBSCRIPTION_TABLE
325                     + " where consumer_id = ?");
326             select.setLong(1, consumerId);
327 
328             set = select.executeQuery();
329             while (set.next()) {
330                 long destinationId = set.getLong("destination_id");
331                 JmsDestination destination = _destinations.get(destinationId);
332                 if (destination == null) {
333                     throw new PersistenceException(
334                             "Failed to locate destination for id="
335                             + destinationId);
336                 }
337                 Subscription subscription = new Subscription(destination);
338                 getMessages(consumerId, destinationId, subscription);
339                 consumer.addSubscription(subscription);
340             }
341         } catch (SQLException exception) {
342             throw new PersistenceException(
343                     "Failed to get subscriptions for consumer=" + consumerId,
344                     exception);
345         } finally {
346             SQLHelper.close(set);
347             SQLHelper.close(select);
348         }
349     }
350 
351     /***
352      * Get messages for a subscription.
353      *
354      * @param consumerId    the identity of the consumer
355      * @param destinationId the identify of the destination
356      * @param subscription  the consumer subscription
357      * @throws SQLException if a database error is encountered
358      */
359     protected void getMessages(long consumerId, long destinationId,
360                                Subscription subscription)
361             throws SQLException {
362 
363         PreparedStatement select = null;
364         ResultSet set = null;
365         try {
366             select = _connection.prepareStatement(
367                     "select message_id, delivered "
368                     + "from " + MESSAGE_HANDLE_TABLE
369                     + " where consumer_id = ? and destination_id = ?");
370             select.setLong(1, consumerId);
371             select.setLong(2, destinationId);
372 
373             set = select.executeQuery();
374             while (set.next()) {
375                 String messageId = set.getString("message_id");
376                 boolean delivered = set.getBoolean("delivered");
377                 subscription.addMessage(messageId, delivered);
378             }
379         } finally {
380             SQLHelper.close(set);
381             SQLHelper.close(select);
382         }
383     }
384 
385     private class ConsumerIterator implements StoreIterator {
386 
387         private Iterator _iterator;
388 
389         public ConsumerIterator(List consumerIds) {
390             _iterator = consumerIds.iterator();
391         }
392 
393         public boolean hasNext() {
394             return _iterator.hasNext();
395         }
396 
397         public Object next() throws PersistenceException {
398             Consumer result = null;
399 
400             Long consumerId = (Long) _iterator.next();
401 
402             result = get(consumerId.longValue());
403             return result;
404         }
405     }
406 
407 }
408 
409 
410