View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: RDBMSAdapter.java,v 1.54 2004/01/08 05:55:07 tanderson Exp $
44   *
45   * Date         Author  Changes
46   * $Date	    jimm    Created
47   */
48  package org.exolab.jms.persistence;
49  
50  import java.sql.Connection;
51  import java.sql.Date;
52  import java.sql.PreparedStatement;
53  import java.sql.ResultSet;
54  import java.sql.SQLException;
55  import java.util.Enumeration;
56  import java.util.HashMap;
57  import java.util.Vector;
58  
59  import EDU.oswego.cs.dl.util.concurrent.FIFOReadWriteLock;
60  import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
61  
62  import org.apache.commons.logging.Log;
63  import org.apache.commons.logging.LogFactory;
64  
65  import org.exolab.core.foundation.HandleIfc;
66  import org.exolab.jms.authentication.User;
67  import org.exolab.jms.client.JmsDestination;
68  import org.exolab.jms.client.JmsQueue;
69  import org.exolab.jms.client.JmsTopic;
70  import org.exolab.jms.config.ConfigurationManager;
71  import org.exolab.jms.config.DatabaseConfiguration;
72  import org.exolab.jms.config.RdbmsDatabaseConfiguration;
73  import org.exolab.jms.events.EventHandler;
74  import org.exolab.jms.message.MessageImpl;
75  import org.exolab.jms.messagemgr.PersistentMessageHandle;
76  
77  
78  /***
79   * This adapter is a wrapper class around the persistency mechanism.
80   * It isolates the client from the working specifics of the database, by
81   * providing a simple straight forward interface. Furure changes to
82   * the database will only require changes to the adapter.
83   *
84   * @version     $Revision: 1.54 $ $Date: 2004/01/08 05:55:07 $
85   * @author      <a href="mailto:mourikis@exolab.org">Jim Mourikis</a>
86   */
87  
88  public class RDBMSAdapter
89      extends PersistenceAdapter
90      implements EventHandler {
91  
92      /***
93       * The schema version number. Note this must be incremented whenever
94       * The schema changes.
95       */
96      public static final String SCHEMA_VERSION = "V0.7.6";
97  
98      /***
99       *  The JDBC ConnectionManager
100      */
101     private static DBConnectionManager _connectionManager = null;
102 
103     /***
104      * This is the interval that the automatic garbage collector will
105      * execute, if specified. It is specified in seconds.
106      */
107     private int _gcInterval = 600;
108 
109     /***
110      * This is the block size that is used during purging.
111      */
112     private int _gcBlockSize = 500;
113 
114     /***
115      * This is used to track the incremental garbage collection. It
116      * tracks the last blocks examined.
117      */
118     private long _lastTime = 0;
119 
120     /***
121      * This is the thread priority for the GC Thread
122      */
123     private int _gcThreadPriority = Thread.NORM_PRIORITY;
124 
125     /***
126      * Lock to help prevent deadlocks when administratively removing
127      * destinations, while producers and consumers are actively sending
128      * and receiving messages. It ensures that when a destination is in the
129      * process of being removed, no other changes are occuring on the
130      * messages and message_handles tables.
131      */
132     private ReadWriteLock _destinationLock = new FIFOReadWriteLock();
133 
134     /***
135      * This is the event that is fired to initiate garbage collection
136      * in the database
137      */
138     private static final int COLLECT_DATABASE_GARBAGE_EVENT = 1;
139 
140     /***
141      * The logger
142      */
143     private static final Log _log = LogFactory.getLog(RDBMSAdapter.class);
144 
145 
146     /***
147      * Connects to the given db.
148      *
149      * @throws PersistenceException if a connection cannot be establised to
150      * the database
151      */
152     RDBMSAdapter(String driver, String url, String userName, String password)
153         throws PersistenceException {
154 
155         DatabaseConfiguration dbConfig = 
156             ConfigurationManager.getConfig().getDatabaseConfiguration();
157         RdbmsDatabaseConfiguration config = 
158             dbConfig.getRdbmsDatabaseConfiguration();
159 
160         // create the connection manager, and configure it
161         _connectionManager = getConnectionManager(config.getClazz());
162         _connectionManager.setUser(userName);
163         _connectionManager.setPassword(password);
164         _connectionManager.setDriver(driver);
165         _connectionManager.setURL(url);
166         _connectionManager.setMaxActive(config.getMaxActive());
167         _connectionManager.setMaxIdle(config.getMaxIdle());
168         _connectionManager.setMinIdleTime(config.getMinIdleTime());
169         _connectionManager.setEvictionInterval(config.getEvictionInterval());
170         _connectionManager.setTestQuery(config.getTestQuery());
171         _connectionManager.setTestBeforeUse(config.getTestBeforeUse());
172 
173         // initialisze the connection manager
174         _connectionManager.init();
175 
176         Connection connection = null;
177         try {
178             // initialize the various caches and helper classes used to
179             // execute the various SQL.
180             connection = getConnection();
181 
182             String version = getSchemaVersion(connection);
183             if (version == null) {
184                 initSchemaVersion(connection);
185             } else if (!version.equals(SCHEMA_VERSION)) {
186                 throw new PersistenceException(
187                     "Schema needs to be converted from version=" + version
188                     + " to version=" + SCHEMA_VERSION
189                     + "\nBack up your database, and run 'dbtool -migrate'"
190                     + "to convert the schema");
191             }
192 
193             SeedGenerator.initialise();
194             Destinations.initialise(connection);
195             Consumers.initialise(connection);
196             Messages.initialise();
197             MessageHandles.initialise();
198             connection.commit();
199             Users.initialise(connection);
200         } catch (PersistenceException exception) {
201             SQLHelper.rollback(connection);
202             throw exception;
203         } catch (Exception exception) {
204             throw new PersistenceException(
205                 "Failed to initialise database adapter", exception);
206         } finally {
207             SQLHelper.close(connection);
208             
209         }
210 
211         // check whether we should initiate automatic garbage collection
212         if (dbConfig.hasGarbageCollectionInterval()) {
213             _gcInterval = dbConfig.getGarbageCollectionInterval() * 1000;
214             registerEvent();
215         }
216 
217         if (dbConfig.hasGarbageCollectionBlockSize()) {
218             _gcBlockSize = dbConfig.getGarbageCollectionBlockSize();
219         }
220 
221         if (dbConfig.hasGarbageCollectionThreadPriority()) {
222             _gcThreadPriority = dbConfig.getGarbageCollectionBlockSize();
223             if (_gcThreadPriority < Thread.MIN_PRIORITY) {
224                 _gcThreadPriority = Thread.MIN_PRIORITY;
225             } else if (_gcThreadPriority > Thread.MAX_PRIORITY) {
226                 _gcThreadPriority = Thread.MAX_PRIORITY;
227             }
228         }
229     }
230 
231     /***
232      * Close the database if open.
233      */
234     public void close() {
235         if (SeedGenerator.instance() != null) {
236             SeedGenerator.instance().close();
237         }
238 
239         if (Destinations.instance() != null) {
240             Destinations.instance().close();
241         }
242 
243         if (Consumers.instance() != null) {
244             Consumers.instance().close();
245         }
246 
247         if (Messages.instance() != null) {
248             Messages.instance().close();
249         }
250 
251         if (MessageHandles.instance() != null) {
252             MessageHandles.instance().close();
253         }
254 
255         if (Users.instance() != null) {
256             Users.instance().close();
257         }
258     }
259 
260     // implementation of PersistenceAdapter.getLastId
261     public long getLastId(Connection connection)
262         throws PersistenceException {
263 
264         long lastId = -1;
265         boolean successful = false;
266         PreparedStatement query = null;
267         ResultSet result = null;
268         PreparedStatement insert = null;
269         try {
270             query = connection.prepareStatement(
271                 "select * from message_id where id = 1");
272             result = query.executeQuery();
273 
274             if (result.next()) {
275                 lastId = result.getInt("maxId");
276             } else {
277                 // first entry create.
278                 insert = connection.prepareStatement(
279                     "insert into message_id values (?,?)");
280                 insert.setInt(1, 1);
281                 insert.setLong(2, 0);
282                 insert.executeUpdate();
283                 lastId = 0;
284             }
285         } catch (Exception exception) {
286             throw new PersistenceException("Failed to get last message id",
287                 exception);
288         } finally {
289             SQLHelper.close(result);
290             SQLHelper.close(insert);
291             SQLHelper.close(query);        	
292         }
293 
294         return lastId;
295     }
296 
297     // implementation of PersistenceAdapter.updateIds
298     public void updateIds(Connection connection, long id)
299         throws PersistenceException {
300 		PreparedStatement insert = null;
301         try {
302             insert = connection.prepareStatement(
303                 "update message_id set maxId = ? where id = 1");
304 
305             insert.setLong(1, id);
306             insert.executeUpdate();
307         } catch (Exception exception) {
308             throw new PersistenceException("Failed to update message id",
309                                            exception);
310         } finally {
311             SQLHelper.close(insert);
312         }
313     }
314 
315     // implementation of PersistenceMessage.addMessage
316     public void addMessage(Connection connection, MessageImpl message)
317         throws PersistenceException {
318 
319         long start = 0;
320 
321         if (_log.isDebugEnabled()) {
322             start = System.currentTimeMillis();
323         }
324 
325         try {
326             _destinationLock.readLock().acquire();
327             Messages.instance().add(connection, message);
328         } catch (InterruptedException exception) {
329             throw new PersistenceException("Failed to acquire lock",
330                 exception);
331         } finally {
332             _destinationLock.readLock().release();
333 
334             if (_log.isDebugEnabled()) {
335                 _log.debug("addMessage," +
336                     (System.currentTimeMillis() - start));
337             }
338         }
339     }
340 
341     // implementation of PersistenceMessage.addMessage
342     public void updateMessage(Connection connection, MessageImpl message)
343         throws PersistenceException {
344         long start = 0;
345         if (_log.isDebugEnabled()) {
346             start = System.currentTimeMillis();
347         }
348 
349         try {
350             _destinationLock.readLock().acquire();
351             Messages.instance().update(connection, message);
352         } catch (InterruptedException exception) {
353             throw new PersistenceException("Failed to acquire lock",
354                 exception);
355         } finally {
356             _destinationLock.readLock().release();
357             if (_log.isDebugEnabled()) {
358                 _log.debug("updateMessage," +
359                     (System.currentTimeMillis() - start));
360             }
361         }
362     }
363 
364     // implementation of PersistenceAdapter.getUnprocessedMessages
365     public Vector getUnprocessedMessages(Connection connection)
366         throws PersistenceException {
367         long start = 0;
368         if (_log.isDebugEnabled()) {
369             start = System.currentTimeMillis();
370         }
371 
372         try {
373             return Messages.instance().getUnprocessedMessages(connection);
374         } finally {
375             if (_log.isDebugEnabled()) {
376                 _log.debug("getUnprocessedMessages," + (System.currentTimeMillis() - start));
377             }
378         }
379     }
380 
381 
382     // implementation of PersistenceAdapter.removeMessage
383     public void removeMessage(Connection connection, String id)
384         throws PersistenceException {
385         long start = 0;
386         if (_log.isDebugEnabled()) {
387             start = System.currentTimeMillis();
388         }
389 
390         try {
391             _destinationLock.readLock().acquire();
392             Messages.instance().remove(connection, id);
393         } catch (InterruptedException exception) {
394             throw new PersistenceException("Failed to acquire lock",
395                 exception);
396         } finally {
397             _destinationLock.readLock().release();
398             if (_log.isDebugEnabled()) {
399                 _log.debug("removeMessage," +
400                     (System.currentTimeMillis() - start));
401             }
402         }
403     }
404 
405     // implementation of PersistenceAdapter.getMessage
406     public MessageImpl getMessage(Connection connection, String id)
407         throws PersistenceException {
408         long start = 0;
409         if (_log.isDebugEnabled()) {
410             start = System.currentTimeMillis();
411         }
412 
413         try {
414             return Messages.instance().get(connection, id);
415         } finally {
416             if (_log.isDebugEnabled()) {
417                 _log.debug("getMessage," + (System.currentTimeMillis() - start));
418             }
419         }
420     }
421 
422     // implementation of PersistenceAdapter.getMessages
423     public Vector getMessages(Connection connection, PersistentMessageHandle handle)
424         throws PersistenceException {
425         long start = 0;
426         if (_log.isDebugEnabled()) {
427             start = System.currentTimeMillis();
428         }
429 
430         try {
431             return Messages.instance().getMessages(connection,
432                 handle.getDestination().getName(), handle.getPriority(),
433                 handle.getAcceptedTime());
434         } finally {
435             if (_log.isDebugEnabled()) {
436                 _log.debug("getMessages," + (System.currentTimeMillis() - start));
437             }
438         }
439     }
440 
441     // implementation of PersistenceAdapter.addMessageHandle
442     public void addMessageHandle(Connection connection, PersistentMessageHandle handle)
443         throws PersistenceException {
444         long start = 0;
445         if (_log.isDebugEnabled()) {
446             start = System.currentTimeMillis();
447         }
448 
449         try {
450             _destinationLock.readLock().acquire();
451             MessageHandles.instance().addMessageHandle(connection, handle);
452         } catch (InterruptedException exception) {
453             throw new PersistenceException("Failed to acquire lock",
454                 exception);
455         } finally {
456             _destinationLock.readLock().release();
457             if (_log.isDebugEnabled()) {
458                 _log.debug("addMessageHandle," + (System.currentTimeMillis() - start));
459             }
460         }
461     }
462 
463     // implementation of PersistenceAdapter.updateMessageHandle
464     public void updateMessageHandle(Connection connection, PersistentMessageHandle handle)
465         throws PersistenceException {
466         long start = 0;
467         if (_log.isDebugEnabled()) {
468             start = System.currentTimeMillis();
469         }
470 
471         try {
472             _destinationLock.readLock().acquire();
473             MessageHandles.instance().updateMessageHandle(connection, handle);
474         } catch (InterruptedException exception) {
475             throw new PersistenceException("Failed to acquire lock",
476                 exception);
477         } finally {
478             _destinationLock.readLock().release();
479             if (_log.isDebugEnabled()) {
480                 _log.debug("updateMessageHandle," + (System.currentTimeMillis() - start));
481             }
482         }
483     }
484 
485     // implementation of PersistenceAdapter.removeMessageHandle
486     public void removeMessageHandle(Connection connection, PersistentMessageHandle handle)
487         throws PersistenceException {
488         long start = 0;
489         if (_log.isDebugEnabled()) {
490             start = System.currentTimeMillis();
491         }
492 
493         try {
494             _destinationLock.readLock().acquire();
495             MessageHandles.instance().removeMessageHandle(connection, handle);
496         } catch (InterruptedException exception) {
497             throw new PersistenceException("Failed to acquire lock",
498                 exception);
499         } finally {
500             _destinationLock.readLock().release();
501             if (_log.isDebugEnabled()) {
502                 _log.debug("removeMessageHandle," + (System.currentTimeMillis() - start));
503             }
504         }
505     }
506 
507     // implementation of PersistenceAdapter.getMessageHandles
508     public Vector getMessageHandles(Connection connection,
509                                     JmsDestination destination, String name)
510         throws PersistenceException {
511         long start = 0;
512         if (_log.isDebugEnabled()) {
513             start = System.currentTimeMillis();
514         }
515 
516         try {
517             return MessageHandles.instance().getMessageHandles(connection,
518                 destination.getName(), name);
519         } finally {
520             if (_log.isDebugEnabled()) {
521                 _log.debug("getMessageHandles," 
522                           + (System.currentTimeMillis() - start));
523             }
524         }
525     }
526 
527     // implementation of PersistenceAdapter.addDurableConsumer
528     public void addDurableConsumer(Connection connection, String topic,
529                                    String consumer)
530         throws PersistenceException {
531 
532         try {
533             _destinationLock.readLock().acquire();
534             Consumers.instance().add(connection, topic, consumer);
535         } catch (InterruptedException exception) {
536             throw new PersistenceException("Failed to acquire lock",
537                 exception);
538         } finally {
539             _destinationLock.readLock().release();
540         }
541     }
542 
543     // implementation of PersistenceAdapter.removeDurableConsumer
544     public void removeDurableConsumer(Connection connection, String consumer)
545         throws PersistenceException {
546 
547         try {
548             _destinationLock.readLock().acquire();
549             Consumers.instance().remove(connection, consumer);
550         } catch (InterruptedException exception) {
551             throw new PersistenceException("Failed to acquire lock",
552                 exception);
553         } finally {
554             _destinationLock.readLock().release();
555         }
556     }
557 
558     // implementation of PersistenceAdapter.getDurableConsumers
559     public Enumeration getDurableConsumers(Connection connection, String topic)
560         throws PersistenceException {
561         return Consumers.instance().getDurableConsumers(topic).elements();
562     }
563 
564     // implementation of PersistenceAdapter.getAllDurableConsumers
565     public HashMap getAllDurableConsumers(Connection connection)
566         throws PersistenceException {
567 
568         return Consumers.instance().getAllDurableConsumers();
569     }
570 
571     // implementation of PersistenceAdapter.durableConsumerExists
572     public boolean durableConsumerExists(Connection connection, String name)
573         throws PersistenceException {
574 
575         return Consumers.instance().exists(name);
576     }
577 
578     // implementation of PersistenceAdapter.addDestination
579     public void addDestination(Connection connection, String name,
580                                boolean queue)
581         throws PersistenceException {
582 
583         JmsDestination destination = (queue)
584             ? (JmsDestination) new JmsQueue(name)
585             : (JmsDestination) new JmsTopic(name);
586 
587         // create the destination. If the destination is also
588         // a queue create a special consumer for it.
589         try {
590             _destinationLock.readLock().acquire();
591             Destinations.instance().add(connection, destination);
592             if (queue) {
593                 Consumers.instance().add(connection, name, name);
594             }
595         } catch (InterruptedException exception) {
596             throw new PersistenceException("Failed to acquire lock",
597                 exception);
598         } finally {
599             _destinationLock.readLock().release();
600         }
601     }
602 
603     // implementation of PersistenceAdapter.removeDestination
604     public void removeDestination(Connection connection, String name)
605         throws PersistenceException {
606 
607         JmsDestination destination = Destinations.instance().get(name);
608         if (destination != null) {
609             try {
610                 _destinationLock.writeLock().acquire();
611                 Destinations.instance().remove(connection, destination);
612             } catch (InterruptedException exception) {
613                 throw new PersistenceException("Failed to acquire lock",
614                     exception);
615             } finally {
616                 _destinationLock.writeLock().release();
617             }
618         }
619     }
620 
621     // implementation of PersistenceAdapter.getAllDestinations
622     public Enumeration getAllDestinations(Connection connection)
623         throws PersistenceException {
624 
625         return Destinations.instance().getDestinations().elements();
626     }
627 
628     // implementation of PersistenceAdapter.checkDestination
629     public boolean checkDestination(Connection connection, String name)
630         throws PersistenceException {
631 
632         return (Destinations.instance().get(name) != null);
633     }
634 
635     // implementation of getQueueMessageCount
636     public int getQueueMessageCount(Connection connection, String name)
637         throws PersistenceException {
638 
639         return MessageHandles.instance().getMessageCount(
640             connection, name, name);
641     }
642 
643     // implementation of PersistenceAdapter.getQueueMessageCount
644     public int getDurableConsumerMessageCount(Connection connection,
645                                               String destination, String name)
646         throws PersistenceException {
647 
648         return MessageHandles.instance().getMessageCount(connection,
649             destination, name);
650     }
651 
652     // implementation of PersistenceAdapter.getQueueMessageCount
653     public void removeExpiredMessages(Connection connection)
654         throws PersistenceException {
655 
656         Messages.instance().removeExpiredMessages(connection);
657     }
658 
659     // implementation of PersistenceAdapter.removeExpiredMessageHandles
660     public void removeExpiredMessageHandles(Connection connection,
661                                             String consumer)
662         throws PersistenceException {
663 
664         MessageHandles.instance().removeExpiredMessageHandles(connection,
665             consumer);
666     }
667 
668     // implementation of PersistenceAdapter.getNonExpiredMessages
669     public Vector getNonExpiredMessages(Connection connection,
670                                         JmsDestination destination)
671         throws PersistenceException {
672 
673         return Messages.instance().getNonExpiredMessages(
674             connection, destination);
675     }
676 
677     // implementation of EventHandler.getHandle
678     public HandleIfc getHandle() {
679         return null;
680     }
681 
682     // implementation of EventHandler.handleEvent
683     public void handleEvent(int event, Object callback, long time) {
684         // disabled, as per bug 816895 - Exception in purgeMessages
685 //          if (event == COLLECT_DATABASE_GARBAGE_EVENT) {
686 //              // collect garbage now, but before doing so change the thread
687 //              // priority to low.
688 //              try {
689 //                  Thread.currentThread().setPriority(_gcThreadPriority);
690 //                  purgeMessages();
691 //              } finally {
692 //                  Thread.currentThread().setPriority(Thread.NORM_PRIORITY);
693 //                  registerEvent();
694 //              }
695 //          }
696     }
697 
698     /***
699      * Return a connection to the database from the pool of connections. It
700      * will throw an PersistenceException if it cannot retrieve a connection.
701      * The client should close the connection normally, since the pool is a
702      * connection event listener.
703      *
704      * @return Connection - a pooled connection or null
705      * @exception PersistenceException - if it cannot retrieve a connection
706      */
707     public Connection getConnection()
708         throws PersistenceException {
709         return _connectionManager.getConnection();
710     }
711 
712     /***
713      * Return a reference to the DBConnectionManager
714      *
715      * @return DBConnectionManager
716      */
717     public DBConnectionManager getDBConnectionManager() {
718         return _connectionManager;
719     }
720 
721     public void addUser(Connection connection, User user)
722         throws PersistenceException {
723         Users.instance().add(connection, user);
724     }
725 
726     public Enumeration getAllUsers(Connection connection)
727         throws PersistenceException {
728         return Users.instance().getAllUsers(connection).elements();
729     }
730 
731     public User getUser(Connection connection, User user)
732         throws PersistenceException {
733         return Users.instance().get(connection, user);
734     }
735 
736     public void removeUser(Connection connection, User user)
737         throws PersistenceException {
738         Users.instance().remove(connection, user);
739     }
740 
741     public void updateUser(Connection connection, User user)
742         throws PersistenceException {
743         Users.instance().update(connection, user);
744     }
745 
746     /***
747      * Incrementally purge all processed messages from the database.
748      * @TODO this needs to be revisited. See bug 816895
749      * - existing expired messages are purged at startup
750      * - messages received that subsequently expire while the server is 
751      *   running are removed individually.
752      * - not clear how the previous implementation ever worked.
753      *   The Messages.getMessageIds() method returns all messages, not
754      *   just those processed, nor is it clear that the processed flag
755      *   is ever non-zero.
756      *   The current implementation (as a fix for bug 816895 - Exception in 
757      *   purgeMessages) simply delegates to removeExpiredMessages()
758      *
759      * @return the number of messages deleted
760      */
761     public synchronized int purgeMessages() {
762         int deleted = 0;
763 
764         Connection connection = null;
765         try {
766             connection = getConnection();
767             removeExpiredMessages(connection);
768             connection.commit();
769         } catch (Exception exception) {
770             _log.error("Exception in purgeMessages", exception);
771         } finally {
772             SQLHelper.close(connection);
773         }
774         return 0;
775 
776 //          if (connection == null) {
777 //              return 0;
778 //          }
779 
780 //          // we have a valid connection so we can proceed
781 //          try {
782 //              long stime = System.currentTimeMillis();
783 //              HashMap msgids = Messages.instance().getMessageIds(
784 //                  connection, _lastTime, _gcBlockSize);
785 
786 //              // if there are no messages then reset the last time to
787 //              // 0 and break;
788 //              if (msgids.size() > 0) {
789 //                  // find the minimum and maximum..we can improve the way we
790 //                  // do this.
791 //                  Iterator iter = msgids.values().iterator();
792 //                  long min = -1;
793 //                  long max = -1;
794 
795 //                  while (iter.hasNext()) {
796 //                      Long id = (Long) iter.next();
797 //                      if ((min == -1) &&
798 //                          (max == -1)) {
799 //                          min = id.longValue();
800 //                          max = id.longValue();
801 //                      }
802 
803 //                      if (id.longValue() < min) {
804 //                          min = id.longValue();
805 //                      } else if (id.longValue() > max) {
806 //                          max = id.longValue();
807 //                      }
808 //                  }
809 
810 //                  // set the last time for the next iteration unless the
811 //                  // the size of the msgids is less than the gcBlockSize.
812 //                  // If the later is the case then reset the last time.
813 //                  // This is in preparation for the next pass through this
814 //                  // method.
815 //                  if (msgids.size() < _gcBlockSize) {
816 //                      _lastTime = 0;
817 //                  } else {
818 //                      _lastTime = max;
819 //                  }
820 
821 //                  // now iterate through the message list and delete the
822 //                  // messages that do not have corresponding handles.
823 //                  Vector hdlids = MessageHandles.instance().getMessageIds(connection, min, max);
824 //                  iter = msgids.keySet().iterator();
825 //                  while (iter.hasNext()) {
826 //                      String id = (String) iter.next();
827 //                      if (!hdlids.contains(id)) {
828 //                          // this message is not referenced by anyone so we can
829 //                          // delete it
830 //                          Messages.instance().remove(connection, id);
831 //                          deleted++;
832 //                      }
833 //                  }
834 //                  connection.commit();
835 //              } else {
836 //                  // reset the lastTime
837 //                  _lastTime = 0;
838 //              }
839 //              _log.debug("DBGC Deleted " + deleted + " messages and took "
840 //                  + (System.currentTimeMillis() - stime) +
841 //                  "ms to complete.");
842 //          } catch (Exception exception) {
843 //              try {
844 //                  connection.rollback();
845 //              } catch (Exception nested) {
846 //                  // ignore this exception
847 //              }
848 //              _log.error("Exception in purgeMessages", exception);
849 //              deleted = 0;
850 //          } finally {
851 //              try {
852 //                  connection.close();
853 //              } catch (Exception nested) {
854 //                  // ignore
855 //              }
856 //          }
857 //
858 //        return deleted;
859     }
860 
861     /***
862      * Get the schema version
863      *
864      * @param connection the connection to use
865      * @return the schema version, or null, if no version has been initialised
866      * @throws PersistenceException for any related persistence exception
867      */
868     private String getSchemaVersion(Connection connection)
869         throws PersistenceException {
870 
871         String version = null;
872         PreparedStatement query = null;
873         ResultSet result = null;
874         try {
875             query = connection.prepareStatement(
876                 "select * from system_data where id = 1");
877             result = query.executeQuery();
878             if (result.next()) {
879                 version = result.getString("version");
880             }
881         } catch (SQLException exception) {
882             throw new PersistenceException(
883                 "Failed to get the schema version", exception);
884         } finally {
885             SQLHelper.close(result);
886             SQLHelper.close(query);
887         	
888         }
889         return version;
890     }
891 
892     /***
893      * Initialise the schema version
894      *
895      * @param connection the connection to use
896      */
897     private void initSchemaVersion(Connection connection)
898         throws PersistenceException {
899 
900         _log.info("Initialising schema version " + SCHEMA_VERSION);
901         PreparedStatement insert = null;
902         try {
903             insert = connection.prepareStatement(
904                 "insert into system_data values (?,?,?)");
905             insert.setInt(1, 1);
906             insert.setString(2, SCHEMA_VERSION);
907             insert.setDate(3, new Date(System.currentTimeMillis()));
908             insert.executeUpdate();
909             
910         } catch (SQLException exception) {
911             throw new PersistenceException(
912                 "Failed to initialise schema version", exception);
913         } finally{
914             SQLHelper.close(insert);
915         }
916     }
917 
918     /***
919      * Register an event to collect and remove processed messages with the
920      * {@link BasicEventManager}
921      */
922     private void registerEvent() {
923 //        try {
924             // disabled, as per bug 816895 - Exception in purgeMessages
925 //              BasicEventManager.instance().registerEventRelative(
926 //                  new Event(COLLECT_DATABASE_GARBAGE_EVENT, this, null),
927 //                  _gcInterval);
928 //          } catch (IllegalEventDefinedException exception) {
929 //              _log.error("registerEvent failed", exception);
930 //          }
931     }
932 
933     /***
934      * Creates a {@link DBConnectionManager} using its fully qualified class
935      * name
936      *
937      * @param className the fully qualified class name
938      * @throws PersistenceException if it cannot be created
939      */
940     private DBConnectionManager getConnectionManager(String className) 
941         throws PersistenceException {
942 
943         DBConnectionManager result = null;
944         Class clazz = null;
945         ClassLoader loader = Thread.currentThread().getContextClassLoader();
946         try {
947             if (loader != null) {
948                 clazz = loader.loadClass(className);
949             }
950         } catch (ClassNotFoundException ignore) {
951         }
952         try {
953             if (clazz == null) {
954                 clazz = Class.forName(className);
955             }
956         } catch (ClassNotFoundException exception) {
957             throw new PersistenceException(
958                 "Failed to locate connection manager implementation: "
959                 + className, exception);
960         }
961 
962         try {
963             result = (DBConnectionManager) clazz.newInstance();
964         } catch (Exception exception) {
965             throw new PersistenceException(
966                 "Failed to create connection manager", exception);
967         }
968 
969         return result;
970     }
971 
972 }