001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.geronimo.transaction.manager;
019
020 import java.util.ArrayList;
021 import java.util.HashMap;
022 import java.util.IdentityHashMap;
023 import java.util.Iterator;
024 import java.util.LinkedList;
025 import java.util.List;
026 import java.util.Map;
027 import java.util.Set;
028
029 import javax.transaction.HeuristicMixedException;
030 import javax.transaction.HeuristicRollbackException;
031 import javax.transaction.RollbackException;
032 import javax.transaction.Status;
033 import javax.transaction.Synchronization;
034 import javax.transaction.SystemException;
035 import javax.transaction.Transaction;
036 import javax.transaction.xa.XAException;
037 import javax.transaction.xa.XAResource;
038 import javax.transaction.xa.Xid;
039 import javax.ejb.EJBException;
040
041 import org.apache.commons.logging.Log;
042 import org.apache.commons.logging.LogFactory;
043
044 /**
045 * Basic local transaction with support for multiple resources.
046 *
047 * @version $Rev: 487175 $ $Date: 2006-12-14 03:10:31 -0800 (Thu, 14 Dec 2006) $
048 */
049 public class TransactionImpl implements Transaction {
050 private static final Log log = LogFactory.getLog("Transaction");
051
052 private final XidFactory xidFactory;
053 private final Xid xid;
054 private final TransactionLog txnLog;
055 private final long timeout;
056 private final List syncList = new ArrayList(5);
057 private final LinkedList resourceManagers = new LinkedList();
058 private final IdentityHashMap activeXaResources = new IdentityHashMap(3);
059 private final IdentityHashMap suspendedXaResources = new IdentityHashMap(3);
060 private int status = Status.STATUS_NO_TRANSACTION;
061 private Object logMark;
062
063 private final Map resources = new HashMap();
064 private Synchronization interposedSynchronization;
065 private final Map entityManagers = new HashMap();
066
067 TransactionImpl(XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
068 this(xidFactory.createXid(), xidFactory, txnLog, transactionTimeoutMilliseconds);
069 }
070
071 TransactionImpl(Xid xid, XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
072 this.xidFactory = xidFactory;
073 this.txnLog = txnLog;
074 this.xid = xid;
075 this.timeout = transactionTimeoutMilliseconds + TransactionTimer.getCurrentTime();
076 try {
077 txnLog.begin(xid);
078 } catch (LogException e) {
079 status = Status.STATUS_MARKED_ROLLBACK;
080 SystemException ex = new SystemException("Error logging begin; transaction marked for roll back)");
081 ex.initCause(e);
082 throw ex;
083 }
084 status = Status.STATUS_ACTIVE;
085 }
086
087 //reconstruct a tx for an external tx found in recovery
088 public TransactionImpl(Xid xid, TransactionLog txLog) {
089 this.xidFactory = null;
090 this.txnLog = txLog;
091 this.xid = xid;
092 status = Status.STATUS_PREPARED;
093 //TODO is this a good idea?
094 this.timeout = Long.MAX_VALUE;
095 }
096
097 public synchronized int getStatus() {
098 return status;
099 }
100
101 public Object getResource(Object key) {
102 return resources.get(key);
103 }
104
105 public boolean getRollbackOnly() {
106 return status == Status.STATUS_MARKED_ROLLBACK;
107 }
108
109 public Object getTransactionKey() {
110 return xid;
111 }
112
113 public int getTransactionStatus() {
114 return status;
115 }
116
117 public void putResource(Object key, Object value) {
118 if (key == null) {
119 throw new NullPointerException("You must supply a non-null key for putResource");
120 }
121 resources.put(key, value);
122 }
123
124 public void registerInterposedSynchronization(Synchronization synchronization) {
125 interposedSynchronization = synchronization;
126 }
127
128 public synchronized void setRollbackOnly() throws IllegalStateException {
129 switch (status) {
130 case Status.STATUS_ACTIVE:
131 case Status.STATUS_PREPARING:
132 status = Status.STATUS_MARKED_ROLLBACK;
133 break;
134 case Status.STATUS_MARKED_ROLLBACK:
135 case Status.STATUS_ROLLING_BACK:
136 // nothing to do
137 break;
138 default:
139 throw new IllegalStateException("Cannot set rollback only, status is " + getStateString(status));
140 }
141 }
142
143 public synchronized void registerSynchronization(Synchronization synch) throws IllegalStateException, RollbackException, SystemException {
144 if (synch == null) {
145 throw new IllegalArgumentException("Synchronization is null");
146 }
147 switch (status) {
148 case Status.STATUS_ACTIVE:
149 case Status.STATUS_PREPARING:
150 break;
151 case Status.STATUS_MARKED_ROLLBACK:
152 throw new RollbackException("Transaction is marked for rollback");
153 default:
154 throw new IllegalStateException("Status is " + getStateString(status));
155 }
156 syncList.add(synch);
157 }
158
159 public synchronized boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException {
160 if (xaRes == null) {
161 throw new IllegalArgumentException("XAResource is null");
162 }
163 switch (status) {
164 case Status.STATUS_ACTIVE:
165 break;
166 case Status.STATUS_MARKED_ROLLBACK:
167 throw new RollbackException("Transaction is marked for rollback");
168 default:
169 throw new IllegalStateException("Status is " + getStateString(status));
170 }
171
172 if (activeXaResources.containsKey(xaRes)) {
173 throw new IllegalStateException("xaresource: " + xaRes + " is already enlisted!");
174 }
175
176 try {
177 TransactionBranch manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
178 if (manager != null) {
179 //we know about this one, it was suspended
180 xaRes.start(manager.getBranchId(), XAResource.TMRESUME);
181 activeXaResources.put(xaRes, manager);
182 return true;
183 }
184 //it is not suspended.
185 for (Iterator i = resourceManagers.iterator(); i.hasNext();) {
186 manager = (TransactionBranch) i.next();
187 boolean sameRM;
188 //if the xares is already known, we must be resuming after a suspend.
189 if (xaRes == manager.getCommitter()) {
190 throw new IllegalStateException("xaRes " + xaRes + " is a committer but is not active or suspended");
191 }
192 //Otherwise, see if this is a new xares for the same resource manager
193 try {
194 sameRM = xaRes.isSameRM(manager.getCommitter());
195 } catch (XAException e) {
196 log.warn("Unexpected error checking for same RM", e);
197 continue;
198 }
199 if (sameRM) {
200 xaRes.start(manager.getBranchId(), XAResource.TMJOIN);
201 activeXaResources.put(xaRes, manager);
202 return true;
203 }
204 }
205 //we know nothing about this XAResource or resource manager
206 Xid branchId = xidFactory.createBranch(xid, resourceManagers.size() + 1);
207 xaRes.start(branchId, XAResource.TMNOFLAGS);
208 activeXaResources.put(xaRes, addBranchXid(xaRes, branchId));
209 return true;
210 } catch (XAException e) {
211 log.warn("Unable to enlist XAResource " + xaRes + ", errorCode: " + e.errorCode, e);
212 return false;
213 }
214 }
215
216 public synchronized boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException {
217 if (!(flag == XAResource.TMFAIL || flag == XAResource.TMSUCCESS || flag == XAResource.TMSUSPEND)) {
218 throw new IllegalStateException("invalid flag for delistResource: " + flag);
219 }
220 if (xaRes == null) {
221 throw new IllegalArgumentException("XAResource is null");
222 }
223 switch (status) {
224 case Status.STATUS_ACTIVE:
225 case Status.STATUS_MARKED_ROLLBACK:
226 break;
227 default:
228 throw new IllegalStateException("Status is " + getStateString(status));
229 }
230 TransactionBranch manager = (TransactionBranch) activeXaResources.remove(xaRes);
231 if (manager == null) {
232 if (flag == XAResource.TMSUSPEND) {
233 throw new IllegalStateException("trying to suspend an inactive xaresource: " + xaRes);
234 }
235 //not active, and we are not trying to suspend. We must be ending tx.
236 manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
237 if (manager == null) {
238 throw new IllegalStateException("Resource not known to transaction: " + xaRes);
239 }
240 }
241
242 try {
243 xaRes.end(manager.getBranchId(), flag);
244 if (flag == XAResource.TMSUSPEND) {
245 suspendedXaResources.put(xaRes, manager);
246 }
247 return true;
248 } catch (XAException e) {
249 log.warn("Unable to delist XAResource " + xaRes + ", error code: " + e.errorCode, e);
250 return false;
251 }
252 }
253
254 //Transaction method, does 2pc
255 public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException {
256 beforePrepare();
257
258 try {
259 boolean timedout = false;
260 if (TransactionTimer.getCurrentTime() > timeout) {
261 status = Status.STATUS_MARKED_ROLLBACK;
262 timedout = true;
263 }
264
265 if (status == Status.STATUS_MARKED_ROLLBACK) {
266 rollbackResources(resourceManagers);
267 if (timedout) {
268 throw new RollbackException("Transaction timout");
269 } else {
270 throw new RollbackException("Unable to commit: transaction marked for rollback");
271 }
272 }
273 synchronized (this) {
274 if (status == Status.STATUS_ACTIVE) {
275 if (this.resourceManagers.size() == 0) {
276 // nothing to commit
277 status = Status.STATUS_COMMITTED;
278 } else if (this.resourceManagers.size() == 1) {
279 // one-phase commit decision
280 status = Status.STATUS_COMMITTING;
281 } else {
282 // start prepare part of two-phase
283 status = Status.STATUS_PREPARING;
284 }
285 }
286 // resourceManagers is now immutable
287 }
288
289 // no-phase
290 if (resourceManagers.size() == 0) {
291 synchronized (this) {
292 status = Status.STATUS_COMMITTED;
293 }
294 return;
295 }
296
297 // one-phase
298 if (resourceManagers.size() == 1) {
299 TransactionBranch manager = (TransactionBranch) resourceManagers.getFirst();
300 try {
301 manager.getCommitter().commit(manager.getBranchId(), true);
302 synchronized (this) {
303 status = Status.STATUS_COMMITTED;
304 }
305 return;
306 } catch (XAException e) {
307 synchronized (this) {
308 status = Status.STATUS_ROLLEDBACK;
309 }
310 throw (RollbackException) new RollbackException("Error during one-phase commit").initCause(e);
311 }
312 }
313
314 // two-phase
315 boolean willCommit = internalPrepare();
316
317 // notify the RMs
318 if (willCommit) {
319 commitResources(resourceManagers);
320 } else {
321 rollbackResources(resourceManagers);
322 throw new RollbackException("Unable to commit");
323 }
324 } finally {
325 afterCompletion();
326 synchronized (this) {
327 status = Status.STATUS_NO_TRANSACTION;
328 }
329 }
330 }
331
332 //Used from XATerminator for first phase in a remotely controlled tx.
333 int prepare() throws SystemException, RollbackException {
334 beforePrepare();
335 int result = XAResource.XA_RDONLY;
336 try {
337 LinkedList rms;
338 synchronized (this) {
339 if (status == Status.STATUS_ACTIVE) {
340 if (resourceManagers.size() == 0) {
341 // nothing to commit
342 status = Status.STATUS_COMMITTED;
343 return result;
344 } else {
345 // start prepare part of two-phase
346 status = Status.STATUS_PREPARING;
347 }
348 }
349 // resourceManagers is now immutable
350 rms = resourceManagers;
351 }
352
353 boolean willCommit = internalPrepare();
354
355 // notify the RMs
356 if (willCommit) {
357 if (!rms.isEmpty()) {
358 result = XAResource.XA_OK;
359 }
360 } else {
361 rollbackResources(rms);
362 throw new RollbackException("Unable to commit");
363 }
364 } finally {
365 if (result == XAResource.XA_RDONLY) {
366 afterCompletion();
367 synchronized (this) {
368 status = Status.STATUS_NO_TRANSACTION;
369 }
370 }
371 }
372 return result;
373 }
374
375 //used from XATerminator for commit phase of non-readonly remotely controlled tx.
376 void preparedCommit() throws SystemException {
377 try {
378 commitResources(resourceManagers);
379 } finally {
380 afterCompletion();
381 synchronized (this) {
382 status = Status.STATUS_NO_TRANSACTION;
383 }
384 }
385 }
386
387 //helper method used by Transaction.commit and XATerminator prepare.
388 private void beforePrepare() {
389 synchronized (this) {
390 switch (status) {
391 case Status.STATUS_ACTIVE:
392 case Status.STATUS_MARKED_ROLLBACK:
393 break;
394 default:
395 throw new IllegalStateException("Status is " + getStateString(status));
396 }
397 }
398
399 beforeCompletion();
400 endResources();
401 }
402
403
404 //helper method used by Transaction.commit and XATerminator prepare.
405 private boolean internalPrepare() throws SystemException {
406
407 for (Iterator rms = resourceManagers.iterator(); rms.hasNext();) {
408 synchronized (this) {
409 if (status != Status.STATUS_PREPARING) {
410 // we were marked for rollback
411 break;
412 }
413 }
414 TransactionBranch manager = (TransactionBranch) rms.next();
415 try {
416 int vote = manager.getCommitter().prepare(manager.getBranchId());
417 if (vote == XAResource.XA_RDONLY) {
418 // we don't need to consider this RM any more
419 rms.remove();
420 }
421 } catch (XAException e) {
422 synchronized (this) {
423 status = Status.STATUS_MARKED_ROLLBACK;
424 //TODO document why this is true from the spec.
425 //XAException during prepare means we can assume resource is rolled back.
426 rms.remove();
427 break;
428 }
429 }
430 }
431
432 // decision time...
433 boolean willCommit;
434 synchronized (this) {
435 willCommit = (status != Status.STATUS_MARKED_ROLLBACK);
436 if (willCommit) {
437 status = Status.STATUS_PREPARED;
438 }
439 }
440 // log our decision
441 if (willCommit && !resourceManagers.isEmpty()) {
442 try {
443 logMark = txnLog.prepare(xid, resourceManagers);
444 } catch (LogException e) {
445 try {
446 rollbackResources(resourceManagers);
447 } catch (Exception se) {
448 log.error("Unable to rollback after failure to log prepare", se.getCause());
449 }
450 throw (SystemException) new SystemException("Error logging prepare; transaction was rolled back)").initCause(e);
451 }
452 }
453 return willCommit;
454 }
455
456 public void rollback() throws IllegalStateException, SystemException {
457 List rms;
458 synchronized (this) {
459 switch (status) {
460 case Status.STATUS_ACTIVE:
461 status = Status.STATUS_MARKED_ROLLBACK;
462 break;
463 case Status.STATUS_MARKED_ROLLBACK:
464 break;
465 default:
466 throw new IllegalStateException("Status is " + getStateString(status));
467 }
468 rms = resourceManagers;
469 }
470
471 beforeCompletion();
472 endResources();
473 try {
474 rollbackResources(rms);
475 //only write rollback record if we have already written prepare record.
476 if (logMark != null) {
477 try {
478 txnLog.rollback(xid, logMark);
479 } catch (LogException e) {
480 try {
481 rollbackResources(rms);
482 } catch (Exception se) {
483 log.error("Unable to rollback after failure to log decision", se.getCause());
484 }
485 throw (SystemException) new SystemException("Error logging rollback").initCause(e);
486 }
487 }
488 } finally {
489 afterCompletion();
490 synchronized (this) {
491 status = Status.STATUS_NO_TRANSACTION;
492 }
493 }
494 }
495
496 private void beforeCompletion() {
497 int i = 0;
498 while (true) {
499 Synchronization synch;
500 synchronized (this) {
501 if (i == syncList.size()) {
502 if (interposedSynchronization != null) {
503 synch = interposedSynchronization;
504 i++;
505 } else {
506 return;
507 }
508 } else if (i == syncList.size() + 1) {
509 return;
510 } else {
511 synch = (Synchronization) syncList.get(i++);
512 }
513 }
514 try {
515 synch.beforeCompletion();
516 } catch (Exception e) {
517 log.warn("Unexpected exception from beforeCompletion; transaction will roll back", e);
518 synchronized (this) {
519 status = Status.STATUS_MARKED_ROLLBACK;
520 }
521 }
522 }
523 }
524
525 private void afterCompletion() {
526 // this does not synchronize because nothing can modify our state at this time
527 if (interposedSynchronization != null) {
528 try {
529 interposedSynchronization.afterCompletion(status);
530 } catch (Exception e) {
531 log.warn("Unexpected exception from afterCompletion; continuing", e);
532 }
533 }
534 for (Iterator i = syncList.iterator(); i.hasNext();) {
535 Synchronization synch = (Synchronization) i.next();
536 try {
537 synch.afterCompletion(status);
538 } catch (Exception e) {
539 log.warn("Unexpected exception from afterCompletion; continuing", e);
540 continue;
541 }
542 }
543 for (Iterator i = entityManagers.values().iterator(); i.hasNext();) {
544 Closeable entityManager = (Closeable) i.next();
545 entityManager.close();
546 }
547 }
548
549 private void endResources() {
550 endResources(activeXaResources);
551 endResources(suspendedXaResources);
552 }
553
554 private void endResources(IdentityHashMap resourceMap) {
555 while (true) {
556 XAResource xaRes;
557 TransactionBranch manager;
558 int flags;
559 synchronized (this) {
560 Set entrySet = resourceMap.entrySet();
561 if (entrySet.isEmpty()) {
562 return;
563 }
564 Map.Entry entry = (Map.Entry) entrySet.iterator().next();
565 xaRes = (XAResource) entry.getKey();
566 manager = (TransactionBranch) entry.getValue();
567 flags = (status == Status.STATUS_MARKED_ROLLBACK) ? XAResource.TMFAIL : XAResource.TMSUCCESS;
568 resourceMap.remove(xaRes);
569 }
570 try {
571 xaRes.end(manager.getBranchId(), flags);
572 } catch (XAException e) {
573 log.warn("Error ending association for XAResource " + xaRes + "; transaction will roll back. XA error code: " + e.errorCode, e);
574 synchronized (this) {
575 status = Status.STATUS_MARKED_ROLLBACK;
576 }
577 }
578 }
579 }
580
581 private void rollbackResources(List rms) throws SystemException {
582 SystemException cause = null;
583 synchronized (this) {
584 status = Status.STATUS_ROLLING_BACK;
585 }
586 for (Iterator i = rms.iterator(); i.hasNext();) {
587 TransactionBranch manager = (TransactionBranch) i.next();
588 try {
589 manager.getCommitter().rollback(manager.getBranchId());
590 } catch (XAException e) {
591 log.error("Unexpected exception rolling back " + manager.getCommitter() + "; continuing with rollback", e);
592 if (cause == null) {
593 cause = new SystemException(e.errorCode);
594 }
595 continue;
596 }
597 }
598 synchronized (this) {
599 status = Status.STATUS_ROLLEDBACK;
600 }
601 if (cause != null) {
602 throw cause;
603 }
604 }
605
606 private void commitResources(List rms) throws SystemException {
607 SystemException cause = null;
608 synchronized (this) {
609 status = Status.STATUS_COMMITTING;
610 }
611 for (Iterator i = rms.iterator(); i.hasNext();) {
612 TransactionBranch manager = (TransactionBranch) i.next();
613 try {
614 manager.getCommitter().commit(manager.getBranchId(), false);
615 } catch (XAException e) {
616 log.error("Unexpected exception committing" + manager.getCommitter() + "; continuing to commit other RMs", e);
617 if (cause == null) {
618 cause = new SystemException(e.errorCode);
619 }
620 continue;
621 }
622 }
623 //if all resources were read only, we didn't write a prepare record.
624 if (!rms.isEmpty()) {
625 try {
626 txnLog.commit(xid, logMark);
627 } catch (LogException e) {
628 log.error("Unexpected exception logging commit completion for xid " + xid, e);
629 throw (SystemException) new SystemException("Unexpected error logging commit completion for xid " + xid).initCause(e);
630 }
631 }
632 synchronized (this) {
633 status = Status.STATUS_COMMITTED;
634 }
635 if (cause != null) {
636 throw cause;
637 }
638 }
639
640 private static String getStateString(int status) {
641 switch (status) {
642 case Status.STATUS_ACTIVE:
643 return "STATUS_ACTIVE";
644 case Status.STATUS_PREPARING:
645 return "STATUS_PREPARING";
646 case Status.STATUS_PREPARED:
647 return "STATUS_PREPARED";
648 case Status.STATUS_MARKED_ROLLBACK:
649 return "STATUS_MARKED_ROLLBACK";
650 case Status.STATUS_ROLLING_BACK:
651 return "STATUS_ROLLING_BACK";
652 case Status.STATUS_COMMITTING:
653 return "STATUS_COMMITTING";
654 case Status.STATUS_COMMITTED:
655 return "STATUS_COMMITTED";
656 case Status.STATUS_ROLLEDBACK:
657 return "STATUS_ROLLEDBACK";
658 case Status.STATUS_NO_TRANSACTION:
659 return "STATUS_NO_TRANSACTION";
660 case Status.STATUS_UNKNOWN:
661 return "STATUS_UNKNOWN";
662 default:
663 throw new AssertionError();
664 }
665 }
666
667 public boolean equals(Object obj) {
668 if (obj instanceof TransactionImpl) {
669 TransactionImpl other = (TransactionImpl) obj;
670 return xid.equals(other.xid);
671 } else {
672 return false;
673 }
674 }
675
676 //when used from recovery, do not add manager to active or suspended resource maps.
677 // The xaresources have already been ended with TMSUCCESS.
678 public TransactionBranch addBranchXid(XAResource xaRes, Xid branchId) {
679 TransactionBranch manager = new TransactionBranch(xaRes, branchId);
680 resourceManagers.add(manager);
681 return manager;
682 }
683
684 public Object getEntityManager(String persistenceUnit) {
685 return entityManagers.get(persistenceUnit);
686 }
687
688 public void setEntityManager(String persistenceUnit, Object entityManager) {
689 Object oldEntityManager = entityManagers.put(persistenceUnit, entityManager);
690 if (oldEntityManager != null) {
691 throw new EJBException("EntityManager " + oldEntityManager + " for persistenceUnit " + persistenceUnit + " already associated with this transaction " + xid);
692 }
693 }
694
695 private static class TransactionBranch implements TransactionBranchInfo {
696 private final XAResource committer;
697 private final Xid branchId;
698
699 public TransactionBranch(XAResource xaRes, Xid branchId) {
700 committer = xaRes;
701 this.branchId = branchId;
702 }
703
704 public XAResource getCommitter() {
705 return committer;
706 }
707
708 public Xid getBranchId() {
709 return branchId;
710 }
711
712 public String getResourceName() {
713 if (committer instanceof NamedXAResource) {
714 return ((NamedXAResource) committer).getName();
715 } else {
716 throw new IllegalStateException("Cannot log transactions unles XAResources are named! " + committer);
717 }
718 }
719
720 public Xid getBranchXid() {
721 return branchId;
722 }
723 }
724
725
726 }