1 /***
2 * Redistribution and use of this software and associated documentation
3 * ("Software"), with or without modification, are permitted provided
4 * that the following conditions are met:
5 *
6 * 1. Redistributions of source code must retain copyright
7 * statements and notices. Redistributions must also contain a
8 * copy of this document.
9 *
10 * 2. Redistributions in binary form must reproduce the
11 * above copyright notice, this list of conditions and the
12 * following disclaimer in the documentation and/or other
13 * materials provided with the distribution.
14 *
15 * 3. The name "Exolab" must not be used to endorse or promote
16 * products derived from this Software without prior written
17 * permission of Exoffice Technologies. For written permission,
18 * please contact info@exolab.org.
19 *
20 * 4. Products derived from this Software may not be called "Exolab"
21 * nor may "Exolab" appear in their names without prior written
22 * permission of Exoffice Technologies. Exolab is a registered
23 * trademark of Exoffice Technologies.
24 *
25 * 5. Due credit should be given to the Exolab Project
26 * (http://www.exolab.org/).
27 *
28 * THIS SOFTWARE IS PROVIDED BY EXOFFICE TECHNOLOGIES AND CONTRIBUTORS
29 * ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
30 * NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
31 * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
32 * EXOFFICE TECHNOLOGIES OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
33 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
34 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
35 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
37 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
38 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
39 * OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * Copyright 2000-2004 (C) Exoffice Technologies Inc. All Rights Reserved.
42 *
43 * $Id: ResourceManager.java,v 1.3 2005/08/30 07:26:49 tanderson Exp $
44 */
45 package org.exolab.jms.messagemgr;
46
47 import java.io.File;
48 import java.io.FilenameFilter;
49 import java.util.Comparator;
50 import java.util.HashMap;
51 import java.util.Iterator;
52 import java.util.LinkedList;
53 import java.util.TreeSet;
54 import java.util.Vector;
55 import javax.jms.JMSException;
56 import javax.transaction.xa.XAException;
57 import javax.transaction.xa.XAResource;
58 import javax.transaction.xa.Xid;
59
60 import org.apache.commons.logging.Log;
61 import org.apache.commons.logging.LogFactory;
62
63 import org.exolab.jms.client.JmsDestination;
64 import org.exolab.jms.common.uuid.UUID;
65 import org.exolab.jms.message.MessageImpl;
66 import org.exolab.jms.persistence.DatabaseService;
67 import org.exolab.jms.persistence.PersistenceException;
68 import org.exolab.jms.service.Service;
69 import org.exolab.jms.tranlog.DataTransactionLogEntry;
70 import org.exolab.jms.tranlog.ExternalXid;
71 import org.exolab.jms.tranlog.StateTransactionLogEntry;
72 import org.exolab.jms.tranlog.TransactionLog;
73 import org.exolab.jms.tranlog.TransactionLogException;
74 import org.exolab.jms.tranlog.TransactionState;
75
76
77 /***
78 * The resource manager provides XA support for the JMS Server.
79 * <p/>
80 * The resource manager is responsible for managing the various transaction
81 * identifiers and managing the association between transaction ids and
82 * connections.
83 * <p/>
84 * The resource manager will store the global XID's and their state in the
85 * database for recovery purposes.
86 * <p/>
87 * Messages that arrive, and are associated with an XID are not processed
88 * through the {@link MessageMgr}. Instead they are routed to this resource
89 * managers where they are cached until the associated XID is committed or
90 * rolled back. If the transaction is successfully committed, through the 2PC
91 * protocol the messages will pass through the system.
92 * <p/>
93 * Similarly, messages that are sent to consumers, either synchronously or
94 * asynchronously are also cached by the resource manager until the global
95 * transaction completes.
96 * <p/>
97 * On startup the resource manager will read all incomplete transactions, which
98 * are incompleted into memory. It will then process trnasactions that have
99 * timed out.
100 * <p/>
101 * The transaction manager will call the {@link #recover} method and obtain a
102 * list of incomplete transaction for the purpose of completing them where
103 * possible.
104 *
105 * @author <a href="mailto:jima@intalio.com">Jim Alateras</a>
106 * @version $Revision: 1.3 $ $Date: 2005/08/30 07:26:49 $
107 */
108 public class ResourceManager extends Service {
109
110 /***
111 * The extension for all transaction log files
112 */
113 public final static String RM_LOGFILE_EXTENSION = ".log";
114
115 /***
116 * This is used to indicate the garbage collection has been disabled and
117 * that the client will take responsibility for all aspects of log file
118 * management. This is useful in situations where the client wants to
119 * archive the transaction log files
120 * <p/>
121 * This is the default mode for GC.
122 */
123 public static final int GC_DISABLED = 0;
124
125 /***
126 * Synchronous gabrage collection is used to remove processed log files when
127 * the last trnasaction, in that log file, has been successfully processed.
128 * This is more efficient means since the log files does not need to be
129 * scanned asynchronously to determine whether all the transactions have
130 * been processed.
131 */
132 public static final int GC_SYNCHRONOUS = 1;
133
134 /***
135 * Asynchronous garbage collection is used to remove processed log files
136 * asynchronous (i.e in a different thread context). This is rather
137 * expensive since it must manually scan each log file and determine whether
138 * all transactions, in that file, have been closed. If this is the case
139 * then it will remove the log file.
140 */
141 public static final int GC_ASYNCHRONOUS = 2;
142
143 /***
144 * The message manager.
145 */
146 private final MessageManager _messages;
147
148 /***
149 * The destination manager.
150 */
151 private final DestinationManager _destinations;
152
153 /***
154 * This is the maximum size, in bytes, of each transaction log file. The
155 * value can be overriden by the user
156 */
157 private int _logFileSize = 1000000;
158
159 /***
160 * Maintains a collection of transaction log files currently in use by this
161 * resource manager
162 */
163 private TreeSet _logs = new TreeSet(new TranLogFileComparator());
164
165 /***
166 * Maintain a mapping between the TRID (transaction id and the log file it
167 * is associated with.
168 */
169 private HashMap _tridToLogCache = new HashMap();
170
171 /***
172 * Maintain a list of open TRIDs for a particular {@link TransactionLog}
173 */
174 private HashMap _logToTridCache = new HashMap();
175
176 /***
177 * This attribute is used to synchronize the modifications to the _tridToLog
178 * _logToTrid attributes
179 */
180 private final Object _cacheLock = new Object();
181
182 /***
183 * This maintains a cache of all open transactions and the corresponding
184 * data. The key is the transaction identifier and the object is a
185 * LinkedList transaction entries, which includes both state and data
186 */
187 private HashMap _activeTransactions = new HashMap();
188
189 /***
190 * The directory where the log files are stored. This can be set by the
191 * client
192 */
193 private String _logDirectory = ".";
194
195 /***
196 * This is the number of the last log file created by the ResourceManager
197 */
198 private long _lastLogNumber = 0;
199
200 /***
201 * The expiry time for transaction associated with this resource manager.
202 * This will either be configured or passed in with the transaction context
203 * The value is specified in seconds.
204 */
205 private int _txExpiryTime = 120;
206
207 /***
208 * This attribute caches the garbage collection mode for the resouce
209 * managers. Valid values are specified by the GC_* constants.
210 * <p/>
211 * By default garbage collection is disabled.
212 */
213 private int _gcMode = GC_SYNCHRONOUS;
214
215 /***
216 * This is the id associated with this resource...need to work out who or
217 * what sets this.
218 */
219 private String _rid = UUID.next();
220
221 /***
222 * The name of the service
223 */
224 private final static String RM_SERVICE_NAME = "XAResourceManager";
225
226 /***
227 * The prefix used for all transaction log files, which are created and
228 * managed by the {@link TransactionLog}
229 */
230 private final static String RM_LOGFILE_PREFIX = "ojmsrm";
231
232 /***
233 * The logger
234 */
235 private static final Log _log = LogFactory.getLog(ResourceManager.class);
236
237
238 /***
239 * Construct a resource manager using the default directory for its log
240 * files.
241 * <p/>
242 * If the directory does not exist or there is no permisssion to access it,
243 * then throw a ResourceManagerException.
244 *
245 * @param messages the message manager
246 * @param destinations the destination manager
247 * @param database the database service
248 * @throws ResourceManagerException
249 */
250 public ResourceManager(MessageManager messages,
251 DestinationManager destinations,
252 DatabaseService database)
253 throws ResourceManagerException {
254 super(RM_SERVICE_NAME);
255
256 _messages = messages;
257 _destinations = destinations;
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276 }
277
278 /***
279 * Check whether garbage collection has been disabled
280 *
281 * @return boolean - true if gc is disabled
282 */
283 public boolean gcDisabled() {
284 return (_gcMode == GC_DISABLED) ? true : false;
285 }
286
287 /***
288 * Log this published message so that it can be passed through the system
289 * when the associated global transaction commits.
290 *
291 * @param xid - the global transaction identity
292 * @param message - the message published
293 * @throws TransactionLogException - error adding the entry
294 * @throws ResourceManagerException - error getting the trnasaction log
295 * @throws JMSException - if there is an issue with prep'ing the
296 * message
297 */
298 public synchronized void logPublishedMessage(Xid xid, MessageImpl message)
299 throws TransactionLogException, ResourceManagerException,
300 JMSException {
301 _messages.prepare(message);
302 logTransactionData(new ExternalXid(xid), _rid,
303 createPublishedMessageWrapper(message));
304 }
305
306 /***
307 * Log that this message handle was sent to the consumer within the
308 * specified global transaction identity. The message will be acknowledged
309 * when the global transaction commits. Alternatively, if the global
310 * transaction is rolled back the message handle will be returned to the
311 * destination
312 *
313 * @param xid the global transaction identity
314 * @param id the consumer receiving this message
315 * @param handle - the handle of the message received
316 * @throws TransactionLogException - error adding the entry
317 * @throws ResourceManagerException - error getting the transaction log
318 */
319 public synchronized void logReceivedMessage(Xid xid, long id,
320 MessageHandle handle)
321 throws TransactionLogException, ResourceManagerException {
322 logTransactionData(new ExternalXid(xid), _rid,
323 createReceivedMessageWrapper(id, handle));
324 }
325
326 /***
327 * Add an {@link StateTransactionLogEntry} using the specified txid, rid and
328 * state
329 *
330 * @param xid - the transaction identifier
331 * @param state - the transaction log state
332 * @throws TransactionLogException - error adding the entry
333 * @throws ResourceManagerException - error getting the trnasaction log
334 */
335 public synchronized void logTransactionState(Xid xid,
336 TransactionState state)
337 throws TransactionLogException, ResourceManagerException {
338 ExternalXid txid = new ExternalXid(xid);
339 switch (state.getOrd()) {
340 case TransactionState.OPENED_ORD:
341 {
342 TransactionLog log = getCurrentTransactionLog();
343 addTridLogEntry(txid, log);
344 log.logTransactionState(txid, _txExpiryTime * 1000, _rid,
345 state);
346
347
348 _activeTransactions.put(txid, new LinkedList());
349 }
350 break;
351
352 case TransactionState.PREPARED_ORD:
353
354 LinkedList list = (LinkedList) _activeTransactions.get(txid);
355 if (list != null) {
356 list.add(state);
357 } else {
358 throw new ResourceManagerException("Trasaction " + txid +
359 " is not active.");
360 }
361 break;
362
363 case TransactionState.CLOSED_ORD:
364 {
365 TransactionLog log = getTransactionLog(txid);
366 log.logTransactionState(txid, _txExpiryTime * 1000, _rid,
367 state);
368 removeTridLogEntry(txid, log);
369
370
371 synchronized (_cacheLock) {
372 if ((_logToTridCache.get(log) == null) &&
373 (!isCurrentTransactionLog(log))) {
374 log.close();
375
376
377
378 if (_gcMode == GC_SYNCHRONOUS) {
379 try {
380 log.destroy();
381 } catch (TransactionLogException exception) {
382 exception.printStackTrace();
383 }
384 }
385 }
386 }
387
388
389
390 _activeTransactions.remove(txid);
391 }
392 break;
393
394 default:
395 throw new ResourceManagerException("Cannot process tx state " +
396 state);
397 }
398 }
399
400 /***
401 * Add an {@link DataTransactionLogEntry} using the specified txid, rid and
402 * data
403 *
404 * @param txid - the transaction identifier
405 * @param rid - the resource identifier
406 * @throws TransactionLogException - error adding the entry
407 * @throws ResourceManagerException - error getting the trnasaction log
408 */
409 synchronized void logTransactionData(ExternalXid txid, String rid,
410 Object data)
411 throws ResourceManagerException, TransactionLogException {
412 getTransactionLog(txid).logTransactionData(txid, _txExpiryTime * 1000,
413 rid, data);
414
415
416
417 LinkedList list = (LinkedList) _activeTransactions.get(txid);
418 if (list != null) {
419 list.add(data);
420 } else {
421 throw new ResourceManagerException("Trasaction " + txid +
422 " is not active.");
423 }
424 }
425
426 /***
427 * This is the entry point for the garbage collection callback. It scans
428 * through the each transaction log file and determines whether it can be
429 * garbage collected. If it can then it simply destroys the corresponding
430 * TransactionLog.
431 */
432 public void garbageCollect() {
433 try {
434 int gcfiles = 0;
435
436
437 if (_logs.size() == 0) {
438 return;
439 }
440
441 TreeSet copy = null;
442 synchronized (_logs) {
443 copy = new TreeSet(_logs);
444 }
445
446
447
448 copy.remove(_logs.last());
449
450
451 while (copy.size() > 0) {
452 TransactionLog log = (TransactionLog) copy.first();
453 copy.remove(log);
454 if (log.canGarbageCollect()) {
455
456 log.destroy();
457
458
459 synchronized (_logs) {
460 _logs.remove(log);
461 }
462
463
464 ++gcfiles;
465 }
466 }
467
468
469 _log.info("[RMGC] Collected " + gcfiles + " files.");
470 } catch (Exception exception) {
471 exception.printStackTrace();
472 }
473 }
474
475 /***
476 * Ensure that a transaction with the specified xid is currently active. If
477 * this is the case then commit the transaction based onb the value of the
478 * onePhase flag.
479 * <p/>
480 * This will have the effect of passing all messages through
481 *
482 * @param id - the xa transaction identity
483 * @param onePhase - treu if it is a one phase commit
484 * @throws XAException - if there is a problem completing the call
485 */
486 public synchronized void commit(Xid id, boolean onePhase)
487 throws XAException {
488
489 if (id == null) {
490 throw new XAException(XAException.XAER_NOTA);
491 }
492
493
494 ExternalXid xid = new ExternalXid(id);
495
496
497
498 if (!isTransactionActive(xid)) {
499 throw new XAException(XAException.XAER_PROTO);
500 }
501
502
503
504
505
506 try {
507
508
509
510 Object[] records = getTransactionRecords(xid, _rid);
511 for (int index = 0; index < records.length; index++) {
512 if (records[index] instanceof TransactionalObjectWrapper) {
513 TransactionalObjectWrapper wrapper =
514 (TransactionalObjectWrapper) records[index];
515 if (wrapper.isPublishedMessage()) {
516
517 MessageImpl message = (MessageImpl) wrapper.getObject();
518 _messages.add(message);
519
520 } else if (wrapper.isReceivedMessage()) {
521
522
523 MessageHandle handle = ((ReceivedMessageWrapper) (wrapper)).getMessageHandle();
524 handle.destroy();
525 }
526 } else {
527
528 }
529 }
530 } catch (Exception exception) {
531 _log.error(exception, exception);
532 throw new XAException("Failed in ResourceManager.commit : " +
533 exception.toString());
534 } finally {
535
536 try {
537 logTransactionState(xid, TransactionState.CLOSED);
538 } catch (Exception exception) {
539 throw new XAException("Error processing commit : " + exception);
540 }
541 }
542 }
543
544 /***
545 * Ends the work performed on behalf of a transaction branch. The resource
546 * manager disassociates the XA resource from the transaction branch
547 * specified and let the transaction be completedCommits an XA transaction
548 * that is in progress.
549 *
550 * @param id - the xa transaction identity
551 * @param flags - one of TMSUCCESS, TMFAIL, or TMSUSPEND
552 * @throws XAException - if there is a problem completing the call
553 */
554 public synchronized void end(Xid id, int flags)
555 throws XAException {
556
557 if (id == null) {
558 throw new XAException(XAException.XAER_NOTA);
559 }
560
561
562 ExternalXid xid = new ExternalXid(id);
563
564
565 if ((flags != XAResource.TMSUSPEND) ||
566 (flags != XAResource.TMSUCCESS) ||
567 (flags != XAResource.TMFAIL)) {
568 throw new XAException(XAException.XAER_PROTO);
569 }
570
571 switch (flags) {
572 case XAResource.TMFAIL:
573
574 if (!isTransactionActive(xid)) {
575 throw new XAException(XAException.XAER_PROTO);
576 }
577
578
579 rollback(xid);
580 break;
581
582 case XAResource.TMSUSPEND:
583
584 if (!isTransactionActive(xid)) {
585 throw new XAException(XAException.XAER_PROTO);
586 }
587 break;
588
589 case XAResource.TMSUCCESS:
590
591
592
593 if (isTransactionActive(xid)) {
594 throw new XAException(XAException.XAER_PROTO);
595 }
596 break;
597 }
598 }
599
600 /***
601 * Tell the resource manager to forget about a heuristically completed
602 * transaction branch.
603 *
604 * @param id - the xa transaction identity
605 * @throws XAException - if there is a problem completing the call
606 */
607 public synchronized void forget(Xid id)
608 throws XAException {
609
610 if (id == null) {
611 throw new XAException(XAException.XAER_NOTA);
612 }
613
614
615 ExternalXid xid = new ExternalXid(id);
616
617
618 if (!isTransactionActive(xid)) {
619 throw new XAException(XAException.XAER_PROTO);
620 }
621
622
623 rollback(id);
624 }
625
626 /***
627 * Return the transaction timeout for this instance of the resource
628 * manager.
629 *
630 * @return int - the timeout in seconds
631 * @throws XAException - if there is a problem completing the call
632 */
633 public synchronized int getTransactionTimeout()
634 throws XAException {
635 return _txExpiryTime;
636 }
637
638 /***
639 * Ask the resource manager to prepare for a transaction commit of the
640 * transaction specified in xid
641 *
642 * @param xares
643 * @return int - XA_RDONLY or XA_OK
644 * @throws XAException - if there is a problem completing the call
645 */
646 public synchronized boolean isSameRM(XAResource xares)
647 throws XAException {
648 boolean result = false;
649
650 if ((xares == this) ||
651 ((xares instanceof ResourceManager) &&
652 (((ResourceManager) xares)._rid.equals(_rid)))) {
653 result = true;
654 }
655
656 return result;
657 }
658
659 /***
660 * Obtain a list of prepared transaction branches from a resource manager.
661 * The transaction manager calls this method during recovery to obtain the
662 * list of transaction branches that are currently in prepared or
663 * heuristically completed states.
664 *
665 * @throws XAException - if there is a problem completing the call
666 */
667 public synchronized int prepare(Xid id)
668 throws XAException {
669
670 if (id == null) {
671 throw new XAException(XAException.XAER_NOTA);
672 }
673
674
675 ExternalXid xid = new ExternalXid(id);
676
677
678 if (!isTransactionActive(xid)) {
679 throw new XAException(XAException.XAER_PROTO);
680 }
681
682
683
684
685 try {
686 logTransactionState(xid, TransactionState.PREPARED);
687 } catch (Exception exception) {
688 throw new XAException("Error processing prepare : " + exception);
689 }
690
691 return XAResource.XA_OK;
692 }
693
694 /***
695 * Inform the resource manager to roll back work done on behalf of a
696 * transaction branch
697 *
698 * @throws XAException - if there is a problem completing the call
699 */
700 public synchronized Xid[] recover(int flag)
701 throws XAException {
702
703 Xid[] result = new Xid[0];
704
705 if ((flag == XAResource.TMNOFLAGS) ||
706 (flag == XAResource.TMSTARTRSCAN) ||
707 (flag == XAResource.TMENDRSCAN)) {
708 LinkedList xids = new LinkedList();
709 Iterator iter = _activeTransactions.keySet().iterator();
710 while (iter.hasNext()) {
711 Xid xid = (Xid) iter.next();
712 LinkedList list = (LinkedList) _activeTransactions.get(xid);
713 if (list.size() > 1) {
714
715 Object last = list.getLast();
716 if ((last instanceof StateTransactionLogEntry)
717 &&
718 (((StateTransactionLogEntry) last).getState()
719 .isPrepared())) {
720 xids.add(xid);
721 }
722 }
723
724 }
725 result = (Xid[]) xids.toArray();
726 }
727
728 return result;
729 }
730
731 /***
732 * Set the current transaction timeout value for this XAResource instance.
733 *
734 * @throws XAException - if there is a problem completing the call
735 */
736 public synchronized void rollback(Xid id)
737 throws XAException {
738
739 if (id == null) {
740 throw new XAException(XAException.XAER_NOTA);
741 }
742
743
744 ExternalXid xid = new ExternalXid(id);
745
746
747 if (!isTransactionActive(xid)) {
748 throw new XAException(XAException.XAER_PROTO);
749 }
750
751
752
753
754 try {
755
756
757
758 Object[] records = getTransactionRecords(xid, _rid);
759 for (int index = 0; index < records.length; index++) {
760 if (records[index] instanceof TransactionalObjectWrapper) {
761 TransactionalObjectWrapper wrapper =
762 (TransactionalObjectWrapper) records[index];
763 if (wrapper.isPublishedMessage()) {
764
765
766 } else if (wrapper.isReceivedMessage()) {
767 ReceivedMessageWrapper rmsg_wrapper =
768 (ReceivedMessageWrapper) wrapper;
769 MessageHandle handle =
770 (MessageHandle) rmsg_wrapper.getObject();
771 JmsDestination dest = handle.getDestination();
772 DestinationCache cache =
773 _destinations.getDestinationCache(dest);
774 cache.returnMessageHandle(handle);
775 }
776 } else {
777
778 }
779 }
780 } catch (Exception exception) {
781 throw new XAException("Failed in ResourceManager.rollback : " +
782 exception.toString());
783 } finally {
784
785 try {
786 logTransactionState(xid, TransactionState.CLOSED);
787 } catch (Exception exception) {
788 throw new XAException(
789 "Error processing rollback : " + exception);
790 }
791 }
792 }
793
794 /***
795 * Start work on behalf of a transaction branch specified in xid If TMJOIN
796 * is specified, the start is for joining a transaction previously seen by
797 * the resource manager
798 *
799 * @throws XAException - if there is a problem completing the call
800 */
801 public synchronized boolean setTransactionTimeout(int seconds)
802 throws XAException {
803 _txExpiryTime = seconds;
804 return true;
805 }
806
807
808 public synchronized void start(Xid id, int flags)
809 throws XAException {
810
811
812 if (id == null) {
813 throw new XAException(XAException.XAER_NOTA);
814 }
815
816
817 ExternalXid xid = new ExternalXid(id);
818
819
820 if ((flags != XAResource.TMNOFLAGS) ||
821 (flags != XAResource.TMJOIN) ||
822 (flags != XAResource.TMRESUME)) {
823 throw new XAException(XAException.XAER_PROTO);
824 }
825
826 switch (flags) {
827 case XAResource.TMNOFLAGS:
828
829 if (isTransactionActive(xid)) {
830 throw new XAException(XAException.XAER_DUPID);
831 }
832
833
834 try {
835 logTransactionState(xid, TransactionState.OPENED);
836 } catch (Exception exception) {
837 throw new XAException(
838 "Error processing start : " + exception);
839 }
840 break;
841
842 case XAResource.TMJOIN:
843 case XAResource.TMRESUME:
844
845
846 if (!isTransactionActive(xid)) {
847 throw new XAException(XAException.XAER_PROTO);
848 }
849 break;
850 }
851 }
852
853 /***
854 * Return the resource manager identity
855 *
856 * @return the resource manager identity
857 */
858 public String getResourceManagerId() {
859 return _rid;
860 }
861
862 /***
863 * Create the next {@link TransactionLog} and add it to the list of managed
864 * transaction logs.
865 * <p/>
866 * The method will throw ResourceManagerException if there is a problem
867 * completing the request.
868 *
869 * @throws ResourceManagerException
870 */
871 protected TransactionLog createNextTransactionLog()
872 throws ResourceManagerException {
873 TransactionLog newlog = null;
874
875 synchronized (_logs) {
876 try {
877
878 long last = 1;
879 if (!_logs.isEmpty()) {
880 last
881 = getSequenceNumber(
882 ((TransactionLog) _logs.last()).getName());
883 }
884
885
886
887 String name = _logDirectory
888 + System.getProperty("file.separator") +
889 RM_LOGFILE_PREFIX + Long.toString(++last)
890 + RM_LOGFILE_EXTENSION;
891
892
893 newlog = new TransactionLog(name, true);
894 _logs.add(newlog);
895 } catch (TransactionLogException exception) {
896 throw new ResourceManagerException(
897 "Error in createNextTransactionLog " + exception);
898 }
899 }
900
901 return newlog;
902 }
903
904 /***
905 * Build a list of all log files in the specified log directory
906 *
907 * @throws IllegalArgumentException - if the directory does not exist.
908 */
909 protected void buildLogFileList() {
910 File dir = new File(_logDirectory);
911 if ((!dir.exists()) ||
912 (!dir.isDirectory())) {
913 throw new IllegalArgumentException(_logDirectory +
914 " is not a directory");
915 }
916
917 try {
918 File[] list = dir.listFiles(new FilenameFilter() {
919
920
921 public boolean accept(File dir, String name) {
922 boolean result = false;
923
924 if ((name.startsWith(RM_LOGFILE_PREFIX)) &&
925 (name.endsWith(RM_LOGFILE_EXTENSION))) {
926 result = true;
927 }
928
929 return result;
930 }
931 });
932
933
934 synchronized (_logs) {
935 for (int index = 0; index < list.length; index++) {
936 _logs.add(new TransactionLog(list[index].getPath(), false));
937 }
938 }
939 } catch (Exception exception) {
940
941 exception.printStackTrace();
942 }
943
944 }
945
946 /***
947 * This method will process all the transaction logs, in the log diretory
948 * and call recover on each of them.
949 *
950 * @throws ResourceManagerException - if there is a problem recovering
951 */
952 private synchronized void recover()
953 throws ResourceManagerException {
954 try {
955 if (!_logs.isEmpty()) {
956 Iterator iter = _logs.iterator();
957 while (iter.hasNext()) {
958 TransactionLog log = (TransactionLog) iter.next();
959 HashMap records = log.recover();
960 }
961 }
962 } catch (Exception exception) {
963 throw new ResourceManagerException("Error in recover " +
964 exception.toString());
965 }
966 }
967
968 /***
969 * Retrieve the transaction log for the specified transaction id
970 *
971 * @param txid - the transaction identity
972 * @return TransactionLog
973 * @throws TransactionLogException - if there is tx log exception
974 * @throws ResourceManagerException - if there is a resource problem.
975 */
976 private TransactionLog getTransactionLog(ExternalXid txid)
977 throws TransactionLogException, ResourceManagerException {
978 TransactionLog log = (TransactionLog) _tridToLogCache.get(txid);
979 if (log == null) {
980 log = getCurrentTransactionLog();
981 addTridLogEntry(txid, log);
982 }
983
984 return log;
985 }
986
987 /***
988 * Get the current transaction log. It will check the last transaction log
989 * opened by the resource manager and determine whether there is space
990 * enough to process another transaction.
991 * <p/>
992 * If there is space enough then it will return that transaction, otherwise
993 * it will create a new transaction log for the resource
994 *
995 * @return TransactionLog - the transaction log to use
996 * @throws ResourceManagerException
997 * @throws TransactionLogException
998 */
999 private TransactionLog getCurrentTransactionLog()
1000 throws TransactionLogException, ResourceManagerException {
1001 TransactionLog log = null;
1002
1003 synchronized (_logs) {
1004 if (_logs.size() > 0) {
1005 log = (TransactionLog) _logs.last();
1006 }
1007
1008 if ((log == null) ||
1009 (log.size() > _logFileSize)) {
1010 log = createNextTransactionLog();
1011 }
1012 }
1013
1014 return log;
1015 }
1016
1017 /***
1018 * Add an entry to the trid log cache table for the specified trid and
1019 * transaction log mapping.
1020 *
1021 * @param trid - the transaction identifier
1022 * @param log - the transaction log
1023 */
1024 private void addTridLogEntry(ExternalXid trid, TransactionLog log) {
1025 synchronized (_cacheLock) {
1026
1027 _tridToLogCache.put(trid, log);
1028
1029
1030 Vector trids = (Vector) _logToTridCache.get(log);
1031 if (trids == null) {
1032 trids = new Vector();
1033 _logToTridCache.put(log, trids);
1034 }
1035 trids.addElement(trid);
1036 }
1037 }
1038
1039 /***
1040 * Check whether the specified log is also the current log
1041 *
1042 * @param log - the log to check
1043 * @return boolean - true if it is
1044 */
1045 private boolean isCurrentTransactionLog(TransactionLog log) {
1046 boolean result = false;
1047
1048 if (_logs.size() > 0) {
1049 result = log.equals(_logs.last());
1050 }
1051
1052 return result;
1053 }
1054
1055 /***
1056 * Remove an entry to the trid log cache table for the specified trid and
1057 * transaction log mapping.
1058 *
1059 * @param trid - the transaction identifier
1060 * @param log - the transaction log
1061 */
1062 private void removeTridLogEntry(ExternalXid trid, TransactionLog log) {
1063 synchronized (_cacheLock) {
1064
1065
1066 _tridToLogCache.remove(trid);
1067
1068
1069 Vector trids = (Vector) _logToTridCache.get(log);
1070 if (trids != null) {
1071 trids.remove(trid);
1072 if (trids.size() == 0) {
1073 _logToTridCache.remove(log);
1074 }
1075 }
1076 }
1077 }
1078
1079 /***
1080 * Return an arrya of records, both state and date, for the specified global
1081 * transaction
1082 *
1083 * @param xid - the global transaction id
1084 * @param rid - the resource id
1085 * @return Object[] - array of records
1086 */
1087 protected Object[] getTransactionRecords(ExternalXid xid, String rid) {
1088 Object[] records;
1089
1090
1091
1092 LinkedList list = (LinkedList) _activeTransactions.get(xid);
1093 if (list != null) {
1094 records = list.toArray();
1095 } else {
1096 records = new Object[0];
1097 }
1098
1099 return records;
1100 }
1101
1102
1103 /***
1104 * Return the sequence number of the file files are associated with a unique
1105 * number
1106 *
1107 * @param name - the file name to investigate
1108 * @return long - the transaction log number
1109 * @throws ResourceManagerException
1110 */
1111 protected long getSequenceNumber(String name)
1112 throws ResourceManagerException {
1113 int start = name.indexOf(RM_LOGFILE_PREFIX) +
1114 RM_LOGFILE_PREFIX.length();
1115 int end = name.indexOf(RM_LOGFILE_EXTENSION);
1116
1117
1118 try {
1119 return Long.parseLong(name.substring(start, end));
1120 } catch (NumberFormatException exception) {
1121 throw new ResourceManagerException(
1122 "Invalid name assigned to resource manager file " + name);
1123 }
1124 }
1125
1126 /***
1127 * Return true if the specified transaction is active
1128 *
1129 * @param xid - the gobal transaction identifier
1130 */
1131 private synchronized boolean isTransactionActive(ExternalXid xid) {
1132 return _activeTransactions.containsKey(xid);
1133 }
1134
1135 /***
1136 * Dump the specified records to the screen
1137 */
1138 private void dumpRecovered(HashMap records) {
1139 Iterator iter = records.keySet().iterator();
1140 while (iter.hasNext()) {
1141 ExternalXid txid = (ExternalXid) iter.next();
1142 LinkedList list = (LinkedList) records.get(txid);
1143 Iterator oiter = list.iterator();
1144 while (oiter.hasNext()) {
1145 Object object = oiter.next();
1146 if (object instanceof StateTransactionLogEntry) {
1147 System.err.println(
1148 "Recovered [" + txid + "] Class " +
1149 object.getClass().getName()
1150 + " ["
1151 +
1152 ((StateTransactionLogEntry) object).getState()
1153 .toString()
1154 + "]");
1155 } else {
1156 System.err.println("Recovered [" + txid + "] Class " +
1157 object.getClass().getName());
1158 }
1159 }
1160 }
1161 }
1162
1163
1164 /***
1165 * Helper and type-safe method for creating a wrapper object for published
1166 * messages
1167 *
1168 * @param message - the message published
1169 * @return PublishedMessageWrapper
1170 */
1171 private PublishedMessageWrapper createPublishedMessageWrapper(
1172 MessageImpl message) {
1173 return new PublishedMessageWrapper(message);
1174 }
1175
1176 /***
1177 * Helper and type-safe method for creating a wrapper object for received
1178 * messages
1179 *
1180 * @param id - the identity of the consumer receiving the message
1181 * @param handle - the handle of the message received
1182 * @return ReceivedMessageWrapper
1183 */
1184 private ReceivedMessageWrapper createReceivedMessageWrapper(long id,
1185 MessageHandle handle) {
1186 return new ReceivedMessageWrapper(id, handle);
1187 }
1188
1189 /***
1190 * This functor is used by various collections to order the transaction log
1191 * files created by this resource manager. The resource manager will create
1192 * log files with sequentially increasing numbers (i.e xxx01.log, xxx2.log
1193 */
1194 private class TranLogFileComparator
1195 implements Comparator {
1196
1197
1198 public int compare(Object o1, Object o2) {
1199 int result = -1;
1200
1201 try {
1202 if ((o1 instanceof TransactionLog) &&
1203 (o2 instanceof TransactionLog)) {
1204 long seq1 = getSequenceNumber(
1205 ((TransactionLog) o1).getName());
1206 long seq2 = getSequenceNumber(
1207 ((TransactionLog) o2).getName());
1208
1209 if (seq1 > seq2) {
1210 result = 1;
1211 } else if (seq1 < seq2) {
1212 result = -1;
1213 } else {
1214 result = 0;
1215 }
1216 } else {
1217 throw new ClassCastException("o1 = " +
1218 o1.getClass().getName() + " and o2 = " +
1219 o2.getClass().getName());
1220 }
1221 } catch (Exception exception) {
1222 throw new RuntimeException("Error in ResourceManager.compare " +
1223 exception.toString());
1224 }
1225
1226 return result;
1227 }
1228
1229
1230 public boolean equals(Object obj) {
1231 if (obj instanceof TranLogFileComparator) {
1232 return true;
1233 }
1234
1235 return false;
1236 }
1237 }
1238
1239
1240 /***
1241 * This private member class is used to wrap the transactional object, which
1242 * for this particular resource manager is a published message or a received
1243 * message handle.
1244 */
1245 abstract private class TransactionalObjectWrapper {
1246
1247 /***
1248 * The transactional object instance
1249 */
1250 private Object _object;
1251
1252 /***
1253 * Create an instance of the wrapper using the type and the object
1254 *
1255 * @param object - the associated object
1256 */
1257 public TransactionalObjectWrapper(Object object) {
1258 _object = object;
1259 }
1260
1261 /***
1262 * Check whether the wrapper contains a published message. Note that a
1263 * published message has a {@link MessageImpl} a the transactional
1264 * object.
1265 *
1266 * @return boolean - true if it is
1267 */
1268 public boolean isPublishedMessage() {
1269 return this instanceof PublishedMessageWrapper;
1270 }
1271
1272 /***
1273 * Check whether the wrapper contains a received message handle. Note
1274 * that a received message contains a {@link MessageHandle} as the
1275 * transactional object.
1276 *
1277 * @return boolean - true if it does
1278 */
1279 public boolean isReceivedMessage() {
1280 return this instanceof ReceivedMessageWrapper;
1281 }
1282
1283 /***
1284 * Return the transaction object
1285 *
1286 * @return Object
1287 */
1288 public Object getObject() {
1289 return _object;
1290 }
1291
1292 }
1293
1294
1295 /***
1296 * This private member class is used to wrap a published message
1297 */
1298 private class PublishedMessageWrapper extends TransactionalObjectWrapper {
1299
1300 /***
1301 * Create an instance of the wrapper using the specified message
1302 *
1303 * @param message - the message to wrap
1304 */
1305 public PublishedMessageWrapper(MessageImpl message) {
1306 super(message);
1307 }
1308
1309 /***
1310 * Return an instance of the message object
1311 *
1312 * @return MessageImpl
1313 */
1314 public MessageImpl getMessage() {
1315 return (MessageImpl) super.getObject();
1316 }
1317 }
1318
1319
1320 /***
1321 * This private member class is used to wrap a received message
1322 */
1323 private class ReceivedMessageWrapper extends TransactionalObjectWrapper {
1324
1325 /***
1326 * Caches the id of the {@link ConsumerEndpoint} that is processed this
1327 * handle
1328 */
1329 private long _consumerId;
1330
1331 /***
1332 * Create an instance of the wrapper using the specified message
1333 *
1334 * @param id - the identity of the consumer endpoint
1335 * @param handle - the handle to the message
1336 */
1337 public ReceivedMessageWrapper(long id, MessageHandle handle) {
1338 super(handle);
1339 _consumerId = id;
1340 }
1341
1342 /***
1343 * Return a reference to the consumer identity
1344 *
1345 * @return String
1346 */
1347 public long getConsumerId() {
1348 return _consumerId;
1349 }
1350
1351 /***
1352 * Return an instance of the message handle
1353 *
1354 * @return MessageHandle
1355 */
1356 public MessageHandle getMessageHandle() {
1357 return (MessageHandle) super.getObject();
1358 }
1359 }
1360
1361 }