View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: ObjectAdapter.java,v 1.30 2003/08/17 01:32:25 tanderson Exp $
44   *
45   * Date         Author  Changes
46   * $Date	    jimm    Created
47   */
48  
49  
50  package org.exolab.jms.persistence;
51  
52  import java.io.Externalizable;
53  import java.io.IOException;
54  import java.io.ObjectInput;
55  import java.io.ObjectOutput;
56  import java.sql.Connection;
57  import java.util.Enumeration;
58  import java.util.HashMap;
59  import java.util.Vector;
60  
61  import javax.jms.JMSException;
62  
63  import org.apache.commons.logging.Log;
64  import org.apache.commons.logging.LogFactory;
65  
66  import org.exolab.core.database.recman.PMDHandle;
67  import org.exolab.core.database.recman.PMDHashMap;
68  import org.exolab.core.database.recman.PMDSessionManager;
69  import org.exolab.core.database.recman.PMDVector;
70  import org.exolab.core.database.recman.PageManagedDatabase;
71  import org.exolab.core.foundation.DatabaseIOException;
72  import org.exolab.core.foundation.FailedToAcquireLockException;
73  import org.exolab.core.foundation.FailedToCreateCollectionException;
74  import org.exolab.core.foundation.FailedToCreateDatabaseException;
75  import org.exolab.core.foundation.FailedToCreateLockException;
76  import org.exolab.core.foundation.FailedToCreateSessionException;
77  import org.exolab.core.foundation.FailedToDestroySessionException;
78  import org.exolab.core.foundation.HandleIfc;
79  import org.exolab.core.foundation.ObjectNameExistsException;
80  import org.exolab.core.foundation.PersistentObject;
81  import org.exolab.core.foundation.SessionIfc;
82  import org.exolab.core.foundation.TransactionException;
83  import org.exolab.core.foundation.TransactionInProgressException;
84  import org.exolab.core.foundation.TransactionNotInProgressException;
85  import org.exolab.jms.authentication.User;
86  import org.exolab.jms.client.JmsDestination;
87  import org.exolab.jms.client.JmsQueue;
88  import org.exolab.jms.client.JmsTopic;
89  import org.exolab.jms.config.Configuration;
90  import org.exolab.jms.config.ConfigurationManager;
91  import org.exolab.jms.config.DatabaseConfiguration;
92  import org.exolab.jms.events.BasicEventManager;
93  import org.exolab.jms.events.Event;
94  import org.exolab.jms.events.EventHandler;
95  import org.exolab.jms.events.IllegalEventDefinedException;
96  import org.exolab.jms.message.DestinationImpl;
97  import org.exolab.jms.message.MessageImpl;
98  import org.exolab.jms.messagemgr.PersistentMessageHandle;
99  
100 
101 /***
102  * This adapter is a wrapper class around the persistency mechanism.
103  * It isolates the client from the working specifics of the database, by
104  * providing a simple straight forward interface. Furure changes to
105  * the database will only require changes to the adapter.
106  *
107  * <P>For this release the PageManager is used as the underlying database.
108  *
109  * @version     $Revision: 1.30 $ $Date: 2003/08/17 01:32:25 $
110  * @author      <a href="mailto:mourikis@exolab.org">Jim Mourikis</a>
111  * @see org.exolab.core.database.recman.PMDSessionManager
112  * @see org.exolab.core.database.recman.PMDHashMap
113  * @see	org.exolab.core.database.recman.PMDVector
114  * @see org.exolab.core.foundation.PersistentObject
115  */
116 public class ObjectAdapter
117     extends PersistenceAdapter
118     implements EventHandler {
119 
120     // the database instance
121     private PageManagedDatabase db_ = null;
122 
123     // The name of this database.
124     private String dbname_ = null;
125 
126     // The max time to wait to aquire a lock
127     static private int MAX_WAIT_TIME = 5000;
128 
129     // The root name for storing messages
130     final static private String MESSAGES = "MESSAGES";
131 
132     // The root name for storing destinations
133     final static private String DESTINATIONS = "DESTINATIONS";
134 
135     // All consumers or destinations have a handle
136     final static private String HANDLES = "HANDLES";
137 
138     // All topics destinations are prefixed with
139     final static private String TOPIC = "T_";
140 
141     // All queue destinations are prefixed with
142     final static private String QUEUE = "Q_";
143 
144     //The root name of the max id storage location
145     final static private String IDSTORAGE = "IDSTORAGE";
146 
147     //The database schema version root
148     final static private String VERSIONID = "VERSIONID";
149 
150     // The schema version number. Note this must be incremented whenever
151     // The schema changes.
152     final static private long VERSIONNUM = 7;
153 
154     // The minimum persistent consumer list size
155     private int minConsumerListSize_ = 10000;
156 
157     // The minimum persistent message list size
158     private int minMessageListSize_ = 20000;
159 
160     /***
161      * Reference to a dummy connection object, used to satisfy the fact
162      * that the object adapter can look like an Connection object
163      */
164     private NullConnection _connection = new NullConnection();
165 
166     /***
167      * This is the interval that the automatic garbage collector will
168      * execute, if specified. It is specified in seconds.
169      */
170     private int _gcInterval = 600;
171 
172     /***
173      * This is the block size that is used during purging.
174      */
175     private int _gcBlockSize = 500;
176 
177     /***
178      * This is the thread priority for the GC Thread
179      */
180     private int _gcThreadPriority = Thread.NORM_PRIORITY;
181 
182     /***
183      * This is the event that is fired to initiate garbage collection
184      * in the database
185      */
186     private static final int COLLECT_DATABASE_GARBAGE_EVENT = 1;
187 
188     /***
189      * The size of the CACHE used by the JDBM
190      */
191     private static final int MIN_CACHE_SIZE = 512;
192 
193     /***
194      * The logger
195      */
196     private static final Log _log = LogFactory.getLog(ObjectAdapter.class);
197 
198 
199     /***
200      * Constructs a databse with the given name, if one does not exist,
201      * otherwise opens the existing database.
202      * The database name is checked to see if it already has a ".db" at the
203      * end and stripped off if it does. The PageManagedDatabase always
204      * prefixes ".db" to any name passed in.
205      *
206      * @throws PersistenceException for any database error
207      *
208      */
209     public ObjectAdapter(String dbName, int minListSize, int minMsgSize,
210                          int minCacheSize)
211         throws PersistenceException {
212 
213         minConsumerListSize_ = Math.max(minConsumerListSize_, minListSize);
214         minMessageListSize_ = Math.max(minMessageListSize_, minMsgSize);
215 
216         dbname_ = fixName(dbName);
217         try {
218             db_ = new PageManagedDatabase(dbname_);
219 
220             // set the cache object size
221             //db_.setObjectCacheSize(Math.max(MIN_CACHE_SIZE, minCacheSize));
222 
223             // initialise the session manager with the created database and
224             // then
225             // check the version to ensure that it is compatible.
226             PMDSessionManager.init(db_);
227         } catch (FailedToCreateDatabaseException exception) {
228             throw new PersistenceException(
229                 "Failed to initialise database adapter", exception);
230         }
231 
232         checkVersion();
233         _log.debug("minConsumerListSize = " + minConsumerListSize_);
234         _log.debug("minMessageListSize = " + minMessageListSize_);
235 
236         // create the roots if they do not already exist
237         // jima - we really need to improve this part of the code
238         try {
239             createIdRoot();
240         } catch (Exception exception) {
241             throw new PersistenceException(
242                 "Failed to create database root", exception);
243         }
244 
245         // check whether we should initiate automatic garbage collection
246         DatabaseConfiguration config =
247             ConfigurationManager.getConfig().getDatabaseConfiguration();
248         if (config.hasGarbageCollectionInterval()) {
249             _gcInterval =
250                 config.getGarbageCollectionInterval() * 1000;
251             registerEvent();
252         }
253 
254         if (config.hasGarbageCollectionBlockSize()) {
255             _gcBlockSize =
256                 config.getGarbageCollectionBlockSize();
257         }
258 
259         if (config.hasGarbageCollectionThreadPriority()) {
260             _gcThreadPriority =
261                 config.getGarbageCollectionBlockSize();
262             if (_gcThreadPriority < Thread.MIN_PRIORITY) {
263                 _gcThreadPriority = Thread.MIN_PRIORITY;
264             } else if (_gcThreadPriority > Thread.MAX_PRIORITY) {
265                 _gcThreadPriority = Thread.MAX_PRIORITY;
266             }
267         }
268     }
269 
270     /***
271      * Fix up the database name, by striping off any ".xx" suffixes.
272      *
273      */
274     private String fixName(String dbName) {
275         String newDbName = dbName;
276         newDbName.trim();
277 
278         int i = newDbName.lastIndexOf('.');
279         if (i > 0 && i < (newDbName.length() - 1)) {
280             newDbName = newDbName.substring(0, i);
281         }
282         return newDbName;
283     }
284 
285     /***
286      * Close the database if open.
287      *
288      */
289     public void close() {
290         if (db_ != null) {
291             db_.close();
292             dbname_ = null;
293             db_ = null;
294         }
295     }
296 
297     /***
298      * Check to see if the version is correct. if this is the first time the
299      * database has been opened, then created the version root, and itialise
300      * the version.
301      *
302      */
303     private void checkVersion() {
304         try {
305             PMDVector vector;
306             SessionIfc session = getSession();
307 
308             session.getCurrentTransaction().begin();
309             if ((vector = (PMDVector) session.lookup(VERSIONID)) == null) {
310                 _log.debug("Setting Version Id");
311                 vector =
312                     (PMDVector) session.getCollectionManager().createVector();
313                 session.createObject(vector);
314                 session.bind(VERSIONID, vector);
315                 session.getCurrentTransaction().commit();
316 
317                 session.getCurrentTransaction().begin();
318                 vector = (PMDVector) session.lookup(VERSIONID);
319                 session.acquireLock(vector, 10);
320                 vector.addElement(new PMDLongInteger(VERSIONNUM));
321                 session.updateObject(vector);
322                 session.getCurrentTransaction().commit();
323             } else {
324                 if (vector.size() == 1) {
325                     long ver;
326 
327                     if ((ver = ((PMDLongInteger) vector.get(0)).get())
328                         != VERSIONNUM) {
329                         _log.error(
330                             "Incompatible Database version Schema\n" +
331                             "Db Vesrsion = " + ver + "\tSchema Version = "
332                             + VERSIONNUM + "\nExiting...");
333                         System.exit(-1);
334                     }
335                 } else {
336                     _log.error("Corrupted Db schema version ID:\n " +
337                         "Exiting...");
338                     System.exit(-1);
339                 }
340                 session.getCurrentTransaction().abort();
341             }
342             try {
343                 PMDSessionManager.instance().destroySession();
344             } catch (FailedToDestroySessionException exception) {
345                 _log.error("Failed to destroy session", exception);
346             }
347         } catch (Exception exception) {
348             _log.error("Error verifying DB schema, exiting", exception);
349             System.exit(-1);
350         }
351 
352     }
353 
354     /***
355      * Check to see if the Id root has been created.
356      * If not create it and initialise it. This is used as a sequencer
357      * to number all Messages that are delivered to OpenJMS server.
358      *
359      */
360     private void createIdRoot()
361         throws FailedToCreateSessionException, TransactionInProgressException,
362         FailedToCreateCollectionException, DatabaseIOException,
363         TransactionNotInProgressException, ObjectNameExistsException,
364         TransactionException, FailedToCreateLockException,
365         FailedToAcquireLockException, FailedToDestroySessionException {
366         PMDVector vector;
367         PMDHashMap map;
368         SessionIfc session = getSession();
369 
370         session.getCurrentTransaction().begin();
371         if ((vector = (PMDVector) session.lookup(IDSTORAGE)) == null) {
372             _log.debug("Creating ID Storage root");
373             vector =
374                 (PMDVector) session.getCollectionManager().createVector();
375             session.createObject(vector);
376             session.bind(IDSTORAGE, vector);
377             session.getCurrentTransaction().commit();
378 
379             session.getCurrentTransaction().begin();
380             vector = (PMDVector) session.lookup(IDSTORAGE);
381             session.acquireLock(vector, MAX_WAIT_TIME);
382             vector.addElement(new PMDLongInteger(0));
383             session.updateObject(vector);
384             session.getCurrentTransaction().commit();
385         } else {
386             session.getCurrentTransaction().abort();
387         }
388 
389         session.getCurrentTransaction().begin();
390         if ((map = (PMDHashMap) session.lookup(MESSAGES)) == null) {
391             _log.debug("Creating Messages root");
392             map =
393                 (PMDHashMap) session.getCollectionManager().createHashMap();
394             session.createObject(map);
395             session.bind(MESSAGES, map);
396             session.getCurrentTransaction().commit();
397         } else {
398             session.getCurrentTransaction().abort();
399         }
400 
401         session.getCurrentTransaction().begin();
402         if ((vector = (PMDVector) session.lookup(DESTINATIONS)) == null) {
403             _log.debug("Creating Destinations root");
404             vector =
405                 (PMDVector) session.getCollectionManager().createVector();
406             session.createObject(vector);
407             session.bind(DESTINATIONS, vector);
408             session.getCurrentTransaction().commit();
409         } else {
410             session.getCurrentTransaction().abort();
411         }
412         PMDSessionManager.instance().destroySession();
413     }
414 
415     // implementation of PersistenceAdapter.getLastIds
416     public long getLastId(Connection connection)
417         throws PersistenceException {
418         long lastId = -1;
419 
420         try {
421             createIdRoot();
422             PMDVector vector;
423             SessionIfc session = getSession();
424 
425             session.getCurrentTransaction().begin();
426             if ((vector = (PMDVector) session.lookup(IDSTORAGE)) != null) {
427                 if (vector.size() == 1) {
428                     lastId = ((PMDLongInteger) vector.get(0)).get();
429                 }
430             }
431             session.getCurrentTransaction().abort();
432         } catch (Exception err) {
433             _log.error("Failed to get Id: " + err.getMessage());
434         }
435         try {
436             PMDSessionManager.instance().destroySession();
437         } catch (FailedToDestroySessionException sessErr) {
438             _log.error("Failed to destroy session: " +
439                 sessErr.getMessage());
440             sessErr.printStackTrace();
441             // Session not destroyed.
442         }
443         return lastId;
444     }
445 
446     // implementation of PersistenceAdapter.updateIds
447     public void updateIds(Connection connection, long id)
448         throws PersistenceException {
449         try {
450             PMDVector vector;
451             SessionIfc session = getSession();
452 
453             session.getCurrentTransaction().begin();
454             if ((vector = (PMDVector) session.lookup(IDSTORAGE)) != null) {
455                 session.acquireLock(vector, MAX_WAIT_TIME);
456                 if (vector.size() == 1) {
457                     vector.set(0, new PMDLongInteger(id));
458                     session.updateObject(vector);
459                 }
460 
461             }
462             session.getCurrentTransaction().commit();
463         } catch (Exception err) {
464             throw new PersistenceException("Failed to get Id: " + err.getMessage());
465         }
466         try {
467             PMDSessionManager.instance().destroySession();
468         } catch (FailedToDestroySessionException sessErr) {
469             _log.error("Failed to destroy session", sessErr);
470         }
471     }
472 
473     // implementation of PersistenceAdapter.addMessage
474     public synchronized void addMessage(Connection connection,
475                                         MessageImpl message)
476         throws PersistenceException {
477 
478         try {
479             DestinationImpl dest =
480                 (DestinationImpl) message.getJMSDestination();
481             String name = dest.getDestination();
482             PMDHashMap messages;
483             SessionIfc session = getSession();
484 
485             session.getCurrentTransaction().begin();
486 
487             messages = messageTable(session);
488 
489             if (messages != null) {
490                 try {
491                     session.acquireLock(messages, MAX_WAIT_TIME);
492 
493                     // check to see that the message does not exist
494                     String id = message.getMessageId().getId();
495                     if (messages.get(id) == null) {
496                         PersistentMessage pm = new PersistentMessage(message);
497                         session.createObject(pm);
498                         // when we initially add a message to the database it
499                         // must go in the unprocessed list
500                         messages.put(id, session.createHandle(pm));
501                         session.updateObject(messages);
502                     } else {
503                         throw new PersistenceException("Mesage with this id: " +
504                             message.getJMSMessageID() +
505                             " already exists: NOT ADDING");
506                     }
507                 } catch (Exception err) {
508                     throw new PersistenceException("Error in addMessage " +
509                         err.toString());
510                 }
511             } else {
512                 _log.error("Attempt to save message for non register" +
513                     " queue/topic, name = " + name);
514             }
515 
516             session.getCurrentTransaction().commit();
517             PMDSessionManager.instance().destroySession();
518         } catch (PersistenceException exception) {
519             // rethrowe it
520             throw exception;
521         } catch (Exception err) {
522             throw new PersistenceException("Error in addMessage " + err.toString());
523         }
524     }
525 
526     // implementation of PersistenceAdapter.updateMessage
527     public synchronized void updateMessage(Connection connection,
528                                            MessageImpl message)
529         throws PersistenceException {
530 
531         try {
532             DestinationImpl dest =
533                 (DestinationImpl) message.getJMSDestination();
534             String name = dest.getDestination();
535             PMDHashMap messages;
536             SessionIfc session = getSession();
537 
538             session.getCurrentTransaction().begin();
539 
540             messages = messageTable(session);
541 
542             if (messages != null) {
543                 try {
544                     session.acquireLock(messages, MAX_WAIT_TIME);
545 
546                     // check to see that the message does not exist
547                     String id = message.getMessageId().getId();
548                     if (messages.get(id) != null) {
549                         // remove the old message and update it
550                         PMDHandle handle = (PMDHandle) messages.get(id);
551                         PersistentMessage pm = (PersistentMessage) handle.resolve();
552                         pm.setMessage(message);
553                         pm.setProcessed(message.getProcessed());
554                         session.updateObject(pm);
555                     } else {
556                         throw new PersistenceException("Mesage with this id: " +
557                             message.getJMSMessageID() +
558                             " doesn't exists: NOT UPDATING");
559                     }
560                 } catch (Exception err) {
561                     throw new PersistenceException("Error in updateMessage " +
562                         err.toString());
563                 }
564             } else {
565                 throw new PersistenceException("Attempt to save message for non register" +
566                     " queue/topic, name = " + name);
567             }
568 
569             session.getCurrentTransaction().commit();
570             PMDSessionManager.instance().destroySession();
571         } catch (PersistenceException pe) {
572             throw pe;
573         } catch (Exception err) {
574             throw new PersistenceException("Error in updateMessage " +
575                 err.toString());
576         }
577     }
578 
579     // implementation of PersistenceAdapter.removeMessage
580     public synchronized void removeMessage(Connection connection, String id)
581         throws PersistenceException {
582 
583         try {
584             PMDHashMap map;
585             SessionIfc session = getSession();
586 
587             session.getCurrentTransaction().begin();
588 
589             if ((map = messageTable(session)) != null) {
590                 try {
591                     session.acquireLock(map, MAX_WAIT_TIME);
592                     PMDHandle handle = (PMDHandle) map.remove(id);
593                     if (handle != null) {
594                         session.deleteObject
595                             ((PersistentMessage) handle.resolve());
596                     }
597 
598                     session.updateObject(map);
599                 } catch (Exception err) {
600                     throw new PersistenceException("Error in removeMessage " +
601                         err.toString());
602                 }
603             } else {
604                 throw new PersistenceException("Error in removeMessage " +
605                     "Cannot retrieve the message talbe ");
606             }
607 
608             session.getCurrentTransaction().commit();
609             PMDSessionManager.instance().destroySession();
610         } catch (PersistenceException pe) {
611             throw pe;
612         } catch (Exception err) {
613             throw new PersistenceException("Error in removeMessage " +
614                 err.toString());
615         }
616     }
617 
618     // implementation of PersistenceAdapter.getMessage
619     public synchronized MessageImpl getMessage(Connection connection, String id)
620         throws PersistenceException {
621         MessageImpl message = null;
622 
623         try {
624             PMDHashMap map;
625             SessionIfc session = getSession();
626 
627             session.getCurrentTransaction().begin();
628 
629             if ((map = messageTable(session)) != null) {
630                 try {
631                     session.acquireLock(map, MAX_WAIT_TIME);
632                     PMDHandle handle = (PMDHandle) map.get(id);
633                     PersistentMessage m = null;
634 
635                     if (handle != null) {
636                         m = (PersistentMessage) handle.resolve();
637                     }
638 
639                     if (m != null) {
640                         message = m.getMessage();
641                     }
642                 } catch (Exception err) {
643                     throw new PersistenceException("Error in getMessage " +
644                         err.toString());
645                 }
646             } else {
647                 throw new PersistenceException("Error in getMessage " +
648                     "Failed to retrieve the message table.");
649             }
650             session.getCurrentTransaction().abort();
651             PMDSessionManager.instance().destroySession();
652         } catch (PersistenceException pe) {
653             throw pe;
654         } catch (Exception err) {
655             throw new PersistenceException("Error in getMessage " +
656                 err.toString());
657         }
658         return message;
659     }
660 
661     // implementation of PersistenceAdapter.getUnprocessedMessages
662     public synchronized Vector getUnprocessedMessages(Connection connection)
663         throws PersistenceException {
664         Vector result = new Vector();
665 
666         try {
667             PMDHashMap map;
668             SessionIfc session = getSession();
669 
670             session.getCurrentTransaction().begin();
671 
672             if ((map = messageTable(session)) != null) {
673                 try {
674                     session.acquireLock(map, MAX_WAIT_TIME);
675                     Enumeration iter = map.elements();
676                     while (iter.hasMoreElements()) {
677                         PMDHandle handle = (PMDHandle) iter.nextElement();
678                         PersistentMessage m =
679                             (PersistentMessage) handle.resolve();
680 
681                         if (!m.getProcessed()) {
682                             result.add(m.getMessage());
683                         }
684                     }
685                 } catch (Exception err) {
686                     throw new PersistenceException(
687                         "Error in getUnprocessedMessages " + err.toString());
688                 }
689             } else {
690                 throw new PersistenceException(
691                     "Error in getUnprocessedMessage. Failed to get message table");
692             }
693             session.getCurrentTransaction().abort();
694             PMDSessionManager.instance().destroySession();
695         } catch (PersistenceException pe) {
696             throw pe;
697         } catch (Exception err) {
698             throw new PersistenceException("Error in getUnprocessedMessages "
699                 + err.toString());
700         }
701         return result;
702     }
703 
704     // implementation of PersistenceAdapter.getMessages
705     public synchronized Vector getMessages(Connection connection,
706                                            PersistentMessageHandle handle)
707         throws PersistenceException {
708         Vector messages = new Vector();
709 
710         // for the jdbm only ever retrieve a single message. We could improve
711         // this at a later date.
712         MessageImpl message = getMessage(connection,
713             handle.getMessageId().getId());
714         if (message != null) {
715             messages.add(message);
716         }
717 
718         return messages;
719     }
720 
721     // implementation of PersistenceAdapter.addMessageHandle
722     public synchronized void addMessageHandle(Connection connection,
723                                               PersistentMessageHandle handle)
724         throws PersistenceException {
725 
726         try {
727             PMDVector vector;
728             SessionIfc session = getSession();
729             String key = getHandlesRootName(handle.getDestination(),
730                 handle.getConsumerName());
731 
732 
733             session.getCurrentTransaction().begin();
734 
735             if ((vector = handleTable(key, session)) != null) {
736                 try {
737                     session.acquireLock(vector, MAX_WAIT_TIME);
738                     vector.addElement(handle);
739                     session.updateObject(vector);
740                 } catch (Exception err) {
741                     throw new PersistenceException("Error in addMessageHandle " +
742                         err.toString());
743                 }
744             } else {
745                 throw new PersistenceException("Error in addMessageHandle " +
746                     "Cannot get handle table for " + key);
747             }
748 
749             session.getCurrentTransaction().commit();
750             PMDSessionManager.instance().destroySession();
751         } catch (PersistenceException pe) {
752             throw pe;
753         } catch (Exception err) {
754             throw new PersistenceException("Error in addMessageHandle " +
755                 err.toString());
756         }
757     }
758 
759     // implementation of PersistenceAdapter.updateMessageHandle
760     public synchronized void updateMessageHandle(Connection connection,
761                                                  PersistentMessageHandle handle)
762         throws PersistenceException {
763 
764         try {
765             PMDVector vector;
766             SessionIfc session = getSession();
767             String key = getHandlesRootName(handle.getDestination(),
768                 handle.getConsumerName());
769 
770 
771             session.getCurrentTransaction().begin();
772 
773             if ((vector = handleTable(key, session)) != null) {
774                 try {
775                     session.acquireLock(vector, MAX_WAIT_TIME);
776 
777                     // linear search for the matching handle
778                     Enumeration handles = vector.elements();
779                     while (handles.hasMoreElements()) {
780                         PersistentMessageHandle phdl =
781                             (PersistentMessageHandle) handles.nextElement();
782 
783                         if (phdl.getMessageId().getId().equals(
784                             handle.getMessageId().getId())) {
785                             phdl.setDelivered(true);
786                             break;
787                         }
788                     }
789                     session.updateObject(vector);
790                 } catch (Exception err) {
791                     throw new PersistenceException("Error in addMessageHandle " +
792                         err.toString());
793                 }
794             } else {
795                 throw new PersistenceException("Error in updateMessageHandle " +
796                     "Failed to get handle table for " + key);
797             }
798 
799             session.getCurrentTransaction().commit();
800             PMDSessionManager.instance().destroySession();
801         } catch (PersistenceException pe) {
802             throw pe;
803         } catch (Exception err) {
804             throw new PersistenceException("Error in updateMessageHandle " +
805                 err.toString());
806         }
807     }
808 
809     // implementation of PersistenceAdapter.removeMessageHandle
810     public synchronized void removeMessageHandle(Connection connection,
811                                                  PersistentMessageHandle handle)
812         throws PersistenceException {
813 
814         try {
815             PMDVector vector;
816             SessionIfc session = getSession();
817             String key = getHandlesRootName(handle.getDestination(),
818                 handle.getConsumerName());
819 
820 
821             session.getCurrentTransaction().begin();
822 
823             if ((vector = handleTable(key, session)) != null) {
824                 try {
825                     session.acquireLock(vector, MAX_WAIT_TIME);
826                     vector.remove(handle);
827                     session.updateObject(vector);
828                 } catch (Exception err) {
829                     throw new PersistenceException("Error in removeMessageHandle " +
830                         err.toString());
831                 }
832             } else {
833                 throw new PersistenceException("Error in removeMessageHandle " +
834                     "Failed to get the handle table for " + key);
835             }
836 
837             session.getCurrentTransaction().commit();
838             PMDSessionManager.instance().destroySession();
839         } catch (PersistenceException pe) {
840             throw pe;
841         } catch (Exception err) {
842             throw new PersistenceException("Error in removeMessageHandle " +
843                 err.toString());
844         }
845     }
846 
847     // implementation of PersistenceAdapter.getMessageHandles
848     public synchronized Vector getMessageHandles(Connection connection,
849                                                  JmsDestination destination, String name)
850         throws PersistenceException {
851         Vector result = new Vector();
852 
853         try {
854             PMDVector vector;
855             SessionIfc session = getSession();
856             String key = getHandlesRootName(destination, name);
857 
858 
859             session.getCurrentTransaction().begin();
860 
861             if ((vector = handleTable(key, session)) != null) {
862                 try {
863                     session.acquireLock(vector, MAX_WAIT_TIME);
864 
865                     Enumeration handles = vector.elements();
866                     while (handles.hasMoreElements()) {
867                         PersistentMessageHandle handle =
868                             (PersistentMessageHandle) handles.nextElement();
869                         result.addElement(handle.clone());
870                     }
871                 } catch (Exception err) {
872                     throw new PersistenceException("Error in getMessageHandles " +
873                         err.toString());
874                 }
875             }
876 
877             session.getCurrentTransaction().commit();
878             PMDSessionManager.instance().destroySession();
879         } catch (PersistenceException pe) {
880             throw pe;
881         } catch (Exception err) {
882             throw new PersistenceException("Error in getMessageHandles " +
883                 err.toString());
884         }
885         return result;
886     }
887 
888     // implementation of PersistenceAdapter.addDurableConsumer
889     public synchronized void addDurableConsumer(Connection connection,
890                                                 String topic, String consumer)
891         throws PersistenceException {
892 
893         try {
894             PMDVector vector;
895             SessionIfc session = getSession();
896 
897             session.getCurrentTransaction().begin();
898 
899             if ((vector = destinationTable(session)) != null) {
900                 try {
901                     session.acquireLock(vector, MAX_WAIT_TIME);
902                     String target = "@" + consumer;
903 
904                     boolean found = false;
905                     Enumeration entries = vector.elements();
906                     while (entries.hasMoreElements()) {
907                         PersistentString entry = (PersistentString) entries.nextElement();
908                         if (entry.toString().endsWith(target)) {
909                             found = true;
910                             break;
911                         }
912                     }
913 
914                     String key = null;
915                     if (!found) {
916                         // add the target and also create a new table
917                         // to hold handles for this consumer
918                         vector.addElement(new PersistentString(TOPIC + topic + target));
919                         session.updateObject(vector);
920 
921                         key = getHandlesRootName(consumer);
922                         PMDVector handles =
923                             (PMDVector) session.getCollectionManager().createVector();
924                         session.createObject(handles);
925                         session.bind(key, handles);
926                     } else {
927                         throw new PersistenceException("Error in addDurableConsumer " +
928                             consumer + " already exists.");
929                     }
930                 } catch (Exception err) {
931                     throw new PersistenceException("Error in addDurableConsumer " +
932                         err.toString());
933                 }
934             } else {
935                 throw new PersistenceException("Error in addDurableConsumer " +
936                     "Failed to get the destination table.");
937             }
938 
939             session.getCurrentTransaction().commit();
940             PMDSessionManager.instance().destroySession();
941         } catch (PersistenceException pe) {
942             throw pe;
943         } catch (Exception err) {
944             throw new PersistenceException("Error in addDurableConsumer " +
945                 err.toString());
946         }
947     }
948 
949     // implementation of PersistenceAdapter.removeDurableConsumer
950     public synchronized void removeDurableConsumer(Connection connection,
951                                                    String consumer)
952         throws PersistenceException {
953 
954         try {
955             PMDVector vector;
956             SessionIfc session = getSession();
957 
958             session.getCurrentTransaction().begin();
959 
960             if ((vector = destinationTable(session)) != null) {
961                 try {
962                     session.acquireLock(vector, MAX_WAIT_TIME);
963                     String target = "@" + consumer;
964 
965                     boolean found = false;
966                     PersistentString entry = null;
967                     Enumeration entries = vector.elements();
968                     while (entries.hasMoreElements()) {
969                         entry = (PersistentString) entries.nextElement();
970                         if (entry.toString().endsWith(target)) {
971                             vector.remove(entry);
972                             found = true;
973                             break;
974                         }
975                     }
976 
977                     String key = null;
978                     if (found) {
979                         // add the target and also create a new table
980                         // to hold handles for this consumer
981                         vector.removeElement(entry);
982                         session.updateObject(vector);
983 
984                         // unbind the table (i.e delete it)
985                         key = getHandlesRootName(consumer);
986                         session.unbind(key);
987                     } else {
988                         throw new PersistenceException("Error in removeDurableConsumer " +
989                             "Cannot find consumer with name " + consumer);
990                     }
991                 } catch (Exception err) {
992                     throw new PersistenceException("Error in removeDurableConsumer " +
993                         err.toString());
994                 }
995             } else {
996                 throw new PersistenceException("Error in removeDurableConsumer " +
997                     "Cannot get access to the destination table");
998             }
999 
1000             session.getCurrentTransaction().commit();
1001             PMDSessionManager.instance().destroySession();
1002         } catch (PersistenceException pe) {
1003             throw pe;
1004         } catch (Exception err) {
1005             throw new PersistenceException("Error in removeDurableConsumer " +
1006                 err.toString());
1007         }
1008     }
1009 
1010     // implementation of PersistenceAdapter.durableConsumerExists
1011     public synchronized boolean durableConsumerExists(Connection connection,
1012                                                       String name)
1013         throws PersistenceException {
1014         boolean exists = false;
1015 
1016         try {
1017             PMDVector vector;
1018             SessionIfc session = getSession();
1019 
1020             session.getCurrentTransaction().begin();
1021 
1022             if ((vector = destinationTable(session)) != null) {
1023                 try {
1024                     session.acquireLock(vector, MAX_WAIT_TIME);
1025                     String target = "@" + name;
1026 
1027                     Enumeration entries = vector.elements();
1028                     while (entries.hasMoreElements()) {
1029                         PersistentString entry = (PersistentString) entries.nextElement();
1030                         if (entry.toString().endsWith(target)) {
1031                             exists = true;
1032                             break;
1033                         }
1034                     }
1035                 } catch (Exception err) {
1036                     throw new PersistenceException("Error in durableConsumerExists " +
1037                         err.toString());
1038                 }
1039             } else {
1040                 throw new PersistenceException("Error in durableConsumerExists " +
1041                     "Cannot get access to the destinationtable.");
1042             }
1043 
1044             session.getCurrentTransaction().commit();
1045             PMDSessionManager.instance().destroySession();
1046         } catch (PersistenceException pe) {
1047             throw pe;
1048         } catch (Exception err) {
1049             throw new PersistenceException("Error in durableConsumerExists " +
1050                 err.toString());
1051         }
1052         return exists;
1053     }
1054 
1055     // implementation of PersistenceAdapter.getDurableConsumers
1056     public synchronized Enumeration getDurableConsumers(Connection connection,
1057                                                         String topic)
1058         throws PersistenceException {
1059         Vector consumers = new Vector();
1060 
1061         try {
1062             PMDVector vector;
1063             SessionIfc session = getSession();
1064 
1065             session.getCurrentTransaction().begin();
1066 
1067             if ((vector = destinationTable(session)) != null) {
1068                 try {
1069                     session.acquireLock(vector, MAX_WAIT_TIME);
1070                     String target = TOPIC + topic + "@";
1071 
1072                     Enumeration entries = vector.elements();
1073                     while (entries.hasMoreElements()) {
1074                         PersistentString entry =
1075                             (PersistentString) entries.nextElement();
1076                         if (entry.toString().startsWith(target)) {
1077                             consumers.addElement(
1078                                 entry.toString().substring(target.length()));
1079                         }
1080                     }
1081                 } catch (Exception err) {
1082                     throw new PersistenceException("Error in getDurableConsumers " +
1083                         err.toString());
1084                 }
1085             } else {
1086                 throw new PersistenceException("Error in getDurableConsumers " +
1087                     "Failed to get access to the destination table.");
1088             }
1089 
1090             session.getCurrentTransaction().commit();
1091             PMDSessionManager.instance().destroySession();
1092         } catch (PersistenceException pe) {
1093             throw pe;
1094         } catch (Exception err) {
1095             throw new PersistenceException("Error in getDurableConsumers " +
1096                 err.toString());
1097         }
1098         return consumers.elements();
1099     }
1100 
1101     // implementation of PersistenceAdapter.getAllDurableConsumers
1102     public HashMap getAllDurableConsumers(Connection connection)
1103         throws PersistenceException {
1104         HashMap consumers = new HashMap();
1105 
1106         try {
1107             PMDVector vector;
1108             SessionIfc session = getSession();
1109 
1110             session.getCurrentTransaction().begin();
1111 
1112             if ((vector = destinationTable(session)) != null) {
1113                 try {
1114                     session.acquireLock(vector, MAX_WAIT_TIME);
1115 
1116                     Enumeration entries = vector.elements();
1117                     while (entries.hasMoreElements()) {
1118                         PersistentString entry =
1119                             (PersistentString) entries.nextElement();
1120                         String temp = entry.toString().substring(TOPIC.length());
1121                         int index = temp.indexOf("@");
1122                         if (!(entry.toString().startsWith(TOPIC)) ||
1123                             (index == -1)) {
1124                             // this is not a durable consumer so continue
1125                             continue;
1126                         }
1127 
1128                         // we have a durable consumer
1129                         consumers.put(temp.substring(index + 1),
1130                             temp.substring(0, index));
1131                     }
1132                 } catch (Exception err) {
1133                     throw new PersistenceException("Error in getAllDurableConsumers " +
1134                         err.toString());
1135                 }
1136             } else {
1137                 throw new PersistenceException("Error in getAllDurableConsumers " +
1138                     "Failed to get access to the destination  table");
1139             }
1140 
1141             session.getCurrentTransaction().commit();
1142             PMDSessionManager.instance().destroySession();
1143         } catch (PersistenceException pe) {
1144             throw pe;
1145         } catch (Exception err) {
1146             throw new PersistenceException("Error in getAllDurableConsumers " +
1147                 err.toString());
1148         }
1149         return consumers;
1150     }
1151 
1152     // implementation of PersistenceAdapter.addDestination
1153     public synchronized void addDestination(Connection connection,
1154                                             String name, boolean queue)
1155         throws PersistenceException {
1156 
1157         try {
1158             PMDVector vector;
1159             SessionIfc session = getSession();
1160 
1161             session.getCurrentTransaction().begin();
1162 
1163             if ((vector = destinationTable(session)) != null) {
1164                 try {
1165                     session.acquireLock(vector, MAX_WAIT_TIME);
1166                     String target;
1167                     if (queue) {
1168                         target = QUEUE + name;
1169                     } else {
1170                         target = TOPIC + name;
1171                     }
1172                     boolean found = false;
1173                     Enumeration entries = vector.elements();
1174                     while (entries.hasMoreElements()) {
1175                         PersistentString entry = (PersistentString) entries.nextElement();
1176                         if (entry.toString().equals(target)) {
1177                             found = true;
1178                             break;
1179                         }
1180                     }
1181 
1182                     String key = null;
1183                     if (!found) {
1184                         // add the target and also create a new table
1185                         // to hold handles for this consumer
1186                         vector.addElement(new PersistentString(target));
1187                         session.updateObject(vector);
1188 
1189                         // if it is a queue then we also need to construct
1190                         // a handles table
1191                         if (queue) {
1192                             key = getHandlesRootName(name);
1193                             PMDVector handles =
1194                                 (PMDVector) session.getCollectionManager().createVector();
1195                             session.createObject(handles);
1196                             session.bind(key, handles);
1197                         }
1198                     }
1199                 } catch (Exception err) {
1200                     throw new PersistenceException("Error in addDestination " +
1201                         err.toString());
1202                 }
1203             } else {
1204                 throw new PersistenceException("Error in addDestination " +
1205                     "Failed to get access to destination table");
1206             }
1207 
1208             session.getCurrentTransaction().commit();
1209             PMDSessionManager.instance().destroySession();
1210         } catch (PersistenceException pe) {
1211             throw pe;
1212         } catch (Exception err) {
1213             throw new PersistenceException("Error in addDestination " +
1214                 err.toString());
1215         }
1216     }
1217 
1218     // implementation of PersistenceAdapter.removeDestination
1219     public synchronized void removeDestination(Connection connection,
1220                                                String name)
1221         throws PersistenceException {
1222         try {
1223             PMDVector vector;
1224             SessionIfc session = getSession();
1225 
1226             session.getCurrentTransaction().begin();
1227 
1228             if ((vector = destinationTable(session)) != null) {
1229                 try {
1230                     session.acquireLock(vector, MAX_WAIT_TIME);
1231 
1232                     // this will remove the destination and all registered
1233                     // consumers.
1234                     Vector to_delete = new Vector();
1235                     Enumeration entries = vector.elements();
1236                     while (entries.hasMoreElements()) {
1237                         PersistentString wrapped_entry =
1238                             (PersistentString) entries.nextElement();
1239                         String entry = wrapped_entry.toString();
1240 
1241                         // now the destination name must appear before
1242                         // the '.' separator
1243                         int name_idx = entry.indexOf("@");
1244                         int dest_idx = entry.indexOf(name);
1245 
1246                         if (entry.substring(TOPIC.length()).equals(name)) {
1247                             // we have found the actual destination entry
1248                             // Firstly mark it for deletion and then if it is a queue
1249                             // unbind the consumer state
1250                             to_delete.addElement(wrapped_entry);
1251                             if (entry.startsWith(QUEUE)) {
1252                                 session.unbind(getHandlesRootName(name));
1253                             }
1254                         } else if ((name_idx >= 0) &&
1255                             (dest_idx >= 0) &&
1256                             (dest_idx < name_idx)) {
1257                             // we have located an entry which describes a
1258                             // durable consumer for a topic
1259                             String consumer = entry.substring(name_idx + 1);
1260                             to_delete.addElement(wrapped_entry);
1261                             session.unbind(getHandlesRootName(consumer));
1262                         }
1263                     }
1264 
1265                     //  now we need to go through and process the to_delete
1266                     // entries
1267                     Enumeration elements = to_delete.elements();
1268                     while (elements.hasMoreElements()) {
1269                         vector.remove((PersistentString) elements.nextElement());
1270                     }
1271 
1272                     // update the vector object
1273                     session.updateObject(vector);
1274                 } catch (Exception err) {
1275                     throw new PersistenceException("Error in removeDestination " +
1276                         err.toString());
1277                 }
1278             } else {
1279                 throw new PersistenceException("Error in removeDestination " +
1280                     "Failed to get access to the destination table.");
1281             }
1282 
1283             session.getCurrentTransaction().commit();
1284             PMDSessionManager.instance().destroySession();
1285         } catch (PersistenceException pe) {
1286             throw pe;
1287         } catch (Exception err) {
1288             throw new PersistenceException("Error in addDestination " +
1289                 err.toString());
1290         }
1291     }
1292 
1293     // implementation of PersistenceAdapter.checkDestination
1294     public boolean checkDestination(Connection connection, String name)
1295         throws PersistenceException {
1296         boolean success = false;
1297 
1298         try {
1299             PMDVector vector;
1300             SessionIfc session = getSession();
1301 
1302             session.getCurrentTransaction().begin();
1303 
1304             if ((vector = destinationTable(session)) != null) {
1305                 try {
1306                     session.acquireLock(vector, MAX_WAIT_TIME);
1307 
1308                     // this will remove the destination and all registered
1309                     // consumers.
1310                     Vector to_delete = new Vector();
1311                     Enumeration entries = vector.elements();
1312                     while (entries.hasMoreElements()) {
1313                         PersistentString entry =
1314                             (PersistentString) entries.nextElement();
1315 
1316                         // check that it is the specified destination
1317                         if (entry.toString().substring(
1318                             TOPIC.length()).equals(name)) {
1319                             success = true;
1320                             break;
1321                         }
1322                     }
1323                 } catch (Exception err) {
1324                     throw new PersistenceException("Error in checkDestination " +
1325                         err.toString());
1326                 }
1327             } else {
1328                 throw new PersistenceException("Error in checkDestination " +
1329                     "Failed to get access to destination table.");
1330             }
1331 
1332             session.getCurrentTransaction().abort();
1333             PMDSessionManager.instance().destroySession();
1334         } catch (PersistenceException pe) {
1335             throw pe;
1336         } catch (Exception err) {
1337             throw new PersistenceException("Error in checkDestination " +
1338                 err.toString());
1339         }
1340         return success;
1341     }
1342 
1343     // implementation of PersistenceAdapter.getAllDestinations
1344     public synchronized Enumeration getAllDestinations(Connection connection)
1345         throws PersistenceException {
1346         Vector destinations = new Vector();
1347 
1348         try {
1349             PMDVector vector;
1350             SessionIfc session = getSession();
1351 
1352             session.getCurrentTransaction().begin();
1353 
1354             if ((vector = destinationTable(session)) != null) {
1355                 try {
1356                     session.acquireLock(vector, MAX_WAIT_TIME);
1357 
1358                     // this will remove the destination and all registered
1359                     // consumers.
1360                     Vector to_delete = new Vector();
1361                     Enumeration entries = vector.elements();
1362                     while (entries.hasMoreElements()) {
1363                         PersistentString entry =
1364                             (PersistentString) entries.nextElement();
1365                         String dest = entry.toString();
1366                         // check that it is the specified destination
1367                         if (dest.indexOf("@") == -1) {
1368                             if (dest.startsWith(QUEUE)) {
1369                                 destinations.addElement(
1370                                     new JmsQueue(dest.substring(QUEUE.length())));
1371                             } else if (dest.startsWith(TOPIC)) {
1372                                 destinations.addElement(
1373                                     new JmsTopic(dest.substring(TOPIC.length())));
1374                             }
1375                         }
1376                     }
1377                 } catch (Exception err) {
1378                     throw new PersistenceException("Error in checkDestination " +
1379                         err.toString());
1380                 }
1381             } else {
1382                 throw new PersistenceException("Error in checkDestination " +
1383                     "Failed to get access to the destination table.");
1384             }
1385 
1386             session.getCurrentTransaction().abort();
1387             PMDSessionManager.instance().destroySession();
1388         } catch (PersistenceException pe) {
1389             throw pe;
1390         } catch (Exception err) {
1391             throw new PersistenceException("Error in checkDestination " +
1392                 err.toString());
1393         }
1394 
1395         return destinations.elements();
1396     }
1397 
1398     // implementation of PersistenceAdapter.getQueueMessageCount
1399     public synchronized int getQueueMessageCount(Connection connection,
1400                                                  String queue)
1401         throws PersistenceException {
1402         int count = -1;
1403 
1404         try {
1405             PMDVector vector;
1406             SessionIfc session = getSession();
1407             String key = getHandlesRootName(queue);
1408 
1409 
1410             session.getCurrentTransaction().begin();
1411 
1412             if ((vector = handleTable(key, session)) != null) {
1413                 try {
1414                     session.acquireLock(vector, MAX_WAIT_TIME);
1415                     count = vector.size();
1416                 } catch (Exception err) {
1417                     throw new PersistenceException("Error in getQueueMessageCount " +
1418                         err.toString());
1419                 }
1420             } else {
1421                 throw new PersistenceException("Error in getQueueMessageCount " +
1422                     "Failed to get access to queue " + queue);
1423             }
1424             session.getCurrentTransaction().abort();
1425             PMDSessionManager.instance().destroySession();
1426         } catch (PersistenceException pe) {
1427             throw pe;
1428         } catch (Exception err) {
1429             throw new PersistenceException("Error in getQueueMessageCount " +
1430                 err.toString());
1431         }
1432 
1433         return count;
1434     }
1435 
1436     // implementation of PersistenceAdapter.getQueueMessageCount
1437     public synchronized int getDurableConsumerMessageCount(Connection connection,
1438                                                            String topic, String name)
1439         throws PersistenceException {
1440         int count = -1;
1441 
1442         try {
1443             PMDVector vector;
1444             SessionIfc session = getSession();
1445             String key = getHandlesRootName(name);
1446 
1447 
1448             session.getCurrentTransaction().begin();
1449 
1450             if ((vector = handleTable(key, session)) != null) {
1451                 try {
1452                     session.acquireLock(vector, MAX_WAIT_TIME);
1453                     count = vector.size();
1454                 } catch (Exception err) {
1455                     throw new PersistenceException("Error in getDurableConsumerMessageCount " +
1456                         err.toString());
1457                 }
1458             } else {
1459                 throw new PersistenceException("Error in getDurableConsumerMessageCount " +
1460                     "Cannot access table for " + topic + " : " + name);
1461             }
1462 
1463             session.getCurrentTransaction().abort();
1464             PMDSessionManager.instance().destroySession();
1465         } catch (PersistenceException pe) {
1466             throw pe;
1467         } catch (Exception err) {
1468             throw new PersistenceException("Error in getDurableConsumerMessageCount " +
1469                 err.toString());
1470         }
1471 
1472         return count;
1473     }
1474 
1475     // implementation of PersistenceAdapter.getQueueMessageCount
1476     public synchronized void removeExpiredMessages(Connection connection)
1477         throws PersistenceException {
1478 
1479         SessionIfc session = null;
1480         try {
1481             PMDVector vector;
1482             PMDHashMap map;
1483             session = getSession();
1484 
1485             session.getCurrentTransaction().begin();
1486             long now = System.currentTimeMillis();
1487 
1488             if ((map = messageTable(session)) != null) {
1489                 try {
1490                     session.acquireLock(map, MAX_WAIT_TIME);
1491                     Enumeration iter = map.elements();
1492                     Vector to_remove = new Vector();
1493 
1494                     // collect a list of messages that have expired so that
1495                     // they can be deleted later
1496                     while (iter.hasMoreElements()) {
1497                         PMDHandle handle = (PMDHandle) iter.nextElement();
1498                         PersistentMessage msg = (PersistentMessage) handle.resolve();
1499                         if ((msg.getExpiryTime() != 0) &&
1500                             (msg.getExpiryTime() <= now)) {
1501                             session.deleteObject(msg);
1502                             to_remove.add(msg.getMessage().getJMSMessageID());
1503                         }
1504                     }
1505 
1506                     // delete the expired messages
1507                     while (to_remove.size() > 0) {
1508                         map.remove(to_remove.remove(0));
1509                     }
1510                     session.updateObject(map);
1511                 } catch (Exception err) {
1512                     throw new PersistenceException("Error in removeExpiredMessages " +
1513                         err.toString());
1514                 }
1515 
1516                 // now we need to go through all the handle tables and
1517                 // remove expired messages. Very long and tedious
1518                 if ((vector = destinationTable(session)) != null) {
1519                     try {
1520                         session.acquireLock(vector, MAX_WAIT_TIME);
1521                         Enumeration entries = vector.elements();
1522                         Vector to_remove = new Vector();
1523 
1524                         while (entries.hasMoreElements()) {
1525                             to_remove.clear();
1526                             PersistentString entry = (PersistentString) entries.nextElement();
1527                             String name = getHandlesRootNameFromDestination(
1528                                 entry.toString());
1529                             if (name == null) {
1530                                 continue;
1531                             }
1532 
1533                             // retrieve the handle tbale based on the name
1534                             PMDVector handles_vector = handleTable(name, session);
1535                             if (handles_vector == null) {
1536                                 continue;
1537                             }
1538 
1539                             // we have the handle table. Now iterate over all the
1540                             // messages and removed expired messages
1541                             Enumeration handles = handles_vector.elements();
1542                             while (handles.hasMoreElements()) {
1543                                 PersistentMessageHandle handle =
1544                                     (PersistentMessageHandle) handles.nextElement();
1545                                 if ((handle.getExpiryTime() != 0) &&
1546                                     (handle.getExpiryTime() <= now)) {
1547                                     to_remove.add(handle);
1548                                 }
1549                             }
1550 
1551                             // now remove the handles
1552                             while (to_remove.size() > 0) {
1553                                 handles_vector.remove(
1554                                     (PersistentMessageHandle) to_remove.remove(0));
1555                             }
1556                             session.updateObject(handles_vector);
1557                         }
1558                     } catch (Exception err) {
1559                         throw new PersistenceException("Error in removeExpiredMessages " +
1560                             err.toString());
1561                     }
1562                 } else {
1563                     throw new PersistenceException("Error in removeExpiredMessages " +
1564                         "Failed to get the message table");
1565                 }
1566             }
1567             session.getCurrentTransaction().commit();
1568             PMDSessionManager.instance().destroySession();
1569         } catch (PersistenceException pe) {
1570             try {
1571                 session.getCurrentTransaction().abort();
1572             } catch (Exception exception) {
1573                 // ignore
1574             }
1575             throw pe;
1576         } catch (Exception err) {
1577             throw new PersistenceException("Error in removeExpiredMessages " +
1578                 err.toString());
1579         }
1580     }
1581 
1582     // implementation of PersistenceAdapter.removeExpiredMessageHandles
1583     public void removeExpiredMessageHandles(Connection connection,
1584                                             String consumer)
1585         throws PersistenceException {
1586 
1587         // no operation
1588     }
1589 
1590     // implementation of PersistenceAdapter.getQueueMessageCount
1591     public synchronized Vector getNonExpiredMessages(Connection connection,
1592                                                      JmsDestination destination)
1593         throws PersistenceException {
1594         Vector result = new Vector();
1595 
1596         SessionIfc session = null;
1597         try {
1598             PMDVector vector;
1599             session = getSession();
1600 
1601             session.getCurrentTransaction().begin();
1602             long now = System.currentTimeMillis();
1603 
1604             // now we need to go through all the handle tables and
1605             // retrieve messages which have not expired for the specified
1606             // destination
1607             if ((vector = destinationTable(session)) != null) {
1608                 try {
1609                     session.acquireLock(vector, MAX_WAIT_TIME);
1610                     Enumeration entries = vector.elements();
1611                     while (entries.hasMoreElements()) {
1612                         PersistentString entry = (PersistentString) entries.nextElement();
1613 
1614                         // check to see if we are interested in this destination
1615                         if (entry.toString().indexOf(destination.getName()) == -1) {
1616                             continue;
1617                         }
1618 
1619                         // attempt to retrieve the handle name table for the specified
1620                         // destination,
1621                         String name = getHandlesRootNameFromDestination(
1622                             entry.toString());
1623                         if (name == null) {
1624                             continue;
1625                         }
1626 
1627                         // retrieve the handle tbale based on the name
1628                         PMDVector handles_vector = handleTable(name, session);
1629                         if (handles_vector == null) {
1630                             continue;
1631                         }
1632 
1633                         // we have the handle table. Now iterate over all the
1634                         // messages and removed expired messages
1635                         Enumeration handles = handles_vector.elements();
1636                         while (handles.hasMoreElements()) {
1637                             PersistentMessageHandle handle =
1638                                 (PersistentMessageHandle) handles.nextElement();
1639                             if ((handle.getExpiryTime() != 0) &&
1640                                 (handle.getExpiryTime() > now)) {
1641                                 result.add(handle);
1642                             }
1643                         }
1644                     }
1645                 } catch (Exception err) {
1646                     throw new PersistenceException("Error in removeExpiredMessages " +
1647                         err.toString());
1648                 }
1649             } else {
1650                 throw new PersistenceException("Error in removeExpiredMessages " +
1651                     "The destination table does not exist");
1652             }
1653             session.getCurrentTransaction().commit();
1654             PMDSessionManager.instance().destroySession();
1655         } catch (PersistenceException pe) {
1656             try {
1657                 session.getCurrentTransaction().abort();
1658             } catch (Exception exception) {
1659                 // ignore
1660             }
1661             throw pe;
1662         } catch (Exception err) {
1663             throw new PersistenceException("Error in removeExpiredMessages " +
1664                 err.toString());
1665         }
1666 
1667         return result;
1668     }
1669 
1670     // implementation of PersistenceAdapter.purgeMessages
1671     public synchronized int purgeMessages() {
1672         boolean errorSet = false;
1673         Vector keep = new Vector();
1674         int count = -1;
1675 
1676         try {
1677             PMDHashMap map;
1678             PMDVector vector;
1679             SessionIfc session = getSession();
1680 
1681             session.getCurrentTransaction().begin();
1682             long now = System.currentTimeMillis();
1683 
1684             // now we need to go through all the handle tables and
1685             // get a unique list of unacked messages
1686             if ((vector = destinationTable(session)) != null) {
1687                 try {
1688                     session.acquireLock(vector, MAX_WAIT_TIME);
1689                     Enumeration entries = vector.elements();
1690                     while (entries.hasMoreElements()) {
1691                         PersistentString entry = (PersistentString) entries.nextElement();
1692 
1693                         // attempt to retrieve the handle name table for the specified
1694                         // destination,
1695                         String name = getHandlesRootNameFromDestination(
1696                             entry.toString());
1697                         if (name == null) {
1698                             continue;
1699                         }
1700                         // retrieve the handle table based on the name
1701                         PMDVector handles_vector = handleTable(name, session);
1702                         if (handles_vector == null) {
1703                             continue;
1704                         }
1705 
1706                         // we have the handle table. Now iterate over all the
1707                         // messages and removed expired messages
1708                         Enumeration handles = handles_vector.elements();
1709                         while (handles.hasMoreElements()) {
1710                             PersistentMessageHandle handle =
1711                                 (PersistentMessageHandle) handles.nextElement();
1712                             String id = handle.getMessageId().getId();
1713                             if (!keep.contains(id)) {
1714                                 keep.add(id);
1715                             }
1716                         }
1717                     }
1718 
1719 
1720                     // now that we have a definitive list of messages we
1721                     // need to remove messages that are not in the list
1722                     if ((map = messageTable(session)) != null) {
1723                         try {
1724                             session.acquireLock(map, MAX_WAIT_TIME);
1725                             Enumeration iter = map.keys();
1726                             Vector to_remove = new Vector();
1727 
1728                             // collect a list of messages that have expired so that
1729                             // they can be deleted later
1730                             while (iter.hasMoreElements()) {
1731                                 String id = (String) iter.nextElement();
1732                                 if (!keep.contains(id)) {
1733                                     session.deleteObject((PersistentMessage)
1734                                         ((PMDHandle) map.get(id)).resolve());
1735                                     to_remove.add(id);
1736                                 }
1737                             }
1738 
1739                             // delete the expired messages
1740                             count = to_remove.size();
1741                             while (to_remove.size() > 0) {
1742                                 map.remove(to_remove.remove(0));
1743                             }
1744                             session.updateObject(map);
1745                             _log.info("GC removed " +
1746                                 count + " messages from database.");
1747                         } catch (Exception exception) {
1748                             errorSet = true;
1749                             _log.error("Error in purgeMessages", exception);
1750                         }
1751                     } else {
1752                         _log.error("Message table does not exist");
1753                         errorSet = true;
1754                     }
1755                 } catch (Exception err) {
1756                     errorSet = true;
1757                     _log.error("Error in getNonExpiredMessages", err);
1758                 }
1759             } else {
1760                 // topic does not exists, may have been removed already.
1761                 errorSet = true;
1762             }
1763 
1764             if (!errorSet) {
1765                 session.getCurrentTransaction().commit();
1766             } else {
1767                 session.getCurrentTransaction().abort();
1768             }
1769             PMDSessionManager.instance().destroySession();
1770         } catch (FailedToDestroySessionException sessErr) {
1771             // Session not destroyed but message saved.
1772             _log.error("Failed to destroy session", sessErr);
1773         } catch (Exception err) {
1774             _log.error(err, err);
1775         }
1776 
1777         return count;
1778     }
1779 
1780     public void addUser(Connection connection, User user)
1781         throws PersistenceException {
1782         // unsupported
1783     }
1784 
1785     public Enumeration getAllUsers(Connection connection)
1786         throws PersistenceException {
1787         // unsupported
1788         return new Vector().elements();
1789     }
1790 
1791     public User getUser(Connection connection, User user)
1792         throws PersistenceException {
1793         // unsupported
1794         return null;
1795     }
1796 
1797     public void removeUser(Connection connection, User user)
1798         throws PersistenceException {
1799         // unsupported
1800     }
1801 
1802     public void updateUser(Connection connection, User user)
1803         throws PersistenceException {
1804         // unsupported
1805     }
1806 
1807     // implementation of PersistenceAdapter.getConnection
1808     public Connection getConnection()
1809         throws PersistenceException {
1810         return _connection;
1811     }
1812 
1813     // implementation of EventHandler.getHandle
1814     public HandleIfc getHandle() {
1815         return null;
1816     }
1817 
1818     // implementation of EventHandler.handleEvent
1819     public void handleEvent(int event, Object callback, long time) {
1820         if (event == COLLECT_DATABASE_GARBAGE_EVENT) {
1821             // collect garbage now, but before doing so change the thread
1822             // priority to low.
1823             try {
1824                 Thread.currentThread().setPriority(_gcThreadPriority);
1825                 purgeMessages();
1826             } finally {
1827                 Thread.currentThread().setPriority(Thread.NORM_PRIORITY);
1828                 registerEvent();
1829             }
1830         }
1831     }
1832 
1833     /***
1834      * Get the session from the session manager.
1835      *
1836      * @return SessionIfc	The unique session for this thread.
1837      * @exception FailedToCreateSessionException internal error
1838      *
1839      */
1840     private SessionIfc getSession() throws FailedToCreateSessionException {
1841         return PMDSessionManager.instance().getSession();
1842     }
1843 
1844     /***
1845      * Return the map where messages are stored.
1846      *
1847      * @param session - the session to use
1848      * @return PMDHashMap
1849      */
1850     private PMDHashMap messageTable(SessionIfc session) {
1851         return (PMDHashMap) session.lookup(MESSAGES);
1852     }
1853 
1854     /***
1855      * Return the vector used to store destinations and also the
1856      * registered dursble consumers.
1857      *
1858      * @param session - the session to use
1859      * @return PMDHashMap
1860      */
1861     private PMDVector destinationTable(SessionIfc session) {
1862         return (PMDVector) session.lookup(DESTINATIONS);
1863     }
1864 
1865     /***
1866      * Return the message handle table for the specified destination and
1867      * and name.
1868      * <p>
1869      * Assume this is only called from within a transaction
1870      *
1871      * @param key - the key to the consumer
1872      * @param session - the session object.
1873      * @return PMDHashMap the root object if it exists or null.
1874      *
1875      */
1876     private PMDVector handleTable(String key, SessionIfc session) {
1877         return (PMDVector) session.lookup(key);
1878     }
1879 
1880     /***
1881      * Return a generated root name to store consumer handles using the
1882      * destination and name
1883      *
1884      * @param destination - the destination
1885      * @param name - the consumer name
1886      * @return String - the root name
1887      */
1888     private String getHandlesRootName(JmsDestination destination, String name) {
1889         if (destination instanceof JmsQueue) {
1890             return getHandlesRootName(destination.getName());
1891         } else {
1892             return getHandlesRootName(name);
1893         }
1894     }
1895 
1896     /***
1897      * Return the handles root name given an internal string, which is an
1898      * entry from the destination table.
1899      * <p>
1900      * The format of the entries are QUEUE_<queue_name> for queues
1901      * or TOPIC_<topic_name>.<consumer_name> for topics.
1902      *
1903      * @param entry - the destination table entry
1904      * @return String - the associated handle table name
1905      */
1906     private String getHandlesRootNameFromDestination(String entry) {
1907         String name = null;
1908 
1909         if (entry.startsWith(QUEUE)) {
1910             name = HANDLES + "@" + entry.substring(QUEUE.length());
1911         } else if (entry.startsWith(TOPIC)) {
1912             int index = entry.indexOf("@");
1913             if (index != -1) {
1914                 name = HANDLES + "@" + entry.substring(index + 1);
1915             }
1916         }
1917 
1918         return name;
1919     }
1920 
1921     /***
1922      * Return a generated root name to store consumer handles using the
1923      * destination and name. The destination is always the name of a queue
1924      * destination
1925      *
1926      * @param queue - the name of the queue destination
1927      * @return String - the root name
1928      */
1929     private String getHandlesRootName(String name) {
1930         return HANDLES + "@" + name;
1931     }
1932 
1933     /***
1934      * Strip the "ID:" prefix from the string, and increment the long
1935      * by one, then add the prefix again.
1936      *
1937      * @param st The string to increment.
1938      * @return String The string incremented by 1.
1939      *
1940      */
1941     private String increment(String st) {
1942         String newId = null;
1943 
1944         try {
1945             newId = "ID:" + (Long.parseLong(st.substring(3)) + 1);
1946         } catch (NumberFormatException err) {
1947             _log.error("Invalid id: " + st);
1948             System.exit(-1);
1949         }
1950         return newId;
1951     }
1952 
1953     /***
1954      * Register an event to collect and remove processed messages with the
1955      * {@link BasicEventManager}
1956      */
1957     private void registerEvent() {
1958         try {
1959             BasicEventManager.instance().registerEventRelative(
1960                 new Event(COLLECT_DATABASE_GARBAGE_EVENT, this, null),
1961                 _gcInterval);
1962         } catch (IllegalEventDefinedException exception) {
1963             _log.error("registerEvent failed", exception);
1964         }
1965     }
1966 }
1967 
1968 
1969 /***
1970  * A simple wrapper to store long values in the database.
1971  *
1972  */
1973 class PMDLongInteger
1974     extends PersistentObject
1975     implements Externalizable {
1976 
1977     /***
1978      * The actual value
1979      */
1980     private long value_;
1981 
1982 
1983     /***
1984      * Used for serialization
1985      */
1986     static final long serialVersionUID = 1;
1987 
1988     public PMDLongInteger() {
1989     }
1990 
1991     public PMDLongInteger(long value) {
1992         value_ = value;
1993     }
1994 
1995     public long get() {
1996         return value_;
1997     }
1998 
1999     // implementation of Externalizable.writeExternal
2000     public void writeExternal(ObjectOutput stream)
2001         throws IOException {
2002         stream.writeLong(serialVersionUID);
2003         stream.writeLong(value_);
2004         super.writeExternal(stream);
2005     }
2006 
2007     // implementation of Externalizable.writeExternal
2008     public void readExternal(ObjectInput stream)
2009         throws IOException, ClassNotFoundException {
2010         long version = stream.readLong();
2011         if (version == serialVersionUID) {
2012             value_ = stream.readLong();
2013             super.readExternal(stream);
2014         } else {
2015             throw new IOException("PMDLongInteger with version " +
2016                 version + " is not supported.");
2017         }
2018     }
2019 
2020 } //-- ObjectAdapter