View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: RDBMSAdapter.java,v 1.6 2005/08/31 05:45:50 tanderson Exp $
44   */
45  package org.exolab.jms.persistence;
46  
47  import java.sql.Connection;
48  import java.sql.Date;
49  import java.sql.PreparedStatement;
50  import java.sql.ResultSet;
51  import java.sql.SQLException;
52  import java.util.Enumeration;
53  import java.util.HashMap;
54  import java.util.Vector;
55  
56  import EDU.oswego.cs.dl.util.concurrent.FIFOReadWriteLock;
57  import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
58  
59  import org.apache.commons.logging.Log;
60  import org.apache.commons.logging.LogFactory;
61  
62  import org.exolab.jms.authentication.User;
63  import org.exolab.jms.client.JmsDestination;
64  import org.exolab.jms.client.JmsQueue;
65  import org.exolab.jms.client.JmsTopic;
66  import org.exolab.jms.config.DatabaseConfiguration;
67  import org.exolab.jms.config.RdbmsDatabaseConfiguration;
68  import org.exolab.jms.events.BasicEventManager;
69  import org.exolab.jms.events.EventHandler;
70  import org.exolab.jms.message.MessageImpl;
71  import org.exolab.jms.messagemgr.MessageHandle;
72  
73  
74  /***
75   * This adapter is a wrapper class around the persistency mechanism.
76   * It isolates the client from the working specifics of the database, by
77   * providing a simple straight forward interface. Furure changes to
78   * the database will only require changes to the adapter.
79   *
80   * @author <a href="mailto:mourikis@exolab.org">Jim Mourikis</a>
81   * @version $Revision: 1.6 $ $Date: 2005/08/31 05:45:50 $
82   */
83  
84  public class RDBMSAdapter
85      extends PersistenceAdapter
86      implements EventHandler {
87  
88      /***
89       * The seed generator.
90       */
91      private final SeedGenerator _seeds;
92  
93      /***
94       * The destination manager.
95       */
96      private final Destinations _destinations;
97  
98      /***
99       * The consumer manager.
100      */
101     private final Consumers _consumers;
102 
103     /***
104      * The message manager.
105      */
106     private final Messages _messages;
107 
108     /***
109      * The message handles manager.
110      */
111     private final MessageHandles _handles;
112 
113     /***
114      * The user manager.
115      */
116     private final Users _users;
117 
118     /***
119      * The schema version number. Note this must be incremented whenever
120      * The schema changes.
121      */
122     public static final String SCHEMA_VERSION = "V0.7.6";
123 
124     /***
125      *  The JDBC ConnectionManager.
126      */
127     private DBConnectionManager _connectionManager = null;
128 
129     /***
130      * Lock to help prevent deadlocks when administratively removing
131      * destinations, while producers and consumers are actively sending
132      * and receiving messages. It ensures that when a destination is in the
133      * process of being removed, no other changes are occuring on the
134      * messages and message_handles tables.
135      */
136     private ReadWriteLock _destinationLock = new FIFOReadWriteLock();
137 
138     /***
139      * The logger.
140      */
141     private static final Log _log = LogFactory.getLog(RDBMSAdapter.class);
142 
143 
144     /***
145      * Connects to the given db.
146      *
147      * @throws PersistenceException if a connection cannot be establised to the
148      *                              database
149      */
150     public RDBMSAdapter(DatabaseConfiguration dbConfig, String driver, String url,
151                  String userName, String password)
152             throws PersistenceException {
153 
154         RdbmsDatabaseConfiguration config =
155                 dbConfig.getRdbmsDatabaseConfiguration();
156 
157         // create the connection manager, and configure it
158         _connectionManager = getConnectionManager(config.getClazz());
159         _connectionManager.setUser(userName);
160         _connectionManager.setPassword(password);
161         _connectionManager.setDriver(driver);
162         _connectionManager.setURL(url);
163         _connectionManager.setMaxActive(config.getMaxActive());
164         _connectionManager.setMaxIdle(config.getMaxIdle());
165         _connectionManager.setMinIdleTime(config.getMinIdleTime());
166         _connectionManager.setEvictionInterval(config.getEvictionInterval());
167         _connectionManager.setTestQuery(config.getTestQuery());
168         _connectionManager.setTestBeforeUse(config.getTestBeforeUse());
169 
170         // initialisze the connection manager
171         _connectionManager.init();
172 
173         Connection connection = null;
174         try {
175             // initialize the various caches and helper classes used to
176             // execute the various SQL.
177             connection = getConnection();
178 
179             String version = getSchemaVersion(connection);
180             if (version == null) {
181                 initSchemaVersion(connection);
182             } else if (!version.equals(SCHEMA_VERSION)) {
183                 throw new PersistenceException(
184                     "Schema needs to be converted from version=" + version
185                     + " to version=" + SCHEMA_VERSION
186                     + "\nBack up your database, and run 'dbtool -migrate'"
187                     + "to convert the schema");
188             }
189 
190             _seeds = new SeedGenerator();
191             _consumers = new Consumers(_seeds, connection);
192             _destinations = new Destinations(_seeds, _consumers, connection);
193             _consumers.setDestinations(_destinations);
194             _messages = new Messages(_destinations);
195             _handles = new MessageHandles(_destinations, _consumers);
196             _users = new Users();
197             connection.commit();
198         } catch (PersistenceException exception) {
199             SQLHelper.rollback(connection);
200             throw exception;
201         } catch (Exception exception) {
202             throw new PersistenceException(
203                     "Failed to initialise database adapter", exception);
204         } finally {
205             SQLHelper.close(connection);
206 
207         }
208 
209 /*
210         // check whether we should initiate automatic garbage collection
211         if (dbConfig.hasGarbageCollectionInterval()) {
212             _gcInterval = dbConfig.getGarbageCollectionInterval() * 1000;
213             registerEvent();
214         }
215 
216         if (dbConfig.hasGarbageCollectionBlockSize()) {
217             _gcBlockSize = dbConfig.getGarbageCollectionBlockSize();
218         }
219 
220         if (dbConfig.hasGarbageCollectionThreadPriority()) {
221             _gcThreadPriority = dbConfig.getGarbageCollectionBlockSize();
222             if (_gcThreadPriority < Thread.MIN_PRIORITY) {
223                 _gcThreadPriority = Thread.MIN_PRIORITY;
224             } else if (_gcThreadPriority > Thread.MAX_PRIORITY) {
225                 _gcThreadPriority = Thread.MAX_PRIORITY;
226             }
227         }
228 */
229     }
230 
231     /***
232      * Close the database.
233      */
234     public void close() {
235         _consumers.close();
236         _destinations.close();
237     }
238 
239     // implementation of PersistenceAdapter.getLastId
240     public long getLastId(Connection connection)
241         throws PersistenceException {
242 
243         long lastId = -1;
244         PreparedStatement query = null;
245         ResultSet result = null;
246         PreparedStatement insert = null;
247         try {
248             query = connection.prepareStatement(
249                     "select maxid from message_id where id = 1");
250             result = query.executeQuery();
251 
252             if (result.next()) {
253                 lastId = result.getInt(1);
254             } else {
255                 // first entry create.
256                 insert = connection.prepareStatement(
257                         "insert into message_id values (?,?)");
258                 insert.setInt(1, 1);
259                 insert.setLong(2, 0);
260                 insert.executeUpdate();
261                 lastId = 0;
262             }
263         } catch (Exception exception) {
264             throw new PersistenceException("Failed to get last message id",
265                                            exception);
266         } finally {
267             SQLHelper.close(result);
268             SQLHelper.close(insert);
269             SQLHelper.close(query);
270         }
271 
272         return lastId;
273     }
274 
275     // implementation of PersistenceAdapter.updateIds
276     public void updateIds(Connection connection, long id)
277             throws PersistenceException {
278         PreparedStatement insert = null;
279         try {
280             insert = connection.prepareStatement(
281                     "update message_id set maxId = ? where id = 1");
282 
283             insert.setLong(1, id);
284             insert.executeUpdate();
285         } catch (Exception exception) {
286             throw new PersistenceException("Failed to update message id",
287                                            exception);
288         } finally {
289             SQLHelper.close(insert);
290         }
291     }
292 
293     // implementation of PersistenceMessage.addMessage
294     public void addMessage(Connection connection, MessageImpl message)
295             throws PersistenceException {
296 
297         long start = 0;
298 
299         if (_log.isDebugEnabled()) {
300             start = System.currentTimeMillis();
301         }
302 
303         try {
304             _destinationLock.readLock().acquire();
305             _messages.add(connection, message);
306         } catch (InterruptedException exception) {
307             throw new PersistenceException("Failed to acquire lock",
308                                            exception);
309         } finally {
310             _destinationLock.readLock().release();
311 
312             if (_log.isDebugEnabled()) {
313                 _log.debug("addMessage," +
314                            (System.currentTimeMillis() - start));
315             }
316         }
317     }
318 
319     // implementation of PersistenceMessage.addMessage
320     public void updateMessage(Connection connection, MessageImpl message)
321             throws PersistenceException {
322         long start = 0;
323         if (_log.isDebugEnabled()) {
324             start = System.currentTimeMillis();
325         }
326 
327         try {
328             _destinationLock.readLock().acquire();
329             _messages.update(connection, message);
330         } catch (InterruptedException exception) {
331             throw new PersistenceException("Failed to acquire lock",
332                                            exception);
333         } finally {
334             _destinationLock.readLock().release();
335             if (_log.isDebugEnabled()) {
336                 _log.debug("updateMessage," +
337                            (System.currentTimeMillis() - start));
338             }
339         }
340     }
341 
342     // implementation of PersistenceAdapter.getUnprocessedMessages
343     public Vector getUnprocessedMessages(Connection connection)
344             throws PersistenceException {
345         long start = 0;
346         if (_log.isDebugEnabled()) {
347             start = System.currentTimeMillis();
348         }
349 
350         try {
351             return _messages.getUnprocessedMessages(connection);
352         } finally {
353             if (_log.isDebugEnabled()) {
354                 _log.debug(
355                         "getUnprocessedMessages,"
356                         + (System.currentTimeMillis() - start));
357             }
358         }
359     }
360 
361 
362     // implementation of PersistenceAdapter.removeMessage
363     public void removeMessage(Connection connection, String id)
364             throws PersistenceException {
365         long start = 0;
366         if (_log.isDebugEnabled()) {
367             start = System.currentTimeMillis();
368         }
369 
370         try {
371             _destinationLock.readLock().acquire();
372             _messages.remove(connection, id);
373         } catch (InterruptedException exception) {
374             throw new PersistenceException("Failed to acquire lock",
375                                            exception);
376         } finally {
377             _destinationLock.readLock().release();
378             if (_log.isDebugEnabled()) {
379                 _log.debug("removeMessage," +
380                            (System.currentTimeMillis() - start));
381             }
382         }
383     }
384 
385     // implementation of PersistenceAdapter.getMessage
386     public MessageImpl getMessage(Connection connection, String id)
387             throws PersistenceException {
388         long start = 0;
389         if (_log.isDebugEnabled()) {
390             start = System.currentTimeMillis();
391         }
392 
393         try {
394             return _messages.get(connection, id);
395         } finally {
396             if (_log.isDebugEnabled()) {
397                 _log.debug(
398                         "getMessage," + (System.currentTimeMillis() - start));
399             }
400         }
401     }
402 
403     // implementation of PersistenceAdapter.getMessages
404     public Vector getMessages(Connection connection, MessageHandle handle)
405             throws PersistenceException {
406         long start = 0;
407         if (_log.isDebugEnabled()) {
408             start = System.currentTimeMillis();
409         }
410 
411         try {
412             return _messages.getMessages(connection,
413                                                    handle.getDestination()
414                                                    .getName(), handle.getPriority(),
415                                                    handle.getAcceptedTime());
416         } finally {
417             if (_log.isDebugEnabled()) {
418                 _log.debug(
419                         "getMessages," + (System.currentTimeMillis() - start));
420             }
421         }
422     }
423 
424     // implementation of PersistenceAdapter.addMessageHandle
425     public void addMessageHandle(Connection connection, MessageHandle handle)
426             throws PersistenceException {
427         long start = 0;
428         if (_log.isDebugEnabled()) {
429             start = System.currentTimeMillis();
430         }
431 
432         try {
433             _destinationLock.readLock().acquire();
434             _handles.addMessageHandle(connection, handle);
435         } catch (InterruptedException exception) {
436             throw new PersistenceException("Failed to acquire lock",
437                                            exception);
438         } finally {
439             _destinationLock.readLock().release();
440             if (_log.isDebugEnabled()) {
441                 _log.debug(
442                         "addMessageHandle,"
443                         + (System.currentTimeMillis() - start));
444             }
445         }
446     }
447 
448     // implementation of PersistenceAdapter.updateMessageHandle
449     public void updateMessageHandle(Connection connection,
450                                     MessageHandle handle)
451             throws PersistenceException {
452         long start = 0;
453         if (_log.isDebugEnabled()) {
454             start = System.currentTimeMillis();
455         }
456 
457         try {
458             _destinationLock.readLock().acquire();
459             _handles.updateMessageHandle(connection, handle);
460         } catch (InterruptedException exception) {
461             throw new PersistenceException("Failed to acquire lock",
462                                            exception);
463         } finally {
464             _destinationLock.readLock().release();
465             if (_log.isDebugEnabled()) {
466                 _log.debug(
467                         "updateMessageHandle,"
468                         + (System.currentTimeMillis() - start));
469             }
470         }
471     }
472 
473     // implementation of PersistenceAdapter.removeMessageHandle
474     public void removeMessageHandle(Connection connection,
475                                     MessageHandle handle)
476             throws PersistenceException {
477         long start = 0;
478         if (_log.isDebugEnabled()) {
479             start = System.currentTimeMillis();
480         }
481 
482         try {
483             _destinationLock.readLock().acquire();
484             _handles.removeMessageHandle(connection, handle);
485         } catch (InterruptedException exception) {
486             throw new PersistenceException("Failed to acquire lock",
487                                            exception);
488         } finally {
489             _destinationLock.readLock().release();
490             if (_log.isDebugEnabled()) {
491                 _log.debug(
492                         "removeMessageHandle,"
493                         + (System.currentTimeMillis() - start));
494             }
495         }
496     }
497 
498     // implementation of PersistenceAdapter.getMessageHandles
499     public Vector getMessageHandles(Connection connection,
500                                     JmsDestination destination, String name)
501             throws PersistenceException {
502         long start = 0;
503         if (_log.isDebugEnabled()) {
504             start = System.currentTimeMillis();
505         }
506 
507         try {
508             return _handles.getMessageHandles(connection,
509                                                                destination.getName(),
510                                                                name);
511         } finally {
512             if (_log.isDebugEnabled()) {
513                 _log.debug("getMessageHandles,"
514                            + (System.currentTimeMillis() - start));
515             }
516         }
517     }
518 
519     // implementation of PersistenceAdapter.addDurableConsumer
520     public void addDurableConsumer(Connection connection, String topic,
521                                    String consumer)
522             throws PersistenceException {
523 
524         try {
525             _destinationLock.readLock().acquire();
526             _consumers.add(connection, topic, consumer);
527         } catch (InterruptedException exception) {
528             throw new PersistenceException("Failed to acquire lock",
529                                            exception);
530         } finally {
531             _destinationLock.readLock().release();
532         }
533     }
534 
535     // implementation of PersistenceAdapter.removeDurableConsumer
536     public void removeDurableConsumer(Connection connection, String consumer)
537             throws PersistenceException {
538 
539         try {
540             _destinationLock.readLock().acquire();
541             _consumers.remove(connection, consumer);
542         } catch (InterruptedException exception) {
543             throw new PersistenceException("Failed to acquire lock",
544                                            exception);
545         } finally {
546             _destinationLock.readLock().release();
547         }
548     }
549 
550     // implementation of PersistenceAdapter.getDurableConsumers
551     public Enumeration getDurableConsumers(Connection connection, String topic)
552             throws PersistenceException {
553         return _consumers.getDurableConsumers(topic).elements();
554     }
555 
556     // implementation of PersistenceAdapter.getAllDurableConsumers
557     public HashMap getAllDurableConsumers(Connection connection)
558             throws PersistenceException {
559 
560         return _consumers.getAllDurableConsumers();
561     }
562 
563     // implementation of PersistenceAdapter.durableConsumerExists
564     public boolean durableConsumerExists(Connection connection, String name)
565             throws PersistenceException {
566 
567         return _consumers.exists(name);
568     }
569 
570     // implementation of PersistenceAdapter.addDestination
571     public void addDestination(Connection connection, String name,
572                                boolean queue)
573             throws PersistenceException {
574 
575         JmsDestination destination = (queue)
576                 ? (JmsDestination) new JmsQueue(name)
577                 : (JmsDestination) new JmsTopic(name);
578 
579         // create the destination. If the destination is also
580         // a queue create a special consumer for it.
581         try {
582             _destinationLock.readLock().acquire();
583             _destinations.add(connection, destination);
584             if (queue) {
585                 _consumers.add(connection, name, name);
586             }
587         } catch (InterruptedException exception) {
588             throw new PersistenceException("Failed to acquire lock",
589                                            exception);
590         } finally {
591             _destinationLock.readLock().release();
592         }
593     }
594 
595     // implementation of PersistenceAdapter.removeDestination
596     public void removeDestination(Connection connection, String name)
597             throws PersistenceException {
598 
599         JmsDestination destination = _destinations.get(name);
600         if (destination != null) {
601             try {
602                 _destinationLock.writeLock().acquire();
603                 _destinations.remove(connection, destination);
604             } catch (InterruptedException exception) {
605                 throw new PersistenceException("Failed to acquire lock",
606                                                exception);
607             } finally {
608                 _destinationLock.writeLock().release();
609             }
610         }
611     }
612 
613     // implementation of PersistenceAdapter.getAllDestinations
614     public Enumeration getAllDestinations(Connection connection)
615             throws PersistenceException {
616 
617         return _destinations.getDestinations().elements();
618     }
619 
620     // implementation of PersistenceAdapter.checkDestination
621     public boolean checkDestination(Connection connection, String name)
622             throws PersistenceException {
623 
624         return (_destinations.get(name) != null);
625     }
626 
627     // implementation of getQueueMessageCount
628     public int getQueueMessageCount(Connection connection, String name)
629             throws PersistenceException {
630 
631         return _handles.getMessageCount(connection, name,
632                                                          name);
633     }
634 
635     // implementation of PersistenceAdapter.getQueueMessageCount
636     public int getDurableConsumerMessageCount(Connection connection,
637                                               String destination, String name)
638             throws PersistenceException {
639 
640         return _handles.getMessageCount(connection,
641                                                          destination, name);
642     }
643 
644     // implementation of PersistenceAdapter.getQueueMessageCount
645     public void removeExpiredMessages(Connection connection)
646             throws PersistenceException {
647 
648         _messages.removeExpiredMessages(connection);
649     }
650 
651     // implementation of PersistenceAdapter.removeExpiredMessageHandles
652     public void removeExpiredMessageHandles(Connection connection,
653                                             String consumer)
654             throws PersistenceException {
655 
656         _handles.removeExpiredMessageHandles(connection,
657                                                               consumer);
658     }
659 
660     // implementation of PersistenceAdapter.getNonExpiredMessages
661     public Vector getNonExpiredMessages(Connection connection,
662                                         JmsDestination destination)
663             throws PersistenceException {
664 
665         return _messages.getNonExpiredMessages(connection,
666                                                          destination);
667     }
668 
669     // implementation of EventHandler.handleEvent
670     public void handleEvent(int event, Object callback, long time) {
671         // disabled, as per bug 816895 - Exception in purgeMessages
672 //          if (event == COLLECT_DATABASE_GARBAGE_EVENT) {
673 //              // collect garbage now, but before doing so change the thread
674 //              // priority to low.
675 //              try {
676 //                  Thread.currentThread().setPriority(_gcThreadPriority);
677 //                  purgeMessages();
678 //              } finally {
679 //                  Thread.currentThread().setPriority(Thread.NORM_PRIORITY);
680 //                  registerEvent();
681 //              }
682 //          }
683     }
684 
685     /***
686      * Return a connection to the database from the pool of connections. It will
687      * throw an PersistenceException if it cannot retrieve a connection. The
688      * client should close the connection normally, since the pool is a
689      * connection event listener.
690      *
691      * @return Connection - a pooled connection or null
692      * @throws PersistenceException - if it cannot retrieve a connection
693      */
694     public Connection getConnection()
695             throws PersistenceException {
696         return _connectionManager.getConnection();
697     }
698 
699     /***
700      * Return a reference to the DBConnectionManager
701      *
702      * @return DBConnectionManager
703      */
704     public DBConnectionManager getDBConnectionManager() {
705         return _connectionManager;
706     }
707 
708     public void addUser(Connection connection, User user)
709             throws PersistenceException {
710         _users.add(connection, user);
711     }
712 
713     public Enumeration getAllUsers(Connection connection)
714             throws PersistenceException {
715         return _users.getAllUsers(connection).elements();
716     }
717 
718     public User getUser(Connection connection, User user)
719             throws PersistenceException {
720         return _users.get(connection, user);
721     }
722 
723     public void removeUser(Connection connection, User user)
724             throws PersistenceException {
725         _users.remove(connection, user);
726     }
727 
728     public void updateUser(Connection connection, User user)
729             throws PersistenceException {
730         _users.update(connection, user);
731     }
732 
733     /***
734      * Incrementally purge all processed messages from the database.
735      * @todo this needs to be revisited. See bug 816895
736      * - existing expired messages are purged at startup
737      * - messages received that subsequently expire while the server is
738      *   running are removed individually.
739      * - not clear how the previous implementation ever worked.
740      *   The Messages.getMessageIds() method returns all messages, not
741      *   just those processed, nor is it clear that the processed flag
742      *   is ever non-zero.
743      *   The current implementation (as a fix for bug 816895 - Exception in
744      *   purgeMessages) simply delegates to removeExpiredMessages()
745      *
746      * @return the number of messages deleted
747      */
748     public synchronized int purgeMessages() {
749         // int deleted = 0;
750 
751         Connection connection = null;
752         try {
753             connection = getConnection();
754             removeExpiredMessages(connection);
755             connection.commit();
756         } catch (Exception exception) {
757             _log.error("Exception in purgeMessages", exception);
758         } finally {
759             SQLHelper.close(connection);
760         }
761         return 0;
762 
763 //          if (connection == null) {
764 //              return 0;
765 //          }
766 
767 //          // we have a valid connection so we can proceed
768 //          try {
769 //              long stime = System.currentTimeMillis();
770 //              HashMap msgids = _messages.getMessageIds(
771 //                  connection, _lastTime, _gcBlockSize);
772 
773 //              // if there are no messages then reset the last time to
774 //              // 0 and break;
775 //              if (msgids.size() > 0) {
776 //                  // find the minimum and maximum..we can improve the way we
777 //                  // do this.
778 //                  Iterator iter = msgids.values().iterator();
779 //                  long min = -1;
780 //                  long max = -1;
781 
782 //                  while (iter.hasNext()) {
783 //                      Long id = (Long) iter.next();
784 //                      if ((min == -1) &&
785 //                          (max == -1)) {
786 //                          min = id.longValue();
787 //                          max = id.longValue();
788 //                      }
789 
790 //                      if (id.longValue() < min) {
791 //                          min = id.longValue();
792 //                      } else if (id.longValue() > max) {
793 //                          max = id.longValue();
794 //                      }
795 //                  }
796 
797 //                  // set the last time for the next iteration unless the
798 //                  // the size of the msgids is less than the gcBlockSize.
799 //                  // If the later is the case then reset the last time.
800 //                  // This is in preparation for the next pass through this
801 //                  // method.
802 //                  if (msgids.size() < _gcBlockSize) {
803 //                      _lastTime = 0;
804 //                  } else {
805 //                      _lastTime = max;
806 //                  }
807 
808 //                  // now iterate through the message list and delete the
809 //                  // messages that do not have corresponding handles.
810 //                  Vector hdlids = _handles.getMessageIds(connection, min, max);
811 //                  iter = msgids.keySet().iterator();
812 //                  while (iter.hasNext()) {
813 //                      String id = (String) iter.next();
814 //                      if (!hdlids.contains(id)) {
815 //                          // this message is not referenced by anyone so we can
816 //                          // delete it
817 //                          _messages.remove(connection, id);
818 //                          deleted++;
819 //                      }
820 //                  }
821 //                  connection.commit();
822 //              } else {
823 //                  // reset the lastTime
824 //                  _lastTime = 0;
825 //              }
826 //              _log.debug("DBGC Deleted " + deleted + " messages and took "
827 //                  + (System.currentTimeMillis() - stime) +
828 //                  "ms to complete.");
829 //          } catch (Exception exception) {
830 //              try {
831 //                  connection.rollback();
832 //              } catch (Exception nested) {
833 //                  // ignore this exception
834 //              }
835 //              _log.error("Exception in purgeMessages", exception);
836 //              deleted = 0;
837 //          } finally {
838 //              try {
839 //                  connection.close();
840 //              } catch (Exception nested) {
841 //                  // ignore
842 //              }
843 //          }
844 //
845 //        return deleted;
846     }
847 
848     /***
849      * Get the schema version
850      *
851      * @param connection the connection to use
852      * @return the schema version, or null, if no version has been initialised
853      * @throws PersistenceException for any related persistence exception
854      */
855     private String getSchemaVersion(Connection connection)
856             throws PersistenceException {
857 
858         String version = null;
859         PreparedStatement query = null;
860         ResultSet result = null;
861         try {
862             query = connection.prepareStatement(
863                     "select version from system_data where id = 1");
864             result = query.executeQuery();
865             if (result.next()) {
866                 version = result.getString(1);
867             }
868         } catch (SQLException exception) {
869             throw new PersistenceException("Failed to get the schema version",
870                                            exception);
871         } finally {
872             SQLHelper.close(result);
873             SQLHelper.close(query);
874 
875         }
876         return version;
877     }
878 
879     /***
880      * Initialise the schema version
881      *
882      * @param connection the connection to use
883      */
884     private void initSchemaVersion(Connection connection)
885             throws PersistenceException {
886 
887         _log.info("Initialising schema version " + SCHEMA_VERSION);
888         PreparedStatement insert = null;
889         try {
890             insert = connection.prepareStatement("insert into system_data (id, version, creationdate) "
891                                                  + "values (?,?,?)");
892             insert.setInt(1, 1);
893             insert.setString(2, SCHEMA_VERSION);
894             insert.setDate(3, new Date(System.currentTimeMillis()));
895             insert.executeUpdate();
896 
897         } catch (SQLException exception) {
898             throw new PersistenceException(
899                     "Failed to initialise schema version", exception);
900         } finally {
901             SQLHelper.close(insert);
902         }
903     }
904 
905     /***
906      * Register an event to collect and remove processed messages with the
907      * {@link BasicEventManager}
908      */
909 //   private void registerEvent() {
910 //        try {
911         // disabled, as per bug 816895 - Exception in purgeMessages
912 //              BasicEventManager.instance().registerEventRelative(
913 //                  new Event(COLLECT_DATABASE_GARBAGE_EVENT, this, null),
914 //                  _gcInterval);
915 //          } catch (IllegalEventDefinedException exception) {
916 //              _log.error("registerEvent failed", exception);
917 //          }
918 //   }
919 
920     /***
921      * Creates a {@link DBConnectionManager} using its fully qualified class
922      * name
923      *
924      * @param className the fully qualified class name
925      * @throws PersistenceException if it cannot be created
926      */
927     private DBConnectionManager getConnectionManager(String className)
928             throws PersistenceException {
929 
930         DBConnectionManager result = null;
931         Class clazz = null;
932         ClassLoader loader = Thread.currentThread().getContextClassLoader();
933         try {
934             if (loader != null) {
935                 clazz = loader.loadClass(className);
936             }
937         } catch (ClassNotFoundException ignore) {
938         }
939         try {
940             if (clazz == null) {
941                 clazz = Class.forName(className);
942             }
943         } catch (ClassNotFoundException exception) {
944             throw new PersistenceException("Failed to locate connection manager implementation: "
945                                            + className, exception);
946         }
947 
948         try {
949             result = (DBConnectionManager) clazz.newInstance();
950         } catch (Exception exception) {
951             throw new PersistenceException(
952                     "Failed to create connection manager", exception);
953         }
954 
955         return result;
956     }
957 
958 }