1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 */
43
44 package org.exolab.jms.persistence;
45
46 import java.sql.Connection;
47 import java.sql.PreparedStatement;
48 import java.sql.ResultSet;
49 import java.sql.SQLException;
50 import java.util.Date;
51 import java.util.HashMap;
52 import java.util.Iterator;
53 import java.util.Vector;
54
55 import org.apache.commons.logging.Log;
56 import org.apache.commons.logging.LogFactory;
57
58 import org.exolab.jms.client.JmsDestination;
59 import org.exolab.jms.client.JmsTopic;
60
61
62 /***
63 * This class provides persistency for ConsumerState objects in an RDBMS
64 * database.
65 *
66 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
67 * @version $Revision: 1.3 $ $Date: 2005/08/31 05:45:50 $
68 */
69 class Consumers {
70
71 /***
72 * The seed generator.
73 */
74 private final SeedGenerator _seeds;
75
76 /***
77 * The destination manager.
78 */
79 private Destinations _destinations;
80
81 /***
82 * A cache for all durable consumers.
83 */
84 private final HashMap _consumers = new HashMap();
85
86 /***
87 * The name of the column that uniquely identifies the consumer.
88 */
89 private static final String CONSUMER_ID_SEED = "consumerId";
90
91 /***
92 * The name of the table that maintains a list of message handles per
93 * consumer.
94 */
95 private static final String CONSUMER_MESSAGE = "message_handles";
96
97 /***
98 * The logger.
99 */
100 private static final Log _log = LogFactory.getLog(Consumers.class);
101
102
103 /***
104 * Construct a new <code>Consumers</code>.
105 *
106 * @param seeds the seed generator
107 * @param connection the connection to use
108 * @throws PersistenceException if initialisation fails
109 */
110 public Consumers(SeedGenerator seeds,
111 Connection connection) throws PersistenceException {
112 _seeds = seeds;
113 init(connection);
114 }
115
116 /***
117 * Sets the destination manager
118 *
119 * @param destinations the destination manager
120 */
121 public void setDestinations(Destinations destinations) {
122 _destinations = destinations;
123 }
124
125 /***
126 * Add a new durable consumer to the database if it does not already exist.
127 * A durable consumer is specified by a destination name and a consumer
128 * name.
129 * <p/>
130 * The destination must resolve to a valid JmsDestination object
131 *
132 * @param connection the database connection to use
133 * @param dest the name of the destination
134 * @param consumer the name of the consumer
135 * @throws PersistenceException if the consumer cannot be added
136 */
137 public synchronized void add(Connection connection, String dest,
138 String consumer)
139 throws PersistenceException {
140
141 JmsDestination destination = null;
142 long destinationId = 0;
143
144 synchronized (_destinations) {
145 destination = _destinations.get(dest);
146 if (destination == null) {
147 raise("add", consumer, dest, "destination is invalid");
148 }
149 destinationId = _destinations.getId(dest);
150 }
151
152
153
154 if ((destination instanceof JmsTopic) &&
155 (consumer.equals(dest))) {
156 raise("add", consumer, dest,
157 "The consumer name and destination name cannot be the same");
158 }
159
160
161 long consumerId = _seeds.next(connection, CONSUMER_ID_SEED);
162
163 PreparedStatement insert = null;
164 try {
165 insert = connection.prepareStatement(
166 "insert into consumers values (?,?,?,?)");
167
168 long created = (new Date()).getTime();
169 insert.setString(1, consumer);
170 insert.setLong(2, destinationId);
171 insert.setLong(3, consumerId);
172 insert.setLong(4, created);
173 insert.executeUpdate();
174
175 Consumer map = new Consumer(consumer, consumerId,
176 destinationId, created);
177
178
179
180 if (!_consumers.containsKey(consumer)) {
181 _consumers.put(consumer, map);
182 } else {
183 _log.error("Durable consumer with name " + consumer
184 + " already exists.");
185 }
186 } catch (Exception exception) {
187 throw new PersistenceException("Failed to add consumer, destination="
188 + dest +
189 ", name=" + consumer, exception);
190 } finally {
191 SQLHelper.close(insert);
192 }
193 }
194
195 /***
196 * Remove a consumer from the database. If the destination is of type queue
197 * then the destination name and the consumer name are identical.
198 *
199 * @param connection - the connection to use
200 * @param name - the consumer name
201 * @throws PersistenceException - if the consumer cannot be removed
202 */
203 public synchronized void remove(Connection connection, String name)
204 throws PersistenceException {
205
206 PreparedStatement delete = null;
207
208
209 Consumer map = (Consumer) _consumers.get(name);
210 if (map == null) {
211 raise("remove", name, "consumer does not exist");
212 }
213
214 try {
215 delete = connection.prepareStatement(
216 "delete from consumers where name=?");
217 delete.setString(1, name);
218 delete.executeUpdate();
219
220
221
222 remove(CONSUMER_MESSAGE, map.consumerId, connection);
223
224
225 _consumers.remove(name);
226 } catch (SQLException exception) {
227 throw new PersistenceException("Failed to remove consumer=" + name,
228 exception);
229 } finally {
230 SQLHelper.close(delete);
231 }
232 }
233
234 /***
235 * Return the id of the durable consumer.
236 *
237 * @param name the consumer name
238 * @return the consumer identity
239 */
240 public synchronized long getConsumerId(String name) {
241 Consumer map = (Consumer) _consumers.get(name);
242 return (map != null) ? map.consumerId : 0;
243 }
244
245 /***
246 * Return true if a consumer exists
247 *
248 * @param name - the consumer name
249 */
250 public synchronized boolean exists(String name) {
251 return (_consumers.get(name) != null);
252 }
253
254 /***
255 * Returns a list of consumer names associated with a topic.
256 *
257 * @param destination the topic to query
258 */
259 public synchronized Vector getDurableConsumers(String destination) {
260 Vector result = new Vector();
261 long destinationId = _destinations.getId(destination);
262 if (destinationId != 0) {
263 Iterator iter = _consumers.values().iterator();
264 while (iter.hasNext()) {
265 Consumer map = (Consumer) iter.next();
266 if (map.destinationId == destinationId) {
267 result.add(map.name);
268 }
269 }
270 }
271
272 return result;
273 }
274
275 /***
276 * Return a map of consumer names to destinations names.
277 *
278 * @return list of all durable consumers
279 */
280 public synchronized HashMap getAllDurableConsumers() {
281 HashMap result = new HashMap();
282
283 Iterator iter = _consumers.values().iterator();
284 while (iter.hasNext()) {
285 Consumer map = (Consumer) iter.next();
286 JmsDestination dest = _destinations.get(map.destinationId);
287
288 if (dest instanceof JmsTopic) {
289 result.put(map.name, dest.getName());
290 }
291 }
292
293 return result;
294 }
295
296 /***
297 * Return the consumer name corresponding to the specified identity
298 *
299 * @param id - the consumer identity
300 */
301 public synchronized String getConsumerName(long id) {
302 String name = null;
303 Iterator iter = _consumers.values().iterator();
304
305 while (iter.hasNext()) {
306 Consumer map = (Consumer) iter.next();
307 if (map.consumerId == id) {
308 name = map.name;
309 break;
310 }
311 }
312
313 return name;
314 }
315
316 /***
317 * Deallocates resources owned or referenced by the instance
318 */
319 public synchronized void close() {
320 _consumers.clear();
321 }
322
323 /***
324 * Removes all cached consumer details for a given destination
325 *
326 * @param destinationId the Id of the destination
327 */
328 protected synchronized void removeCached(long destinationId) {
329 Object[] list = _consumers.values().toArray();
330 for (int i = 0; i < list.length; i++) {
331 Consumer map = (Consumer) list[i];
332 if (map.destinationId == destinationId) {
333 _consumers.remove(map.name);
334 }
335 }
336 }
337
338 /***
339 * Initialises the cache of consumers.
340 *
341 * @param connection the connection to use
342 * @throws PersistenceException if initialisation fails
343 */
344 private void init(Connection connection) throws PersistenceException {
345 PreparedStatement select = null;
346 ResultSet set = null;
347 try {
348 select = connection.prepareStatement(
349 "select name, consumerid, destinationid, created "
350 + "from consumers");
351 set = select.executeQuery();
352 String name = null;
353 long consumerId = 0;
354 long destinationId = 0;
355 long created = 0;
356 Consumer map = null;
357 while (set.next()) {
358 name = set.getString(1);
359 consumerId = set.getLong(2);
360 destinationId = set.getLong(3);
361 created = set.getLong(4);
362 map = new Consumer(name, consumerId, destinationId,
363 created);
364 _consumers.put(name, map);
365 }
366 } catch (SQLException exception) {
367 throw new PersistenceException("Failed to retrieve consumers",
368 exception);
369 } finally {
370 SQLHelper.close(set);
371 SQLHelper.close(select);
372 }
373 }
374
375 /***
376 * Remove all the rows in the specified table with the corresponding
377 * consumer identity.
378 *
379 * @param table - the table to destroy
380 * @param consumerId - the target consumerId
381 * @param connection - the database connection to use
382 * @throws SQLException - thrown on any error
383 */
384 private void remove(String table, long consumerId, Connection connection)
385 throws SQLException {
386
387 PreparedStatement delete = null;
388 try {
389 delete = connection.prepareStatement(
390 "delete from " + table + " where consumerId=?");
391 delete.setLong(1, consumerId);
392 delete.executeUpdate();
393 } finally {
394 SQLHelper.close(delete);
395 }
396 }
397
398 /***
399 * Raise a PersistenceException with the specified parameters
400 *
401 * @param operation - operation that failed
402 * @param name - corresponding consumert name
403 * @param destination - corresponding destination
404 * @param reason - the reason for the exception
405 */
406 private void raise(String operation, String name, String destination,
407 String reason)
408 throws PersistenceException {
409 throw new PersistenceException(
410 "Cannot " + operation + " consumer=" + name
411 + ", destination=" + destination + ": " + reason);
412 }
413
414 /***
415 * Raise a PersistenceException with the specified parameters
416 *
417 * @param operation - operation that failed
418 * @param name - corresponding consumert name
419 * @param reason - the reasone for the exception
420 */
421 private void raise(String operation, String name, String reason)
422 throws PersistenceException {
423 throw new PersistenceException("Cannot " + operation + " consumer=" +
424 name + ": " + reason);
425 }
426
427 /***
428 * This is an internal class that is used to store consumer entries
429 */
430 private class Consumer {
431
432 /***
433 * The name of the consumer
434 */
435 public String name;
436
437 /***
438 * The unique consumer identity
439 */
440 public long consumerId;
441
442 /***
443 * The identity of the destination that this durable consumer is
444 * subscribed too
445 */
446 public long destinationId;
447
448 /***
449 * The time that this durable consumer was created
450 */
451 public long created;
452
453
454 public Consumer(String name, long consumerId, long destinationId,
455 long created) {
456
457 this.name = name;
458 this.consumerId = consumerId;
459 this.destinationId = destinationId;
460 this.created = created;
461 }
462
463 public String getKey() {
464 return name;
465 }
466 }
467 }