View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2003 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   *
44   * $Id: ResourceManager.java,v 1.6 2003/08/17 01:32:25 tanderson Exp $
45   *
46   * Date			Author  Changes
47   * 20/11/2001   jima    Created
48   * 12/02/2002   jima    Changed the package name from .xa to .messagemgr
49   */
50  package org.exolab.jms.messagemgr;
51  
52  import java.io.File;
53  import java.io.FilenameFilter;
54  import java.sql.Connection;
55  import java.util.Comparator;
56  import java.util.HashMap;
57  import java.util.Iterator;
58  import java.util.LinkedList;
59  import java.util.TreeSet;
60  import java.util.Vector;
61  
62  import javax.jms.JMSException;
63  import javax.transaction.xa.XAException;
64  import javax.transaction.xa.XAResource;
65  import javax.transaction.xa.Xid;
66  
67  import org.apache.commons.logging.Log;
68  import org.apache.commons.logging.LogFactory;
69  
70  import org.exolab.core.service.BasicService;
71  import org.exolab.core.service.ServiceException;
72  import org.exolab.core.service.ServiceState;
73  import org.exolab.jms.config.Configuration;
74  import org.exolab.jms.message.MessageHandle;
75  import org.exolab.jms.message.MessageImpl;
76  import org.exolab.jms.persistence.DatabaseService;
77  import org.exolab.jms.persistence.PersistenceException;
78  import org.exolab.jms.tranlog.ExternalXid;
79  import org.exolab.jms.tranlog.StateTransactionLogEntry;
80  import org.exolab.jms.tranlog.TransactionLog;
81  import org.exolab.jms.tranlog.TransactionLogException;
82  import org.exolab.jms.tranlog.TransactionState;
83  import org.exolab.jms.util.UUID;
84  
85  
86  /***
87   * The resource manager provides XA support for the JMS Server.
88   * <p>
89   * The resource manager is responsible for managing the various transaction
90   * identifiers and managing the association between transaction ids and
91   * connections.
92   * <p>
93   * The resource manager will store the global XID's and their state in the
94   * database for recovery purposes.
95   * <p>
96   * Messages that arrive, and are associated with an XID are not processed
97   * through the {@link MessageMgr}. Instead they are routed to this resource
98   * managers where they are cached until the associated XID is committed or
99   * rolled back. If the transaction is successfully committed, through the 2PC
100  * protocol the messages will pass through the system.
101  * <p>
102  * Similarly, messages that are sent to consumers, either synchronously or
103  * asynchronously are also cached by the resource manager until the global
104  * transaction completes.
105  * <p>
106  * On startup the resource manager will read all incomplete transactions, which
107  * are incompleted into memory. It will then process trnasactions that have
108  * timed out.
109  * <p>
110  * The transaction manager will call the {@link #recover} method  and obtain a
111  * list of incomplete transaction for the purpose of completing them where
112  * possible.
113  *
114  * @version   $Revision: 1.6 $ $Date: 2003/08/17 01:32:25 $
115  * @author    <a href="mailto:jima@intalio.com">Jim Alateras</a>
116  */
117 public class ResourceManager
118     extends BasicService {
119 
120     /***
121      * The name of the service
122      */
123     private final static String RM_SERVICE_NAME = "XAResourceManager";
124 
125     /***
126      * The prefix used for all transaction log files, which are created and
127      * managed by the {@link TransactionLog}
128      */
129     private final static String RM_LOGFILE_PREFIX = "ojmsrm";
130 
131     /***
132      * The extension for all transaction log files
133      */
134     public final static String RM_LOGFILE_EXTENSION = ".log";
135 
136     /***
137      * This is used to indicate the garbage collection has been disabled and
138      * that the client will take responsibility for all aspects of log file
139      * management. This is useful in situations where the client wants to
140      * archive the transaction log files
141      * <p>
142      * This is the default mode for GC.
143      */
144     public static final int GC_DISABLED = 0;
145 
146     /***
147      * Synchronous gabrage collection is used to remove processed log files
148      * when the last trnasaction, in that log file, has been successfully
149      * processed. This is more efficient means since the log files does not
150      * need to be scanned asynchronously to determine whether all the
151      * transactions have been processed.
152      */
153     public static final int GC_SYNCHRONOUS = 1;
154 
155     /***
156      * Asynchronous garbage collection is used to remove processed log files
157      * asynchronous (i.e in a different thread context). This is rather
158      * expensive since it must manually scan each log file and determine
159      * whether all transactions, in that file, have been closed. If this is
160      * the case then it will remove the log file.
161      */
162     public static final int GC_ASYNCHRONOUS = 2;
163 
164     /***
165      * Maintains a singleton instance of the gc service
166      */
167     private static ResourceManager _instance = null;
168 
169     /***
170      * Used to synchronize the creation of the transaction manager
171      */
172     private static final Object _initializer = new Object();
173 
174     /***
175      * This is the maximum size, in bytes, of each transaction log file. The
176      * value can be overriden by the user
177      */
178     private int _logFileSize = 1000000;
179 
180     /***
181      * Maintains a collection of transaction log files currently in use by this
182      * resource manager
183      */
184     private TreeSet _logs = new TreeSet(new TranLogFileComparator());
185 
186     /***
187      * Maintain a mapping between the TRID (transaction id and the log file it
188      * is associated with.
189      */
190     private HashMap _tridToLogCache = new HashMap();
191 
192     /***
193      * Maintain a list of open TRIDs for a particular  {@link TransactionLog}
194      */
195     private HashMap _logToTridCache = new HashMap();
196 
197     /***
198      * This attribute is used to synchronize the modifications to the _tridToLog
199      * _logToTrid attributes
200      */
201     private final Object _cacheLock = new Object();
202 
203     /***
204      * This maintains a cache of all open transactions and the corresponding
205      * data. The key is the transaction identifier and the object is a LinkedList
206      * transaction entries, which includes both state and data
207      */
208     private HashMap _activeTransactions = new HashMap();
209 
210     /***
211      * The directory where the log files are stored. This can be set by the
212      * client
213      */
214     private String _logDirectory = ".";
215 
216     /***
217      * This is the number of the last log file created by the ResourceManager
218      */
219     private long _lastLogNumber = 0;
220 
221     /***
222      * The expiry time for transaction associated with this resource manager.
223      * This will either be configured or passed in with the transaction context
224      * The value is specified in seconds.
225      */
226     private int _txExpiryTime = 120;
227 
228     /***
229      * This attribute caches the garbage collection mode for the resouce
230      * managers. Valid values are specified by the GC_* constants.
231      * <p>
232      * By default garbage collection is disabled.
233      */
234     private int _gcMode = GC_SYNCHRONOUS;
235 
236     /***
237      * This is the id associated with this resource...need to work out who
238      * or what sets this.
239      */
240     private String _rid = UUID.next();
241 
242     /***
243      * The logger
244      */
245     private static final Log _log = LogFactory.getLog(ResourceManager.class);
246 
247 
248     /***
249      * Return the singleton instance of the ResourceManager
250      *
251      * @return ResourceManager
252      * @throws ResourceManagerException
253      */
254     public static ResourceManager instance()
255         throws ResourceManagerException {
256         if (_instance == null) {
257             synchronized (_initializer) {
258                 // we need to check again if multiple threads
259                 // have blocked on the creation of the singleton
260                 if (_instance == null) {
261                     _instance = new ResourceManager();
262                 }
263             }
264         }
265 
266         return _instance;
267     }
268 
269     /***
270      * Construct a resource manager using the default directory for its log
271      * files.
272      * <p>
273      * If there is a problem constructing this instances then throw a
274      * {@link ResourceManagerException} exception.
275      *
276      * @throws ResourceManagerException
277      */
278     private ResourceManager()
279         throws ResourceManagerException {
280 
281         // build the list of existing log files.
282         this("./logs");
283     }
284 
285     /***
286      * Construct a resource manager using the specified directory, which
287      * must already exist. If the directory does not exist or there is
288      * no permisssion to access it then throw a ResourceManagerException
289      *
290      * @param dir - the base directory
291      * @throws ResourceManagerException
292      */
293     public ResourceManager(String dir)
294         throws ResourceManagerException {
295         super(RM_SERVICE_NAME);
296         _logDirectory = dir;
297         File file = new File(dir);
298         if ((!file.exists()) ||
299             (!file.isDirectory())) {
300             throw new ResourceManagerException(dir +
301                 " does not exist or is not a directory");
302         }
303 
304         // build the list of existing log files.
305         buildLogFileList();
306 
307         // recover te list of log files
308         recover();
309     }
310 
311     /***
312      * Set the log directory. This is the directory where all the log files
313      * are stored
314      *
315      * @param dir - the name of the directory (absolute or relative)
316      * @throws IllegalArgumentException if the string is not a directory
317      */
318     public void setLogDirectory(String dir)
319         throws IllegalArgumentException {
320         if (!(new File(dir)).isDirectory()) {
321             throw new IllegalArgumentException(dir + " is not a directory");
322         } else {
323             _logDirectory = dir;
324         }
325     }
326 
327     /***
328      * Retrieve the name of the log directory
329      *
330      * @return String - log dir name
331      */
332     public String getLogDirectory() {
333         return _logDirectory;
334     }
335 
336     /***
337      * Set the maximum size of each log file. When this size is reached a
338      * new log file is created. The size of the log file can be changed
339      * dynamically during runtime. If it is not specified it will default
340      * to 1MB.
341      * <p>
342      * The size is specified in bytes
343      *
344      * @param size - the max size of each log file
345      */
346     public void setLogFileSize(int size) {
347         _logFileSize = size;
348     }
349 
350     /***
351      * Return the maximum size that each log file can grow too.
352      *
353      * @return int - log file size
354      */
355     public int getLogFileSize() {
356         return _logFileSize;
357     }
358 
359     /***
360      * Set the GC mode for the resource manager. Valid values are GC_SYNCHRONOUS,
361      * GC_ASYNCHRONOUS and GC_DISABLED.
362      * <p>
363      * @param mode - one of GC_*
364      * @return boolean - if the specified mode has been correctly set
365      */
366     public boolean setGCMode(int mode) {
367         boolean result = false;
368 
369         if ((mode == GC_DISABLED) ||
370             (mode == GC_SYNCHRONOUS) ||
371             (mode == GC_ASYNCHRONOUS)) {
372             _gcMode = mode;
373             result = true;
374         }
375 
376         return result;
377     }
378 
379     /***
380      * Return the garbage collection mode for the resource manager
381      *
382      * @return int
383      */
384     public int getGCMode() {
385         return _gcMode;
386     }
387 
388     /***
389      * Check whether garbage collection has been disabled
390      *
391      * @return boolean - true if gc is disabled
392      */
393     public boolean gcDisabled() {
394         return (_gcMode == GC_DISABLED) ? true : false;
395     }
396 
397     /***
398      * Log this published message so that it can be passed through the system
399      * when the associated global transaction commits.
400      *
401      * @param xid - the global transaction identity
402      * @param message - the message published
403      * @throws TransactionLogException - error adding the entry
404      * @throws ResourceManagerException - error getting the trnasaction log
405      * @throws JMSException - if there is an issue with prep'ing the message
406      */
407     public synchronized void logPublishedMessage(Xid xid, MessageImpl message)
408         throws TransactionLogException, ResourceManagerException, JMSException {
409         MessageMgr.instance().checkAndPrepareMessage(message);
410         logTransactionData(new ExternalXid(xid), _rid,
411             createPublishedMessageWrapper(message));
412     }
413 
414     /***
415      * Log that this message handle was sent to the consumer within the specified
416      * global transaction identity. The message will be acknowledged when the
417      * global transaction commits. Alternatively, if the global transaction is
418      * rolled back the message handle will be returned to the destination
419      *
420      * @param txid - the global transaction identity
421      * @param id - the consumer receiving this message
422      * @param handle - the handle of the message received
423      * @throws TransactionLogException - error adding the entry
424      * @throws ResourceManagerException - error getting the transaction log
425      */
426     public synchronized void logReceivedMessage(Xid xid, String id, MessageHandle handle)
427         throws TransactionLogException, ResourceManagerException {
428         logTransactionData(new ExternalXid(xid), _rid,
429             createReceivedMessageWrapper(id, handle));
430     }
431 
432     /***
433      * Add an {@link StateTransactionLogEntry} using the specified txid,
434      * rid and state
435      *
436      * @param xid - the transaction identifier
437      * @param state - the transaction log state
438      * @throws TransactionLogException - error adding the entry
439      * @throws ResourceManagerException - error getting the trnasaction log
440      */
441     public synchronized void logTransactionState(Xid xid, TransactionState state)
442         throws TransactionLogException, ResourceManagerException {
443         ExternalXid txid = new ExternalXid(xid);
444         switch (state.getOrd()) {
445             case TransactionState.OPENED_ORD:
446                 {
447                     TransactionLog log = getCurrentTransactionLog();
448                     addTridLogEntry(txid, log);
449                     log.logTransactionState(txid, _txExpiryTime * 1000, _rid,
450                         state);
451 
452                     // cache the transaction state
453                     _activeTransactions.put(txid, new LinkedList());
454                 }
455                 break;
456 
457             case TransactionState.PREPARED_ORD:
458                 // cache the transaction state
459                 LinkedList list = (LinkedList) _activeTransactions.get(txid);
460                 if (list != null) {
461                     list.add(state);
462                 } else {
463                     throw new ResourceManagerException("Trasaction " + txid +
464                         " is not active.");
465                 }
466                 break;
467 
468             case TransactionState.CLOSED_ORD:
469                 {
470                     TransactionLog log = getTransactionLog(txid);
471                     log.logTransactionState(txid, _txExpiryTime * 1000, _rid,
472                         state);
473                     removeTridLogEntry(txid, log);
474 
475                     // check whether this log has anymore open transactions
476                     synchronized (_cacheLock) {
477                         if ((_logToTridCache.get(log) == null) &&
478                             (!isCurrentTransactionLog(log))) {
479                             log.close();
480 
481                             // now check if gc mode is GC_SYNCHRONOUS. If it is
482                             // remove the log file
483                             if (_gcMode == GC_SYNCHRONOUS) {
484                                 try {
485                                     log.destroy();
486                                 } catch (TransactionLogException exception) {
487                                     exception.printStackTrace();
488                                 }
489                             }
490                         }
491                     }
492 
493                     // we also want to remove this entry from the list
494                     // of active transactions
495                     _activeTransactions.remove(txid);
496                 }
497                 break;
498 
499             default:
500                 throw new ResourceManagerException("Cannot process tx state " +
501                     state);
502         }
503     }
504 
505     /***
506      * Add an {@link DataTransactionLogEntry} using the specified txid,
507      * rid and data
508      *
509      * @param txid - the transaction identifier
510      * @param rid - the resource identifier
511      * @param state - the transaction log state
512      * @throws TransactionLogException - error adding the entry
513      * @throws ResourceManagerException - error getting the trnasaction log
514      */
515     synchronized void logTransactionData(ExternalXid txid, String rid,
516                                          Object data)
517         throws ResourceManagerException, TransactionLogException {
518         getTransactionLog(txid).logTransactionData(txid, _txExpiryTime * 1000,
519             rid, data);
520 
521         // we also want to add this to the transaction data for that
522         // txid
523         LinkedList list = (LinkedList) _activeTransactions.get(txid);
524         if (list != null) {
525             list.add(data);
526         } else {
527             throw new ResourceManagerException("Trasaction " + txid +
528                 " is not active.");
529         }
530     }
531 
532     /***
533      * This is the entry point for the garbage collection callback. It scans
534      * through the each transaction log file and determines whether it can
535      * be garbage collected. If it can then it simply destroys the corresponding
536      * TransactionLog.
537      */
538     public void garbageCollect() {
539         try {
540             int gcfiles = 0;
541 
542             // if there are no transaction log files then return
543             if (_logs.size() == 0) {
544                 return;
545             }
546 
547             TreeSet copy = null;
548             synchronized (_logs) {
549                 copy = new TreeSet(_logs);
550             }
551 
552             // remove the current log file, since this is likely to be the
553             // current log file
554             copy.remove(_logs.last());
555 
556             // process each of the remaining log files
557             while (copy.size() > 0) {
558                 TransactionLog log = (TransactionLog) copy.first();
559                 copy.remove(log);
560                 if (log.canGarbageCollect()) {
561                     // destroy the log
562                     log.destroy();
563 
564                     // remove it from the log cache
565                     synchronized (_logs) {
566                         _logs.remove(log);
567                     }
568 
569                     // increment the number of garbafe collected files
570                     ++gcfiles;
571                 }
572             }
573 
574             // print an informative message
575             _log.info("[RMGC] Collected " + gcfiles + " files.");
576         } catch (Exception exception) {
577             exception.printStackTrace();
578         }
579     }
580 
581     /***
582      * Ensure that a transaction with the specified xid is currently active.
583      * If this is the case then commit the transaction based onb the value
584      * of the onePhase flag.
585      * <p>
586      * This will have the effect of passing all messages through
587      *
588      * @param xid - the xa transaction identity
589      * @param onePhase - treu if it is a one phase commit
590      * @throws XAException - if there is a problem completing the call
591      */
592     public synchronized void commit(Xid id, boolean onePhase)
593         throws XAException {
594         // check that the xid is not null
595         if (id == null) {
596             throw new XAException(XAException.XAER_NOTA);
597         }
598 
599         // covert to our internal representation of an xid
600         ExternalXid xid = new ExternalXid(id);
601 
602         // check to see that the transaction is active and open. We should
603         // not be allowed to commit a committed transaction.
604         if (!isTransactionActive(xid)) {
605             throw new XAException(XAException.XAER_PROTO);
606         }
607 
608         // process all the messages associated with this global transaction
609         // If a message has been  published then sent it to the message mgr
610         // for processing. If a message has been consumed then remove it
611         // from the list of unconsumed messages.
612         Connection connection = null;
613         try {
614             // get a connection to the database
615             connection = DatabaseService.getConnection();
616 
617             // retrieve a list of recrods for the specified global transaction
618             // and process them. Ignore the state records and only process the
619             // data records, which are of type TransacitonalObjectWrapper.
620             Object[] records = getTransactionRecords(xid, _rid);
621             for (int index = 0; index < records.length; index++) {
622                 if (records[index] instanceof TransactionalObjectWrapper) {
623                     TransactionalObjectWrapper wrapper =
624                         (TransactionalObjectWrapper) records[index];
625                     if (wrapper.isPublishedMessage()) {
626                         // send the published message to the message manager
627                         MessageMgr.instance().add(connection,
628                             (MessageImpl) wrapper.getObject());
629 
630                     } else if (wrapper.isReceivedMessage()) {
631                         // if it is a received message handle then simply
632                         // delete it and mark it as acknowledged
633                         MessageHandle handle = ((ReceivedMessageWrapper) (wrapper)).getMessageHandle();
634                         if (handle instanceof PersistentMessageHandle) {
635                             MessageHandleFactory.destroyPersistentHandle(connection,
636                                 (PersistentMessageHandle) handle);
637                         } else {
638                             handle.destroy();
639                         }
640                     }
641                 } else {
642                     // ignore since it is a state records.
643                 }
644             }
645             connection.commit();
646         } catch (PersistenceException exception) {
647             if (connection != null) {
648                 try {
649                     connection.rollback();
650                 } catch (Exception nested) {
651                     // ignore
652                 }
653             }
654             throw new XAException("Failed in ResourceManager.commit : " +
655                 exception.toString());
656         } catch (Exception exception) {
657             throw new XAException("Failed in ResourceManager.commit : " +
658                 exception.toString());
659         } finally {
660             if (connection != null) {
661                 try {
662                     connection.close();
663                 } catch (Exception nested) {
664                     // ignore
665                 }
666             }
667 
668             // and now mark the transaction as closed
669             try {
670                 logTransactionState(xid, TransactionState.CLOSED);
671             } catch (Exception exception) {
672                 throw new XAException("Error processing commit : " + exception);
673             }
674         }
675     }
676 
677     /***
678      * Ends the work performed on behalf of a transaction branch. The resource
679      * manager disassociates the XA resource from the transaction branch
680      * specified and let the transaction be completedCommits an XA transaction
681      * that is in progress.
682      *
683      * @param xid - the xa transaction identity
684      * @param flags - one of TMSUCCESS, TMFAIL, or TMSUSPEND
685      * @throws XAException - if there is a problem completing the call
686      */
687     public synchronized void end(Xid id, int flags)
688         throws XAException {
689         //check the xid is not null
690         if (id == null) {
691             throw new XAException(XAException.XAER_NOTA);
692         }
693 
694         // covert to our internal representation of an xid
695         ExternalXid xid = new ExternalXid(id);
696 
697         // check that the flags are valid for this method
698         if ((flags != XAResource.TMSUSPEND) ||
699             (flags != XAResource.TMSUCCESS) ||
700             (flags != XAResource.TMFAIL)) {
701             throw new XAException(XAException.XAER_PROTO);
702         }
703 
704         switch (flags) {
705             case XAResource.TMFAIL:
706                 // check that the transaction exists
707                 if (!isTransactionActive(xid)) {
708                     throw new XAException(XAException.XAER_PROTO);
709                 }
710 
711                 // do not process that associated data, simply rollback
712                 rollback(xid);
713                 break;
714 
715             case XAResource.TMSUSPEND:
716                 // check that the transaction is opened
717                 if (!isTransactionActive(xid)) {
718                     throw new XAException(XAException.XAER_PROTO);
719                 }
720                 break;
721 
722             case XAResource.TMSUCCESS:
723                 // nothing to do here but check that the resource manager is
724                 // in a consistent state wrt to this xid. The xid should not
725                 // be active if it received the commit, forget etc.
726                 if (isTransactionActive(xid)) {
727                     throw new XAException(XAException.XAER_PROTO);
728                 }
729                 break;
730         }
731     }
732 
733     /***
734      * Tell the resource manager to forget about a heuristically completed
735      * transaction branch.
736      *
737      * @param xid - the xa transaction identity
738      * @throws XAException - if there is a problem completing the call
739      */
740     public synchronized void forget(Xid id)
741         throws XAException {
742         //check the xid is not null
743         if (id == null) {
744             throw new XAException(XAException.XAER_NOTA);
745         }
746 
747         // covert to our internal representation of an xid
748         ExternalXid xid = new ExternalXid(id);
749 
750         // check to see that the xid actually exists
751         if (!isTransactionActive(xid)) {
752             throw new XAException(XAException.XAER_PROTO);
753         }
754 
755         // call rollback to complete the work
756         rollback(id);
757     }
758 
759     /***
760      * Return the transaction timeout for this instance of the resource
761      * manager.
762      *
763      * @return int - the timeout in seconds
764      * @throws XAException - if there is a problem completing the call
765      */
766     public synchronized int getTransactionTimeout()
767         throws XAException {
768         return _txExpiryTime;
769     }
770 
771     /***
772      * Ask the resource manager to prepare for a transaction commit of the
773      * transaction specified in xid
774      *
775      * @param xid - the xa transaction identity
776      * @return int - XA_RDONLY or XA_OK
777      * @throws XAException - if there is a problem completing the call
778      */
779     public synchronized boolean isSameRM(XAResource xares)
780         throws XAException {
781         boolean result = false;
782 
783         if ((xares == this) ||
784             ((xares instanceof ResourceManager) &&
785             (((ResourceManager) xares)._rid.equals(_rid)))) {
786             result = true;
787         }
788 
789         return result;
790     }
791 
792     /***
793      * Obtain a list of prepared transaction branches from a resource manager.
794      * The transaction manager calls this method during recovery to obtain the
795      * list of transaction branches that are currently in prepared or
796      * heuristically completed states.
797      *
798      * @param flag - One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS. TMNOFLAGS
799      * @param Xid[] - the set of Xids to recover
800      * @throws XAException - if there is a problem completing the call
801      */
802     public synchronized int prepare(Xid id)
803         throws XAException {
804         //check the xid is not null
805         if (id == null) {
806             throw new XAException(XAException.XAER_NOTA);
807         }
808 
809         // covert to our internal representation of an xid
810         ExternalXid xid = new ExternalXid(id);
811 
812         // check to see that the xid actually exists
813         if (!isTransactionActive(xid)) {
814             throw new XAException(XAException.XAER_PROTO);
815         }
816 
817         // can a prepare for the same resource occur multiple times
818         // ????
819 
820         try {
821             logTransactionState(xid, TransactionState.PREPARED);
822         } catch (Exception exception) {
823             throw new XAException("Error processing prepare : " + exception);
824         }
825 
826         return XAResource.XA_OK;
827     }
828 
829     /***
830      * Inform the resource manager to roll back work done on behalf of a
831      * transaction branch
832      *
833      * @param xid - the xa transaction identity
834      * @throws XAException - if there is a problem completing the call
835      */
836     public synchronized Xid[] recover(int flag)
837         throws XAException {
838 
839         Xid[] result = new Xid[0];
840 
841         if ((flag == XAResource.TMNOFLAGS) ||
842             (flag == XAResource.TMSTARTRSCAN) ||
843             (flag == XAResource.TMENDRSCAN)) {
844             LinkedList xids = new LinkedList();
845             Iterator iter = _activeTransactions.keySet().iterator();
846             while (iter.hasNext()) {
847                 Xid xid = (Xid) iter.next();
848                 LinkedList list = (LinkedList) _activeTransactions.get(xid);
849                 if (list.size() > 1) {
850                     // need at least a start in the chain.
851                     Object last = list.getLast();
852                     if ((last instanceof StateTransactionLogEntry) &&
853                         (((StateTransactionLogEntry) last).getState().isPrepared())) {
854                         xids.add(xid);
855                     }
856                 }
857 
858             }
859             result = (Xid[]) xids.toArray();
860         }
861 
862         return result;
863     }
864 
865     /***
866      * Set the current transaction timeout value for this XAResource instance.
867      *
868      * @param seconds - timeout in seconds
869      * @return boolean - true if the new transaction timeout was accepted
870      * @throws XAException - if there is a problem completing the call
871      */
872     public synchronized void rollback(Xid id)
873         throws XAException {
874         //check the xid is not null
875         if (id == null) {
876             throw new XAException(XAException.XAER_NOTA);
877         }
878 
879         // covert to our internal representation of an xid
880         ExternalXid xid = new ExternalXid(id);
881 
882         // check to see that the xid actually exists
883         if (!isTransactionActive(xid)) {
884             throw new XAException(XAException.XAER_PROTO);
885         }
886 
887         // process the data in that transaction. If it was a published message
888         // then drop it. If it was a consumed message then return it back to
889         // the destination.
890         Connection connection = null;
891         try {
892             // get a connection to the database
893             connection = DatabaseService.getConnection();
894 
895             // retrieve a list of recrods for the specified global transaction
896             // and process them. Ignore the state records and only process the
897             // data records, which are of type TransacitonalObjectWrapper.
898             Object[] records = getTransactionRecords(xid, _rid);
899             for (int index = 0; index < records.length; index++) {
900                 if (records[index] instanceof TransactionalObjectWrapper) {
901                     TransactionalObjectWrapper wrapper =
902                         (TransactionalObjectWrapper) records[index];
903                     if (wrapper.isPublishedMessage()) {
904                         // we don't need to process these messages since the
905                         // global transaction has been rolled back.
906                     } else if (wrapper.isReceivedMessage()) {
907                         // use the ConsumerManager to retrieve an instance of
908                         // of the ConsumerEndpoint and then return the message
909                         // back to the consumer.
910                         ReceivedMessageWrapper rmsg_wrapper =
911                             (ReceivedMessageWrapper) wrapper;
912                         ConsumerEndpoint endpoint =
913                             ConsumerManager.instance().getConsumerEndpoint(
914                                 rmsg_wrapper.getConsumerId());
915                         if (endpoint != null) {
916                             // the endpoint exists so we must return the message
917                             // back to it.
918                             endpoint.returnMessage(
919                                 (MessageHandle) rmsg_wrapper.getObject());
920                         } else {
921                             // endpoint no longer exists so we can ignore this
922                         }
923                     }
924                 } else {
925                     // ignore since it is a state records.
926                 }
927             }
928 
929             connection.commit();
930         } catch (PersistenceException exception) {
931             if (connection != null) {
932                 try {
933                     connection.rollback();
934                 } catch (Exception nested) {
935                     // ignore
936                 }
937             }
938             throw new XAException("Failed in ResourceManager.rollback : " +
939                 exception.toString());
940         } catch (Exception exception) {
941             throw new XAException("Failed in ResourceManager.rollback : " +
942                 exception.toString());
943         } finally {
944             if (connection != null) {
945                 try {
946                     connection.close();
947                 } catch (Exception nested) {
948                     // ignore
949                 }
950             }
951 
952             // and now mark the transaction as closed
953             try {
954                 logTransactionState(xid, TransactionState.CLOSED);
955             } catch (Exception exception) {
956                 throw new XAException("Error processing rollback : " + exception);
957             }
958         }
959     }
960 
961     /***
962      * Start work on behalf of a transaction branch specified in xid If TMJOIN
963      * is specified, the start is for joining a transaction previously seen by
964      * the resource manager
965      *
966      * @param xid - the xa transaction identity
967      * @param flags - One of TMNOFLAGS, TMJOIN, or TMRESUME
968      * @throws XAException - if there is a problem completing the call
969      */
970     public synchronized boolean setTransactionTimeout(int seconds)
971         throws XAException {
972         _txExpiryTime = seconds;
973         return true;
974     }
975 
976     // implementation of XAResource.start
977     public synchronized void start(Xid id, int flags)
978         throws XAException {
979 
980         //check the xid is not null
981         if (id == null) {
982             throw new XAException(XAException.XAER_NOTA);
983         }
984 
985         // covert to our internal representation of an xid
986         ExternalXid xid = new ExternalXid(id);
987 
988         // check that the flags are valid for this method
989         if ((flags != XAResource.TMNOFLAGS) ||
990             (flags != XAResource.TMJOIN) ||
991             (flags != XAResource.TMRESUME)) {
992             throw new XAException(XAException.XAER_PROTO);
993         }
994 
995         switch (flags) {
996             case XAResource.TMNOFLAGS:
997                 // check to see that the xid does not already exist
998                 if (isTransactionActive(xid)) {
999                     throw new XAException(XAException.XAER_DUPID);
1000                 }
1001 
1002                 // otherwise log the start of the transaction
1003                 try {
1004                     logTransactionState(xid, TransactionState.OPENED);
1005                 } catch (Exception exception) {
1006                     throw new XAException("Error processing start : " + exception);
1007                 }
1008                 break;
1009 
1010             case XAResource.TMJOIN:
1011             case XAResource.TMRESUME:
1012                 // joining a transaction previously seen by the resource
1013                 // manager
1014                 if (!isTransactionActive(xid)) {
1015                     throw new XAException(XAException.XAER_PROTO);
1016                 }
1017                 break;
1018         }
1019     }
1020 
1021     // override ServiceManager.start
1022     public void start()
1023         throws ServiceException {
1024         this.setState(ServiceState.RUNNING);
1025     }
1026 
1027     // override ServiceManager.stop
1028     public void stop()
1029         throws ServiceException {
1030         this.setState(ServiceState.STOPPED);
1031     }
1032 
1033     // override ServiceManager.run
1034     public void run() {
1035         // do nothing
1036     }
1037 
1038     /***
1039      * Return the resource manager identity
1040      *
1041      * @return the resource manager identity
1042      */
1043     public String getResourceManagerId() {
1044         return _rid;
1045     }
1046 
1047     /***
1048      * Create the next {@link TransactionLog} and add it to the list of
1049      * managed transaction logs.
1050      * <p>
1051      * The method will throw ResourceManagerException if there is a
1052      * problem completing the request.
1053      *
1054      * @throws ResourceManagerException
1055      */
1056     protected TransactionLog createNextTransactionLog()
1057         throws ResourceManagerException {
1058         TransactionLog newlog = null;
1059 
1060         synchronized (_logs) {
1061             try {
1062                 // get the last log number
1063                 long last = 1;
1064                 if (!_logs.isEmpty()) {
1065                     last = getSequenceNumber(((TransactionLog) _logs.last()).getName());
1066                 }
1067 
1068                 // now that we have the last log number, increment it and use
1069                 // it to build the name of the next log file.
1070                 String name = _logDirectory + System.getProperty("file.separator") +
1071                     RM_LOGFILE_PREFIX + Long.toString(++last) + RM_LOGFILE_EXTENSION;
1072 
1073                 // create a transaction log and add it to the collection
1074                 newlog = new TransactionLog(name, true);
1075                 _logs.add(newlog);
1076             } catch (TransactionLogException exception) {
1077                 throw new ResourceManagerException(
1078                     "Error in createNextTransactionLog " + exception);
1079             }
1080         }
1081 
1082         return newlog;
1083     }
1084 
1085     /***
1086      * Build a list of all log files in the specified log directory
1087      *
1088      * @throws IllegalArgumentException - if the directory does not exist.
1089      */
1090     protected void buildLogFileList() {
1091         File dir = new File(_logDirectory);
1092         if ((!dir.exists()) ||
1093             (!dir.isDirectory())) {
1094             throw new IllegalArgumentException(_logDirectory +
1095                 " is not a directory");
1096         }
1097 
1098         try {
1099             File[] list = dir.listFiles(new FilenameFilter() {
1100 
1101                 // implementation of FilenameFilter.accept
1102                 public boolean accept(File dir, String name) {
1103                     boolean result = false;
1104 
1105                     if ((name.startsWith(RM_LOGFILE_PREFIX)) &&
1106                         (name.endsWith(RM_LOGFILE_EXTENSION))) {
1107                         result = true;
1108                     }
1109 
1110                     return result;
1111                 }
1112             });
1113 
1114             // add the files to the list
1115             synchronized (_logs) {
1116                 for (int index = 0; index < list.length; index++) {
1117                     _logs.add(new TransactionLog(list[index].getPath(), false));
1118                 }
1119             }
1120         } catch (Exception exception) {
1121             // replace this with the exception strategy
1122             exception.printStackTrace();
1123         }
1124 
1125     }
1126 
1127     /***
1128      * This method will process all the transaction logs, in the log diretory
1129      * and call recover on each of them.
1130      *
1131      * @throws ResourceManagerException - if there is a problem recovering
1132      */
1133     private synchronized void recover()
1134         throws ResourceManagerException {
1135         try {
1136             if (!_logs.isEmpty()) {
1137                 Iterator iter = _logs.iterator();
1138                 while (iter.hasNext()) {
1139                     TransactionLog log = (TransactionLog) iter.next();
1140                     HashMap records = log.recover();
1141                 }
1142             }
1143         } catch (Exception exception) {
1144             throw new ResourceManagerException("Error in recover " +
1145                 exception.toString());
1146         }
1147     }
1148 
1149     /***
1150      * Retrieve the transaction log for the specified transaction id
1151      *
1152      * @param txid - the transaction identity
1153      * @return TransactionLog
1154      * @throws TransactionLogException - if there is tx log exception
1155      * @throws ResourceManagerException - if there is a resource problem.
1156      */
1157     private TransactionLog getTransactionLog(ExternalXid txid)
1158         throws TransactionLogException, ResourceManagerException {
1159         TransactionLog log = (TransactionLog) _tridToLogCache.get(txid);
1160         if (log == null) {
1161             log = getCurrentTransactionLog();
1162             addTridLogEntry(txid, log);
1163         }
1164 
1165         return log;
1166     }
1167 
1168     /***
1169      * Get the current transaction log. It will check the last transaction
1170      * log opened by the resource manager and determine whether there is
1171      * space enough to process another transaction.
1172      * <p>
1173      * If there is space enough then it will return that transaction,
1174      * otherwise it will create a new transaction log for the resource
1175      *
1176      * @return TransactionLog - the transaction log to use
1177      * @throws ResourceManagerException
1178      * @throws TransactionLogException
1179      */
1180     private TransactionLog getCurrentTransactionLog()
1181         throws TransactionLogException, ResourceManagerException {
1182         TransactionLog log = null;
1183 
1184         synchronized (_logs) {
1185             if (_logs.size() > 0) {
1186                 log = (TransactionLog) _logs.last();
1187             }
1188 
1189             if ((log == null) ||
1190                 (log.size() > _logFileSize)) {
1191                 log = createNextTransactionLog();
1192             }
1193         }
1194 
1195         return log;
1196     }
1197 
1198     /***
1199      * Add an entry to the trid log cache table for the specified trid and
1200      * transaction log mapping.
1201      *
1202      * @param trid - the transaction identifier
1203      * @param log - the transaction log
1204      */
1205     private void addTridLogEntry(ExternalXid trid, TransactionLog log) {
1206         synchronized (_cacheLock) {
1207             // one to one relationship
1208             _tridToLogCache.put(trid, log);
1209 
1210             // one to many relationship
1211             Vector trids = (Vector) _logToTridCache.get(log);
1212             if (trids == null) {
1213                 trids = new Vector();
1214                 _logToTridCache.put(log, trids);
1215             }
1216             trids.addElement(trid);
1217         }
1218     }
1219 
1220     /***
1221      * Check whether the specified log is also the current log
1222      *
1223      * @param log - the log to check
1224      * @return boolean - true if it is
1225      */
1226     private boolean isCurrentTransactionLog(TransactionLog log) {
1227         boolean result = false;
1228 
1229         if (_logs.size() > 0) {
1230             result = log.equals(_logs.last());
1231         }
1232 
1233         return result;
1234     }
1235 
1236     /***
1237      * Remove an entry to the trid log cache table for the specified trid and
1238      * transaction log mapping.
1239      *
1240      * @param trid - the transaction identifier
1241      * @param log - the transaction log
1242      */
1243     private void removeTridLogEntry(ExternalXid trid, TransactionLog log) {
1244         synchronized (_cacheLock) {
1245 
1246             // one to one relationship
1247             _tridToLogCache.remove(trid);
1248 
1249             // one to many relationship
1250             Vector trids = (Vector) _logToTridCache.get(log);
1251             if (trids != null) {
1252                 trids.remove(trid);
1253                 if (trids.size() == 0) {
1254                     _logToTridCache.remove(log);
1255                 }
1256             }
1257         }
1258     }
1259 
1260     /***
1261      * Return an arrya of records, both state and date, for the specified
1262      * global transaction
1263      *
1264      * @param xid - the global transaction id
1265      * @param rid - the resource id
1266      * @return Object[] - array of records
1267      */
1268     protected Object[] getTransactionRecords(ExternalXid xid, String rid) {
1269         Object[] records;
1270 
1271         // we also want to add this to the transaction data for that
1272         // txid
1273         LinkedList list = (LinkedList) _activeTransactions.get(xid);
1274         if (list != null) {
1275             records = list.toArray();
1276         } else {
1277             records = new Object[0];
1278         }
1279 
1280         return records;
1281     }
1282 
1283 
1284     /***
1285      * Return the sequence number of the file
1286      * files are associated with a unique number
1287      *
1288      * @param name - the file name to investigate
1289      * @return long - the transaction log number
1290      * @throws ResourceManagerException
1291      */
1292     protected long getSequenceNumber(String name)
1293         throws ResourceManagerException {
1294         int start = name.indexOf(RM_LOGFILE_PREFIX) +
1295             RM_LOGFILE_PREFIX.length();
1296         int end = name.indexOf(RM_LOGFILE_EXTENSION);
1297 
1298         // the number must be between the start and end positions
1299         try {
1300             return Long.parseLong(name.substring(start, end));
1301         } catch (NumberFormatException exception) {
1302             throw new ResourceManagerException(
1303                 "Invalid name assigned to resource manager file " + name);
1304         }
1305     }
1306 
1307     /***
1308      * Return true if the specified transaction is active
1309      *
1310      * @param xid - the gobal transaction identifier
1311      */
1312     private synchronized boolean isTransactionActive(ExternalXid xid) {
1313         return _activeTransactions.containsKey(xid);
1314     }
1315 
1316     /***
1317      * Dump the specified records to the screen
1318      */
1319     private void dumpRecovered(HashMap records) {
1320         Iterator iter = records.keySet().iterator();
1321         while (iter.hasNext()) {
1322             ExternalXid txid = (ExternalXid) iter.next();
1323             LinkedList list = (LinkedList) records.get(txid);
1324             Iterator oiter = list.iterator();
1325             while (oiter.hasNext()) {
1326                 Object object = oiter.next();
1327                 if (object instanceof StateTransactionLogEntry) {
1328                     System.err.println("Recovered [" + txid + "] Class " +
1329                         object.getClass().getName() + " [" +
1330                         ((StateTransactionLogEntry) object).getState().toString() + "]");
1331                 } else {
1332                     System.err.println("Recovered [" + txid + "] Class " +
1333                         object.getClass().getName());
1334                 }
1335             }
1336         }
1337     }
1338 
1339 
1340     /***
1341      * Helper and type-safe method for creating a wrapper object for published
1342      * messages
1343      *
1344      * @param message - the message published
1345      * @return PublishedMessageWrapper
1346      */
1347     private PublishedMessageWrapper createPublishedMessageWrapper(
1348         MessageImpl message) {
1349         return new PublishedMessageWrapper(message);
1350     }
1351 
1352     /***
1353      * Helper and type-safe method for creating a wrapper object for received
1354      * messages
1355      *
1356      * @param id - the identity of the consumer receiving the message
1357      * @param handle - the handle of the message received
1358      * @return ReceivedMessageWrapper
1359      */
1360     private ReceivedMessageWrapper createReceivedMessageWrapper(
1361         String id, MessageHandle handle) {
1362         return new ReceivedMessageWrapper(id, handle);
1363     }
1364 
1365     /***
1366      * This functor is used by various collections to order the transaction log
1367      * files created by this resource manager. The resource manager will create
1368      * log files with sequentially increasing numbers (i.e xxx01.log, xxx2.log
1369      */
1370     private class TranLogFileComparator
1371         implements Comparator {
1372 
1373         // implementation of Comparator.comapre
1374         public int compare(Object o1, Object o2) {
1375             int result = -1;
1376 
1377             try {
1378                 if ((o1 instanceof TransactionLog) &&
1379                     (o2 instanceof TransactionLog)) {
1380                     long seq1 = getSequenceNumber(((TransactionLog) o1).getName());
1381                     long seq2 = getSequenceNumber(((TransactionLog) o2).getName());
1382 
1383                     if (seq1 > seq2) {
1384                         result = 1;
1385                     } else if (seq1 < seq2) {
1386                         result = -1;
1387                     } else {
1388                         result = 0;
1389                     }
1390                 } else {
1391                     throw new ClassCastException("o1 = " +
1392                         o1.getClass().getName() + " and o2 = " +
1393                         o2.getClass().getName());
1394                 }
1395             } catch (Exception exception) {
1396                 throw new RuntimeException("Error in ResourceManager.compare " +
1397                     exception.toString());
1398             }
1399 
1400             return result;
1401         }
1402 
1403         // implementation of Comparator.equals
1404         public boolean equals(Object obj) {
1405             if (obj instanceof TranLogFileComparator) {
1406                 return true;
1407             }
1408 
1409             return false;
1410         }
1411     }
1412 
1413 
1414     /***
1415      * This private member class is used to wrap the transactional object,
1416      * which for this particular resource manager is a published message or
1417      * a received message handle.
1418      */
1419     abstract private class TransactionalObjectWrapper {
1420 
1421         /***
1422          * The transactional object instance
1423          */
1424         private Object _object;
1425 
1426         /***
1427          * Create an instance of the wrapper using the type and the object
1428          *
1429          * @param object - the associated object
1430          */
1431         public TransactionalObjectWrapper(Object object) {
1432             _object = object;
1433         }
1434 
1435         /***
1436          * Check whether the wrapper contains a published message. Note that a
1437          * published message has a {@link MessageImpl} a the transactional
1438          * object.
1439          *
1440          * @return boolean - true if it is
1441          */
1442         public boolean isPublishedMessage() {
1443             return this instanceof PublishedMessageWrapper;
1444         }
1445 
1446         /***
1447          * Check whether the wrapper contains a received message handle. Note
1448          * that a received message contains a {@link MessageHandle} as the
1449          * transactional object.
1450          *
1451          * @return boolean - true if it does
1452          */
1453         public boolean isReceivedMessage() {
1454             return this instanceof ReceivedMessageWrapper;
1455         }
1456 
1457         /***
1458          * Return the transaction object
1459          *
1460          * @return Object
1461          */
1462         public Object getObject() {
1463             return _object;
1464         }
1465 
1466     }
1467 
1468 
1469     /***
1470      * This private member class is used to wrap a published message
1471      */
1472     private class PublishedMessageWrapper extends TransactionalObjectWrapper {
1473 
1474         /***
1475          * Create an instance of the wrapper using the specified message
1476          *
1477          * @param message - the message to wrap
1478          */
1479         public PublishedMessageWrapper(MessageImpl message) {
1480             super(message);
1481         }
1482 
1483         /***
1484          * Return an instance of the message object
1485          *
1486          * @return MessageImpl
1487          */
1488         public MessageImpl getMessage() {
1489             return (MessageImpl) super.getObject();
1490         }
1491     }
1492 
1493 
1494     /***
1495      * This private member class is used to wrap a received message
1496      */
1497     private class ReceivedMessageWrapper extends TransactionalObjectWrapper {
1498 
1499         /***
1500          * Caches the id of the {@link ConsumerEndpoint} that is processed
1501          * this handle
1502          */
1503         private String _consumerId;
1504 
1505         /***
1506          * Create an instance of the wrapper using the specified message
1507          *
1508          * @param id - the identity of the consumer endpoint
1509          * @param handle - the handle to the message
1510          */
1511         public ReceivedMessageWrapper(String id, MessageHandle handle) {
1512             super(handle);
1513             _consumerId = id;
1514         }
1515 
1516         /***
1517          * Return a reference to the  consumer identity
1518          *
1519          * @return String
1520          */
1521         public String getConsumerId() {
1522             return _consumerId;
1523         }
1524 
1525         /***
1526          * Return an instance of the message handle
1527          *
1528          * @return MessageHandle
1529          */
1530         public MessageHandle getMessageHandle() {
1531             return (MessageHandle) super.getObject();
1532         }
1533     }
1534 
1535 } //-- ResourceManager