001    /**
002     *  Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    
018    package org.apache.geronimo.transaction.manager;
019    
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.IdentityHashMap;
023    import java.util.Iterator;
024    import java.util.LinkedList;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Set;
028    
029    import javax.transaction.HeuristicMixedException;
030    import javax.transaction.HeuristicRollbackException;
031    import javax.transaction.RollbackException;
032    import javax.transaction.Status;
033    import javax.transaction.Synchronization;
034    import javax.transaction.SystemException;
035    import javax.transaction.Transaction;
036    import javax.transaction.xa.XAException;
037    import javax.transaction.xa.XAResource;
038    import javax.transaction.xa.Xid;
039    import javax.ejb.EJBException;
040    
041    import org.apache.commons.logging.Log;
042    import org.apache.commons.logging.LogFactory;
043    
044    /**
045     * Basic local transaction with support for multiple resources.
046     *
047     * @version $Rev: 487175 $ $Date: 2006-12-14 03:10:31 -0800 (Thu, 14 Dec 2006) $
048     */
049    public class TransactionImpl implements Transaction {
050        private static final Log log = LogFactory.getLog("Transaction");
051    
052        private final XidFactory xidFactory;
053        private final Xid xid;
054        private final TransactionLog txnLog;
055        private final long timeout;
056        private final List syncList = new ArrayList(5);
057        private final LinkedList resourceManagers = new LinkedList();
058        private final IdentityHashMap activeXaResources = new IdentityHashMap(3);
059        private final IdentityHashMap suspendedXaResources = new IdentityHashMap(3);
060        private int status = Status.STATUS_NO_TRANSACTION;
061        private Object logMark;
062    
063        private final Map resources = new HashMap();
064        private Synchronization interposedSynchronization;
065        private final Map entityManagers = new HashMap();
066    
067        TransactionImpl(XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
068            this(xidFactory.createXid(), xidFactory, txnLog, transactionTimeoutMilliseconds);
069        }
070    
071        TransactionImpl(Xid xid, XidFactory xidFactory, TransactionLog txnLog, long transactionTimeoutMilliseconds) throws SystemException {
072            this.xidFactory = xidFactory;
073            this.txnLog = txnLog;
074            this.xid = xid;
075            this.timeout = transactionTimeoutMilliseconds + TransactionTimer.getCurrentTime();
076            try {
077                txnLog.begin(xid);
078            } catch (LogException e) {
079                status = Status.STATUS_MARKED_ROLLBACK;
080                SystemException ex = new SystemException("Error logging begin; transaction marked for roll back)");
081                ex.initCause(e);
082                throw ex;
083            }
084            status = Status.STATUS_ACTIVE;
085        }
086    
087        //reconstruct a tx for an external tx found in recovery
088        public TransactionImpl(Xid xid, TransactionLog txLog) {
089            this.xidFactory = null;
090            this.txnLog = txLog;
091            this.xid = xid;
092            status = Status.STATUS_PREPARED;
093            //TODO is this a good idea?
094            this.timeout = Long.MAX_VALUE;
095        }
096    
097        public synchronized int getStatus() {
098            return status;
099        }
100    
101        public Object getResource(Object key) {
102            return resources.get(key);
103        }
104    
105        public boolean getRollbackOnly() {
106            return status == Status.STATUS_MARKED_ROLLBACK;
107        }
108    
109        public Object getTransactionKey() {
110            return xid;
111        }
112    
113        public int getTransactionStatus() {
114            return status;
115        }
116    
117        public void putResource(Object key, Object value) {
118            if (key == null) {
119                throw new NullPointerException("You must supply a non-null key for putResource");
120            }
121            resources.put(key, value);
122        }
123    
124        public void registerInterposedSynchronization(Synchronization synchronization) {
125            interposedSynchronization = synchronization;
126        }
127    
128        public synchronized void setRollbackOnly() throws IllegalStateException {
129            switch (status) {
130                case Status.STATUS_ACTIVE:
131                case Status.STATUS_PREPARING:
132                    status = Status.STATUS_MARKED_ROLLBACK;
133                    break;
134                case Status.STATUS_MARKED_ROLLBACK:
135                case Status.STATUS_ROLLING_BACK:
136                    // nothing to do
137                    break;
138                default:
139                    throw new IllegalStateException("Cannot set rollback only, status is " + getStateString(status));
140            }
141        }
142    
143        public synchronized void registerSynchronization(Synchronization synch) throws IllegalStateException, RollbackException, SystemException {
144            if (synch == null) {
145                throw new IllegalArgumentException("Synchronization is null");
146            }
147            switch (status) {
148                case Status.STATUS_ACTIVE:
149                case Status.STATUS_PREPARING:
150                    break;
151                case Status.STATUS_MARKED_ROLLBACK:
152                    throw new RollbackException("Transaction is marked for rollback");
153                default:
154                    throw new IllegalStateException("Status is " + getStateString(status));
155            }
156            syncList.add(synch);
157        }
158    
159        public synchronized boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException {
160            if (xaRes == null) {
161                throw new IllegalArgumentException("XAResource is null");
162            }
163            switch (status) {
164                case Status.STATUS_ACTIVE:
165                    break;
166                case Status.STATUS_MARKED_ROLLBACK:
167                    throw new RollbackException("Transaction is marked for rollback");
168                default:
169                    throw new IllegalStateException("Status is " + getStateString(status));
170            }
171    
172            if (activeXaResources.containsKey(xaRes)) {
173                throw new IllegalStateException("xaresource: " + xaRes + " is already enlisted!");
174            }
175    
176            try {
177                TransactionBranch manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
178                if (manager != null) {
179                    //we know about this one, it was suspended
180                    xaRes.start(manager.getBranchId(), XAResource.TMRESUME);
181                    activeXaResources.put(xaRes, manager);
182                    return true;
183                }
184                //it is not suspended.
185                for (Iterator i = resourceManagers.iterator(); i.hasNext();) {
186                    manager = (TransactionBranch) i.next();
187                    boolean sameRM;
188                    //if the xares is already known, we must be resuming after a suspend.
189                    if (xaRes == manager.getCommitter()) {
190                        throw new IllegalStateException("xaRes " + xaRes + " is a committer but is not active or suspended");
191                    }
192                    //Otherwise, see if this is a new xares for the same resource manager
193                    try {
194                        sameRM = xaRes.isSameRM(manager.getCommitter());
195                    } catch (XAException e) {
196                        log.warn("Unexpected error checking for same RM", e);
197                        continue;
198                    }
199                    if (sameRM) {
200                        xaRes.start(manager.getBranchId(), XAResource.TMJOIN);
201                        activeXaResources.put(xaRes, manager);
202                        return true;
203                    }
204                }
205                //we know nothing about this XAResource or resource manager
206                Xid branchId = xidFactory.createBranch(xid, resourceManagers.size() + 1);
207                xaRes.start(branchId, XAResource.TMNOFLAGS);
208                activeXaResources.put(xaRes, addBranchXid(xaRes, branchId));
209                return true;
210            } catch (XAException e) {
211                log.warn("Unable to enlist XAResource " + xaRes + ", errorCode: " + e.errorCode, e);
212                return false;
213            }
214        }
215    
216        public synchronized boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException {
217            if (!(flag == XAResource.TMFAIL || flag == XAResource.TMSUCCESS || flag == XAResource.TMSUSPEND)) {
218                throw new IllegalStateException("invalid flag for delistResource: " + flag);
219            }
220            if (xaRes == null) {
221                throw new IllegalArgumentException("XAResource is null");
222            }
223            switch (status) {
224                case Status.STATUS_ACTIVE:
225                case Status.STATUS_MARKED_ROLLBACK:
226                    break;
227                default:
228                    throw new IllegalStateException("Status is " + getStateString(status));
229            }
230            TransactionBranch manager = (TransactionBranch) activeXaResources.remove(xaRes);
231            if (manager == null) {
232                if (flag == XAResource.TMSUSPEND) {
233                    throw new IllegalStateException("trying to suspend an inactive xaresource: " + xaRes);
234                }
235                //not active, and we are not trying to suspend.  We must be ending tx.
236                manager = (TransactionBranch) suspendedXaResources.remove(xaRes);
237                if (manager == null) {
238                    throw new IllegalStateException("Resource not known to transaction: " + xaRes);
239                }
240            }
241    
242            try {
243                xaRes.end(manager.getBranchId(), flag);
244                if (flag == XAResource.TMSUSPEND) {
245                    suspendedXaResources.put(xaRes, manager);
246                }
247                return true;
248            } catch (XAException e) {
249                log.warn("Unable to delist XAResource " + xaRes + ", error code: " + e.errorCode, e);
250                return false;
251            }
252        }
253    
254        //Transaction method, does 2pc
255        public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException {
256            beforePrepare();
257    
258            try {
259                boolean timedout = false;
260                if (TransactionTimer.getCurrentTime() > timeout) {
261                    status = Status.STATUS_MARKED_ROLLBACK;
262                    timedout = true;
263                }
264    
265                if (status == Status.STATUS_MARKED_ROLLBACK) {
266                    rollbackResources(resourceManagers);
267                    if (timedout) {
268                        throw new RollbackException("Transaction timout");
269                    } else {
270                        throw new RollbackException("Unable to commit: transaction marked for rollback");
271                    }
272                }
273                synchronized (this) {
274                    if (status == Status.STATUS_ACTIVE) {
275                        if (this.resourceManagers.size() == 0) {
276                            // nothing to commit
277                            status = Status.STATUS_COMMITTED;
278                        } else if (this.resourceManagers.size() == 1) {
279                            // one-phase commit decision
280                            status = Status.STATUS_COMMITTING;
281                        } else {
282                            // start prepare part of two-phase
283                            status = Status.STATUS_PREPARING;
284                        }
285                    }
286                    // resourceManagers is now immutable
287                }
288    
289                // no-phase
290                if (resourceManagers.size() == 0) {
291                    synchronized (this) {
292                        status = Status.STATUS_COMMITTED;
293                    }
294                    return;
295                }
296    
297                // one-phase
298                if (resourceManagers.size() == 1) {
299                    TransactionBranch manager = (TransactionBranch) resourceManagers.getFirst();
300                    try {
301                        manager.getCommitter().commit(manager.getBranchId(), true);
302                        synchronized (this) {
303                            status = Status.STATUS_COMMITTED;
304                        }
305                        return;
306                    } catch (XAException e) {
307                        synchronized (this) {
308                            status = Status.STATUS_ROLLEDBACK;
309                        }
310                        throw (RollbackException) new RollbackException("Error during one-phase commit").initCause(e);
311                    }
312                }
313    
314                // two-phase
315                boolean willCommit = internalPrepare();
316    
317                // notify the RMs
318                if (willCommit) {
319                    commitResources(resourceManagers);
320                } else {
321                    rollbackResources(resourceManagers);
322                    throw new RollbackException("Unable to commit");
323                }
324            } finally {
325                afterCompletion();
326                synchronized (this) {
327                    status = Status.STATUS_NO_TRANSACTION;
328                }
329            }
330        }
331    
332        //Used from XATerminator for first phase in a remotely controlled tx.
333        int prepare() throws SystemException, RollbackException {
334            beforePrepare();
335            int result = XAResource.XA_RDONLY;
336            try {
337                LinkedList rms;
338                synchronized (this) {
339                    if (status == Status.STATUS_ACTIVE) {
340                        if (resourceManagers.size() == 0) {
341                            // nothing to commit
342                            status = Status.STATUS_COMMITTED;
343                            return result;
344                        } else {
345                            // start prepare part of two-phase
346                            status = Status.STATUS_PREPARING;
347                        }
348                    }
349                    // resourceManagers is now immutable
350                    rms = resourceManagers;
351                }
352    
353                boolean willCommit = internalPrepare();
354    
355                // notify the RMs
356                if (willCommit) {
357                    if (!rms.isEmpty()) {
358                        result = XAResource.XA_OK;
359                    }
360                } else {
361                    rollbackResources(rms);
362                    throw new RollbackException("Unable to commit");
363                }
364            } finally {
365                if (result == XAResource.XA_RDONLY) {
366                    afterCompletion();
367                    synchronized (this) {
368                        status = Status.STATUS_NO_TRANSACTION;
369                    }
370                }
371            }
372            return result;
373        }
374    
375        //used from XATerminator for commit phase of non-readonly remotely controlled tx.
376        void preparedCommit() throws SystemException {
377            try {
378                commitResources(resourceManagers);
379            } finally {
380                afterCompletion();
381                synchronized (this) {
382                    status = Status.STATUS_NO_TRANSACTION;
383                }
384            }
385        }
386    
387        //helper method used by Transaction.commit and XATerminator prepare.
388        private void beforePrepare() {
389            synchronized (this) {
390                switch (status) {
391                    case Status.STATUS_ACTIVE:
392                    case Status.STATUS_MARKED_ROLLBACK:
393                        break;
394                    default:
395                        throw new IllegalStateException("Status is " + getStateString(status));
396                }
397            }
398    
399            beforeCompletion();
400            endResources();
401        }
402    
403    
404        //helper method used by Transaction.commit and XATerminator prepare.
405        private boolean internalPrepare() throws SystemException {
406    
407            for (Iterator rms = resourceManagers.iterator(); rms.hasNext();) {
408                synchronized (this) {
409                    if (status != Status.STATUS_PREPARING) {
410                        // we were marked for rollback
411                        break;
412                    }
413                }
414                TransactionBranch manager = (TransactionBranch) rms.next();
415                try {
416                    int vote = manager.getCommitter().prepare(manager.getBranchId());
417                    if (vote == XAResource.XA_RDONLY) {
418                        // we don't need to consider this RM any more
419                        rms.remove();
420                    }
421                } catch (XAException e) {
422                    synchronized (this) {
423                        status = Status.STATUS_MARKED_ROLLBACK;
424                        //TODO document why this is true from the spec.
425                        //XAException during prepare means we can assume resource is rolled back.
426                        rms.remove();
427                        break;
428                    }
429                }
430            }
431    
432            // decision time...
433            boolean willCommit;
434            synchronized (this) {
435                willCommit = (status != Status.STATUS_MARKED_ROLLBACK);
436                if (willCommit) {
437                    status = Status.STATUS_PREPARED;
438                }
439            }
440            // log our decision
441            if (willCommit && !resourceManagers.isEmpty()) {
442                try {
443                    logMark = txnLog.prepare(xid, resourceManagers);
444                } catch (LogException e) {
445                    try {
446                        rollbackResources(resourceManagers);
447                    } catch (Exception se) {
448                        log.error("Unable to rollback after failure to log prepare", se.getCause());
449                    }
450                    throw (SystemException) new SystemException("Error logging prepare; transaction was rolled back)").initCause(e);
451                }
452            }
453            return willCommit;
454        }
455    
456        public void rollback() throws IllegalStateException, SystemException {
457            List rms;
458            synchronized (this) {
459                switch (status) {
460                    case Status.STATUS_ACTIVE:
461                        status = Status.STATUS_MARKED_ROLLBACK;
462                        break;
463                    case Status.STATUS_MARKED_ROLLBACK:
464                        break;
465                    default:
466                        throw new IllegalStateException("Status is " + getStateString(status));
467                }
468                rms = resourceManagers;
469            }
470    
471            beforeCompletion();
472            endResources();
473            try {
474                rollbackResources(rms);
475                //only write rollback record if we have already written prepare record.
476                if (logMark != null) {
477                    try {
478                        txnLog.rollback(xid, logMark);
479                    } catch (LogException e) {
480                        try {
481                            rollbackResources(rms);
482                        } catch (Exception se) {
483                            log.error("Unable to rollback after failure to log decision", se.getCause());
484                        }
485                        throw (SystemException) new SystemException("Error logging rollback").initCause(e);
486                    }
487                }
488            } finally {
489                afterCompletion();
490                synchronized (this) {
491                    status = Status.STATUS_NO_TRANSACTION;
492                }
493            }
494        }
495    
496        private void beforeCompletion() {
497            int i = 0;
498            while (true) {
499                Synchronization synch;
500                synchronized (this) {
501                    if (i == syncList.size()) {
502                        if (interposedSynchronization != null) {
503                            synch = interposedSynchronization;
504                            i++;
505                        } else {
506                            return;
507                        }
508                    } else if (i == syncList.size() + 1) {
509                        return;
510                    } else {
511                        synch = (Synchronization) syncList.get(i++);
512                    }
513                }
514                try {
515                    synch.beforeCompletion();
516                } catch (Exception e) {
517                    log.warn("Unexpected exception from beforeCompletion; transaction will roll back", e);
518                    synchronized (this) {
519                        status = Status.STATUS_MARKED_ROLLBACK;
520                    }
521                }
522            }
523        }
524    
525        private void afterCompletion() {
526            // this does not synchronize because nothing can modify our state at this time
527            if (interposedSynchronization != null) {
528                try {
529                    interposedSynchronization.afterCompletion(status);
530                } catch (Exception e) {
531                    log.warn("Unexpected exception from afterCompletion; continuing", e);
532                }
533            }
534            for (Iterator i = syncList.iterator(); i.hasNext();) {
535                Synchronization synch = (Synchronization) i.next();
536                try {
537                    synch.afterCompletion(status);
538                } catch (Exception e) {
539                    log.warn("Unexpected exception from afterCompletion; continuing", e);
540                    continue;
541                }
542            }
543            for (Iterator i = entityManagers.values().iterator(); i.hasNext();) {
544                Closeable entityManager = (Closeable) i.next();
545                entityManager.close();
546            }
547        }
548    
549        private void endResources() {
550            endResources(activeXaResources);
551            endResources(suspendedXaResources);
552        }
553    
554        private void endResources(IdentityHashMap resourceMap) {
555            while (true) {
556                XAResource xaRes;
557                TransactionBranch manager;
558                int flags;
559                synchronized (this) {
560                    Set entrySet = resourceMap.entrySet();
561                    if (entrySet.isEmpty()) {
562                        return;
563                    }
564                    Map.Entry entry = (Map.Entry) entrySet.iterator().next();
565                    xaRes = (XAResource) entry.getKey();
566                    manager = (TransactionBranch) entry.getValue();
567                    flags = (status == Status.STATUS_MARKED_ROLLBACK) ? XAResource.TMFAIL : XAResource.TMSUCCESS;
568                    resourceMap.remove(xaRes);
569                }
570                try {
571                    xaRes.end(manager.getBranchId(), flags);
572                } catch (XAException e) {
573                    log.warn("Error ending association for XAResource " + xaRes + "; transaction will roll back. XA error code: " + e.errorCode, e);
574                    synchronized (this) {
575                        status = Status.STATUS_MARKED_ROLLBACK;
576                    }
577                }
578            }
579        }
580    
581        private void rollbackResources(List rms) throws SystemException {
582            SystemException cause = null;
583            synchronized (this) {
584                status = Status.STATUS_ROLLING_BACK;
585            }
586            for (Iterator i = rms.iterator(); i.hasNext();) {
587                TransactionBranch manager = (TransactionBranch) i.next();
588                try {
589                    manager.getCommitter().rollback(manager.getBranchId());
590                } catch (XAException e) {
591                    log.error("Unexpected exception rolling back " + manager.getCommitter() + "; continuing with rollback", e);
592                    if (cause == null) {
593                        cause = new SystemException(e.errorCode);
594                    }
595                    continue;
596                }
597            }
598            synchronized (this) {
599                status = Status.STATUS_ROLLEDBACK;
600            }
601            if (cause != null) {
602                throw cause;
603            }
604        }
605    
606        private void commitResources(List rms) throws SystemException {
607            SystemException cause = null;
608            synchronized (this) {
609                status = Status.STATUS_COMMITTING;
610            }
611            for (Iterator i = rms.iterator(); i.hasNext();) {
612                TransactionBranch manager = (TransactionBranch) i.next();
613                try {
614                    manager.getCommitter().commit(manager.getBranchId(), false);
615                } catch (XAException e) {
616                    log.error("Unexpected exception committing" + manager.getCommitter() + "; continuing to commit other RMs", e);
617                    if (cause == null) {
618                        cause = new SystemException(e.errorCode);
619                    }
620                    continue;
621                }
622            }
623            //if all resources were read only, we didn't write a prepare record.
624            if (!rms.isEmpty()) {
625                try {
626                    txnLog.commit(xid, logMark);
627                } catch (LogException e) {
628                    log.error("Unexpected exception logging commit completion for xid " + xid, e);
629                    throw (SystemException) new SystemException("Unexpected error logging commit completion for xid " + xid).initCause(e);
630                }
631            }
632            synchronized (this) {
633                status = Status.STATUS_COMMITTED;
634            }
635            if (cause != null) {
636                throw cause;
637            }
638        }
639    
640        private static String getStateString(int status) {
641            switch (status) {
642                case Status.STATUS_ACTIVE:
643                    return "STATUS_ACTIVE";
644                case Status.STATUS_PREPARING:
645                    return "STATUS_PREPARING";
646                case Status.STATUS_PREPARED:
647                    return "STATUS_PREPARED";
648                case Status.STATUS_MARKED_ROLLBACK:
649                    return "STATUS_MARKED_ROLLBACK";
650                case Status.STATUS_ROLLING_BACK:
651                    return "STATUS_ROLLING_BACK";
652                case Status.STATUS_COMMITTING:
653                    return "STATUS_COMMITTING";
654                case Status.STATUS_COMMITTED:
655                    return "STATUS_COMMITTED";
656                case Status.STATUS_ROLLEDBACK:
657                    return "STATUS_ROLLEDBACK";
658                case Status.STATUS_NO_TRANSACTION:
659                    return "STATUS_NO_TRANSACTION";
660                case Status.STATUS_UNKNOWN:
661                    return "STATUS_UNKNOWN";
662                default:
663                    throw new AssertionError();
664            }
665        }
666    
667        public boolean equals(Object obj) {
668            if (obj instanceof TransactionImpl) {
669                TransactionImpl other = (TransactionImpl) obj;
670                return xid.equals(other.xid);
671            } else {
672                return false;
673            }
674        }
675    
676        //when used from recovery, do not add manager to active or suspended resource maps.
677        // The xaresources have already been ended with TMSUCCESS.
678        public TransactionBranch addBranchXid(XAResource xaRes, Xid branchId) {
679            TransactionBranch manager = new TransactionBranch(xaRes, branchId);
680            resourceManagers.add(manager);
681            return manager;
682        }
683    
684        public Object getEntityManager(String persistenceUnit) {
685            return entityManagers.get(persistenceUnit);
686        }
687    
688        public void setEntityManager(String persistenceUnit, Object entityManager) {
689            Object oldEntityManager = entityManagers.put(persistenceUnit, entityManager);
690            if (oldEntityManager != null) {
691                throw new EJBException("EntityManager " + oldEntityManager + " for persistenceUnit " + persistenceUnit + " already associated with this transaction " + xid);
692            }
693        }
694    
695        private static class TransactionBranch implements TransactionBranchInfo {
696            private final XAResource committer;
697            private final Xid branchId;
698    
699            public TransactionBranch(XAResource xaRes, Xid branchId) {
700                committer = xaRes;
701                this.branchId = branchId;
702            }
703    
704            public XAResource getCommitter() {
705                return committer;
706            }
707    
708            public Xid getBranchId() {
709                return branchId;
710            }
711    
712            public String getResourceName() {
713                if (committer instanceof NamedXAResource) {
714                    return ((NamedXAResource) committer).getName();
715                } else {
716                    throw new IllegalStateException("Cannot log transactions unles XAResources are named! " + committer);
717                }
718            }
719    
720            public Xid getBranchXid() {
721                return branchId;
722            }
723        }
724    
725    
726    }