View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2001,2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   */
43  
44  package org.exolab.jms.persistence;
45  
46  import java.sql.Connection;
47  import java.sql.PreparedStatement;
48  import java.sql.ResultSet;
49  import java.sql.SQLException;
50  import java.util.Date;
51  import java.util.HashMap;
52  import java.util.Iterator;
53  import java.util.Vector;
54  
55  import javax.jms.JMSException;
56  import javax.sql.DataSource;
57  
58  import org.apache.commons.logging.Log;
59  import org.apache.commons.logging.LogFactory;
60  
61  import org.exolab.jms.client.JmsDestination;
62  import org.exolab.jms.client.JmsTopic;
63  
64  
65  /***
66   * This class provides persistency for ConsumerState objects
67   * in an RDBMS database
68   *
69   * @version     $Revision: 1.9 $ $Date: 2003/08/07 13:33:06 $
70   * @author      <a href="mailto:tima@intalio.com">Tim Anderson</a>
71   * @see         org.exolab.jms.persistence.ConsumerState
72   * @see         org.exolab.jms.persistence.RDBMSAdapter
73   */
74  class Consumers {
75  
76      /***
77       * A cache for all durable consumers
78       */
79      private HashMap _consumers;
80  
81      /***
82       * A refernce to the singleton instance of this class
83       */
84      private static Consumers _instance;
85  
86      /***
87       * Monitor used to synchronize access to the initialization of
88       * the singleton
89       */
90      private static final Boolean block = new Boolean(true);
91  
92      /***
93       * The name of the column that uniquely identifies the consumer
94       */
95      private static final String CONSUMER_ID_SEED = "consumerId";
96  
97      /***
98       * The name of the table that maintains a list of message handles
99       * per consumer
100      */
101     private static final String CONSUMER_MESSAGE = "message_handles";
102 
103     /***
104      * The logger
105      */
106     private static final Log _log = LogFactory.getLog(Consumers.class);
107 
108 
109     /***
110      * Returns the singleton instance.
111      *
112      * Note that initialise() must have been invoked first for this
113      * to return a valid instance.
114      *
115      * @return      Consumers           the singleton instance
116      */
117     public static Consumers instance() {
118         return _instance;
119     }
120 
121     /***
122      * Initialise the singleton instance
123      *
124      * @param Connection - the connection to use
125      * @return Consumers -  singleton instance
126      * @throws PersistenceException - if the call cannot complete
127      */
128     public static Consumers initialise(Connection connection)
129         throws PersistenceException {
130 
131         if (_instance == null) {
132             synchronized (block) {
133                 if (_instance == null) {
134                     _instance = new Consumers();
135                     _instance.load(connection);
136                 }
137             }
138         }
139         return _instance;
140     }
141 
142     /***
143      * Add a new durable consumer to the database if it does not already
144      * exist. A durable consumer is specified by a destination name and
145      * a consumer name.
146      * <p>
147      * The destination must resolve to a valid JmsDestination object
148      *
149      * @param connection - the database connection to use
150      * @param destination - the name of the destination
151      * @param consumer - the name of the consumer
152      * @throws PersistenceException - if the consumer cannot be added
153      */
154     public synchronized void add(Connection connection, String dest,
155                                  String consumer)
156         throws PersistenceException {
157 
158         JmsDestination destination = null;
159         Destinations singleton = Destinations.instance();
160         long destinationId = 0;
161 
162         synchronized (singleton) {
163             destination = singleton.get(dest);
164             if (destination == null) {
165                 raise("add", consumer, dest, "destination is invalid");
166             }
167             destinationId = singleton.getId(dest);
168         }
169 
170         // check that for a topic the consumer name is not the same as the
171         // destination name
172         if ((destination instanceof JmsTopic) &&
173             (consumer.equals(dest))) {
174             raise("add", consumer, dest,
175                 "The consumer name and destination name cannot be the same");
176         }
177 
178         // get the next id from the seed table
179         long consumerId = SeedGenerator.instance().next(connection,
180             CONSUMER_ID_SEED);
181 
182         PreparedStatement insert = null;
183         try {
184             insert = connection.prepareStatement(
185                 "insert into consumers values (?,?,?,?)");
186 
187             long created = (new Date()).getTime();
188             insert.setString(1, consumer);
189             insert.setLong(2, destinationId);
190             insert.setLong(3, consumerId);
191             insert.setLong(4, created);
192             insert.executeUpdate();
193 
194             Consumer map = new Consumer(consumer, consumerId,
195                 destinationId, created);
196 
197             // check to see if the durable consumer already exists. If it
198             // does then do not add it but signal and error
199             if (!_consumers.containsKey(consumer)) {
200                 _consumers.put(consumer, map);
201             } else {
202                 _log.error("Durable consumer with name " + consumer
203                     + " already exists.");
204             }
205         } catch (Exception exception) {
206             throw new PersistenceException(
207                 "Failed to add consumer, destination=" + dest +
208                 ", name=" + consumer, exception);
209         } finally {
210             SQLHelper.close(insert);
211         }
212     }
213 
214     /***
215      * Remove a consumer from the database. If the destination is of
216      * type queue then the destination name and the consumer name are
217      * identical.
218      *
219      * @param connection - the connection to use
220      * @param name - the consumer name
221      * @throws PersistenceException - if the consumer cannot be removed
222      */
223     public synchronized void remove(Connection connection, String name)
224         throws PersistenceException {
225 
226         PreparedStatement delete = null;
227 
228         // locate the consumer
229         Consumer map = (Consumer) _consumers.get(name);
230         if (map == null) {
231             raise("remove", name, "consumer does not exist");
232         }
233 
234         try {
235             delete = connection.prepareStatement(
236                 "delete from consumers where name=?");
237             delete.setString(1, name);
238             delete.executeUpdate();
239 
240             // now delete all the corresponding handles in the consumer
241             // message table
242             remove(CONSUMER_MESSAGE, map.consumerId, connection);
243 
244             // remove the consumer from the local cache
245             _consumers.remove(name);
246         } catch (SQLException exception) {
247             throw new PersistenceException(
248                 "Failed to remove consumer=" + name, exception);
249         } finally {
250             SQLHelper.close(delete);
251         }
252     }
253 
254     /***
255      * Return the id of the durable consumer.
256      *
257      * @param connection - the database connection to use
258      * @param name - consumer name
259      * @return the consumer identity
260      */
261     public synchronized long getConsumerId(String name) {
262         Consumer map = (Consumer) _consumers.get(name);
263         return (map != null) ? map.consumerId : 0;
264     }
265 
266     /***
267      * Return true if a consumer exists
268      *
269      * @param name - the consumer name
270      */
271     public synchronized boolean exists(String name) {
272         return (_consumers.get(name) != null);
273     }
274 
275     /***
276      * Returns a list of consumer names associated with a topic
277      *
278      * @param topic - the topic to query
279      */
280     public synchronized Vector getDurableConsumers(String destination) {
281         Vector result = new Vector(); // vector for legacy reasons
282         long destinationId = Destinations.instance().getId(destination);
283         if (destinationId != 0) {
284             Iterator iter = _consumers.values().iterator();
285             while (iter.hasNext()) {
286                 Consumer map = (Consumer) iter.next();
287                 if (map.destinationId == destinationId) {
288                     result.add(map.name);
289                 }
290             }
291         }
292 
293         return result;
294     }
295 
296     /***
297      * Return a map of consumer names to destinations names.
298      *
299      * @return HashMap - list of all durable consumers
300      */
301     public synchronized HashMap getAllDurableConsumers() {
302         HashMap result = new HashMap();
303 
304         Iterator iter = _consumers.values().iterator();
305         while (iter.hasNext()) {
306             Consumer map = (Consumer) iter.next();
307             JmsDestination dest = Destinations.instance().get(
308                 map.destinationId);
309 
310             if (dest instanceof JmsTopic) {
311                 result.put(map.name, dest.getName());
312             }
313         }
314 
315         return result;
316     }
317 
318     /***
319      * Return the consumer name corresponding to the specified identity
320      *
321      * @param id - the consumer identity
322      */
323     public synchronized String getConsumerName(long id) {
324         String name = null;
325         Iterator iter = _consumers.values().iterator();
326 
327         while (iter.hasNext()) {
328             Consumer map = (Consumer) iter.next();
329             if (map.consumerId == id) {
330                 name = map.name;
331                 break;
332             }
333         }
334 
335         return name;
336     }
337 
338     /***
339      * Deallocates resources owned or referenced by the instance
340      */
341     public synchronized void close() {
342         _consumers.clear();
343         _consumers = null;
344 
345         _instance = null;
346     }
347 
348     /***
349      * Removes all cached consumer details for a given destination
350      *
351      * @param       destinationId       the Id of the destination
352      */
353     protected synchronized void removeCached(long destinationId) {
354         Object[] list = _consumers.values().toArray();
355         for (int i = 0; i < list.length; i++) {
356             Consumer map = (Consumer) list[i];
357             if (map.destinationId == destinationId) {
358                 _consumers.remove(map.name);
359             }
360         }
361     }
362 
363     /***
364      * Constructor
365      */
366     private Consumers() {
367         _consumers = new HashMap();
368     }
369 
370     /***
371      * Load the cache during init time. It needs to get access to the
372      * TransactionService and the DatabaseService so that it can get
373      * get access to a transaction and a database connection. This method
374      * reads <b>all the consumers</b> into memory.
375      * <p>
376      * If there is any problem completing this operation then the method
377      * will throw a PersistenceException
378      *
379      * @param connection - the connection to use
380      * @throws PersistenceException - if the load fails
381      */
382     private void load(Connection connection)
383         throws PersistenceException {
384 
385         PreparedStatement select = null;
386         ResultSet set = null;
387         try {
388             select = connection.prepareStatement("select * from consumers");
389             set = select.executeQuery();
390 
391             while (set.next()) {
392                 String name = set.getString("name");
393                 long consumerId = set.getLong("consumerId");
394                 long destinationId = set.getLong("destinationId");
395                 long created = set.getLong("created");
396                 Consumer map = new Consumer(name, consumerId, destinationId,
397                     created);
398                 _consumers.put(name, map);
399             }
400         } catch (SQLException exception) {
401             throw new PersistenceException("Failed to retrieve consumers",
402                 exception);
403         } finally {
404             SQLHelper.close(set);
405             SQLHelper.close(select);
406         }
407     }
408 
409     /***
410      * Remove all the rows in the specified table with the corresponding
411      * consumer identity.
412      *
413      * @param table - the table to destroy
414      * @param consumerId - the target consumerId
415      * @param connection - the database connection to use
416      * @throws SQLException - thrown on any error
417      */
418     private void remove(String table, long consumerId, Connection connection)
419         throws SQLException {
420 
421         PreparedStatement delete = null;
422         try {
423             delete = connection.prepareStatement(
424                 "delete from " + table + " where consumerId=?");
425             delete.setLong(1, consumerId);
426             delete.executeUpdate();
427         } finally {
428             SQLHelper.close(delete);
429         }
430     }
431 
432     /***
433      * Raise a PersistenceException with the specified parameters
434      *
435      * @param operation - operation that failed
436      * @param name - corresponding consumert name
437      * @param destination - corresponding destination
438      * @param reason - the reason for the exception
439      */
440     private void raise(String operation, String name, String destination,
441                        String reason)
442         throws PersistenceException {
443         throw new PersistenceException("Cannot " + operation + " consumer=" +
444             name + ", destination=" + destination + ": " + reason);
445     }
446 
447     /***
448      * Raise a PersistenceException with the specified parameters
449      *
450      * @param operation - operation that failed
451      * @param name - corresponding consumert name
452      * @param reason - the reasone for the exception
453      */
454     private void raise(String operation, String name, String reason)
455         throws PersistenceException {
456         throw new PersistenceException("Cannot " + operation + " consumer=" +
457             name + ": " + reason);
458     }
459 
460     /***
461      * This is an internal class that is used to store consumer entries
462      */
463     private class Consumer {
464 
465         /***
466          * The name of the consumer
467          */
468         public String name;
469 
470         /***
471          * The unique consumer identity
472          */
473         public long consumerId;
474 
475         /***
476          * The identity of the destination that this durable consumer is
477          * subscribed too
478          */
479         public long destinationId;
480 
481         /***
482          * The time that this durable consumer was created
483          */
484         public long created;
485 
486 
487         public Consumer(String name, long consumerId, long destinationId,
488                         long created) {
489 
490             this.name = name;
491             this.consumerId = consumerId;
492             this.destinationId = destinationId;
493             this.created = created;
494         }
495 
496         public String getKey() {
497             return name;
498         }
499     }
500 }