View Javadoc

1   /***
2    * Redistribution and use of this software and associated documentation
3    * ("Software"), with or without modification, are permitted provided
4    * that the following conditions are met:
5    *
6    * 1. Redistributions of source code must retain copyright
7    *    statements and notices.  Redistributions must also contain a
8    *    copy of this document.
9    *
10   * 2. Redistributions in binary form must reproduce the
11   *    above copyright notice, this list of conditions and the
12   *    following disclaimer in the documentation and/or other
13   *    materials provided with the distribution.
14   *
15   * 3. The name "Exolab" must not be used to endorse or promote
16   *    products derived from this Software without prior written
17   *    permission of Exoffice Technologies.  For written permission,
18   *    please contact info@exolab.org.
19   *
20   * 4. Products derived from this Software may not be called "Exolab"
21   *    nor may "Exolab" appear in their names without prior written
22   *    permission of Exoffice Technologies. Exolab is a registered
23   *    trademark of Exoffice Technologies.
24   *
25   * 5. Due credit should be given to the Exolab Project
26   *    (http://www.exolab.org/).
27   *
28   * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29   * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30   * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31   * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL
32   * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35   * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36   * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39   * OF THE POSSIBILITY OF SUCH DAMAGE.
40   *
41   * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42   *
43   * $Id: ResourceManager.java,v 1.3 2005/08/30 07:26:49 tanderson Exp $
44   */
45  package org.exolab.jms.messagemgr;
46  
47  import java.io.File;
48  import java.io.FilenameFilter;
49  import java.util.Comparator;
50  import java.util.HashMap;
51  import java.util.Iterator;
52  import java.util.LinkedList;
53  import java.util.TreeSet;
54  import java.util.Vector;
55  import javax.jms.JMSException;
56  import javax.transaction.xa.XAException;
57  import javax.transaction.xa.XAResource;
58  import javax.transaction.xa.Xid;
59  
60  import org.apache.commons.logging.Log;
61  import org.apache.commons.logging.LogFactory;
62  
63  import org.exolab.jms.client.JmsDestination;
64  import org.exolab.jms.common.uuid.UUID;
65  import org.exolab.jms.message.MessageImpl;
66  import org.exolab.jms.persistence.DatabaseService;
67  import org.exolab.jms.persistence.PersistenceException;
68  import org.exolab.jms.service.Service;
69  import org.exolab.jms.tranlog.DataTransactionLogEntry;
70  import org.exolab.jms.tranlog.ExternalXid;
71  import org.exolab.jms.tranlog.StateTransactionLogEntry;
72  import org.exolab.jms.tranlog.TransactionLog;
73  import org.exolab.jms.tranlog.TransactionLogException;
74  import org.exolab.jms.tranlog.TransactionState;
75  
76  
77  /***
78   * The resource manager provides XA support for the JMS Server.
79   * <p/>
80   * The resource manager is responsible for managing the various transaction
81   * identifiers and managing the association between transaction ids and
82   * connections.
83   * <p/>
84   * The resource manager will store the global XID's and their state in the
85   * database for recovery purposes.
86   * <p/>
87   * Messages that arrive, and are associated with an XID are not processed
88   * through the {@link MessageMgr}. Instead they are routed to this resource
89   * managers where they are cached until the associated XID is committed or
90   * rolled back. If the transaction is successfully committed, through the 2PC
91   * protocol the messages will pass through the system.
92   * <p/>
93   * Similarly, messages that are sent to consumers, either synchronously or
94   * asynchronously are also cached by the resource manager until the global
95   * transaction completes.
96   * <p/>
97   * On startup the resource manager will read all incomplete transactions, which
98   * are incompleted into memory. It will then process trnasactions that have
99   * timed out.
100  * <p/>
101  * The transaction manager will call the {@link #recover} method  and obtain a
102  * list of incomplete transaction for the purpose of completing them where
103  * possible.
104  *
105  * @author <a href="mailto:jima@intalio.com">Jim Alateras</a>
106  * @version $Revision: 1.3 $ $Date: 2005/08/30 07:26:49 $
107  */
108 public class ResourceManager extends Service {
109 
110     /***
111      * The extension for all transaction log files
112      */
113     public final static String RM_LOGFILE_EXTENSION = ".log";
114 
115     /***
116      * This is used to indicate the garbage collection has been disabled and
117      * that the client will take responsibility for all aspects of log file
118      * management. This is useful in situations where the client wants to
119      * archive the transaction log files
120      * <p/>
121      * This is the default mode for GC.
122      */
123     public static final int GC_DISABLED = 0;
124 
125     /***
126      * Synchronous gabrage collection is used to remove processed log files when
127      * the last trnasaction, in that log file, has been successfully processed.
128      * This is more efficient means since the log files does not need to be
129      * scanned asynchronously to determine whether all the transactions have
130      * been processed.
131      */
132     public static final int GC_SYNCHRONOUS = 1;
133 
134     /***
135      * Asynchronous garbage collection is used to remove processed log files
136      * asynchronous (i.e in a different thread context). This is rather
137      * expensive since it must manually scan each log file and determine whether
138      * all transactions, in that file, have been closed. If this is the case
139      * then it will remove the log file.
140      */
141     public static final int GC_ASYNCHRONOUS = 2;
142 
143     /***
144      * The message manager.
145      */
146     private final MessageManager _messages;
147 
148     /***
149      * The destination manager.
150      */
151     private final DestinationManager _destinations;
152 
153     /***
154      * This is the maximum size, in bytes, of each transaction log file. The
155      * value can be overriden by the user
156      */
157     private int _logFileSize = 1000000;
158 
159     /***
160      * Maintains a collection of transaction log files currently in use by this
161      * resource manager
162      */
163     private TreeSet _logs = new TreeSet(new TranLogFileComparator());
164 
165     /***
166      * Maintain a mapping between the TRID (transaction id and the log file it
167      * is associated with.
168      */
169     private HashMap _tridToLogCache = new HashMap();
170 
171     /***
172      * Maintain a list of open TRIDs for a particular  {@link TransactionLog}
173      */
174     private HashMap _logToTridCache = new HashMap();
175 
176     /***
177      * This attribute is used to synchronize the modifications to the _tridToLog
178      * _logToTrid attributes
179      */
180     private final Object _cacheLock = new Object();
181 
182     /***
183      * This maintains a cache of all open transactions and the corresponding
184      * data. The key is the transaction identifier and the object is a
185      * LinkedList transaction entries, which includes both state and data
186      */
187     private HashMap _activeTransactions = new HashMap();
188 
189     /***
190      * The directory where the log files are stored. This can be set by the
191      * client
192      */
193     private String _logDirectory = ".";
194 
195     /***
196      * This is the number of the last log file created by the ResourceManager
197      */
198     private long _lastLogNumber = 0;
199 
200     /***
201      * The expiry time for transaction associated with this resource manager.
202      * This will either be configured or passed in with the transaction context
203      * The value is specified in seconds.
204      */
205     private int _txExpiryTime = 120;
206 
207     /***
208      * This attribute caches the garbage collection mode for the resouce
209      * managers. Valid values are specified by the GC_* constants.
210      * <p/>
211      * By default garbage collection is disabled.
212      */
213     private int _gcMode = GC_SYNCHRONOUS;
214 
215     /***
216      * This is the id associated with this resource...need to work out who or
217      * what sets this.
218      */
219     private String _rid = UUID.next();
220 
221     /***
222      * The name of the service
223      */
224     private final static String RM_SERVICE_NAME = "XAResourceManager";
225 
226     /***
227      * The prefix used for all transaction log files, which are created and
228      * managed by the {@link TransactionLog}
229      */
230     private final static String RM_LOGFILE_PREFIX = "ojmsrm";
231 
232     /***
233      * The logger
234      */
235     private static final Log _log = LogFactory.getLog(ResourceManager.class);
236 
237 
238     /***
239      * Construct a resource manager using the default directory for its log
240      * files.
241      * <p/>
242      * If the directory does not exist or there is no permisssion to access it,
243      * then throw a ResourceManagerException.
244      *
245      * @param messages     the message manager
246      * @param destinations the destination manager
247      * @param database     the database service
248      * @throws ResourceManagerException
249      */
250     public ResourceManager(MessageManager messages,
251                            DestinationManager destinations,
252                            DatabaseService database)
253             throws ResourceManagerException {
254         super(RM_SERVICE_NAME);
255 
256         _messages = messages;
257         _destinations = destinations;
258         /*
259 
260         final String dir = "./logs";
261         _logDirectory = dir;
262         File file = new File(dir);
263         if ((!file.exists()) ||
264                 (!file.isDirectory())) {
265             throw new ResourceManagerException(dir
266                                                +
267                                                " does not exist or is not a directory");
268         }
269 
270         // build the list of existing log files.
271         buildLogFileList();
272 
273         // recover te list of log files
274         recover();
275         */
276     }
277 
278     /***
279      * Check whether garbage collection has been disabled
280      *
281      * @return boolean - true if gc is disabled
282      */
283     public boolean gcDisabled() {
284         return (_gcMode == GC_DISABLED) ? true : false;
285     }
286 
287     /***
288      * Log this published message so that it can be passed through the system
289      * when the associated global transaction commits.
290      *
291      * @param xid     - the global transaction identity
292      * @param message - the message published
293      * @throws TransactionLogException  - error adding the entry
294      * @throws ResourceManagerException - error getting the trnasaction log
295      * @throws JMSException             - if there is an issue with prep'ing the
296      *                                  message
297      */
298     public synchronized void logPublishedMessage(Xid xid, MessageImpl message)
299             throws TransactionLogException, ResourceManagerException,
300             JMSException {
301         _messages.prepare(message);
302         logTransactionData(new ExternalXid(xid), _rid,
303                            createPublishedMessageWrapper(message));
304     }
305 
306     /***
307      * Log that this message handle was sent to the consumer within the
308      * specified global transaction identity. The message will be acknowledged
309      * when the global transaction commits. Alternatively, if the global
310      * transaction is rolled back the message handle will be returned to the
311      * destination
312      *
313      * @param xid    the global transaction identity
314      * @param id     the consumer receiving this message
315      * @param handle - the handle of the message received
316      * @throws TransactionLogException  - error adding the entry
317      * @throws ResourceManagerException - error getting the transaction log
318      */
319     public synchronized void logReceivedMessage(Xid xid, long id,
320                                                 MessageHandle handle)
321             throws TransactionLogException, ResourceManagerException {
322         logTransactionData(new ExternalXid(xid), _rid,
323                            createReceivedMessageWrapper(id, handle));
324     }
325 
326     /***
327      * Add an {@link StateTransactionLogEntry} using the specified txid, rid and
328      * state
329      *
330      * @param xid   - the transaction identifier
331      * @param state - the transaction log state
332      * @throws TransactionLogException  - error adding the entry
333      * @throws ResourceManagerException - error getting the trnasaction log
334      */
335     public synchronized void logTransactionState(Xid xid,
336                                                  TransactionState state)
337             throws TransactionLogException, ResourceManagerException {
338         ExternalXid txid = new ExternalXid(xid);
339         switch (state.getOrd()) {
340             case TransactionState.OPENED_ORD:
341                 {
342                     TransactionLog log = getCurrentTransactionLog();
343                     addTridLogEntry(txid, log);
344                     log.logTransactionState(txid, _txExpiryTime * 1000, _rid,
345                                             state);
346 
347                     // cache the transaction state
348                     _activeTransactions.put(txid, new LinkedList());
349                 }
350                 break;
351 
352             case TransactionState.PREPARED_ORD:
353                 // cache the transaction state
354                 LinkedList list = (LinkedList) _activeTransactions.get(txid);
355                 if (list != null) {
356                     list.add(state);
357                 } else {
358                     throw new ResourceManagerException("Trasaction " + txid +
359                                                        " is not active.");
360                 }
361                 break;
362 
363             case TransactionState.CLOSED_ORD:
364                 {
365                     TransactionLog log = getTransactionLog(txid);
366                     log.logTransactionState(txid, _txExpiryTime * 1000, _rid,
367                                             state);
368                     removeTridLogEntry(txid, log);
369 
370                     // check whether this log has anymore open transactions
371                     synchronized (_cacheLock) {
372                         if ((_logToTridCache.get(log) == null) &&
373                                 (!isCurrentTransactionLog(log))) {
374                             log.close();
375 
376                             // now check if gc mode is GC_SYNCHRONOUS. If it is
377                             // remove the log file
378                             if (_gcMode == GC_SYNCHRONOUS) {
379                                 try {
380                                     log.destroy();
381                                 } catch (TransactionLogException exception) {
382                                     exception.printStackTrace();
383                                 }
384                             }
385                         }
386                     }
387 
388                     // we also want to remove this entry from the list
389                     // of active transactions
390                     _activeTransactions.remove(txid);
391                 }
392                 break;
393 
394             default:
395                 throw new ResourceManagerException("Cannot process tx state " +
396                                                    state);
397         }
398     }
399 
400     /***
401      * Add an {@link DataTransactionLogEntry} using the specified txid, rid and
402      * data
403      *
404      * @param txid - the transaction identifier
405      * @param rid  - the resource identifier
406      * @throws TransactionLogException  - error adding the entry
407      * @throws ResourceManagerException - error getting the trnasaction log
408      */
409     synchronized void logTransactionData(ExternalXid txid, String rid,
410                                          Object data)
411             throws ResourceManagerException, TransactionLogException {
412         getTransactionLog(txid).logTransactionData(txid, _txExpiryTime * 1000,
413                                                    rid, data);
414 
415         // we also want to add this to the transaction data for that
416         // txid
417         LinkedList list = (LinkedList) _activeTransactions.get(txid);
418         if (list != null) {
419             list.add(data);
420         } else {
421             throw new ResourceManagerException("Trasaction " + txid +
422                                                " is not active.");
423         }
424     }
425 
426     /***
427      * This is the entry point for the garbage collection callback. It scans
428      * through the each transaction log file and determines whether it can be
429      * garbage collected. If it can then it simply destroys the corresponding
430      * TransactionLog.
431      */
432     public void garbageCollect() {
433         try {
434             int gcfiles = 0;
435 
436             // if there are no transaction log files then return
437             if (_logs.size() == 0) {
438                 return;
439             }
440 
441             TreeSet copy = null;
442             synchronized (_logs) {
443                 copy = new TreeSet(_logs);
444             }
445 
446             // remove the current log file, since this is likely to be the
447             // current log file
448             copy.remove(_logs.last());
449 
450             // process each of the remaining log files
451             while (copy.size() > 0) {
452                 TransactionLog log = (TransactionLog) copy.first();
453                 copy.remove(log);
454                 if (log.canGarbageCollect()) {
455                     // destroy the log
456                     log.destroy();
457 
458                     // remove it from the log cache
459                     synchronized (_logs) {
460                         _logs.remove(log);
461                     }
462 
463                     // increment the number of garbafe collected files
464                     ++gcfiles;
465                 }
466             }
467 
468             // print an informative message
469             _log.info("[RMGC] Collected " + gcfiles + " files.");
470         } catch (Exception exception) {
471             exception.printStackTrace();
472         }
473     }
474 
475     /***
476      * Ensure that a transaction with the specified xid is currently active. If
477      * this is the case then commit the transaction based onb the value of the
478      * onePhase flag.
479      * <p/>
480      * This will have the effect of passing all messages through
481      *
482      * @param id       - the xa transaction identity
483      * @param onePhase - treu if it is a one phase commit
484      * @throws XAException - if there is a problem completing the call
485      */
486     public synchronized void commit(Xid id, boolean onePhase)
487             throws XAException {
488         // check that the xid is not null
489         if (id == null) {
490             throw new XAException(XAException.XAER_NOTA);
491         }
492 
493         // covert to our internal representation of an xid
494         ExternalXid xid = new ExternalXid(id);
495 
496         // check to see that the transaction is active and open. We should
497         // not be allowed to commit a committed transaction.
498         if (!isTransactionActive(xid)) {
499             throw new XAException(XAException.XAER_PROTO);
500         }
501 
502         // process all the messages associated with this global transaction
503         // If a message has been  published then sent it to the message mgr
504         // for processing. If a message has been consumed then remove it
505         // from the list of unconsumed messages.
506         try {
507             // retrieve a list of recrods for the specified global transaction
508             // and process them. Ignore the state records and only process the
509             // data records, which are of type TransacitonalObjectWrapper.
510             Object[] records = getTransactionRecords(xid, _rid);
511             for (int index = 0; index < records.length; index++) {
512                 if (records[index] instanceof TransactionalObjectWrapper) {
513                     TransactionalObjectWrapper wrapper =
514                             (TransactionalObjectWrapper) records[index];
515                     if (wrapper.isPublishedMessage()) {
516                         // send the published message to the message manager
517                         MessageImpl message = (MessageImpl) wrapper.getObject();
518                         _messages.add(message);
519 
520                     } else if (wrapper.isReceivedMessage()) {
521                         // if it is a received message handle then simply
522                         // delete it and mark it as acknowledged
523                         MessageHandle handle = ((ReceivedMessageWrapper) (wrapper)).getMessageHandle();
524                         handle.destroy();
525                     }
526                 } else {
527                     // ignore since it is a state records.
528                 }
529             }
530         } catch (Exception exception) {
531             _log.error(exception, exception);
532             throw new XAException("Failed in ResourceManager.commit : " +
533                                   exception.toString());
534         } finally {
535             // and now mark the transaction as closed
536             try {
537                 logTransactionState(xid, TransactionState.CLOSED);
538             } catch (Exception exception) {
539                 throw new XAException("Error processing commit : " + exception);
540             }
541         }
542     }
543 
544     /***
545      * Ends the work performed on behalf of a transaction branch. The resource
546      * manager disassociates the XA resource from the transaction branch
547      * specified and let the transaction be completedCommits an XA transaction
548      * that is in progress.
549      *
550      * @param id    - the xa transaction identity
551      * @param flags - one of TMSUCCESS, TMFAIL, or TMSUSPEND
552      * @throws XAException - if there is a problem completing the call
553      */
554     public synchronized void end(Xid id, int flags)
555             throws XAException {
556         //check the xid is not null
557         if (id == null) {
558             throw new XAException(XAException.XAER_NOTA);
559         }
560 
561         // covert to our internal representation of an xid
562         ExternalXid xid = new ExternalXid(id);
563 
564         // check that the flags are valid for this method
565         if ((flags != XAResource.TMSUSPEND) ||
566                 (flags != XAResource.TMSUCCESS) ||
567                 (flags != XAResource.TMFAIL)) {
568             throw new XAException(XAException.XAER_PROTO);
569         }
570 
571         switch (flags) {
572             case XAResource.TMFAIL:
573                 // check that the transaction exists
574                 if (!isTransactionActive(xid)) {
575                     throw new XAException(XAException.XAER_PROTO);
576                 }
577 
578                 // do not process that associated data, simply rollback
579                 rollback(xid);
580                 break;
581 
582             case XAResource.TMSUSPEND:
583                 // check that the transaction is opened
584                 if (!isTransactionActive(xid)) {
585                     throw new XAException(XAException.XAER_PROTO);
586                 }
587                 break;
588 
589             case XAResource.TMSUCCESS:
590                 // nothing to do here but check that the resource manager is
591                 // in a consistent state wrt to this xid. The xid should not
592                 // be active if it received the commit, forget etc.
593                 if (isTransactionActive(xid)) {
594                     throw new XAException(XAException.XAER_PROTO);
595                 }
596                 break;
597         }
598     }
599 
600     /***
601      * Tell the resource manager to forget about a heuristically completed
602      * transaction branch.
603      *
604      * @param id - the xa transaction identity
605      * @throws XAException - if there is a problem completing the call
606      */
607     public synchronized void forget(Xid id)
608             throws XAException {
609         //check the xid is not null
610         if (id == null) {
611             throw new XAException(XAException.XAER_NOTA);
612         }
613 
614         // covert to our internal representation of an xid
615         ExternalXid xid = new ExternalXid(id);
616 
617         // check to see that the xid actually exists
618         if (!isTransactionActive(xid)) {
619             throw new XAException(XAException.XAER_PROTO);
620         }
621 
622         // call rollback to complete the work
623         rollback(id);
624     }
625 
626     /***
627      * Return the transaction timeout for this instance of the resource
628      * manager.
629      *
630      * @return int - the timeout in seconds
631      * @throws XAException - if there is a problem completing the call
632      */
633     public synchronized int getTransactionTimeout()
634             throws XAException {
635         return _txExpiryTime;
636     }
637 
638     /***
639      * Ask the resource manager to prepare for a transaction commit of the
640      * transaction specified in xid
641      *
642      * @param xares
643      * @return int - XA_RDONLY or XA_OK
644      * @throws XAException - if there is a problem completing the call
645      */
646     public synchronized boolean isSameRM(XAResource xares)
647             throws XAException {
648         boolean result = false;
649 
650         if ((xares == this) ||
651                 ((xares instanceof ResourceManager) &&
652                 (((ResourceManager) xares)._rid.equals(_rid)))) {
653             result = true;
654         }
655 
656         return result;
657     }
658 
659     /***
660      * Obtain a list of prepared transaction branches from a resource manager.
661      * The transaction manager calls this method during recovery to obtain the
662      * list of transaction branches that are currently in prepared or
663      * heuristically completed states.
664      *
665      * @throws XAException - if there is a problem completing the call
666      */
667     public synchronized int prepare(Xid id)
668             throws XAException {
669         //check the xid is not null
670         if (id == null) {
671             throw new XAException(XAException.XAER_NOTA);
672         }
673 
674         // covert to our internal representation of an xid
675         ExternalXid xid = new ExternalXid(id);
676 
677         // check to see that the xid actually exists
678         if (!isTransactionActive(xid)) {
679             throw new XAException(XAException.XAER_PROTO);
680         }
681 
682         // can a prepare for the same resource occur multiple times
683         // ????
684 
685         try {
686             logTransactionState(xid, TransactionState.PREPARED);
687         } catch (Exception exception) {
688             throw new XAException("Error processing prepare : " + exception);
689         }
690 
691         return XAResource.XA_OK;
692     }
693 
694     /***
695      * Inform the resource manager to roll back work done on behalf of a
696      * transaction branch
697      *
698      * @throws XAException - if there is a problem completing the call
699      */
700     public synchronized Xid[] recover(int flag)
701             throws XAException {
702 
703         Xid[] result = new Xid[0];
704 
705         if ((flag == XAResource.TMNOFLAGS) ||
706                 (flag == XAResource.TMSTARTRSCAN) ||
707                 (flag == XAResource.TMENDRSCAN)) {
708             LinkedList xids = new LinkedList();
709             Iterator iter = _activeTransactions.keySet().iterator();
710             while (iter.hasNext()) {
711                 Xid xid = (Xid) iter.next();
712                 LinkedList list = (LinkedList) _activeTransactions.get(xid);
713                 if (list.size() > 1) {
714                     // need at least a start in the chain.
715                     Object last = list.getLast();
716                     if ((last instanceof StateTransactionLogEntry)
717                             &&
718                             (((StateTransactionLogEntry) last).getState()
719                             .isPrepared())) {
720                         xids.add(xid);
721                     }
722                 }
723 
724             }
725             result = (Xid[]) xids.toArray();
726         }
727 
728         return result;
729     }
730 
731     /***
732      * Set the current transaction timeout value for this XAResource instance.
733      *
734      * @throws XAException - if there is a problem completing the call
735      */
736     public synchronized void rollback(Xid id)
737             throws XAException {
738         //check the xid is not null
739         if (id == null) {
740             throw new XAException(XAException.XAER_NOTA);
741         }
742 
743         // covert to our internal representation of an xid
744         ExternalXid xid = new ExternalXid(id);
745 
746         // check to see that the xid actually exists
747         if (!isTransactionActive(xid)) {
748             throw new XAException(XAException.XAER_PROTO);
749         }
750 
751         // process the data in that transaction. If it was a published message
752         // then drop it. If it was a consumed message then return it back to
753         // the destination.
754         try {
755             // retrieve a list of recrods for the specified global transaction
756             // and process them. Ignore the state records and only process the
757             // data records, which are of type TransacitonalObjectWrapper.
758             Object[] records = getTransactionRecords(xid, _rid);
759             for (int index = 0; index < records.length; index++) {
760                 if (records[index] instanceof TransactionalObjectWrapper) {
761                     TransactionalObjectWrapper wrapper =
762                             (TransactionalObjectWrapper) records[index];
763                     if (wrapper.isPublishedMessage()) {
764                         // we don't need to process these messages since the
765                         // global transaction has been rolled back.
766                     } else if (wrapper.isReceivedMessage()) {
767                         ReceivedMessageWrapper rmsg_wrapper =
768                                 (ReceivedMessageWrapper) wrapper;
769                         MessageHandle handle =
770                                 (MessageHandle) rmsg_wrapper.getObject();
771                         JmsDestination dest = handle.getDestination();
772                         DestinationCache cache =
773                                 _destinations.getDestinationCache(dest);
774                         cache.returnMessageHandle(handle);
775                     }
776                 } else {
777                     // ignore since it is a state records.
778                 }
779             }
780         } catch (Exception exception) {
781             throw new XAException("Failed in ResourceManager.rollback : " +
782                                   exception.toString());
783         } finally {
784             // and now mark the transaction as closed
785             try {
786                 logTransactionState(xid, TransactionState.CLOSED);
787             } catch (Exception exception) {
788                 throw new XAException(
789                         "Error processing rollback : " + exception);
790             }
791         }
792     }
793 
794     /***
795      * Start work on behalf of a transaction branch specified in xid If TMJOIN
796      * is specified, the start is for joining a transaction previously seen by
797      * the resource manager
798      *
799      * @throws XAException - if there is a problem completing the call
800      */
801     public synchronized boolean setTransactionTimeout(int seconds)
802             throws XAException {
803         _txExpiryTime = seconds;
804         return true;
805     }
806 
807     // implementation of XAResource.start
808     public synchronized void start(Xid id, int flags)
809             throws XAException {
810 
811         //check the xid is not null
812         if (id == null) {
813             throw new XAException(XAException.XAER_NOTA);
814         }
815 
816         // covert to our internal representation of an xid
817         ExternalXid xid = new ExternalXid(id);
818 
819         // check that the flags are valid for this method
820         if ((flags != XAResource.TMNOFLAGS) ||
821                 (flags != XAResource.TMJOIN) ||
822                 (flags != XAResource.TMRESUME)) {
823             throw new XAException(XAException.XAER_PROTO);
824         }
825 
826         switch (flags) {
827             case XAResource.TMNOFLAGS:
828                 // check to see that the xid does not already exist
829                 if (isTransactionActive(xid)) {
830                     throw new XAException(XAException.XAER_DUPID);
831                 }
832 
833                 // otherwise log the start of the transaction
834                 try {
835                     logTransactionState(xid, TransactionState.OPENED);
836                 } catch (Exception exception) {
837                     throw new XAException(
838                             "Error processing start : " + exception);
839                 }
840                 break;
841 
842             case XAResource.TMJOIN:
843             case XAResource.TMRESUME:
844                 // joining a transaction previously seen by the resource
845                 // manager
846                 if (!isTransactionActive(xid)) {
847                     throw new XAException(XAException.XAER_PROTO);
848                 }
849                 break;
850         }
851     }
852 
853     /***
854      * Return the resource manager identity
855      *
856      * @return the resource manager identity
857      */
858     public String getResourceManagerId() {
859         return _rid;
860     }
861 
862     /***
863      * Create the next {@link TransactionLog} and add it to the list of managed
864      * transaction logs.
865      * <p/>
866      * The method will throw ResourceManagerException if there is a problem
867      * completing the request.
868      *
869      * @throws ResourceManagerException
870      */
871     protected TransactionLog createNextTransactionLog()
872             throws ResourceManagerException {
873         TransactionLog newlog = null;
874 
875         synchronized (_logs) {
876             try {
877                 // get the last log number
878                 long last = 1;
879                 if (!_logs.isEmpty()) {
880                     last
881                             = getSequenceNumber(
882                                     ((TransactionLog) _logs.last()).getName());
883                 }
884 
885                 // now that we have the last log number, increment it and use
886                 // it to build the name of the next log file.
887                 String name = _logDirectory
888                         + System.getProperty("file.separator") +
889                         RM_LOGFILE_PREFIX + Long.toString(++last)
890                         + RM_LOGFILE_EXTENSION;
891 
892                 // create a transaction log and add it to the collection
893                 newlog = new TransactionLog(name, true);
894                 _logs.add(newlog);
895             } catch (TransactionLogException exception) {
896                 throw new ResourceManagerException(
897                         "Error in createNextTransactionLog " + exception);
898             }
899         }
900 
901         return newlog;
902     }
903 
904     /***
905      * Build a list of all log files in the specified log directory
906      *
907      * @throws IllegalArgumentException - if the directory does not exist.
908      */
909     protected void buildLogFileList() {
910         File dir = new File(_logDirectory);
911         if ((!dir.exists()) ||
912                 (!dir.isDirectory())) {
913             throw new IllegalArgumentException(_logDirectory +
914                                                " is not a directory");
915         }
916 
917         try {
918             File[] list = dir.listFiles(new FilenameFilter() {
919 
920                 // implementation of FilenameFilter.accept
921                 public boolean accept(File dir, String name) {
922                     boolean result = false;
923 
924                     if ((name.startsWith(RM_LOGFILE_PREFIX)) &&
925                             (name.endsWith(RM_LOGFILE_EXTENSION))) {
926                         result = true;
927                     }
928 
929                     return result;
930                 }
931             });
932 
933             // add the files to the list
934             synchronized (_logs) {
935                 for (int index = 0; index < list.length; index++) {
936                     _logs.add(new TransactionLog(list[index].getPath(), false));
937                 }
938             }
939         } catch (Exception exception) {
940             // replace this with the exception strategy
941             exception.printStackTrace();
942         }
943 
944     }
945 
946     /***
947      * This method will process all the transaction logs, in the log diretory
948      * and call recover on each of them.
949      *
950      * @throws ResourceManagerException - if there is a problem recovering
951      */
952     private synchronized void recover()
953             throws ResourceManagerException {
954         try {
955             if (!_logs.isEmpty()) {
956                 Iterator iter = _logs.iterator();
957                 while (iter.hasNext()) {
958                     TransactionLog log = (TransactionLog) iter.next();
959                     HashMap records = log.recover();
960                 }
961             }
962         } catch (Exception exception) {
963             throw new ResourceManagerException("Error in recover " +
964                                                exception.toString());
965         }
966     }
967 
968     /***
969      * Retrieve the transaction log for the specified transaction id
970      *
971      * @param txid - the transaction identity
972      * @return TransactionLog
973      * @throws TransactionLogException  - if there is tx log exception
974      * @throws ResourceManagerException - if there is a resource problem.
975      */
976     private TransactionLog getTransactionLog(ExternalXid txid)
977             throws TransactionLogException, ResourceManagerException {
978         TransactionLog log = (TransactionLog) _tridToLogCache.get(txid);
979         if (log == null) {
980             log = getCurrentTransactionLog();
981             addTridLogEntry(txid, log);
982         }
983 
984         return log;
985     }
986 
987     /***
988      * Get the current transaction log. It will check the last transaction log
989      * opened by the resource manager and determine whether there is space
990      * enough to process another transaction.
991      * <p/>
992      * If there is space enough then it will return that transaction, otherwise
993      * it will create a new transaction log for the resource
994      *
995      * @return TransactionLog - the transaction log to use
996      * @throws ResourceManagerException
997      * @throws TransactionLogException
998      */
999     private TransactionLog getCurrentTransactionLog()
1000             throws TransactionLogException, ResourceManagerException {
1001         TransactionLog log = null;
1002 
1003         synchronized (_logs) {
1004             if (_logs.size() > 0) {
1005                 log = (TransactionLog) _logs.last();
1006             }
1007 
1008             if ((log == null) ||
1009                     (log.size() > _logFileSize)) {
1010                 log = createNextTransactionLog();
1011             }
1012         }
1013 
1014         return log;
1015     }
1016 
1017     /***
1018      * Add an entry to the trid log cache table for the specified trid and
1019      * transaction log mapping.
1020      *
1021      * @param trid - the transaction identifier
1022      * @param log  - the transaction log
1023      */
1024     private void addTridLogEntry(ExternalXid trid, TransactionLog log) {
1025         synchronized (_cacheLock) {
1026             // one to one relationship
1027             _tridToLogCache.put(trid, log);
1028 
1029             // one to many relationship
1030             Vector trids = (Vector) _logToTridCache.get(log);
1031             if (trids == null) {
1032                 trids = new Vector();
1033                 _logToTridCache.put(log, trids);
1034             }
1035             trids.addElement(trid);
1036         }
1037     }
1038 
1039     /***
1040      * Check whether the specified log is also the current log
1041      *
1042      * @param log - the log to check
1043      * @return boolean - true if it is
1044      */
1045     private boolean isCurrentTransactionLog(TransactionLog log) {
1046         boolean result = false;
1047 
1048         if (_logs.size() > 0) {
1049             result = log.equals(_logs.last());
1050         }
1051 
1052         return result;
1053     }
1054 
1055     /***
1056      * Remove an entry to the trid log cache table for the specified trid and
1057      * transaction log mapping.
1058      *
1059      * @param trid - the transaction identifier
1060      * @param log  - the transaction log
1061      */
1062     private void removeTridLogEntry(ExternalXid trid, TransactionLog log) {
1063         synchronized (_cacheLock) {
1064 
1065             // one to one relationship
1066             _tridToLogCache.remove(trid);
1067 
1068             // one to many relationship
1069             Vector trids = (Vector) _logToTridCache.get(log);
1070             if (trids != null) {
1071                 trids.remove(trid);
1072                 if (trids.size() == 0) {
1073                     _logToTridCache.remove(log);
1074                 }
1075             }
1076         }
1077     }
1078 
1079     /***
1080      * Return an arrya of records, both state and date, for the specified global
1081      * transaction
1082      *
1083      * @param xid - the global transaction id
1084      * @param rid - the resource id
1085      * @return Object[] - array of records
1086      */
1087     protected Object[] getTransactionRecords(ExternalXid xid, String rid) {
1088         Object[] records;
1089 
1090         // we also want to add this to the transaction data for that
1091         // txid
1092         LinkedList list = (LinkedList) _activeTransactions.get(xid);
1093         if (list != null) {
1094             records = list.toArray();
1095         } else {
1096             records = new Object[0];
1097         }
1098 
1099         return records;
1100     }
1101 
1102 
1103     /***
1104      * Return the sequence number of the file files are associated with a unique
1105      * number
1106      *
1107      * @param name - the file name to investigate
1108      * @return long - the transaction log number
1109      * @throws ResourceManagerException
1110      */
1111     protected long getSequenceNumber(String name)
1112             throws ResourceManagerException {
1113         int start = name.indexOf(RM_LOGFILE_PREFIX) +
1114                 RM_LOGFILE_PREFIX.length();
1115         int end = name.indexOf(RM_LOGFILE_EXTENSION);
1116 
1117         // the number must be between the start and end positions
1118         try {
1119             return Long.parseLong(name.substring(start, end));
1120         } catch (NumberFormatException exception) {
1121             throw new ResourceManagerException(
1122                     "Invalid name assigned to resource manager file " + name);
1123         }
1124     }
1125 
1126     /***
1127      * Return true if the specified transaction is active
1128      *
1129      * @param xid - the gobal transaction identifier
1130      */
1131     private synchronized boolean isTransactionActive(ExternalXid xid) {
1132         return _activeTransactions.containsKey(xid);
1133     }
1134 
1135     /***
1136      * Dump the specified records to the screen
1137      */
1138     private void dumpRecovered(HashMap records) {
1139         Iterator iter = records.keySet().iterator();
1140         while (iter.hasNext()) {
1141             ExternalXid txid = (ExternalXid) iter.next();
1142             LinkedList list = (LinkedList) records.get(txid);
1143             Iterator oiter = list.iterator();
1144             while (oiter.hasNext()) {
1145                 Object object = oiter.next();
1146                 if (object instanceof StateTransactionLogEntry) {
1147                     System.err.println(
1148                             "Recovered [" + txid + "] Class " +
1149                             object.getClass().getName()
1150                             + " ["
1151                             +
1152                             ((StateTransactionLogEntry) object).getState()
1153                             .toString()
1154                             + "]");
1155                 } else {
1156                     System.err.println("Recovered [" + txid + "] Class " +
1157                                        object.getClass().getName());
1158                 }
1159             }
1160         }
1161     }
1162 
1163 
1164     /***
1165      * Helper and type-safe method for creating a wrapper object for published
1166      * messages
1167      *
1168      * @param message - the message published
1169      * @return PublishedMessageWrapper
1170      */
1171     private PublishedMessageWrapper createPublishedMessageWrapper(
1172             MessageImpl message) {
1173         return new PublishedMessageWrapper(message);
1174     }
1175 
1176     /***
1177      * Helper and type-safe method for creating a wrapper object for received
1178      * messages
1179      *
1180      * @param id     - the identity of the consumer receiving the message
1181      * @param handle - the handle of the message received
1182      * @return ReceivedMessageWrapper
1183      */
1184     private ReceivedMessageWrapper createReceivedMessageWrapper(long id,
1185                                                                 MessageHandle handle) {
1186         return new ReceivedMessageWrapper(id, handle);
1187     }
1188 
1189     /***
1190      * This functor is used by various collections to order the transaction log
1191      * files created by this resource manager. The resource manager will create
1192      * log files with sequentially increasing numbers (i.e xxx01.log, xxx2.log
1193      */
1194     private class TranLogFileComparator
1195             implements Comparator {
1196 
1197         // implementation of Comparator.comapre
1198         public int compare(Object o1, Object o2) {
1199             int result = -1;
1200 
1201             try {
1202                 if ((o1 instanceof TransactionLog) &&
1203                         (o2 instanceof TransactionLog)) {
1204                     long seq1 = getSequenceNumber(
1205                             ((TransactionLog) o1).getName());
1206                     long seq2 = getSequenceNumber(
1207                             ((TransactionLog) o2).getName());
1208 
1209                     if (seq1 > seq2) {
1210                         result = 1;
1211                     } else if (seq1 < seq2) {
1212                         result = -1;
1213                     } else {
1214                         result = 0;
1215                     }
1216                 } else {
1217                     throw new ClassCastException("o1 = " +
1218                                                  o1.getClass().getName() + " and o2 = " +
1219                                                  o2.getClass().getName());
1220                 }
1221             } catch (Exception exception) {
1222                 throw new RuntimeException("Error in ResourceManager.compare " +
1223                                            exception.toString());
1224             }
1225 
1226             return result;
1227         }
1228 
1229         // implementation of Comparator.equals
1230         public boolean equals(Object obj) {
1231             if (obj instanceof TranLogFileComparator) {
1232                 return true;
1233             }
1234 
1235             return false;
1236         }
1237     }
1238 
1239 
1240     /***
1241      * This private member class is used to wrap the transactional object, which
1242      * for this particular resource manager is a published message or a received
1243      * message handle.
1244      */
1245     abstract private class TransactionalObjectWrapper {
1246 
1247         /***
1248          * The transactional object instance
1249          */
1250         private Object _object;
1251 
1252         /***
1253          * Create an instance of the wrapper using the type and the object
1254          *
1255          * @param object - the associated object
1256          */
1257         public TransactionalObjectWrapper(Object object) {
1258             _object = object;
1259         }
1260 
1261         /***
1262          * Check whether the wrapper contains a published message. Note that a
1263          * published message has a {@link MessageImpl} a the transactional
1264          * object.
1265          *
1266          * @return boolean - true if it is
1267          */
1268         public boolean isPublishedMessage() {
1269             return this instanceof PublishedMessageWrapper;
1270         }
1271 
1272         /***
1273          * Check whether the wrapper contains a received message handle. Note
1274          * that a received message contains a {@link MessageHandle} as the
1275          * transactional object.
1276          *
1277          * @return boolean - true if it does
1278          */
1279         public boolean isReceivedMessage() {
1280             return this instanceof ReceivedMessageWrapper;
1281         }
1282 
1283         /***
1284          * Return the transaction object
1285          *
1286          * @return Object
1287          */
1288         public Object getObject() {
1289             return _object;
1290         }
1291 
1292     }
1293 
1294 
1295     /***
1296      * This private member class is used to wrap a published message
1297      */
1298     private class PublishedMessageWrapper extends TransactionalObjectWrapper {
1299 
1300         /***
1301          * Create an instance of the wrapper using the specified message
1302          *
1303          * @param message - the message to wrap
1304          */
1305         public PublishedMessageWrapper(MessageImpl message) {
1306             super(message);
1307         }
1308 
1309         /***
1310          * Return an instance of the message object
1311          *
1312          * @return MessageImpl
1313          */
1314         public MessageImpl getMessage() {
1315             return (MessageImpl) super.getObject();
1316         }
1317     }
1318 
1319 
1320     /***
1321      * This private member class is used to wrap a received message
1322      */
1323     private class ReceivedMessageWrapper extends TransactionalObjectWrapper {
1324 
1325         /***
1326          * Caches the id of the {@link ConsumerEndpoint} that is processed this
1327          * handle
1328          */
1329         private long _consumerId;
1330 
1331         /***
1332          * Create an instance of the wrapper using the specified message
1333          *
1334          * @param id     - the identity of the consumer endpoint
1335          * @param handle - the handle to the message
1336          */
1337         public ReceivedMessageWrapper(long id, MessageHandle handle) {
1338             super(handle);
1339             _consumerId = id;
1340         }
1341 
1342         /***
1343          * Return a reference to the  consumer identity
1344          *
1345          * @return String
1346          */
1347         public long getConsumerId() {
1348             return _consumerId;
1349         }
1350 
1351         /***
1352          * Return an instance of the message handle
1353          *
1354          * @return MessageHandle
1355          */
1356         public MessageHandle getMessageHandle() {
1357             return (MessageHandle) super.getObject();
1358         }
1359     }
1360 
1361 }