View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   */
43  
44  package org.exolab.jms.persistence;
45  
46  import java.sql.Connection;
47  import java.sql.PreparedStatement;
48  import java.sql.ResultSet;
49  import java.sql.SQLException;
50  import java.util.Date;
51  import java.util.HashMap;
52  import java.util.Iterator;
53  import java.util.Vector;
54  
55  import org.apache.commons.logging.Log;
56  import org.apache.commons.logging.LogFactory;
57  
58  import org.exolab.jms.client.JmsDestination;
59  import org.exolab.jms.client.JmsTopic;
60  
61  
62  /***
63   * This class provides persistency for ConsumerState objects in an RDBMS
64   * database.
65   *
66   * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
67   * @version $Revision: 1.3 $ $Date: 2005/08/31 05:45:50 $
68   */
69  class Consumers {
70  
71      /***
72       * The seed generator.
73       */
74      private final SeedGenerator _seeds;
75  
76      /***
77       * The destination manager.
78       */
79      private Destinations _destinations;
80  
81      /***
82       * A cache for all durable consumers.
83       */
84      private final HashMap _consumers = new HashMap();
85  
86      /***
87       * The name of the column that uniquely identifies the consumer.
88       */
89      private static final String CONSUMER_ID_SEED = "consumerId";
90  
91      /***
92       * The name of the table that maintains a list of message handles per
93       * consumer.
94       */
95      private static final String CONSUMER_MESSAGE = "message_handles";
96  
97      /***
98       * The logger.
99       */
100     private static final Log _log = LogFactory.getLog(Consumers.class);
101 
102 
103     /***
104      * Construct a new <code>Consumers</code>.
105      *
106      * @param seeds the seed generator
107      * @param connection the connection to use
108      * @throws PersistenceException if initialisation fails
109      */
110     public Consumers(SeedGenerator seeds,
111                      Connection connection) throws PersistenceException {
112         _seeds = seeds;
113         init(connection);
114     }
115 
116     /***
117      * Sets the destination manager
118      *
119      * @param destinations the destination manager
120      */
121     public void setDestinations(Destinations destinations) {
122         _destinations = destinations;
123     }
124 
125     /***
126      * Add a new durable consumer to the database if it does not already exist.
127      * A durable consumer is specified by a destination name and a consumer
128      * name.
129      * <p/>
130      * The destination must resolve to a valid JmsDestination object
131      *
132      * @param connection the database connection to use
133      * @param dest       the name of the destination
134      * @param consumer   the name of the consumer
135      * @throws PersistenceException if the consumer cannot be added
136      */
137     public synchronized void add(Connection connection, String dest,
138                                  String consumer)
139             throws PersistenceException {
140 
141         JmsDestination destination = null;
142         long destinationId = 0;
143 
144         synchronized (_destinations) {
145             destination = _destinations.get(dest);
146             if (destination == null) {
147                 raise("add", consumer, dest, "destination is invalid");
148             }
149             destinationId = _destinations.getId(dest);
150         }
151 
152         // check that for a topic the consumer name is not the same as the
153         // destination name
154         if ((destination instanceof JmsTopic) &&
155                 (consumer.equals(dest))) {
156             raise("add", consumer, dest,
157                   "The consumer name and destination name cannot be the same");
158         }
159 
160         // get the next id from the seed table
161         long consumerId = _seeds.next(connection, CONSUMER_ID_SEED);
162 
163         PreparedStatement insert = null;
164         try {
165             insert = connection.prepareStatement(
166                     "insert into consumers values (?,?,?,?)");
167 
168             long created = (new Date()).getTime();
169             insert.setString(1, consumer);
170             insert.setLong(2, destinationId);
171             insert.setLong(3, consumerId);
172             insert.setLong(4, created);
173             insert.executeUpdate();
174 
175             Consumer map = new Consumer(consumer, consumerId,
176                                         destinationId, created);
177 
178             // check to see if the durable consumer already exists. If it
179             // does then do not add it but signal and error
180             if (!_consumers.containsKey(consumer)) {
181                 _consumers.put(consumer, map);
182             } else {
183                 _log.error("Durable consumer with name " + consumer
184                            + " already exists.");
185             }
186         } catch (Exception exception) {
187             throw new PersistenceException("Failed to add consumer, destination="
188                                            + dest +
189                                            ", name=" + consumer, exception);
190         } finally {
191             SQLHelper.close(insert);
192         }
193     }
194 
195     /***
196      * Remove a consumer from the database. If the destination is of type queue
197      * then the destination name and the consumer name are identical.
198      *
199      * @param connection - the connection to use
200      * @param name       - the consumer name
201      * @throws PersistenceException - if the consumer cannot be removed
202      */
203     public synchronized void remove(Connection connection, String name)
204             throws PersistenceException {
205 
206         PreparedStatement delete = null;
207 
208         // locate the consumer
209         Consumer map = (Consumer) _consumers.get(name);
210         if (map == null) {
211             raise("remove", name, "consumer does not exist");
212         }
213 
214         try {
215             delete = connection.prepareStatement(
216                     "delete from consumers where name=?");
217             delete.setString(1, name);
218             delete.executeUpdate();
219 
220             // now delete all the corresponding handles in the consumer
221             // message table
222             remove(CONSUMER_MESSAGE, map.consumerId, connection);
223 
224             // remove the consumer from the local cache
225             _consumers.remove(name);
226         } catch (SQLException exception) {
227             throw new PersistenceException("Failed to remove consumer=" + name,
228                                            exception);
229         } finally {
230             SQLHelper.close(delete);
231         }
232     }
233 
234     /***
235      * Return the id of the durable consumer.
236      *
237      * @param name the consumer name
238      * @return the consumer identity
239      */
240     public synchronized long getConsumerId(String name) {
241         Consumer map = (Consumer) _consumers.get(name);
242         return (map != null) ? map.consumerId : 0;
243     }
244 
245     /***
246      * Return true if a consumer exists
247      *
248      * @param name - the consumer name
249      */
250     public synchronized boolean exists(String name) {
251         return (_consumers.get(name) != null);
252     }
253 
254     /***
255      * Returns a list of consumer names associated with a topic.
256      *
257      * @param destination the topic to query
258      */
259     public synchronized Vector getDurableConsumers(String destination) {
260         Vector result = new Vector();
261         long destinationId = _destinations.getId(destination);
262         if (destinationId != 0) {
263             Iterator iter = _consumers.values().iterator();
264             while (iter.hasNext()) {
265                 Consumer map = (Consumer) iter.next();
266                 if (map.destinationId == destinationId) {
267                     result.add(map.name);
268                 }
269             }
270         }
271 
272         return result;
273     }
274 
275     /***
276      * Return a map of consumer names to destinations names.
277      *
278      * @return list of all durable consumers
279      */
280     public synchronized HashMap getAllDurableConsumers() {
281         HashMap result = new HashMap();
282 
283         Iterator iter = _consumers.values().iterator();
284         while (iter.hasNext()) {
285             Consumer map = (Consumer) iter.next();
286             JmsDestination dest = _destinations.get(map.destinationId);
287 
288             if (dest instanceof JmsTopic) {
289                 result.put(map.name, dest.getName());
290             }
291         }
292 
293         return result;
294     }
295 
296     /***
297      * Return the consumer name corresponding to the specified identity
298      *
299      * @param id - the consumer identity
300      */
301     public synchronized String getConsumerName(long id) {
302         String name = null;
303         Iterator iter = _consumers.values().iterator();
304 
305         while (iter.hasNext()) {
306             Consumer map = (Consumer) iter.next();
307             if (map.consumerId == id) {
308                 name = map.name;
309                 break;
310             }
311         }
312 
313         return name;
314     }
315 
316     /***
317      * Deallocates resources owned or referenced by the instance
318      */
319     public synchronized void close() {
320         _consumers.clear();
321     }
322 
323     /***
324      * Removes all cached consumer details for a given destination
325      *
326      * @param destinationId the Id of the destination
327      */
328     protected synchronized void removeCached(long destinationId) {
329         Object[] list = _consumers.values().toArray();
330         for (int i = 0; i < list.length; i++) {
331             Consumer map = (Consumer) list[i];
332             if (map.destinationId == destinationId) {
333                 _consumers.remove(map.name);
334             }
335         }
336     }
337 
338     /***
339      * Initialises the cache of consumers.
340      *
341      * @param connection the connection to use
342      * @throws PersistenceException if initialisation fails
343      */
344     private void init(Connection connection) throws PersistenceException {
345         PreparedStatement select = null;
346         ResultSet set = null;
347         try {
348             select = connection.prepareStatement(
349                     "select name, consumerid, destinationid, created "
350                     + "from consumers");
351             set = select.executeQuery();
352             String name = null;
353             long consumerId = 0;
354             long destinationId = 0;
355             long created = 0;
356             Consumer map = null;
357             while (set.next()) {
358                 name = set.getString(1);
359                 consumerId = set.getLong(2);
360                 destinationId = set.getLong(3);
361                 created = set.getLong(4);
362                 map = new Consumer(name, consumerId, destinationId,
363                                    created);
364                 _consumers.put(name, map);
365             }
366         } catch (SQLException exception) {
367             throw new PersistenceException("Failed to retrieve consumers",
368                                            exception);
369         } finally {
370             SQLHelper.close(set);
371             SQLHelper.close(select);
372         }
373     }
374 
375     /***
376      * Remove all the rows in the specified table with the corresponding
377      * consumer identity.
378      *
379      * @param table      - the table to destroy
380      * @param consumerId - the target consumerId
381      * @param connection - the database connection to use
382      * @throws SQLException - thrown on any error
383      */
384     private void remove(String table, long consumerId, Connection connection)
385             throws SQLException {
386 
387         PreparedStatement delete = null;
388         try {
389             delete = connection.prepareStatement(
390                     "delete from " + table + " where consumerId=?");
391             delete.setLong(1, consumerId);
392             delete.executeUpdate();
393         } finally {
394             SQLHelper.close(delete);
395         }
396     }
397 
398     /***
399      * Raise a PersistenceException with the specified parameters
400      *
401      * @param operation   - operation that failed
402      * @param name        - corresponding consumert name
403      * @param destination - corresponding destination
404      * @param reason      - the reason for the exception
405      */
406     private void raise(String operation, String name, String destination,
407                        String reason)
408             throws PersistenceException {
409         throw new PersistenceException(
410                 "Cannot " + operation + " consumer=" + name
411                 + ", destination=" + destination + ": " + reason);
412     }
413 
414     /***
415      * Raise a PersistenceException with the specified parameters
416      *
417      * @param operation - operation that failed
418      * @param name      - corresponding consumert name
419      * @param reason    - the reasone for the exception
420      */
421     private void raise(String operation, String name, String reason)
422             throws PersistenceException {
423         throw new PersistenceException("Cannot " + operation + " consumer=" +
424                                        name + ": " + reason);
425     }
426 
427     /***
428      * This is an internal class that is used to store consumer entries
429      */
430     private class Consumer {
431 
432         /***
433          * The name of the consumer
434          */
435         public String name;
436 
437         /***
438          * The unique consumer identity
439          */
440         public long consumerId;
441 
442         /***
443          * The identity of the destination that this durable consumer is
444          * subscribed too
445          */
446         public long destinationId;
447 
448         /***
449          * The time that this durable consumer was created
450          */
451         public long created;
452 
453 
454         public Consumer(String name, long consumerId, long destinationId,
455                         long created) {
456 
457             this.name = name;
458             this.consumerId = consumerId;
459             this.destinationId = destinationId;
460             this.created = created;
461         }
462 
463         public String getKey() {
464             return name;
465         }
466     }
467 }