001    /**
002     *  Licensed to the Apache Software Foundation (ASF) under one or more
003     *  contributor license agreements.  See the NOTICE file distributed with
004     *  this work for additional information regarding copyright ownership.
005     *  The ASF licenses this file to You under the Apache License, Version 2.0
006     *  (the "License"); you may not use this file except in compliance with
007     *  the License.  You may obtain a copy of the License at
008     *
009     *     http://www.apache.org/licenses/LICENSE-2.0
010     *
011     *  Unless required by applicable law or agreed to in writing, software
012     *  distributed under the License is distributed on an "AS IS" BASIS,
013     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     *  See the License for the specific language governing permissions and
015     *  limitations under the License.
016     */
017    
018    package org.apache.geronimo.transaction.manager;
019    
020    import java.util.ArrayList;
021    import java.util.Collection;
022    import java.util.HashMap;
023    import java.util.Iterator;
024    import java.util.List;
025    import java.util.Map;
026    
027    import javax.transaction.HeuristicMixedException;
028    import javax.transaction.HeuristicRollbackException;
029    import javax.transaction.InvalidTransactionException;
030    import javax.transaction.NotSupportedException;
031    import javax.transaction.RollbackException;
032    import javax.transaction.Status;
033    import javax.transaction.Synchronization;
034    import javax.transaction.SystemException;
035    import javax.transaction.Transaction;
036    import javax.transaction.TransactionManager;
037    import javax.transaction.UserTransaction;
038    import javax.transaction.xa.XAException;
039    import javax.transaction.xa.Xid;
040    
041    import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
042    import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
043    import org.apache.commons.logging.Log;
044    import org.apache.commons.logging.LogFactory;
045    import org.apache.geronimo.transaction.log.UnrecoverableLog;
046    
047    /**
048     * Simple implementation of a transaction manager.
049     *
050     * @version $Rev: 487175 $ $Date: 2006-12-14 03:10:31 -0800 (Thu, 14 Dec 2006) $
051     */
052    public class TransactionManagerImpl implements TransactionManager, UserTransaction, XidImporter, MonitorableTransactionManager {
053        private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);
054        protected static final int DEFAULT_TIMEOUT = 600;
055        protected static final byte[] DEFAULT_TM_ID = new byte[] {71,84,77,73,68};
056    
057        final TransactionLog transactionLog;
058        final XidFactory xidFactory;
059        private final int defaultTransactionTimeoutMilliseconds;
060        private final ThreadLocal transactionTimeoutMilliseconds = new ThreadLocal();
061        private final ThreadLocal threadTx = new ThreadLocal();
062        private final ConcurrentHashMap associatedTransactions = new ConcurrentHashMap();
063        private static final Log recoveryLog = LogFactory.getLog("RecoveryController");
064        final Recovery recovery;
065        final Collection resourceManagers;
066        private final CopyOnWriteArrayList transactionAssociationListeners = new CopyOnWriteArrayList();
067        private List recoveryErrors = new ArrayList();
068    
069        public TransactionManagerImpl() throws XAException {
070            this(DEFAULT_TIMEOUT,
071                    null,
072                    null,
073                    null);
074        }
075    
076        public TransactionManagerImpl(int defaultTransactionTimeoutSeconds) throws XAException {
077            this(defaultTransactionTimeoutSeconds,
078                    null,
079                    null,
080                    null);
081        }
082    
083        public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, TransactionLog transactionLog) throws XAException {
084            this(defaultTransactionTimeoutSeconds,
085                    null,
086                    transactionLog,
087                    null);
088        }
089    
090        public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, XidFactory xidFactory, TransactionLog transactionLog, Collection resourceManagers) throws XAException {
091            if (defaultTransactionTimeoutSeconds <= 0) {
092                throw new IllegalArgumentException("defaultTransactionTimeoutSeconds must be positive: attempted value: " + defaultTransactionTimeoutSeconds);
093            }
094    
095            this.defaultTransactionTimeoutMilliseconds = defaultTransactionTimeoutSeconds * 1000;
096    
097            if (transactionLog == null) {
098                this.transactionLog = new UnrecoverableLog();
099            } else {
100                this.transactionLog = transactionLog;
101            }
102    
103            if (xidFactory != null) {
104                this.xidFactory = xidFactory;
105            } else {
106                this.xidFactory = new XidFactoryImpl(DEFAULT_TM_ID);
107            }
108    
109            this.resourceManagers = resourceManagers;
110            recovery = new RecoveryImpl(this.transactionLog, this.xidFactory);
111    
112            if (resourceManagers != null) {
113                recovery.recoverLog();
114                List copy = watchResourceManagers(resourceManagers);
115                for (Iterator iterator = copy.iterator(); iterator.hasNext();) {
116                    ResourceManager resourceManager = (ResourceManager) iterator.next();
117                    recoverResourceManager(resourceManager);
118                }
119            }
120        }
121    
122        protected List watchResourceManagers(Collection resourceManagers) {
123            return new ArrayList(resourceManagers);
124        }
125    
126        public Transaction getTransaction() {
127            return (Transaction) threadTx.get();
128        }
129    
130        private void associate(TransactionImpl tx) throws InvalidTransactionException {
131            if (tx == null) throw new NullPointerException("tx is null");
132    
133            Object existingAssociation = associatedTransactions.putIfAbsent(tx, Thread.currentThread());
134            if (existingAssociation != null) {
135                throw new InvalidTransactionException("Specified transaction is already associated with another thread");
136            }
137            threadTx.set(tx);
138            fireThreadAssociated(tx);
139        }
140    
141        private void unassociate() {
142            Transaction tx = getTransaction();
143            if (tx != null) {
144                associatedTransactions.remove(tx);
145                threadTx.set(null);
146                fireThreadUnassociated(tx);
147            }
148        }
149    
150        public void setTransactionTimeout(int seconds) throws SystemException {
151            if (seconds < 0) {
152                throw new SystemException("transaction timeout must be positive or 0 to reset to default");
153            }
154            if (seconds == 0) {
155                transactionTimeoutMilliseconds.set(null);
156            } else {
157                transactionTimeoutMilliseconds.set(new Long(seconds * 1000));
158            }
159        }
160    
161        public int getStatus() throws SystemException {
162            Transaction tx = getTransaction();
163            return (tx != null) ? tx.getStatus() : Status.STATUS_NO_TRANSACTION;
164        }
165    
166        public void begin() throws NotSupportedException, SystemException {
167            begin(getTransactionTimeoutMilliseconds(0L));
168        }
169    
170        public Transaction begin(long transactionTimeoutMilliseconds) throws NotSupportedException, SystemException {
171            if (getStatus() != Status.STATUS_NO_TRANSACTION) {
172                throw new NotSupportedException("Nested Transactions are not supported");
173            }
174            TransactionImpl tx = new TransactionImpl(xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
175    //        timeoutTimer.schedule(tx, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
176            try {
177                associate(tx);
178            } catch (InvalidTransactionException e) {
179                // should not be possible since we just created that transaction and no one has a reference yet
180                throw new SystemException("Internal error: associate threw an InvalidTransactionException for a newly created transaction");
181            }
182            // Todo: Verify if this is correct thing to do. Use default timeout for next transaction.
183            this.transactionTimeoutMilliseconds.set(null);
184            return tx;
185        }
186    
187        public Transaction suspend() throws SystemException {
188            Transaction tx = getTransaction();
189            if (tx != null) {
190                unassociate();
191            }
192            return tx;
193        }
194    
195        public void resume(Transaction tx) throws IllegalStateException, InvalidTransactionException, SystemException {
196            if (getTransaction() != null) {
197                throw new IllegalStateException("Thread already associated with another transaction");
198            }
199            if (!(tx instanceof TransactionImpl)) {
200                throw new InvalidTransactionException("Cannot resume foreign transaction: " + tx);
201            }
202            associate((TransactionImpl) tx);
203        }
204    
205        public Object getResource(Object key) {
206            TransactionImpl tx = getActiveTransactionImpl();
207            return tx.getResource(key);
208        }
209    
210        private TransactionImpl getActiveTransactionImpl() {
211            TransactionImpl tx = (TransactionImpl)threadTx.get();
212            if (tx == null) {
213                throw new IllegalStateException("No tx on thread");
214            }
215            if (tx.getStatus() != Status.STATUS_ACTIVE) {
216                throw new IllegalStateException("Transaction " + tx + " is not active");
217            }
218            return tx;
219        }
220    
221        public boolean getRollbackOnly() {
222            TransactionImpl tx = getActiveTransactionImpl();
223            return tx.getRollbackOnly();
224        }
225    
226        public Object getTransactionKey() {
227            TransactionImpl tx = getActiveTransactionImpl();
228            return tx.getTransactionKey();
229        }
230    
231        public int getTransactionStatus() {
232            TransactionImpl tx = getActiveTransactionImpl();
233            return tx.getTransactionStatus();
234        }
235    
236        public void putResource(Object key, Object value) {
237            TransactionImpl tx = getActiveTransactionImpl();
238            tx.putResource(key, value);
239        }
240    
241        /**
242         * jta 1.1 method so the jpa implementations can be told to flush their caches.
243         * @param synchronization
244         */
245        public void registerInterposedSynchronization(Synchronization synchronization) {
246            TransactionImpl tx = getActiveTransactionImpl();
247            tx.registerInterposedSynchronization(synchronization);
248        }
249    
250        public void setRollbackOnly() throws IllegalStateException {
251            TransactionImpl tx = (TransactionImpl) threadTx.get();
252            if (tx == null) {
253                throw new IllegalStateException("No transaction associated with current thread");
254            }
255            tx.setRollbackOnly();
256        }
257    
258        public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
259            Transaction tx = getTransaction();
260            if (tx == null) {
261                throw new IllegalStateException("No transaction associated with current thread");
262            }
263            try {
264                tx.commit();
265            } finally {
266                unassociate();
267            }
268        }
269    
270        public void rollback() throws IllegalStateException, SecurityException, SystemException {
271            Transaction tx = getTransaction();
272            if (tx == null) {
273                throw new IllegalStateException("No transaction associated with current thread");
274            }
275            try {
276                tx.rollback();
277            } finally {
278                unassociate();
279            }
280        }
281    
282        //XidImporter implementation
283        public Transaction importXid(Xid xid, long transactionTimeoutMilliseconds) throws XAException, SystemException {
284            if (transactionTimeoutMilliseconds < 0) {
285                throw new SystemException("transaction timeout must be positive or 0 to reset to default");
286            }
287            TransactionImpl tx = new TransactionImpl(xid, xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
288            return tx;
289        }
290    
291        public void commit(Transaction tx, boolean onePhase) throws XAException {
292            if (onePhase) {
293                try {
294                    tx.commit();
295                } catch (HeuristicMixedException e) {
296                    throw (XAException) new XAException().initCause(e);
297                } catch (HeuristicRollbackException e) {
298                    throw (XAException) new XAException().initCause(e);
299                } catch (RollbackException e) {
300                    throw (XAException) new XAException().initCause(e);
301                } catch (SecurityException e) {
302                    throw (XAException) new XAException().initCause(e);
303                } catch (SystemException e) {
304                    throw (XAException) new XAException().initCause(e);
305                }
306            } else {
307                try {
308                    ((TransactionImpl) tx).preparedCommit();
309                } catch (SystemException e) {
310                    throw (XAException) new XAException().initCause(e);
311                }
312            }
313        }
314    
315        public void forget(Transaction tx) throws XAException {
316            //TODO implement this!
317        }
318    
319        public int prepare(Transaction tx) throws XAException {
320            try {
321                return ((TransactionImpl) tx).prepare();
322            } catch (SystemException e) {
323                throw (XAException) new XAException().initCause(e);
324            } catch (RollbackException e) {
325                throw (XAException) new XAException().initCause(e);
326            }
327        }
328    
329        public void rollback(Transaction tx) throws XAException {
330            try {
331                tx.rollback();
332            } catch (IllegalStateException e) {
333                throw (XAException) new XAException().initCause(e);
334            } catch (SystemException e) {
335                throw (XAException) new XAException().initCause(e);
336            }
337        }
338    
339        long getTransactionTimeoutMilliseconds(long transactionTimeoutMilliseconds) {
340            if (transactionTimeoutMilliseconds != 0) {
341                return transactionTimeoutMilliseconds;
342            }
343            Long timeout = (Long) this.transactionTimeoutMilliseconds.get();
344            if (timeout != null) {
345                return timeout.longValue();
346            }
347            return defaultTransactionTimeoutMilliseconds;
348        }
349    
350        protected void recoverResourceManager(ResourceManager resourceManager) {
351            NamedXAResource namedXAResource;
352            try {
353                namedXAResource = resourceManager.getRecoveryXAResources();
354            } catch (SystemException e) {
355                recoveryLog.error(e);
356                recoveryErrors.add(e);
357                return;
358            }
359            if (namedXAResource != null) {
360                try {
361                    recovery.recoverResourceManager(namedXAResource);
362                } catch (XAException e) {
363                    recoveryLog.error(e);
364                    recoveryErrors.add(e);
365                } finally {
366                    resourceManager.returnResource(namedXAResource);
367                }
368            }
369        }
370    
371    
372        public Map getExternalXids() {
373            return new HashMap(recovery.getExternalXids());
374        }
375    
376        public void addTransactionAssociationListener(TransactionManagerMonitor listener) {
377            transactionAssociationListeners.addIfAbsent(listener);
378        }
379    
380        public void removeTransactionAssociationListener(TransactionManagerMonitor listener) {
381            transactionAssociationListeners.remove(listener);
382        }
383    
384        protected void fireThreadAssociated(Transaction tx) {
385            for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
386                TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
387                try {
388                    listener.threadAssociated(tx);
389                } catch (Exception e) {
390                    log.warn("Error calling transaction association listener", e);
391                }
392            }
393        }
394    
395        protected void fireThreadUnassociated(Transaction tx) {
396            for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
397                TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
398                try {
399                    listener.threadUnassociated(tx);
400                } catch (Exception e) {
401                    log.warn("Error calling transaction association listener", e);
402                }
403            }
404        }
405    }