1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 */
43 package org.exolab.jms.persistence;
44
45 import java.sql.Connection;
46 import java.sql.PreparedStatement;
47 import java.sql.ResultSet;
48 import java.sql.SQLException;
49 import java.util.Vector;
50
51 import org.apache.commons.logging.Log;
52 import org.apache.commons.logging.LogFactory;
53
54 import org.exolab.jms.client.JmsDestination;
55 import org.exolab.jms.client.JmsTopic;
56 import org.exolab.jms.messagemgr.PersistentMessageHandle;
57 import org.exolab.jms.messagemgr.MessageHandle;
58
59
60 /***
61 * This class provides persistency for MessageHandle objects
62 * in an RDBMS database
63 *
64 * @version $Revision: 1.4 $ $Date: 2005/08/31 05:45:50 $
65 * @author <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
66 */
67 class MessageHandles {
68
69 /***
70 * The destination manager.
71 */
72 private final Destinations _destinations;
73
74 /***
75 * The consumer manager.
76 */
77 private final Consumers _consumers;
78
79 /***
80 * prepared statement for inserting a message handle
81 */
82 private static final String INSERT_MSG_HANDLE_STMT =
83 "insert into message_handles (messageid, destinationid, consumerid, "
84 + "priority, acceptedtime, sequencenumber, expirytime, delivered) "
85 + "values (?,?,?,?,?,?,?,?)";
86
87 /***
88 * prepared statements for deleting message handle
89 */
90 private static final String DELETE_MSG_HANDLE_STMT1 =
91 "delete from message_handles where messageId=? and consumerId=?";
92 private static final String DELETE_MSG_HANDLE_STMT2 =
93 "delete from message_handles where messageId=? and destinationId=? " +
94 "and consumerId=?";
95
96 /***
97 * Delete all message handles with the specified message id
98 */
99 private static final String DELETE_MSG_HANDLES_STMT =
100 "delete from message_handles where messageId=?";
101
102 /***
103 * Update a row in the message handles table
104 */
105 private static final String UPDATE_MSG_HANDLE_STMT =
106 "update message_handles set delivered=? where messageId=? and " +
107 "destinationId=? and consumerId=?";
108
109 /***
110 * Delete all message handles for a destination
111 */
112 private static final String DELETE_MSG_HANDLES_FOR_DEST =
113 "delete from message_handles where destinationId=?";
114
115 /***
116 * Retrieve all message handles for a particular consumer
117 */
118 private static final String GET_MSG_HANDLES_FOR_DEST =
119 "select messageid, destinationid, consumerid, priority, acceptedtime, "
120 + "sequencenumber, expirytime, delivered from message_handles "
121 + "where consumerId=? order by acceptedTime asc";
122
123 /***
124 * Retrieve a range of message handles between the specified times
125 */
126 private static final String GET_MESSAGE_HANDLES_IN_RANGE =
127 "select distinct messageId from message_handles where " +
128 " acceptedTime >= ? and acceptedTime <=?";
129
130 /***
131 * Retrieve a handle with the specified id
132 */
133 private static final String GET_MESSAGE_HANDLE_WITH_ID =
134 "select distinct messageId from message_handles where messageId=?";
135
136 /***
137 * Return the number of messages and a specified destination and cousmer
138 */
139 private static final String GET_MSG_HANDLE_COUNT_FOR_DEST_AND_CONSUMER =
140 "select count(messageId) from message_handles where destinationId=? " +
141 "and consumerId=?";
142
143 /***
144 * Return the number of messages and a specified consumer
145 */
146 private static final String GET_MSG_HANDLE_COUNT_FOR_CONSUMER =
147 "select count(messageId) from message_handles where consumerId=?";
148
149 /***
150 * Delete all expired messages
151 */
152 private static final String DELETE_EXPIRED_MESSAGES =
153 "delete from message_handles where consumerId=? and expiryTime != 0 " +
154 "and expiryTime<?";
155
156 /***
157 * The logger.
158 */
159 private static final Log _log = LogFactory.getLog(MessageHandles.class);
160
161
162 /***
163 * Construct a new <code>MessageHandles</code>.
164 *
165 * @param destinations the destinations manager
166 * @param consumers the consumers manager
167 */
168 public MessageHandles(Destinations destinations, Consumers consumers) {
169 _destinations = destinations;
170 _consumers = consumers;
171 }
172
173 /***
174 * Add the specified message handle to the database.
175 *
176 * @param connection - the connection to use
177 * @param handle - message handle to add
178 * @throws PersistenceException - if add does not complete
179 */
180 public void addMessageHandle(Connection connection,
181 MessageHandle handle)
182 throws PersistenceException {
183
184 if (_log.isDebugEnabled()) {
185 _log.debug("addMessageHandle(handle=[consumer="
186 + handle.getConsumerPersistentId()
187 + ", destination=" + handle.getDestination()
188 + ", id=" + handle.getMessageId() + "])");
189 }
190
191 PreparedStatement insert = null;
192 try {
193
194 long destinationId = _destinations.getId(
195 handle.getDestination().getName());
196 if (destinationId == 0) {
197 throw new PersistenceException(
198 "Cannot add message handle id=" + handle.getMessageId() +
199 " for destination=" + handle.getDestination().getName() +
200 " and consumer=" + handle.getConsumerPersistentId() +
201 " since the destination cannot be mapped to an id");
202 }
203
204
205 long consumerId = _consumers.getConsumerId(
206 handle.getConsumerPersistentId());
207 if (consumerId == 0) {
208 throw new PersistenceException(
209 "Cannot add message handle id=" + handle.getMessageId() +
210 " for destination=" + handle.getDestination().getName() +
211 " and consumer=" + handle.getConsumerPersistentId() +
212 " since the consumer cannot be mapped to an id");
213 }
214
215 insert = connection.prepareStatement(INSERT_MSG_HANDLE_STMT);
216 insert.setString(1, handle.getMessageId());
217 insert.setLong(2, destinationId);
218 insert.setLong(3, consumerId);
219 insert.setInt(4, handle.getPriority());
220 insert.setLong(5, handle.getAcceptedTime());
221 insert.setLong(6, handle.getSequenceNumber());
222 insert.setLong(7, handle.getExpiryTime());
223 insert.setInt(8, (handle.getDelivered()) ? 1 : 0);
224
225
226 if (insert.executeUpdate() != 1) {
227 _log.error(
228 "Failed to execute addMessageHandle for handle="
229 + handle.getMessageId() + ", destination Id="
230 + destinationId);
231 }
232 } catch (SQLException exception) {
233 throw new PersistenceException("Failed to add message handle=" +
234 handle, exception);
235 } finally {
236 SQLHelper.close(insert);
237 }
238 }
239
240 /***
241 * Remove the specified message handle from the database. Once the handle
242 * has been removed check to see whether there are any more message handles
243 * referencing the same message. If there are not then remove the
244 * corresponding message from the messages tables.
245 *
246 * @param connection - the connection to use
247 * @param handle - the handle to remove
248 * @throws PersistenceException - sql releated exception
249 */
250 public void removeMessageHandle(Connection connection,
251 MessageHandle handle)
252 throws PersistenceException {
253
254 if (_log.isDebugEnabled()) {
255 _log.debug("removeMessageHandle(handle=[consumer="
256 + handle.getConsumerPersistentId()
257 + ", destination=" + handle.getDestination()
258 + ", id=" + handle.getMessageId() + "])");
259 }
260
261 PreparedStatement delete = null;
262 PreparedStatement select = null;
263 ResultSet rs = null;
264
265 try {
266
267
268 long consumerId = _consumers.getConsumerId(
269 handle.getConsumerPersistentId());
270 if (consumerId != 0) {
271
272 String id = handle.getMessageId();
273
274
275
276
277 long destinationId = _destinations.getId(
278 handle.getDestination().getName());
279
280 if (destinationId == 0) {
281 delete = connection.prepareStatement(
282 DELETE_MSG_HANDLE_STMT1);
283 delete.setString(1, id);
284 delete.setLong(2, consumerId);
285
286 } else {
287 delete = connection.prepareStatement(
288 DELETE_MSG_HANDLE_STMT2);
289 delete.setString(1, id);
290 delete.setLong(2, destinationId);
291 delete.setLong(3, consumerId);
292 }
293
294
295 if (delete.executeUpdate() != 1 && !handle.hasExpired()) {
296
297
298 _log.error("Failed to execute removeMessageHandle for "
299 + "handle=" + id + " destination id="
300 + destinationId + " consumer id=" + consumerId);
301 }
302 }
303 } catch (SQLException exception) {
304 throw new PersistenceException("Failed to remove message handle=" +
305 handle, exception);
306 } finally {
307 SQLHelper.close(rs);
308 SQLHelper.close(delete);
309 SQLHelper.close(select);
310 }
311 }
312
313 /***
314 * Update the specified message handle from the database
315 *
316 * @param connection - the connection to use
317 * @param handle - the handle to update
318 * @throws PersistenceException - sql releated exception
319 */
320 public void updateMessageHandle(Connection connection,
321 MessageHandle handle)
322 throws PersistenceException {
323 PreparedStatement update = null;
324
325 if (_log.isDebugEnabled()) {
326 _log.debug("updateMessageHandle(handle=[consumer="
327 + handle.getConsumerPersistentId()
328 + ", destination=" + handle.getDestination()
329 + ", id=" + handle.getMessageId() + "])");
330 }
331
332 try {
333
334 String id = handle.getMessageId();
335
336
337 long destinationId = _destinations.getId(
338 handle.getDestination().getName());
339 if (destinationId == 0) {
340 throw new PersistenceException(
341 "Cannot update message handle id=" +
342 handle.getMessageId() + " for destination=" +
343 handle.getDestination().getName() + " and consumer=" +
344 handle.getConsumerPersistentId() +
345 " since the destination cannot be mapped to an id");
346 }
347
348
349 long consumerId = _consumers.getConsumerId(
350 handle.getConsumerPersistentId());
351 if (consumerId == 0) {
352 throw new PersistenceException(
353 "Cannot update message handle id=" +
354 handle.getMessageId() + " for destination=" +
355 handle.getDestination().getName() + " and consumer=" +
356 handle.getConsumerPersistentId() +
357 " since the consumer cannot be mapped to an id");
358 }
359
360 update = connection.prepareStatement(UPDATE_MSG_HANDLE_STMT);
361 update.setInt(1, handle.getDelivered() ? 1 : 0);
362 update.setString(2, id);
363 update.setLong(3, destinationId);
364 update.setLong(4, consumerId);
365
366
367 if (update.executeUpdate() != 1 && !handle.hasExpired()) {
368
369 _log.error(
370 "Failed to execute updateMessageHandle for handle=" +
371 id + ", destination id=" + destinationId +
372 ", consumer id=" + consumerId);
373 }
374 } catch (SQLException exception) {
375 throw new PersistenceException("Failed to update message handle=" +
376 handle, exception);
377 } finally {
378 SQLHelper.close(update);
379 }
380 }
381
382 /***
383 * Remove all the message handles associated with the specified destination
384 *
385 * @param connection - the connection to use
386 * @param destination the name of the destination
387 * @throws PersistenceException - sql releated exception
388 */
389 public void removeMessageHandles(Connection connection, String destination)
390 throws PersistenceException {
391
392 PreparedStatement delete = null;
393
394 try {
395
396 long destinationId = _destinations.getId(destination);
397 if (destinationId == 0) {
398 throw new PersistenceException(
399 "Cannot remove message handles for destination=" +
400 destination + " since the destination cannot be " +
401 "mapped to an id");
402 }
403
404 delete = connection.prepareStatement(DELETE_MSG_HANDLES_FOR_DEST);
405 delete.setLong(1, destinationId);
406 delete.executeUpdate();
407 } catch (SQLException exception) {
408 throw new PersistenceException(
409 "Failed to remove message handles for destination=" +
410 destination, exception);
411 } finally {
412 SQLHelper.close(delete);
413 }
414 }
415
416 /***
417 * Remove all the message handles for the specified messageid
418 *
419 * @param connection - the connection to use
420 * @param messageId the message identity
421 * @throws PersistenceException - sql releated exception
422 */
423 public void removeMessageHandles(Connection connection, long messageId)
424 throws PersistenceException {
425
426 PreparedStatement delete = null;
427
428 try {
429 delete = connection.prepareStatement(DELETE_MSG_HANDLES_STMT);
430 delete.setLong(1, messageId);
431 delete.executeUpdate();
432 } catch (SQLException exception) {
433 throw new PersistenceException(
434 "Failed to remove message handles for message id=" + messageId,
435 exception);
436 } finally {
437 SQLHelper.close(delete);
438 }
439 }
440
441 /***
442 * Retrieve the message handle for the specified desitation and consumer
443 * name
444 *
445 * @param connection - the connection to use
446 * @param destination - destination name
447 * @param name - consumer name
448 * @return Vector - collection of MessageHandle objects
449 * @throws PersistenceException - sql releated exception
450 */
451 public Vector getMessageHandles(Connection connection, String destination,
452 String name)
453 throws PersistenceException {
454
455 Vector result = new Vector();
456 PreparedStatement select = null;
457 ResultSet set = null;
458
459
460
461 long destinationId = _destinations.getId(destination);
462 long consumerId = _consumers.getConsumerId(name);
463 if ((consumerId == 0) ||
464 (destinationId == 0)) {
465 return result;
466 }
467
468
469
470 try {
471 select = connection.prepareStatement(GET_MSG_HANDLES_FOR_DEST);
472 select.setLong(1, consumerId);
473
474
475
476 set = select.executeQuery();
477 while (set.next()) {
478
479 JmsDestination dest = _destinations.get(set.getLong(2));
480 if (dest == null) {
481 throw new PersistenceException(
482 "Cannot create persistent handle, because " +
483 "destination mapping failed for " + set.getLong(2));
484 }
485
486 String consumer = _consumers.getConsumerName(set.getLong(3));
487 if (name == null) {
488 throw new PersistenceException(
489 "Cannot create persistent handle because " +
490 "consumer mapping failed for " + set.getLong(3));
491 }
492
493 String messageId = set.getString(1);
494 int priority = set.getInt(4);
495 long acceptedTime = set.getLong(5);
496 long sequenceNumber = set.getLong(6);
497 long expiryTime = set.getLong(7);
498 boolean delivered = (set.getInt(8) == 0) ? false : true;
499 MessageHandle handle = new PersistentMessageHandle(
500 messageId, priority, acceptedTime, sequenceNumber,
501 expiryTime, dest, consumer);
502 handle.setDelivered(delivered);
503 result.add(handle);
504 }
505 } catch (SQLException exception) {
506 throw new PersistenceException(
507 "Failed to get message handles for destination=" +
508 destination + ", consumer=" + name, exception);
509 } finally {
510 SQLHelper.close(set);
511 SQLHelper.close(select);
512 }
513
514 return result;
515 }
516
517 /***
518 * Retrieve a distint list of message ids, in this table, between the min
519 * and max times inclusive.
520 *
521 * @param connection - the connection to use
522 * @param min - the minimum time in milliseconds
523 * @param max - the maximum time in milliseconds
524 * @return Vector - collection of String objects
525 * @throws PersistenceException - sql related exception
526 */
527 public Vector getMessageIds(Connection connection, long min, long max)
528 throws PersistenceException {
529
530 Vector result = new Vector();
531 PreparedStatement select = null;
532 ResultSet set = null;
533
534 try {
535 select = connection.prepareStatement(GET_MESSAGE_HANDLES_IN_RANGE);
536 select.setLong(1, min);
537 select.setLong(2, max);
538
539
540
541 set = select.executeQuery();
542 while (set.next()) {
543 result.add(set.getString(1));
544 }
545
546
547 } catch (SQLException exception) {
548 throw new PersistenceException("Failed to retrieve message ids",
549 exception);
550 } finally {
551 SQLHelper.close(set);
552 SQLHelper.close(select);
553 }
554
555 return result;
556 }
557
558 /***
559 * Check if a message with the specified messageId exists in the
560 * table
561 *
562 * @param connection - the connection to use
563 * @param messageId the message Identifier
564 * @return Vector - collection of MessageHandle objects
565 * @throws PersistenceException - sql releated exception
566 */
567 public boolean messageExists(Connection connection, long messageId)
568 throws PersistenceException {
569
570 boolean result = false;
571 PreparedStatement select = null;
572 ResultSet set = null;
573
574 try {
575 select = connection.prepareStatement(GET_MESSAGE_HANDLE_WITH_ID);
576 select.setLong(1, messageId);
577 set = select.executeQuery();
578
579 if (set.next()) {
580 result = true;
581 }
582
583 } catch (SQLException exception) {
584 throw new PersistenceException(
585 "Failed to determine if message exists, id=" + messageId,
586 exception);
587 } finally {
588 SQLHelper.close(set);
589 SQLHelper.close(select);
590 }
591 return result;
592 }
593
594 /***
595 * Returns the number of messages for the specified destination and
596 * consumer
597 *
598 * @param connection - the connection to use
599 * @param destination - destination name
600 * @param name - consumer name
601 * @return Vector - collection of MessageHandle objects
602 * @throws PersistenceException - sql releated exception
603 */
604 public int getMessageCount(Connection connection, String destination,
605 String name)
606 throws PersistenceException {
607
608 int result = -1;
609 boolean destinationIsWildCard = false;
610
611
612 long destinationId = _destinations.getId(destination);
613 if (destinationId == 0) {
614 if (JmsTopic.isWildCard(destination)) {
615 destinationIsWildCard = true;
616 } else {
617 throw new PersistenceException(
618 "Cannot get message handle count for destination=" +
619 destination + " and consumer=" + name +
620 " since the destination cannot be mapped to an id");
621 }
622 }
623
624
625 long consumerId = _consumers.getConsumerId(name);
626 if (consumerId == 0) {
627 throw new PersistenceException(
628 "Cannot get message handle count for destination=" +
629 destination + " and consumer=" + name +
630 " since the consumer cannot be mapped to an id");
631 }
632
633 PreparedStatement select = null;
634 ResultSet set = null;
635
636 try {
637 if (destinationIsWildCard) {
638 select = connection.prepareStatement(
639 GET_MSG_HANDLE_COUNT_FOR_DEST_AND_CONSUMER);
640 select.setLong(1, destinationId);
641 select.setLong(2, consumerId);
642 } else {
643 select = connection.prepareStatement(
644 GET_MSG_HANDLE_COUNT_FOR_CONSUMER);
645 select.setLong(1, consumerId);
646 }
647
648 set = select.executeQuery();
649 if (set.next()) {
650 result = set.getInt(1);
651 }
652 } catch (SQLException exception) {
653 throw new PersistenceException(
654 "Failed to count messages for destination=" + destination +
655 ", consumer=" + name, exception);
656 } finally {
657 SQLHelper.close(set);
658 SQLHelper.close(select);
659 }
660
661 return result;
662 }
663
664 /***
665 * Remove all expired handles for the specified consumer
666 *
667 * @param connection - the connection to use
668 * @param consumer - consumer name
669 * @throws PersistenceException - sql releated exception
670 */
671 public void removeExpiredMessageHandles(Connection connection,
672 String consumer)
673 throws PersistenceException {
674
675 PreparedStatement delete = null;
676
677
678 long consumerId = _consumers.getConsumerId(consumer);
679 if (consumerId != 0) {
680 try {
681 delete = connection.prepareStatement(DELETE_EXPIRED_MESSAGES);
682 delete.setLong(1, consumerId);
683 delete.setLong(2, System.currentTimeMillis());
684 delete.executeUpdate();
685 } catch (SQLException exception) {
686 throw new PersistenceException(
687 "Failed to remove expired message handles",
688 exception);
689 } finally {
690 SQLHelper.close(delete);
691 }
692 }
693 }
694
695 }