View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42   */
43  
44  package org.exolab.jms.persistence;
45  
46  import java.sql.Connection;
47  import java.sql.PreparedStatement;
48  import java.sql.ResultSet;
49  import java.sql.SQLException;
50  import java.util.Vector;
51  
52  import org.apache.commons.logging.Log;
53  import org.apache.commons.logging.LogFactory;
54  
55  import org.exolab.jms.client.JmsDestination;
56  import org.exolab.jms.client.JmsTopic;
57  import org.exolab.jms.message.MessageId;
58  import org.exolab.jms.messagemgr.PersistentMessageHandle;
59  
60  
61  /***
62   * This class provides persistency for PersistentMessageHandle objects
63   * in an RDBMS database
64   *
65   * @version     $Revision: 1.30 $ $Date: 2004/01/08 05:55:07 $
66   * @author      <a href="mailto:tma@netspace.net.au">Tim Anderson</a>
67   * @see         org.exolab.jms.persistence.RDBMSAdapter
68   */
69  class MessageHandles {
70  
71      /***
72       * prepared statement for inserting a message handle
73       */
74      private static final String INSERT_MSG_HANDLE_STMT =
75          "insert into message_handles values " + "(?,?,?,?,?,?,?,?)";
76  
77      /***
78       * prepared statements for deleting message handle
79       */
80      private static final String DELETE_MSG_HANDLE_STMT1 =
81          "delete from message_handles where messageId=? and consumerId=?";
82      private static final String DELETE_MSG_HANDLE_STMT2 =
83          "delete from message_handles where messageId=? and destinationId=? " +
84          "and consumerId=?";
85  
86      /***
87       * Delete all message handles with the specified message id
88       */
89      private static final String DELETE_MSG_HANDLES_STMT =
90          "delete from message_handles where messageId=?";
91  
92      /***
93       * determine the number of handles with the specified message identity
94       */
95      private static final String GET_MSG_HANDLE_COUNT =
96          "select count(messageId) from message_handles where messageId=?";
97  
98      /***
99       * Delete a message from the message table
100      */
101     private static final String DELETE_MESSAGE =
102         "delete from messages where messageId=?";
103 
104     /***
105      * Update a row in the message handles table
106      */
107     private static final String UPDATE_MSG_HANDLE_STMT =
108         "update message_handles set delivered=? where messageId=? and " +
109         "destinationId=? and consumerId=?";
110 
111     /***
112      * Delete all message handles for a destination
113      */
114     private static final String DELETE_MSG_HANDLES_FOR_DEST =
115         "delete from message_handles where destinationId=?";
116 
117     /***
118      * Retrieve all message handles for a particular consumer
119      */
120     private static final String GET_MSG_HANDLES_FOR_DEST =
121         "select * from message_handles where consumerId=? order by " +
122         "acceptedTime asc";
123 
124     /***
125      * Retrieve a range of message handles between the specified times
126      */
127     private static final String GET_MESSAGE_HANDLES_IN_RANGE =
128         "select distinct messageId from message_handles where " +
129         " acceptedTime >= ? and acceptedTime <=?";
130 
131     /***
132      * Retrieve a handle with the specified id
133      */
134     private static final String GET_MESSAGE_HANDLE_WITH_ID =
135         "select distinct messageId from message_handles where messageId=?";
136 
137     /***
138      * Return the number of messages and a specified destination and cousmer
139      */
140     private static final String GET_MSG_HANDLE_COUNT_FOR_DEST_AND_CONSUMER =
141         "select count(messageId) from message_handles where destinationId=? " +
142         "and consumerId=?";
143 
144     /***
145      * Return the number of messages and a specified consumer
146      */
147     private static final String GET_MSG_HANDLE_COUNT_FOR_CONSUMER =
148         "select count(messageId) from message_handles where consumerId=?";
149 
150     /***
151      * Delete all expired messages
152      */
153     private static final String DELETE_EXPIRED_MESSAGES =
154         "delete from message_handles where consumerId=? and expiryTime != 0 " +
155         "and expiryTime<?";
156 
157     /***
158      * Singleton to this class
159      */
160     private static MessageHandles _instance;
161 
162     /***
163      * Cache the transaction retry count
164      */
165     private static int _retryCount = 10;
166 
167     /***
168      * Cache the retry interval in milliseconds
169      */
170     private static long _retryInterval = 50;
171 
172     /***
173      * Used to ensure that only one thread initialises the class
174      */
175     private static final Object _block = new Object();
176 
177     /***
178      * The logger
179      */
180     private static final Log _log = LogFactory.getLog(MessageHandles.class);
181 
182 
183     /***
184      * Returns the singleton instance.
185      *
186      * Note that initialise() must have been invoked first for this
187      * to return a valid instance.
188      *
189      * @return MessageHandles
190      */
191     public static MessageHandles instance() {
192         return _instance;
193     }
194 
195     /***
196      * Constructor
197      */
198     protected MessageHandles() {
199     }
200 
201     /***
202      * Initialise the singleton _instance
203      *
204      * @return      MessageHandles
205      */
206     public static MessageHandles initialise() {
207         if (_instance == null) {
208             synchronized (_block) {
209                 if (_instance == null) {
210                     _instance = new MessageHandles();
211                 }
212             }
213         }
214 
215         return _instance;
216     }
217 
218     /***
219      * Add the specified message handle to the database
220      *
221      * @param connection - the connection to use
222      * @param handle - message handle to add
223      * @throws PersistenceException - if add does not complete
224      */
225     public void addMessageHandle(Connection connection,
226                                  PersistentMessageHandle handle)
227         throws PersistenceException {
228 
229         if (_log.isDebugEnabled()) {
230             _log.debug("addMessageHandle(handle=[consumer="
231                        + handle.getConsumerName()
232                        + ", destination=" + handle.getDestination() 
233                        + ", id=" + handle.getMessageId().getId() + "])");
234         }
235 
236         PreparedStatement insert = null;
237         try {
238             // map the destination name to an actual identity
239             long destinationId = Destinations.instance().getId(
240                 handle.getDestination().getName());
241             if (destinationId == 0) {
242                 throw new PersistenceException(
243                     "Cannot add message handle id=" + handle.getMessageId() +
244                     " for destination=" + handle.getDestination().getName() +
245                     " and consumer=" + handle.getConsumerName() +
246                     " since the destination cannot be mapped to an id");
247             }
248 
249             // map the consumer name ot an identity
250             long consumerId = Consumers.instance().getConsumerId(
251                 handle.getConsumerName());
252             if (consumerId == 0) {
253                 throw new PersistenceException(
254                     "Cannot add message handle id=" + handle.getMessageId() +
255                     " for destination=" + handle.getDestination().getName() +
256                     " and consumer=" + handle.getConsumerName() +
257                     " since the consumer cannot be mapped to an id");
258             }
259 
260             insert = connection.prepareStatement(INSERT_MSG_HANDLE_STMT);
261             insert.setString(1, handle.getMessageId().getId());
262             insert.setLong(2, destinationId);
263             insert.setLong(3, consumerId);
264             insert.setInt(4, handle.getPriority());
265             insert.setLong(5, handle.getAcceptedTime());
266             insert.setLong(6, handle.getSequenceNumber());
267             insert.setLong(7, handle.getExpiryTime());
268             insert.setInt(8, (handle.getDelivered()) ? 1 : 0);
269 
270             // execute the insert
271             if (insert.executeUpdate() != 1) {
272                 _log.error(
273                     "Failed to execute addMessageHandle for handle="
274                     + handle.getMessageId().getId() + ", destination Id="
275                     + destinationId);
276             }
277         } catch (SQLException exception) {
278             throw new PersistenceException("Failed to add message handle=" +
279                 handle, exception);
280         } finally {
281             SQLHelper.close(insert);
282         }
283     }
284 
285     /***
286      * Remove the specified message handle from the database. Once the handle
287      * has been removed check to see whether there are any more message handles
288      * referencing the same message. If there are not then remove the
289      * corresponding message from the messages tables.
290      *
291      * @param connection - the connection to use
292      * @param  handle - the handle to remove
293      * @throws  PersistenceException - sql releated exception
294      */
295     public void removeMessageHandle(Connection connection,
296                                     PersistentMessageHandle handle)
297         throws PersistenceException {
298 
299         if (_log.isDebugEnabled()) {
300             _log.debug("removeMessageHandle(handle=[consumer="
301                        + handle.getConsumerName()
302                        + ", destination=" + handle.getDestination() 
303                        + ", id=" + handle.getMessageId().getId() + "])");
304         }
305 
306         PreparedStatement delete = null;
307         PreparedStatement select = null;
308         ResultSet rs = null;
309 
310         try {
311             // first check to see that the consumer exists and only
312             // proceed if it non-zero.
313             long consumerId = Consumers.instance().getConsumerId(
314                 handle.getConsumerName());
315             if (consumerId != 0) {
316                 // get the message id
317                 String id = handle.getMessageId().getId();
318 
319                 // map the destination name to an actual identity. If it is
320                 // null then the destination does not currently exist but we
321                 // may need to delete orphaned handles
322                 long destinationId = Destinations.instance().getId(
323                     handle.getDestination().getName());
324 
325                 if (destinationId == 0) {
326                     delete = connection.prepareStatement(
327                         DELETE_MSG_HANDLE_STMT1);
328                     delete.setString(1, id);
329                     delete.setLong(2, consumerId);
330 
331                 } else {
332                     delete = connection.prepareStatement(
333                         DELETE_MSG_HANDLE_STMT2);
334                     delete.setString(1, id);
335                     delete.setLong(2, destinationId);
336                     delete.setLong(3, consumerId);
337                 }
338 
339                 // execute the delete
340                 if (delete.executeUpdate() != 1 && !handle.hasExpired()) {
341                     // only log if the message hasn't been garbage
342                     // collected
343                     _log.error("Failed to execute removeMessageHandle for "
344                         + "handle=" + id + " destination id="
345                         + destinationId + " consumer id=" + consumerId);
346                 }
347 
348                 // if there are no more handles with the specified message id
349                 // then delete the corresponding message from the message table
350                 select = connection.prepareStatement(GET_MSG_HANDLE_COUNT);
351                 select.setString(1, id);
352                 rs = select.executeQuery();
353                 if (rs.next() && (rs.getInt(1) == 0)) {
354                     delete.close();
355                     delete = connection.prepareStatement(DELETE_MESSAGE);
356                     delete.setString(1, id);
357                     if (delete.executeUpdate() != 1 && !handle.hasExpired()) {
358                         // can get 2 durable consumers trying to do this
359                         // simultaneously, so don't log an error
360                         // See bug 819212.
361                         if (_log.isDebugEnabled()) {
362                             _log.debug(
363                                 "Failed to delete the message with id=" + id
364                                 + " in a call to removeMessageHandle.");
365                         }
366                     }
367                 }
368             }
369         } catch (SQLException exception) {
370             throw new PersistenceException("Failed to remove message handle=" +
371                 handle, exception);
372         } finally {
373             SQLHelper.close(rs);
374             SQLHelper.close(delete);
375             SQLHelper.close(select);
376         }
377     }
378 
379     /***
380      * Update the specified message handle from the database
381      *
382      * @param connection - the connection to use
383      * @param  handle - the handle to update
384      * @throws  PersistenceException - sql releated exception
385      */
386     public void updateMessageHandle(Connection connection,
387                                     PersistentMessageHandle handle)
388         throws PersistenceException {
389         PreparedStatement update = null;
390 
391         if (_log.isDebugEnabled()) {
392             _log.debug("updateMessageHandle(handle=[consumer="
393                        + handle.getConsumerName()
394                        + ", destination=" + handle.getDestination() 
395                        + ", id=" + handle.getMessageId().getId() + "])");
396         }
397 
398         try {
399             // get the message id
400             String id = handle.getMessageId().getId();
401 
402             // map the destination name to an actual identity
403             long destinationId = Destinations.instance().getId(
404                 handle.getDestination().getName());
405             if (destinationId == 0) {
406                 throw new PersistenceException(
407                     "Cannot update message handle id=" +
408                     handle.getMessageId() + " for destination=" +
409                     handle.getDestination().getName() + " and consumer=" +
410                     handle.getConsumerName() +
411                     " since the destination cannot be mapped to an id");
412             }
413 
414             // map the consumer name to an identity
415             long consumerId = Consumers.instance().getConsumerId(
416                 handle.getConsumerName());
417             if (consumerId == 0) {
418                 throw new PersistenceException(
419                     "Cannot update message handle id=" +
420                     handle.getMessageId() + " for destination=" +
421                     handle.getDestination().getName() + " and consumer=" +
422                     handle.getConsumerName() +
423                     " since the consumer cannot be mapped to an id");
424             }
425 
426             update = connection.prepareStatement(UPDATE_MSG_HANDLE_STMT);
427             update.setInt(1, handle.getDelivered() ? 1 : 0);
428             update.setString(2, id);
429             update.setLong(3, destinationId);
430             update.setLong(4, consumerId);
431 
432             // execute the delete
433             if (update.executeUpdate() != 1 && !handle.hasExpired()) {
434                 // only log if the message hasn't been garbage collected
435                 _log.error(
436                     "Failed to execute updateMessageHandle for handle=" +
437                     id + ", destination id=" + destinationId +
438                     ", consumer id=" + consumerId);
439             }
440         } catch (SQLException exception) {
441             throw new PersistenceException("Failed to update message handle=" +
442                 handle, exception);
443         } finally {
444             SQLHelper.close(update);
445         }
446     }
447 
448     /***
449      * Remove all the message handles associated with the specified destination
450      *
451      * @param connection - the connection to use
452      * @param  String name of destination
453      * @throws  PersistenceException - sql releated exception
454      */
455     public void removeMessageHandles(Connection connection, String destination)
456         throws PersistenceException {
457 
458         PreparedStatement delete = null;
459 
460         try {
461             // map the destination name to an actual identity
462             long destinationId = Destinations.instance().getId(destination);
463             if (destinationId == 0) {
464                 throw new PersistenceException(
465                     "Cannot remove message handles for destination=" +
466                     destination + " since the destination cannot be " +
467                     "mapped to an id");
468             }
469 
470             delete = connection.prepareStatement(DELETE_MSG_HANDLES_FOR_DEST);
471             delete.setLong(1, destinationId);
472             delete.executeUpdate();
473         } catch (SQLException exception) {
474             throw new PersistenceException(
475                 "Failed to remove message handles for destination=" +
476                 destination, exception);
477         } finally {
478             SQLHelper.close(delete);
479         }
480     }
481 
482     /***
483      * Remove all the message handles for the specified messageid
484      *
485      * @param connection - the connection to use
486      * @param id - message identity
487      * @throws  PersistenceException - sql releated exception
488      */
489     public void removeMessageHandles(Connection connection, long messageId)
490         throws PersistenceException {
491 
492         PreparedStatement delete = null;
493 
494         try {
495             delete = connection.prepareStatement(DELETE_MSG_HANDLES_STMT);
496             delete.setLong(1, messageId);
497             delete.executeUpdate();
498         } catch (SQLException exception) {
499             throw new PersistenceException(
500                 "Failed to remove message handles for message id=" + messageId,
501                 exception);
502         } finally {
503             SQLHelper.close(delete);
504         }
505     }
506 
507     /***
508      * Retrieve the message handle for the specified desitation and consumer
509      * name
510      *
511      * @param connection - the connection to use
512      * @param destination - destination name
513      * @param name - consumer name
514      * @return Vector - collection of PersistentMessageHandle objects
515      * @throws  PersistenceException - sql releated exception
516      */
517     public Vector getMessageHandles(Connection connection, String destination,
518                                     String name)
519         throws PersistenceException {
520 
521         Vector result = new Vector();
522         PreparedStatement select = null;
523         ResultSet set = null;
524 
525         // if the consumer and/or destination cannot be mapped then
526         // return an empty vector
527         long destinationId = Destinations.instance().getId(destination);
528         long consumerId = Consumers.instance().getConsumerId(name);
529         if ((consumerId == 0) ||
530             (destinationId == 0)) {
531             return result;
532         }
533 
534         // all preprequisites have been met so continue processing the
535         // request.
536         try {
537             select = connection.prepareStatement(GET_MSG_HANDLES_FOR_DEST);
538             select.setLong(1, consumerId);
539 
540             // iterate through the result set and construct the corresponding
541             // PersistentMessageHandles
542             set = select.executeQuery();
543             while (set.next()) {
544                 // Attempt to retrieve the corresponding destination
545                 JmsDestination dest = Destinations.instance().get(
546                     set.getLong(2));
547                 if (dest == null) {
548                     throw new PersistenceException(
549                         "Cannot create persistent handle, because " +
550                         "destination mapping failed for " + set.getLong(2));
551                 }
552 
553                 String consumer = Consumers.instance().getConsumerName(
554                     set.getLong(3));
555                 if (name == null) {
556                     throw new PersistenceException(
557                         "Cannot create persistent handle because " +
558                         "consumer mapping failed for " + set.getLong(3));
559                 }
560 
561                 PersistentMessageHandle handle = new PersistentMessageHandle();
562                 handle.setMessageId(new MessageId(set.getString(1)));
563                 handle.setDestination(dest);
564                 handle.setConsumerName(consumer);
565                 handle.setPriority(set.getInt(4));
566                 handle.setAcceptedTime(set.getLong(5));
567                 handle.setSequenceNumber(set.getLong(6));
568                 handle.setExpiryTime(set.getLong(7));
569                 handle.setDelivered((set.getInt(8) == 0) ? false : true);
570                 result.add(handle);
571             }            
572         } catch (SQLException exception) {
573             throw new PersistenceException(
574                 "Failed to get message handles for destination=" +
575                 destination + ", consumer=" + name, exception);
576         } finally {
577             SQLHelper.close(set);
578             SQLHelper.close(select);
579         }
580 
581         return result;
582     }
583 
584     /***
585      * Retrieve a distint list of message ids, in this table, between the min
586      * and max times inclusive.
587      *
588      * @param connection - the connection to use
589      * @param min - the minimum time in milliseconds
590      * @param max - the maximum time in milliseconds
591      * @return Vector - collection of String objects
592      * @throws  PersistenceException - sql related exception
593      */
594     public Vector getMessageIds(Connection connection, long min, long max)
595         throws PersistenceException {
596 
597         Vector result = new Vector();
598         PreparedStatement select = null;
599         ResultSet set = null;
600 
601         try {
602             select = connection.prepareStatement(GET_MESSAGE_HANDLES_IN_RANGE);
603             select.setLong(1, min);
604             select.setLong(2, max);
605 
606             // iterate through the result set and construct the corresponding
607             // PersistentMessageHandles
608             set = select.executeQuery();
609             while (set.next()) {
610                 result.add(set.getString(1));
611             }
612 
613            
614         } catch (SQLException exception) {
615             throw new PersistenceException("Failed to retrieve message ids",
616                 exception);
617         } finally {
618             SQLHelper.close(set);
619             SQLHelper.close(select);
620         }
621 
622         return result;
623     }
624 
625     /***
626      * Check if a message with the specified messageId exists in the
627      * table
628      *
629      * @param connection - the connection to use
630      * @param id - message Id
631      * @return Vector - collection of PersistentMessageHandle objects
632      * @throws  PersistenceException - sql releated exception
633      */
634     public boolean messageExists(Connection connection, long messageId)
635         throws PersistenceException {
636 
637         boolean result = false;
638         PreparedStatement select = null;
639         ResultSet set = null;
640 
641         try {
642             select = connection.prepareStatement(GET_MESSAGE_HANDLE_WITH_ID);
643             select.setLong(1, messageId);
644             set = select.executeQuery();
645 
646             if (set.next()) {
647                 result = true;
648             }
649             
650         } catch (SQLException exception) {
651             throw new PersistenceException(
652                 "Failed to determine if message exists, id=" + messageId,
653                 exception);
654         } finally {
655             SQLHelper.close(set);
656             SQLHelper.close(select);
657         }
658         return result;
659     }
660 
661     /***
662      * Returns the number of messages for the specified destination and
663      * consumer
664      *
665      * @param connection - the connection to use
666      * @param destination - destination name
667      * @param name - consumer name
668      * @return Vector - collection of PersistentMessageHandle objects
669      * @throws  PersistenceException - sql releated exception
670      */
671     public int getMessageCount(Connection connection, String destination,
672                                String name)
673         throws PersistenceException {
674 
675         int result = -1;
676         boolean destinationIsWildCard = false;
677 
678         // map the destination name to an actual identity
679         long destinationId = Destinations.instance().getId(destination);
680         if (destinationId == 0) {
681             if (JmsTopic.isWildCard(destination)) {
682                 destinationIsWildCard = true;
683             } else {
684                 throw new PersistenceException(
685                     "Cannot get message handle count for destination=" +
686                     destination + " and consumer=" + name +
687                     " since the destination cannot be mapped to an id");
688             }
689         }
690 
691         // map the consumer name to an identity
692         long consumerId = Consumers.instance().getConsumerId(name);
693         if (consumerId == 0) {
694             throw new PersistenceException(
695                 "Cannot get message handle count for destination=" +
696                 destination + " and consumer=" + name +
697                 " since the consumer cannot be mapped to an id");
698         }
699 
700         PreparedStatement select = null;
701         ResultSet set = null;
702 
703         try {
704             if (destinationIsWildCard) {
705                 select = connection.prepareStatement(
706                     GET_MSG_HANDLE_COUNT_FOR_DEST_AND_CONSUMER);
707                 select.setLong(1, destinationId);
708                 select.setLong(2, consumerId);
709             } else {
710                 select = connection.prepareStatement(
711                     GET_MSG_HANDLE_COUNT_FOR_CONSUMER);
712                 select.setLong(1, consumerId);
713             }
714 
715             set = select.executeQuery();
716             if (set.next()) {
717                 result = set.getInt(1);
718             }
719         } catch (SQLException exception) {
720             throw new PersistenceException(
721                 "Failed to count messages for destination=" + destination +
722                 ", consumer=" + name, exception);
723         } finally {
724             SQLHelper.close(set);
725             SQLHelper.close(select);
726         }
727 
728         return result;
729     }
730 
731     /***
732      * Remove all expired handles for the specified consumer
733      *
734      * @param connection - the connection to use
735      * @param consumer - consumer name
736      * @throws  PersistenceException - sql releated exception
737      */
738     public void removeExpiredMessageHandles(Connection connection,
739                                             String consumer)
740         throws PersistenceException {
741 
742         PreparedStatement delete = null;
743 
744         // map the consumer name ot an identity
745         long consumerId = Consumers.instance().getConsumerId(consumer);
746         if (consumerId != 0) {
747             try {
748                 delete = connection.prepareStatement(DELETE_EXPIRED_MESSAGES);
749                 delete.setLong(1, consumerId);
750                 delete.setLong(2, System.currentTimeMillis());
751                 delete.executeUpdate();
752             } catch (SQLException exception) {
753                 throw new PersistenceException(
754                     "Failed to remove expired message handles",
755                     exception);
756             } finally {
757                 SQLHelper.close(delete);
758             }
759         }
760     }
761 
762     /***
763      * Deallocates resources owned or referenced by the instance
764      */
765     public void close() {
766         _instance = null;
767     }
768 
769 } //-- MessageHandles