1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2000-2005 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: RDBMSAdapter.java,v 1.6 2005/08/31 05:45:50 tanderson Exp $
44 */
45 package org.exolab.jms.persistence;
46
47 import java.sql.Connection;
48 import java.sql.Date;
49 import java.sql.PreparedStatement;
50 import java.sql.ResultSet;
51 import java.sql.SQLException;
52 import java.util.Enumeration;
53 import java.util.HashMap;
54 import java.util.Vector;
55
56 import EDU.oswego.cs.dl.util.concurrent.FIFOReadWriteLock;
57 import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
58
59 import org.apache.commons.logging.Log;
60 import org.apache.commons.logging.LogFactory;
61
62 import org.exolab.jms.authentication.User;
63 import org.exolab.jms.client.JmsDestination;
64 import org.exolab.jms.client.JmsQueue;
65 import org.exolab.jms.client.JmsTopic;
66 import org.exolab.jms.config.DatabaseConfiguration;
67 import org.exolab.jms.config.RdbmsDatabaseConfiguration;
68 import org.exolab.jms.events.BasicEventManager;
69 import org.exolab.jms.events.EventHandler;
70 import org.exolab.jms.message.MessageImpl;
71 import org.exolab.jms.messagemgr.MessageHandle;
72
73
74 /***
75 * This adapter is a wrapper class around the persistency mechanism.
76 * It isolates the client from the working specifics of the database, by
77 * providing a simple straight forward interface. Furure changes to
78 * the database will only require changes to the adapter.
79 *
80 * @author <a href="mailto:mourikis@exolab.org">Jim Mourikis</a>
81 * @version $Revision: 1.6 $ $Date: 2005/08/31 05:45:50 $
82 */
83
84 public class RDBMSAdapter
85 extends PersistenceAdapter
86 implements EventHandler {
87
88 /***
89 * The seed generator.
90 */
91 private final SeedGenerator _seeds;
92
93 /***
94 * The destination manager.
95 */
96 private final Destinations _destinations;
97
98 /***
99 * The consumer manager.
100 */
101 private final Consumers _consumers;
102
103 /***
104 * The message manager.
105 */
106 private final Messages _messages;
107
108 /***
109 * The message handles manager.
110 */
111 private final MessageHandles _handles;
112
113 /***
114 * The user manager.
115 */
116 private final Users _users;
117
118 /***
119 * The schema version number. Note this must be incremented whenever
120 * The schema changes.
121 */
122 public static final String SCHEMA_VERSION = "V0.7.6";
123
124 /***
125 * The JDBC ConnectionManager.
126 */
127 private DBConnectionManager _connectionManager = null;
128
129 /***
130 * Lock to help prevent deadlocks when administratively removing
131 * destinations, while producers and consumers are actively sending
132 * and receiving messages. It ensures that when a destination is in the
133 * process of being removed, no other changes are occuring on the
134 * messages and message_handles tables.
135 */
136 private ReadWriteLock _destinationLock = new FIFOReadWriteLock();
137
138 /***
139 * The logger.
140 */
141 private static final Log _log = LogFactory.getLog(RDBMSAdapter.class);
142
143
144 /***
145 * Connects to the given db.
146 *
147 * @throws PersistenceException if a connection cannot be establised to the
148 * database
149 */
150 public RDBMSAdapter(DatabaseConfiguration dbConfig, String driver, String url,
151 String userName, String password)
152 throws PersistenceException {
153
154 RdbmsDatabaseConfiguration config =
155 dbConfig.getRdbmsDatabaseConfiguration();
156
157
158 _connectionManager = getConnectionManager(config.getClazz());
159 _connectionManager.setUser(userName);
160 _connectionManager.setPassword(password);
161 _connectionManager.setDriver(driver);
162 _connectionManager.setURL(url);
163 _connectionManager.setMaxActive(config.getMaxActive());
164 _connectionManager.setMaxIdle(config.getMaxIdle());
165 _connectionManager.setMinIdleTime(config.getMinIdleTime());
166 _connectionManager.setEvictionInterval(config.getEvictionInterval());
167 _connectionManager.setTestQuery(config.getTestQuery());
168 _connectionManager.setTestBeforeUse(config.getTestBeforeUse());
169
170
171 _connectionManager.init();
172
173 Connection connection = null;
174 try {
175
176
177 connection = getConnection();
178
179 String version = getSchemaVersion(connection);
180 if (version == null) {
181 initSchemaVersion(connection);
182 } else if (!version.equals(SCHEMA_VERSION)) {
183 throw new PersistenceException(
184 "Schema needs to be converted from version=" + version
185 + " to version=" + SCHEMA_VERSION
186 + "\nBack up your database, and run 'dbtool -migrate'"
187 + "to convert the schema");
188 }
189
190 _seeds = new SeedGenerator();
191 _consumers = new Consumers(_seeds, connection);
192 _destinations = new Destinations(_seeds, _consumers, connection);
193 _consumers.setDestinations(_destinations);
194 _messages = new Messages(_destinations);
195 _handles = new MessageHandles(_destinations, _consumers);
196 _users = new Users();
197 connection.commit();
198 } catch (PersistenceException exception) {
199 SQLHelper.rollback(connection);
200 throw exception;
201 } catch (Exception exception) {
202 throw new PersistenceException(
203 "Failed to initialise database adapter", exception);
204 } finally {
205 SQLHelper.close(connection);
206
207 }
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229 }
230
231 /***
232 * Close the database.
233 */
234 public void close() {
235 _consumers.close();
236 _destinations.close();
237 }
238
239
240 public long getLastId(Connection connection)
241 throws PersistenceException {
242
243 long lastId = -1;
244 PreparedStatement query = null;
245 ResultSet result = null;
246 PreparedStatement insert = null;
247 try {
248 query = connection.prepareStatement(
249 "select maxid from message_id where id = 1");
250 result = query.executeQuery();
251
252 if (result.next()) {
253 lastId = result.getInt(1);
254 } else {
255
256 insert = connection.prepareStatement(
257 "insert into message_id values (?,?)");
258 insert.setInt(1, 1);
259 insert.setLong(2, 0);
260 insert.executeUpdate();
261 lastId = 0;
262 }
263 } catch (Exception exception) {
264 throw new PersistenceException("Failed to get last message id",
265 exception);
266 } finally {
267 SQLHelper.close(result);
268 SQLHelper.close(insert);
269 SQLHelper.close(query);
270 }
271
272 return lastId;
273 }
274
275
276 public void updateIds(Connection connection, long id)
277 throws PersistenceException {
278 PreparedStatement insert = null;
279 try {
280 insert = connection.prepareStatement(
281 "update message_id set maxId = ? where id = 1");
282
283 insert.setLong(1, id);
284 insert.executeUpdate();
285 } catch (Exception exception) {
286 throw new PersistenceException("Failed to update message id",
287 exception);
288 } finally {
289 SQLHelper.close(insert);
290 }
291 }
292
293
294 public void addMessage(Connection connection, MessageImpl message)
295 throws PersistenceException {
296
297 long start = 0;
298
299 if (_log.isDebugEnabled()) {
300 start = System.currentTimeMillis();
301 }
302
303 try {
304 _destinationLock.readLock().acquire();
305 _messages.add(connection, message);
306 } catch (InterruptedException exception) {
307 throw new PersistenceException("Failed to acquire lock",
308 exception);
309 } finally {
310 _destinationLock.readLock().release();
311
312 if (_log.isDebugEnabled()) {
313 _log.debug("addMessage," +
314 (System.currentTimeMillis() - start));
315 }
316 }
317 }
318
319
320 public void updateMessage(Connection connection, MessageImpl message)
321 throws PersistenceException {
322 long start = 0;
323 if (_log.isDebugEnabled()) {
324 start = System.currentTimeMillis();
325 }
326
327 try {
328 _destinationLock.readLock().acquire();
329 _messages.update(connection, message);
330 } catch (InterruptedException exception) {
331 throw new PersistenceException("Failed to acquire lock",
332 exception);
333 } finally {
334 _destinationLock.readLock().release();
335 if (_log.isDebugEnabled()) {
336 _log.debug("updateMessage," +
337 (System.currentTimeMillis() - start));
338 }
339 }
340 }
341
342
343 public Vector getUnprocessedMessages(Connection connection)
344 throws PersistenceException {
345 long start = 0;
346 if (_log.isDebugEnabled()) {
347 start = System.currentTimeMillis();
348 }
349
350 try {
351 return _messages.getUnprocessedMessages(connection);
352 } finally {
353 if (_log.isDebugEnabled()) {
354 _log.debug(
355 "getUnprocessedMessages,"
356 + (System.currentTimeMillis() - start));
357 }
358 }
359 }
360
361
362
363 public void removeMessage(Connection connection, String id)
364 throws PersistenceException {
365 long start = 0;
366 if (_log.isDebugEnabled()) {
367 start = System.currentTimeMillis();
368 }
369
370 try {
371 _destinationLock.readLock().acquire();
372 _messages.remove(connection, id);
373 } catch (InterruptedException exception) {
374 throw new PersistenceException("Failed to acquire lock",
375 exception);
376 } finally {
377 _destinationLock.readLock().release();
378 if (_log.isDebugEnabled()) {
379 _log.debug("removeMessage," +
380 (System.currentTimeMillis() - start));
381 }
382 }
383 }
384
385
386 public MessageImpl getMessage(Connection connection, String id)
387 throws PersistenceException {
388 long start = 0;
389 if (_log.isDebugEnabled()) {
390 start = System.currentTimeMillis();
391 }
392
393 try {
394 return _messages.get(connection, id);
395 } finally {
396 if (_log.isDebugEnabled()) {
397 _log.debug(
398 "getMessage," + (System.currentTimeMillis() - start));
399 }
400 }
401 }
402
403
404 public Vector getMessages(Connection connection, MessageHandle handle)
405 throws PersistenceException {
406 long start = 0;
407 if (_log.isDebugEnabled()) {
408 start = System.currentTimeMillis();
409 }
410
411 try {
412 return _messages.getMessages(connection,
413 handle.getDestination()
414 .getName(), handle.getPriority(),
415 handle.getAcceptedTime());
416 } finally {
417 if (_log.isDebugEnabled()) {
418 _log.debug(
419 "getMessages," + (System.currentTimeMillis() - start));
420 }
421 }
422 }
423
424
425 public void addMessageHandle(Connection connection, MessageHandle handle)
426 throws PersistenceException {
427 long start = 0;
428 if (_log.isDebugEnabled()) {
429 start = System.currentTimeMillis();
430 }
431
432 try {
433 _destinationLock.readLock().acquire();
434 _handles.addMessageHandle(connection, handle);
435 } catch (InterruptedException exception) {
436 throw new PersistenceException("Failed to acquire lock",
437 exception);
438 } finally {
439 _destinationLock.readLock().release();
440 if (_log.isDebugEnabled()) {
441 _log.debug(
442 "addMessageHandle,"
443 + (System.currentTimeMillis() - start));
444 }
445 }
446 }
447
448
449 public void updateMessageHandle(Connection connection,
450 MessageHandle handle)
451 throws PersistenceException {
452 long start = 0;
453 if (_log.isDebugEnabled()) {
454 start = System.currentTimeMillis();
455 }
456
457 try {
458 _destinationLock.readLock().acquire();
459 _handles.updateMessageHandle(connection, handle);
460 } catch (InterruptedException exception) {
461 throw new PersistenceException("Failed to acquire lock",
462 exception);
463 } finally {
464 _destinationLock.readLock().release();
465 if (_log.isDebugEnabled()) {
466 _log.debug(
467 "updateMessageHandle,"
468 + (System.currentTimeMillis() - start));
469 }
470 }
471 }
472
473
474 public void removeMessageHandle(Connection connection,
475 MessageHandle handle)
476 throws PersistenceException {
477 long start = 0;
478 if (_log.isDebugEnabled()) {
479 start = System.currentTimeMillis();
480 }
481
482 try {
483 _destinationLock.readLock().acquire();
484 _handles.removeMessageHandle(connection, handle);
485 } catch (InterruptedException exception) {
486 throw new PersistenceException("Failed to acquire lock",
487 exception);
488 } finally {
489 _destinationLock.readLock().release();
490 if (_log.isDebugEnabled()) {
491 _log.debug(
492 "removeMessageHandle,"
493 + (System.currentTimeMillis() - start));
494 }
495 }
496 }
497
498
499 public Vector getMessageHandles(Connection connection,
500 JmsDestination destination, String name)
501 throws PersistenceException {
502 long start = 0;
503 if (_log.isDebugEnabled()) {
504 start = System.currentTimeMillis();
505 }
506
507 try {
508 return _handles.getMessageHandles(connection,
509 destination.getName(),
510 name);
511 } finally {
512 if (_log.isDebugEnabled()) {
513 _log.debug("getMessageHandles,"
514 + (System.currentTimeMillis() - start));
515 }
516 }
517 }
518
519
520 public void addDurableConsumer(Connection connection, String topic,
521 String consumer)
522 throws PersistenceException {
523
524 try {
525 _destinationLock.readLock().acquire();
526 _consumers.add(connection, topic, consumer);
527 } catch (InterruptedException exception) {
528 throw new PersistenceException("Failed to acquire lock",
529 exception);
530 } finally {
531 _destinationLock.readLock().release();
532 }
533 }
534
535
536 public void removeDurableConsumer(Connection connection, String consumer)
537 throws PersistenceException {
538
539 try {
540 _destinationLock.readLock().acquire();
541 _consumers.remove(connection, consumer);
542 } catch (InterruptedException exception) {
543 throw new PersistenceException("Failed to acquire lock",
544 exception);
545 } finally {
546 _destinationLock.readLock().release();
547 }
548 }
549
550
551 public Enumeration getDurableConsumers(Connection connection, String topic)
552 throws PersistenceException {
553 return _consumers.getDurableConsumers(topic).elements();
554 }
555
556
557 public HashMap getAllDurableConsumers(Connection connection)
558 throws PersistenceException {
559
560 return _consumers.getAllDurableConsumers();
561 }
562
563
564 public boolean durableConsumerExists(Connection connection, String name)
565 throws PersistenceException {
566
567 return _consumers.exists(name);
568 }
569
570
571 public void addDestination(Connection connection, String name,
572 boolean queue)
573 throws PersistenceException {
574
575 JmsDestination destination = (queue)
576 ? (JmsDestination) new JmsQueue(name)
577 : (JmsDestination) new JmsTopic(name);
578
579
580
581 try {
582 _destinationLock.readLock().acquire();
583 _destinations.add(connection, destination);
584 if (queue) {
585 _consumers.add(connection, name, name);
586 }
587 } catch (InterruptedException exception) {
588 throw new PersistenceException("Failed to acquire lock",
589 exception);
590 } finally {
591 _destinationLock.readLock().release();
592 }
593 }
594
595
596 public void removeDestination(Connection connection, String name)
597 throws PersistenceException {
598
599 JmsDestination destination = _destinations.get(name);
600 if (destination != null) {
601 try {
602 _destinationLock.writeLock().acquire();
603 _destinations.remove(connection, destination);
604 } catch (InterruptedException exception) {
605 throw new PersistenceException("Failed to acquire lock",
606 exception);
607 } finally {
608 _destinationLock.writeLock().release();
609 }
610 }
611 }
612
613
614 public Enumeration getAllDestinations(Connection connection)
615 throws PersistenceException {
616
617 return _destinations.getDestinations().elements();
618 }
619
620
621 public boolean checkDestination(Connection connection, String name)
622 throws PersistenceException {
623
624 return (_destinations.get(name) != null);
625 }
626
627
628 public int getQueueMessageCount(Connection connection, String name)
629 throws PersistenceException {
630
631 return _handles.getMessageCount(connection, name,
632 name);
633 }
634
635
636 public int getDurableConsumerMessageCount(Connection connection,
637 String destination, String name)
638 throws PersistenceException {
639
640 return _handles.getMessageCount(connection,
641 destination, name);
642 }
643
644
645 public void removeExpiredMessages(Connection connection)
646 throws PersistenceException {
647
648 _messages.removeExpiredMessages(connection);
649 }
650
651
652 public void removeExpiredMessageHandles(Connection connection,
653 String consumer)
654 throws PersistenceException {
655
656 _handles.removeExpiredMessageHandles(connection,
657 consumer);
658 }
659
660
661 public Vector getNonExpiredMessages(Connection connection,
662 JmsDestination destination)
663 throws PersistenceException {
664
665 return _messages.getNonExpiredMessages(connection,
666 destination);
667 }
668
669
670 public void handleEvent(int event, Object callback, long time) {
671
672
673
674
675
676
677
678
679
680
681
682
683 }
684
685 /***
686 * Return a connection to the database from the pool of connections. It will
687 * throw an PersistenceException if it cannot retrieve a connection. The
688 * client should close the connection normally, since the pool is a
689 * connection event listener.
690 *
691 * @return Connection - a pooled connection or null
692 * @throws PersistenceException - if it cannot retrieve a connection
693 */
694 public Connection getConnection()
695 throws PersistenceException {
696 return _connectionManager.getConnection();
697 }
698
699 /***
700 * Return a reference to the DBConnectionManager
701 *
702 * @return DBConnectionManager
703 */
704 public DBConnectionManager getDBConnectionManager() {
705 return _connectionManager;
706 }
707
708 public void addUser(Connection connection, User user)
709 throws PersistenceException {
710 _users.add(connection, user);
711 }
712
713 public Enumeration getAllUsers(Connection connection)
714 throws PersistenceException {
715 return _users.getAllUsers(connection).elements();
716 }
717
718 public User getUser(Connection connection, User user)
719 throws PersistenceException {
720 return _users.get(connection, user);
721 }
722
723 public void removeUser(Connection connection, User user)
724 throws PersistenceException {
725 _users.remove(connection, user);
726 }
727
728 public void updateUser(Connection connection, User user)
729 throws PersistenceException {
730 _users.update(connection, user);
731 }
732
733 /***
734 * Incrementally purge all processed messages from the database.
735 * @todo this needs to be revisited. See bug 816895
736 * - existing expired messages are purged at startup
737 * - messages received that subsequently expire while the server is
738 * running are removed individually.
739 * - not clear how the previous implementation ever worked.
740 * The Messages.getMessageIds() method returns all messages, not
741 * just those processed, nor is it clear that the processed flag
742 * is ever non-zero.
743 * The current implementation (as a fix for bug 816895 - Exception in
744 * purgeMessages) simply delegates to removeExpiredMessages()
745 *
746 * @return the number of messages deleted
747 */
748 public synchronized int purgeMessages() {
749
750
751 Connection connection = null;
752 try {
753 connection = getConnection();
754 removeExpiredMessages(connection);
755 connection.commit();
756 } catch (Exception exception) {
757 _log.error("Exception in purgeMessages", exception);
758 } finally {
759 SQLHelper.close(connection);
760 }
761 return 0;
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846 }
847
848 /***
849 * Get the schema version
850 *
851 * @param connection the connection to use
852 * @return the schema version, or null, if no version has been initialised
853 * @throws PersistenceException for any related persistence exception
854 */
855 private String getSchemaVersion(Connection connection)
856 throws PersistenceException {
857
858 String version = null;
859 PreparedStatement query = null;
860 ResultSet result = null;
861 try {
862 query = connection.prepareStatement(
863 "select version from system_data where id = 1");
864 result = query.executeQuery();
865 if (result.next()) {
866 version = result.getString(1);
867 }
868 } catch (SQLException exception) {
869 throw new PersistenceException("Failed to get the schema version",
870 exception);
871 } finally {
872 SQLHelper.close(result);
873 SQLHelper.close(query);
874
875 }
876 return version;
877 }
878
879 /***
880 * Initialise the schema version
881 *
882 * @param connection the connection to use
883 */
884 private void initSchemaVersion(Connection connection)
885 throws PersistenceException {
886
887 _log.info("Initialising schema version " + SCHEMA_VERSION);
888 PreparedStatement insert = null;
889 try {
890 insert = connection.prepareStatement("insert into system_data (id, version, creationdate) "
891 + "values (?,?,?)");
892 insert.setInt(1, 1);
893 insert.setString(2, SCHEMA_VERSION);
894 insert.setDate(3, new Date(System.currentTimeMillis()));
895 insert.executeUpdate();
896
897 } catch (SQLException exception) {
898 throw new PersistenceException(
899 "Failed to initialise schema version", exception);
900 } finally {
901 SQLHelper.close(insert);
902 }
903 }
904
905 /***
906 * Register an event to collect and remove processed messages with the
907 * {@link BasicEventManager}
908 */
909
910
911
912
913
914
915
916
917
918
919
920 /***
921 * Creates a {@link DBConnectionManager} using its fully qualified class
922 * name
923 *
924 * @param className the fully qualified class name
925 * @throws PersistenceException if it cannot be created
926 */
927 private DBConnectionManager getConnectionManager(String className)
928 throws PersistenceException {
929
930 DBConnectionManager result = null;
931 Class clazz = null;
932 ClassLoader loader = Thread.currentThread().getContextClassLoader();
933 try {
934 if (loader != null) {
935 clazz = loader.loadClass(className);
936 }
937 } catch (ClassNotFoundException ignore) {
938 }
939 try {
940 if (clazz == null) {
941 clazz = Class.forName(className);
942 }
943 } catch (ClassNotFoundException exception) {
944 throw new PersistenceException("Failed to locate connection manager implementation: "
945 + className, exception);
946 }
947
948 try {
949 result = (DBConnectionManager) clazz.newInstance();
950 } catch (Exception exception) {
951 throw new PersistenceException(
952 "Failed to create connection manager", exception);
953 }
954
955 return result;
956 }
957
958 }