1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: RDBMSAdapter.java,v 1.54 2004/01/08 05:55:07 tanderson Exp $
44 *
45 * Date Author Changes
46 * $Date jimm Created
47 */
48 package org.exolab.jms.persistence;
49
50 import java.sql.Connection;
51 import java.sql.Date;
52 import java.sql.PreparedStatement;
53 import java.sql.ResultSet;
54 import java.sql.SQLException;
55 import java.util.Enumeration;
56 import java.util.HashMap;
57 import java.util.Vector;
58
59 import EDU.oswego.cs.dl.util.concurrent.FIFOReadWriteLock;
60 import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
61
62 import org.apache.commons.logging.Log;
63 import org.apache.commons.logging.LogFactory;
64
65 import org.exolab.core.foundation.HandleIfc;
66 import org.exolab.jms.authentication.User;
67 import org.exolab.jms.client.JmsDestination;
68 import org.exolab.jms.client.JmsQueue;
69 import org.exolab.jms.client.JmsTopic;
70 import org.exolab.jms.config.ConfigurationManager;
71 import org.exolab.jms.config.DatabaseConfiguration;
72 import org.exolab.jms.config.RdbmsDatabaseConfiguration;
73 import org.exolab.jms.events.EventHandler;
74 import org.exolab.jms.message.MessageImpl;
75 import org.exolab.jms.messagemgr.PersistentMessageHandle;
76
77
78 /***
79 * This adapter is a wrapper class around the persistency mechanism.
80 * It isolates the client from the working specifics of the database, by
81 * providing a simple straight forward interface. Furure changes to
82 * the database will only require changes to the adapter.
83 *
84 * @version $Revision: 1.54 $ $Date: 2004/01/08 05:55:07 $
85 * @author <a href="mailto:mourikis@exolab.org">Jim Mourikis</a>
86 */
87
88 public class RDBMSAdapter
89 extends PersistenceAdapter
90 implements EventHandler {
91
92 /***
93 * The schema version number. Note this must be incremented whenever
94 * The schema changes.
95 */
96 public static final String SCHEMA_VERSION = "V0.7.6";
97
98 /***
99 * The JDBC ConnectionManager
100 */
101 private static DBConnectionManager _connectionManager = null;
102
103 /***
104 * This is the interval that the automatic garbage collector will
105 * execute, if specified. It is specified in seconds.
106 */
107 private int _gcInterval = 600;
108
109 /***
110 * This is the block size that is used during purging.
111 */
112 private int _gcBlockSize = 500;
113
114 /***
115 * This is used to track the incremental garbage collection. It
116 * tracks the last blocks examined.
117 */
118 private long _lastTime = 0;
119
120 /***
121 * This is the thread priority for the GC Thread
122 */
123 private int _gcThreadPriority = Thread.NORM_PRIORITY;
124
125 /***
126 * Lock to help prevent deadlocks when administratively removing
127 * destinations, while producers and consumers are actively sending
128 * and receiving messages. It ensures that when a destination is in the
129 * process of being removed, no other changes are occuring on the
130 * messages and message_handles tables.
131 */
132 private ReadWriteLock _destinationLock = new FIFOReadWriteLock();
133
134 /***
135 * This is the event that is fired to initiate garbage collection
136 * in the database
137 */
138 private static final int COLLECT_DATABASE_GARBAGE_EVENT = 1;
139
140 /***
141 * The logger
142 */
143 private static final Log _log = LogFactory.getLog(RDBMSAdapter.class);
144
145
146 /***
147 * Connects to the given db.
148 *
149 * @throws PersistenceException if a connection cannot be establised to
150 * the database
151 */
152 RDBMSAdapter(String driver, String url, String userName, String password)
153 throws PersistenceException {
154
155 DatabaseConfiguration dbConfig =
156 ConfigurationManager.getConfig().getDatabaseConfiguration();
157 RdbmsDatabaseConfiguration config =
158 dbConfig.getRdbmsDatabaseConfiguration();
159
160
161 _connectionManager = getConnectionManager(config.getClazz());
162 _connectionManager.setUser(userName);
163 _connectionManager.setPassword(password);
164 _connectionManager.setDriver(driver);
165 _connectionManager.setURL(url);
166 _connectionManager.setMaxActive(config.getMaxActive());
167 _connectionManager.setMaxIdle(config.getMaxIdle());
168 _connectionManager.setMinIdleTime(config.getMinIdleTime());
169 _connectionManager.setEvictionInterval(config.getEvictionInterval());
170 _connectionManager.setTestQuery(config.getTestQuery());
171 _connectionManager.setTestBeforeUse(config.getTestBeforeUse());
172
173
174 _connectionManager.init();
175
176 Connection connection = null;
177 try {
178
179
180 connection = getConnection();
181
182 String version = getSchemaVersion(connection);
183 if (version == null) {
184 initSchemaVersion(connection);
185 } else if (!version.equals(SCHEMA_VERSION)) {
186 throw new PersistenceException(
187 "Schema needs to be converted from version=" + version
188 + " to version=" + SCHEMA_VERSION
189 + "\nBack up your database, and run 'dbtool -migrate'"
190 + "to convert the schema");
191 }
192
193 SeedGenerator.initialise();
194 Destinations.initialise(connection);
195 Consumers.initialise(connection);
196 Messages.initialise();
197 MessageHandles.initialise();
198 connection.commit();
199 Users.initialise(connection);
200 } catch (PersistenceException exception) {
201 SQLHelper.rollback(connection);
202 throw exception;
203 } catch (Exception exception) {
204 throw new PersistenceException(
205 "Failed to initialise database adapter", exception);
206 } finally {
207 SQLHelper.close(connection);
208
209 }
210
211
212 if (dbConfig.hasGarbageCollectionInterval()) {
213 _gcInterval = dbConfig.getGarbageCollectionInterval() * 1000;
214 registerEvent();
215 }
216
217 if (dbConfig.hasGarbageCollectionBlockSize()) {
218 _gcBlockSize = dbConfig.getGarbageCollectionBlockSize();
219 }
220
221 if (dbConfig.hasGarbageCollectionThreadPriority()) {
222 _gcThreadPriority = dbConfig.getGarbageCollectionBlockSize();
223 if (_gcThreadPriority < Thread.MIN_PRIORITY) {
224 _gcThreadPriority = Thread.MIN_PRIORITY;
225 } else if (_gcThreadPriority > Thread.MAX_PRIORITY) {
226 _gcThreadPriority = Thread.MAX_PRIORITY;
227 }
228 }
229 }
230
231 /***
232 * Close the database if open.
233 */
234 public void close() {
235 if (SeedGenerator.instance() != null) {
236 SeedGenerator.instance().close();
237 }
238
239 if (Destinations.instance() != null) {
240 Destinations.instance().close();
241 }
242
243 if (Consumers.instance() != null) {
244 Consumers.instance().close();
245 }
246
247 if (Messages.instance() != null) {
248 Messages.instance().close();
249 }
250
251 if (MessageHandles.instance() != null) {
252 MessageHandles.instance().close();
253 }
254
255 if (Users.instance() != null) {
256 Users.instance().close();
257 }
258 }
259
260
261 public long getLastId(Connection connection)
262 throws PersistenceException {
263
264 long lastId = -1;
265 boolean successful = false;
266 PreparedStatement query = null;
267 ResultSet result = null;
268 PreparedStatement insert = null;
269 try {
270 query = connection.prepareStatement(
271 "select * from message_id where id = 1");
272 result = query.executeQuery();
273
274 if (result.next()) {
275 lastId = result.getInt("maxId");
276 } else {
277
278 insert = connection.prepareStatement(
279 "insert into message_id values (?,?)");
280 insert.setInt(1, 1);
281 insert.setLong(2, 0);
282 insert.executeUpdate();
283 lastId = 0;
284 }
285 } catch (Exception exception) {
286 throw new PersistenceException("Failed to get last message id",
287 exception);
288 } finally {
289 SQLHelper.close(result);
290 SQLHelper.close(insert);
291 SQLHelper.close(query);
292 }
293
294 return lastId;
295 }
296
297
298 public void updateIds(Connection connection, long id)
299 throws PersistenceException {
300 PreparedStatement insert = null;
301 try {
302 insert = connection.prepareStatement(
303 "update message_id set maxId = ? where id = 1");
304
305 insert.setLong(1, id);
306 insert.executeUpdate();
307 } catch (Exception exception) {
308 throw new PersistenceException("Failed to update message id",
309 exception);
310 } finally {
311 SQLHelper.close(insert);
312 }
313 }
314
315
316 public void addMessage(Connection connection, MessageImpl message)
317 throws PersistenceException {
318
319 long start = 0;
320
321 if (_log.isDebugEnabled()) {
322 start = System.currentTimeMillis();
323 }
324
325 try {
326 _destinationLock.readLock().acquire();
327 Messages.instance().add(connection, message);
328 } catch (InterruptedException exception) {
329 throw new PersistenceException("Failed to acquire lock",
330 exception);
331 } finally {
332 _destinationLock.readLock().release();
333
334 if (_log.isDebugEnabled()) {
335 _log.debug("addMessage," +
336 (System.currentTimeMillis() - start));
337 }
338 }
339 }
340
341
342 public void updateMessage(Connection connection, MessageImpl message)
343 throws PersistenceException {
344 long start = 0;
345 if (_log.isDebugEnabled()) {
346 start = System.currentTimeMillis();
347 }
348
349 try {
350 _destinationLock.readLock().acquire();
351 Messages.instance().update(connection, message);
352 } catch (InterruptedException exception) {
353 throw new PersistenceException("Failed to acquire lock",
354 exception);
355 } finally {
356 _destinationLock.readLock().release();
357 if (_log.isDebugEnabled()) {
358 _log.debug("updateMessage," +
359 (System.currentTimeMillis() - start));
360 }
361 }
362 }
363
364
365 public Vector getUnprocessedMessages(Connection connection)
366 throws PersistenceException {
367 long start = 0;
368 if (_log.isDebugEnabled()) {
369 start = System.currentTimeMillis();
370 }
371
372 try {
373 return Messages.instance().getUnprocessedMessages(connection);
374 } finally {
375 if (_log.isDebugEnabled()) {
376 _log.debug("getUnprocessedMessages," + (System.currentTimeMillis() - start));
377 }
378 }
379 }
380
381
382
383 public void removeMessage(Connection connection, String id)
384 throws PersistenceException {
385 long start = 0;
386 if (_log.isDebugEnabled()) {
387 start = System.currentTimeMillis();
388 }
389
390 try {
391 _destinationLock.readLock().acquire();
392 Messages.instance().remove(connection, id);
393 } catch (InterruptedException exception) {
394 throw new PersistenceException("Failed to acquire lock",
395 exception);
396 } finally {
397 _destinationLock.readLock().release();
398 if (_log.isDebugEnabled()) {
399 _log.debug("removeMessage," +
400 (System.currentTimeMillis() - start));
401 }
402 }
403 }
404
405
406 public MessageImpl getMessage(Connection connection, String id)
407 throws PersistenceException {
408 long start = 0;
409 if (_log.isDebugEnabled()) {
410 start = System.currentTimeMillis();
411 }
412
413 try {
414 return Messages.instance().get(connection, id);
415 } finally {
416 if (_log.isDebugEnabled()) {
417 _log.debug("getMessage," + (System.currentTimeMillis() - start));
418 }
419 }
420 }
421
422
423 public Vector getMessages(Connection connection, PersistentMessageHandle handle)
424 throws PersistenceException {
425 long start = 0;
426 if (_log.isDebugEnabled()) {
427 start = System.currentTimeMillis();
428 }
429
430 try {
431 return Messages.instance().getMessages(connection,
432 handle.getDestination().getName(), handle.getPriority(),
433 handle.getAcceptedTime());
434 } finally {
435 if (_log.isDebugEnabled()) {
436 _log.debug("getMessages," + (System.currentTimeMillis() - start));
437 }
438 }
439 }
440
441
442 public void addMessageHandle(Connection connection, PersistentMessageHandle handle)
443 throws PersistenceException {
444 long start = 0;
445 if (_log.isDebugEnabled()) {
446 start = System.currentTimeMillis();
447 }
448
449 try {
450 _destinationLock.readLock().acquire();
451 MessageHandles.instance().addMessageHandle(connection, handle);
452 } catch (InterruptedException exception) {
453 throw new PersistenceException("Failed to acquire lock",
454 exception);
455 } finally {
456 _destinationLock.readLock().release();
457 if (_log.isDebugEnabled()) {
458 _log.debug("addMessageHandle," + (System.currentTimeMillis() - start));
459 }
460 }
461 }
462
463
464 public void updateMessageHandle(Connection connection, PersistentMessageHandle handle)
465 throws PersistenceException {
466 long start = 0;
467 if (_log.isDebugEnabled()) {
468 start = System.currentTimeMillis();
469 }
470
471 try {
472 _destinationLock.readLock().acquire();
473 MessageHandles.instance().updateMessageHandle(connection, handle);
474 } catch (InterruptedException exception) {
475 throw new PersistenceException("Failed to acquire lock",
476 exception);
477 } finally {
478 _destinationLock.readLock().release();
479 if (_log.isDebugEnabled()) {
480 _log.debug("updateMessageHandle," + (System.currentTimeMillis() - start));
481 }
482 }
483 }
484
485
486 public void removeMessageHandle(Connection connection, PersistentMessageHandle handle)
487 throws PersistenceException {
488 long start = 0;
489 if (_log.isDebugEnabled()) {
490 start = System.currentTimeMillis();
491 }
492
493 try {
494 _destinationLock.readLock().acquire();
495 MessageHandles.instance().removeMessageHandle(connection, handle);
496 } catch (InterruptedException exception) {
497 throw new PersistenceException("Failed to acquire lock",
498 exception);
499 } finally {
500 _destinationLock.readLock().release();
501 if (_log.isDebugEnabled()) {
502 _log.debug("removeMessageHandle," + (System.currentTimeMillis() - start));
503 }
504 }
505 }
506
507
508 public Vector getMessageHandles(Connection connection,
509 JmsDestination destination, String name)
510 throws PersistenceException {
511 long start = 0;
512 if (_log.isDebugEnabled()) {
513 start = System.currentTimeMillis();
514 }
515
516 try {
517 return MessageHandles.instance().getMessageHandles(connection,
518 destination.getName(), name);
519 } finally {
520 if (_log.isDebugEnabled()) {
521 _log.debug("getMessageHandles,"
522 + (System.currentTimeMillis() - start));
523 }
524 }
525 }
526
527
528 public void addDurableConsumer(Connection connection, String topic,
529 String consumer)
530 throws PersistenceException {
531
532 try {
533 _destinationLock.readLock().acquire();
534 Consumers.instance().add(connection, topic, consumer);
535 } catch (InterruptedException exception) {
536 throw new PersistenceException("Failed to acquire lock",
537 exception);
538 } finally {
539 _destinationLock.readLock().release();
540 }
541 }
542
543
544 public void removeDurableConsumer(Connection connection, String consumer)
545 throws PersistenceException {
546
547 try {
548 _destinationLock.readLock().acquire();
549 Consumers.instance().remove(connection, consumer);
550 } catch (InterruptedException exception) {
551 throw new PersistenceException("Failed to acquire lock",
552 exception);
553 } finally {
554 _destinationLock.readLock().release();
555 }
556 }
557
558
559 public Enumeration getDurableConsumers(Connection connection, String topic)
560 throws PersistenceException {
561 return Consumers.instance().getDurableConsumers(topic).elements();
562 }
563
564
565 public HashMap getAllDurableConsumers(Connection connection)
566 throws PersistenceException {
567
568 return Consumers.instance().getAllDurableConsumers();
569 }
570
571
572 public boolean durableConsumerExists(Connection connection, String name)
573 throws PersistenceException {
574
575 return Consumers.instance().exists(name);
576 }
577
578
579 public void addDestination(Connection connection, String name,
580 boolean queue)
581 throws PersistenceException {
582
583 JmsDestination destination = (queue)
584 ? (JmsDestination) new JmsQueue(name)
585 : (JmsDestination) new JmsTopic(name);
586
587
588
589 try {
590 _destinationLock.readLock().acquire();
591 Destinations.instance().add(connection, destination);
592 if (queue) {
593 Consumers.instance().add(connection, name, name);
594 }
595 } catch (InterruptedException exception) {
596 throw new PersistenceException("Failed to acquire lock",
597 exception);
598 } finally {
599 _destinationLock.readLock().release();
600 }
601 }
602
603
604 public void removeDestination(Connection connection, String name)
605 throws PersistenceException {
606
607 JmsDestination destination = Destinations.instance().get(name);
608 if (destination != null) {
609 try {
610 _destinationLock.writeLock().acquire();
611 Destinations.instance().remove(connection, destination);
612 } catch (InterruptedException exception) {
613 throw new PersistenceException("Failed to acquire lock",
614 exception);
615 } finally {
616 _destinationLock.writeLock().release();
617 }
618 }
619 }
620
621
622 public Enumeration getAllDestinations(Connection connection)
623 throws PersistenceException {
624
625 return Destinations.instance().getDestinations().elements();
626 }
627
628
629 public boolean checkDestination(Connection connection, String name)
630 throws PersistenceException {
631
632 return (Destinations.instance().get(name) != null);
633 }
634
635
636 public int getQueueMessageCount(Connection connection, String name)
637 throws PersistenceException {
638
639 return MessageHandles.instance().getMessageCount(
640 connection, name, name);
641 }
642
643
644 public int getDurableConsumerMessageCount(Connection connection,
645 String destination, String name)
646 throws PersistenceException {
647
648 return MessageHandles.instance().getMessageCount(connection,
649 destination, name);
650 }
651
652
653 public void removeExpiredMessages(Connection connection)
654 throws PersistenceException {
655
656 Messages.instance().removeExpiredMessages(connection);
657 }
658
659
660 public void removeExpiredMessageHandles(Connection connection,
661 String consumer)
662 throws PersistenceException {
663
664 MessageHandles.instance().removeExpiredMessageHandles(connection,
665 consumer);
666 }
667
668
669 public Vector getNonExpiredMessages(Connection connection,
670 JmsDestination destination)
671 throws PersistenceException {
672
673 return Messages.instance().getNonExpiredMessages(
674 connection, destination);
675 }
676
677
678 public HandleIfc getHandle() {
679 return null;
680 }
681
682
683 public void handleEvent(int event, Object callback, long time) {
684
685
686
687
688
689
690
691
692
693
694
695
696 }
697
698 /***
699 * Return a connection to the database from the pool of connections. It
700 * will throw an PersistenceException if it cannot retrieve a connection.
701 * The client should close the connection normally, since the pool is a
702 * connection event listener.
703 *
704 * @return Connection - a pooled connection or null
705 * @exception PersistenceException - if it cannot retrieve a connection
706 */
707 public Connection getConnection()
708 throws PersistenceException {
709 return _connectionManager.getConnection();
710 }
711
712 /***
713 * Return a reference to the DBConnectionManager
714 *
715 * @return DBConnectionManager
716 */
717 public DBConnectionManager getDBConnectionManager() {
718 return _connectionManager;
719 }
720
721 public void addUser(Connection connection, User user)
722 throws PersistenceException {
723 Users.instance().add(connection, user);
724 }
725
726 public Enumeration getAllUsers(Connection connection)
727 throws PersistenceException {
728 return Users.instance().getAllUsers(connection).elements();
729 }
730
731 public User getUser(Connection connection, User user)
732 throws PersistenceException {
733 return Users.instance().get(connection, user);
734 }
735
736 public void removeUser(Connection connection, User user)
737 throws PersistenceException {
738 Users.instance().remove(connection, user);
739 }
740
741 public void updateUser(Connection connection, User user)
742 throws PersistenceException {
743 Users.instance().update(connection, user);
744 }
745
746 /***
747 * Incrementally purge all processed messages from the database.
748 * @TODO this needs to be revisited. See bug 816895
749 * - existing expired messages are purged at startup
750 * - messages received that subsequently expire while the server is
751 * running are removed individually.
752 * - not clear how the previous implementation ever worked.
753 * The Messages.getMessageIds() method returns all messages, not
754 * just those processed, nor is it clear that the processed flag
755 * is ever non-zero.
756 * The current implementation (as a fix for bug 816895 - Exception in
757 * purgeMessages) simply delegates to removeExpiredMessages()
758 *
759 * @return the number of messages deleted
760 */
761 public synchronized int purgeMessages() {
762 int deleted = 0;
763
764 Connection connection = null;
765 try {
766 connection = getConnection();
767 removeExpiredMessages(connection);
768 connection.commit();
769 } catch (Exception exception) {
770 _log.error("Exception in purgeMessages", exception);
771 } finally {
772 SQLHelper.close(connection);
773 }
774 return 0;
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859 }
860
861 /***
862 * Get the schema version
863 *
864 * @param connection the connection to use
865 * @return the schema version, or null, if no version has been initialised
866 * @throws PersistenceException for any related persistence exception
867 */
868 private String getSchemaVersion(Connection connection)
869 throws PersistenceException {
870
871 String version = null;
872 PreparedStatement query = null;
873 ResultSet result = null;
874 try {
875 query = connection.prepareStatement(
876 "select * from system_data where id = 1");
877 result = query.executeQuery();
878 if (result.next()) {
879 version = result.getString("version");
880 }
881 } catch (SQLException exception) {
882 throw new PersistenceException(
883 "Failed to get the schema version", exception);
884 } finally {
885 SQLHelper.close(result);
886 SQLHelper.close(query);
887
888 }
889 return version;
890 }
891
892 /***
893 * Initialise the schema version
894 *
895 * @param connection the connection to use
896 */
897 private void initSchemaVersion(Connection connection)
898 throws PersistenceException {
899
900 _log.info("Initialising schema version " + SCHEMA_VERSION);
901 PreparedStatement insert = null;
902 try {
903 insert = connection.prepareStatement(
904 "insert into system_data values (?,?,?)");
905 insert.setInt(1, 1);
906 insert.setString(2, SCHEMA_VERSION);
907 insert.setDate(3, new Date(System.currentTimeMillis()));
908 insert.executeUpdate();
909
910 } catch (SQLException exception) {
911 throw new PersistenceException(
912 "Failed to initialise schema version", exception);
913 } finally{
914 SQLHelper.close(insert);
915 }
916 }
917
918 /***
919 * Register an event to collect and remove processed messages with the
920 * {@link BasicEventManager}
921 */
922 private void registerEvent() {
923
924
925
926
927
928
929
930
931 }
932
933 /***
934 * Creates a {@link DBConnectionManager} using its fully qualified class
935 * name
936 *
937 * @param className the fully qualified class name
938 * @throws PersistenceException if it cannot be created
939 */
940 private DBConnectionManager getConnectionManager(String className)
941 throws PersistenceException {
942
943 DBConnectionManager result = null;
944 Class clazz = null;
945 ClassLoader loader = Thread.currentThread().getContextClassLoader();
946 try {
947 if (loader != null) {
948 clazz = loader.loadClass(className);
949 }
950 } catch (ClassNotFoundException ignore) {
951 }
952 try {
953 if (clazz == null) {
954 clazz = Class.forName(className);
955 }
956 } catch (ClassNotFoundException exception) {
957 throw new PersistenceException(
958 "Failed to locate connection manager implementation: "
959 + className, exception);
960 }
961
962 try {
963 result = (DBConnectionManager) clazz.newInstance();
964 } catch (Exception exception) {
965 throw new PersistenceException(
966 "Failed to create connection manager", exception);
967 }
968
969 return result;
970 }
971
972 }