001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.geronimo.transaction.manager;
019
020 import java.util.ArrayList;
021 import java.util.Collection;
022 import java.util.HashMap;
023 import java.util.Iterator;
024 import java.util.List;
025 import java.util.Map;
026
027 import javax.transaction.HeuristicMixedException;
028 import javax.transaction.HeuristicRollbackException;
029 import javax.transaction.InvalidTransactionException;
030 import javax.transaction.NotSupportedException;
031 import javax.transaction.RollbackException;
032 import javax.transaction.Status;
033 import javax.transaction.Synchronization;
034 import javax.transaction.SystemException;
035 import javax.transaction.Transaction;
036 import javax.transaction.TransactionManager;
037 import javax.transaction.UserTransaction;
038 import javax.transaction.xa.XAException;
039 import javax.transaction.xa.Xid;
040
041 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
042 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
043 import org.apache.commons.logging.Log;
044 import org.apache.commons.logging.LogFactory;
045 import org.apache.geronimo.transaction.log.UnrecoverableLog;
046
047 /**
048 * Simple implementation of a transaction manager.
049 *
050 * @version $Rev: 487175 $ $Date: 2006-12-14 03:10:31 -0800 (Thu, 14 Dec 2006) $
051 */
052 public class TransactionManagerImpl implements TransactionManager, UserTransaction, XidImporter, MonitorableTransactionManager {
053 private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);
054 protected static final int DEFAULT_TIMEOUT = 600;
055 protected static final byte[] DEFAULT_TM_ID = new byte[] {71,84,77,73,68};
056
057 final TransactionLog transactionLog;
058 final XidFactory xidFactory;
059 private final int defaultTransactionTimeoutMilliseconds;
060 private final ThreadLocal transactionTimeoutMilliseconds = new ThreadLocal();
061 private final ThreadLocal threadTx = new ThreadLocal();
062 private final ConcurrentHashMap associatedTransactions = new ConcurrentHashMap();
063 private static final Log recoveryLog = LogFactory.getLog("RecoveryController");
064 final Recovery recovery;
065 final Collection resourceManagers;
066 private final CopyOnWriteArrayList transactionAssociationListeners = new CopyOnWriteArrayList();
067 private List recoveryErrors = new ArrayList();
068
069 public TransactionManagerImpl() throws XAException {
070 this(DEFAULT_TIMEOUT,
071 null,
072 null,
073 null);
074 }
075
076 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds) throws XAException {
077 this(defaultTransactionTimeoutSeconds,
078 null,
079 null,
080 null);
081 }
082
083 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, TransactionLog transactionLog) throws XAException {
084 this(defaultTransactionTimeoutSeconds,
085 null,
086 transactionLog,
087 null);
088 }
089
090 public TransactionManagerImpl(int defaultTransactionTimeoutSeconds, XidFactory xidFactory, TransactionLog transactionLog, Collection resourceManagers) throws XAException {
091 if (defaultTransactionTimeoutSeconds <= 0) {
092 throw new IllegalArgumentException("defaultTransactionTimeoutSeconds must be positive: attempted value: " + defaultTransactionTimeoutSeconds);
093 }
094
095 this.defaultTransactionTimeoutMilliseconds = defaultTransactionTimeoutSeconds * 1000;
096
097 if (transactionLog == null) {
098 this.transactionLog = new UnrecoverableLog();
099 } else {
100 this.transactionLog = transactionLog;
101 }
102
103 if (xidFactory != null) {
104 this.xidFactory = xidFactory;
105 } else {
106 this.xidFactory = new XidFactoryImpl(DEFAULT_TM_ID);
107 }
108
109 this.resourceManagers = resourceManagers;
110 recovery = new RecoveryImpl(this.transactionLog, this.xidFactory);
111
112 if (resourceManagers != null) {
113 recovery.recoverLog();
114 List copy = watchResourceManagers(resourceManagers);
115 for (Iterator iterator = copy.iterator(); iterator.hasNext();) {
116 ResourceManager resourceManager = (ResourceManager) iterator.next();
117 recoverResourceManager(resourceManager);
118 }
119 }
120 }
121
122 protected List watchResourceManagers(Collection resourceManagers) {
123 return new ArrayList(resourceManagers);
124 }
125
126 public Transaction getTransaction() {
127 return (Transaction) threadTx.get();
128 }
129
130 private void associate(TransactionImpl tx) throws InvalidTransactionException {
131 if (tx == null) throw new NullPointerException("tx is null");
132
133 Object existingAssociation = associatedTransactions.putIfAbsent(tx, Thread.currentThread());
134 if (existingAssociation != null) {
135 throw new InvalidTransactionException("Specified transaction is already associated with another thread");
136 }
137 threadTx.set(tx);
138 fireThreadAssociated(tx);
139 }
140
141 private void unassociate() {
142 Transaction tx = getTransaction();
143 if (tx != null) {
144 associatedTransactions.remove(tx);
145 threadTx.set(null);
146 fireThreadUnassociated(tx);
147 }
148 }
149
150 public void setTransactionTimeout(int seconds) throws SystemException {
151 if (seconds < 0) {
152 throw new SystemException("transaction timeout must be positive or 0 to reset to default");
153 }
154 if (seconds == 0) {
155 transactionTimeoutMilliseconds.set(null);
156 } else {
157 transactionTimeoutMilliseconds.set(new Long(seconds * 1000));
158 }
159 }
160
161 public int getStatus() throws SystemException {
162 Transaction tx = getTransaction();
163 return (tx != null) ? tx.getStatus() : Status.STATUS_NO_TRANSACTION;
164 }
165
166 public void begin() throws NotSupportedException, SystemException {
167 begin(getTransactionTimeoutMilliseconds(0L));
168 }
169
170 public Transaction begin(long transactionTimeoutMilliseconds) throws NotSupportedException, SystemException {
171 if (getStatus() != Status.STATUS_NO_TRANSACTION) {
172 throw new NotSupportedException("Nested Transactions are not supported");
173 }
174 TransactionImpl tx = new TransactionImpl(xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
175 // timeoutTimer.schedule(tx, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
176 try {
177 associate(tx);
178 } catch (InvalidTransactionException e) {
179 // should not be possible since we just created that transaction and no one has a reference yet
180 throw new SystemException("Internal error: associate threw an InvalidTransactionException for a newly created transaction");
181 }
182 // Todo: Verify if this is correct thing to do. Use default timeout for next transaction.
183 this.transactionTimeoutMilliseconds.set(null);
184 return tx;
185 }
186
187 public Transaction suspend() throws SystemException {
188 Transaction tx = getTransaction();
189 if (tx != null) {
190 unassociate();
191 }
192 return tx;
193 }
194
195 public void resume(Transaction tx) throws IllegalStateException, InvalidTransactionException, SystemException {
196 if (getTransaction() != null) {
197 throw new IllegalStateException("Thread already associated with another transaction");
198 }
199 if (!(tx instanceof TransactionImpl)) {
200 throw new InvalidTransactionException("Cannot resume foreign transaction: " + tx);
201 }
202 associate((TransactionImpl) tx);
203 }
204
205 public Object getResource(Object key) {
206 TransactionImpl tx = getActiveTransactionImpl();
207 return tx.getResource(key);
208 }
209
210 private TransactionImpl getActiveTransactionImpl() {
211 TransactionImpl tx = (TransactionImpl)threadTx.get();
212 if (tx == null) {
213 throw new IllegalStateException("No tx on thread");
214 }
215 if (tx.getStatus() != Status.STATUS_ACTIVE) {
216 throw new IllegalStateException("Transaction " + tx + " is not active");
217 }
218 return tx;
219 }
220
221 public boolean getRollbackOnly() {
222 TransactionImpl tx = getActiveTransactionImpl();
223 return tx.getRollbackOnly();
224 }
225
226 public Object getTransactionKey() {
227 TransactionImpl tx = getActiveTransactionImpl();
228 return tx.getTransactionKey();
229 }
230
231 public int getTransactionStatus() {
232 TransactionImpl tx = getActiveTransactionImpl();
233 return tx.getTransactionStatus();
234 }
235
236 public void putResource(Object key, Object value) {
237 TransactionImpl tx = getActiveTransactionImpl();
238 tx.putResource(key, value);
239 }
240
241 /**
242 * jta 1.1 method so the jpa implementations can be told to flush their caches.
243 * @param synchronization
244 */
245 public void registerInterposedSynchronization(Synchronization synchronization) {
246 TransactionImpl tx = getActiveTransactionImpl();
247 tx.registerInterposedSynchronization(synchronization);
248 }
249
250 public void setRollbackOnly() throws IllegalStateException {
251 TransactionImpl tx = (TransactionImpl) threadTx.get();
252 if (tx == null) {
253 throw new IllegalStateException("No transaction associated with current thread");
254 }
255 tx.setRollbackOnly();
256 }
257
258 public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
259 Transaction tx = getTransaction();
260 if (tx == null) {
261 throw new IllegalStateException("No transaction associated with current thread");
262 }
263 try {
264 tx.commit();
265 } finally {
266 unassociate();
267 }
268 }
269
270 public void rollback() throws IllegalStateException, SecurityException, SystemException {
271 Transaction tx = getTransaction();
272 if (tx == null) {
273 throw new IllegalStateException("No transaction associated with current thread");
274 }
275 try {
276 tx.rollback();
277 } finally {
278 unassociate();
279 }
280 }
281
282 //XidImporter implementation
283 public Transaction importXid(Xid xid, long transactionTimeoutMilliseconds) throws XAException, SystemException {
284 if (transactionTimeoutMilliseconds < 0) {
285 throw new SystemException("transaction timeout must be positive or 0 to reset to default");
286 }
287 TransactionImpl tx = new TransactionImpl(xid, xidFactory, transactionLog, getTransactionTimeoutMilliseconds(transactionTimeoutMilliseconds));
288 return tx;
289 }
290
291 public void commit(Transaction tx, boolean onePhase) throws XAException {
292 if (onePhase) {
293 try {
294 tx.commit();
295 } catch (HeuristicMixedException e) {
296 throw (XAException) new XAException().initCause(e);
297 } catch (HeuristicRollbackException e) {
298 throw (XAException) new XAException().initCause(e);
299 } catch (RollbackException e) {
300 throw (XAException) new XAException().initCause(e);
301 } catch (SecurityException e) {
302 throw (XAException) new XAException().initCause(e);
303 } catch (SystemException e) {
304 throw (XAException) new XAException().initCause(e);
305 }
306 } else {
307 try {
308 ((TransactionImpl) tx).preparedCommit();
309 } catch (SystemException e) {
310 throw (XAException) new XAException().initCause(e);
311 }
312 }
313 }
314
315 public void forget(Transaction tx) throws XAException {
316 //TODO implement this!
317 }
318
319 public int prepare(Transaction tx) throws XAException {
320 try {
321 return ((TransactionImpl) tx).prepare();
322 } catch (SystemException e) {
323 throw (XAException) new XAException().initCause(e);
324 } catch (RollbackException e) {
325 throw (XAException) new XAException().initCause(e);
326 }
327 }
328
329 public void rollback(Transaction tx) throws XAException {
330 try {
331 tx.rollback();
332 } catch (IllegalStateException e) {
333 throw (XAException) new XAException().initCause(e);
334 } catch (SystemException e) {
335 throw (XAException) new XAException().initCause(e);
336 }
337 }
338
339 long getTransactionTimeoutMilliseconds(long transactionTimeoutMilliseconds) {
340 if (transactionTimeoutMilliseconds != 0) {
341 return transactionTimeoutMilliseconds;
342 }
343 Long timeout = (Long) this.transactionTimeoutMilliseconds.get();
344 if (timeout != null) {
345 return timeout.longValue();
346 }
347 return defaultTransactionTimeoutMilliseconds;
348 }
349
350 protected void recoverResourceManager(ResourceManager resourceManager) {
351 NamedXAResource namedXAResource;
352 try {
353 namedXAResource = resourceManager.getRecoveryXAResources();
354 } catch (SystemException e) {
355 recoveryLog.error(e);
356 recoveryErrors.add(e);
357 return;
358 }
359 if (namedXAResource != null) {
360 try {
361 recovery.recoverResourceManager(namedXAResource);
362 } catch (XAException e) {
363 recoveryLog.error(e);
364 recoveryErrors.add(e);
365 } finally {
366 resourceManager.returnResource(namedXAResource);
367 }
368 }
369 }
370
371
372 public Map getExternalXids() {
373 return new HashMap(recovery.getExternalXids());
374 }
375
376 public void addTransactionAssociationListener(TransactionManagerMonitor listener) {
377 transactionAssociationListeners.addIfAbsent(listener);
378 }
379
380 public void removeTransactionAssociationListener(TransactionManagerMonitor listener) {
381 transactionAssociationListeners.remove(listener);
382 }
383
384 protected void fireThreadAssociated(Transaction tx) {
385 for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
386 TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
387 try {
388 listener.threadAssociated(tx);
389 } catch (Exception e) {
390 log.warn("Error calling transaction association listener", e);
391 }
392 }
393 }
394
395 protected void fireThreadUnassociated(Transaction tx) {
396 for (Iterator iterator = transactionAssociationListeners.iterator(); iterator.hasNext();) {
397 TransactionManagerMonitor listener = (TransactionManagerMonitor) iterator.next();
398 try {
399 listener.threadUnassociated(tx);
400 } catch (Exception e) {
401 log.warn("Error calling transaction association listener", e);
402 }
403 }
404 }
405 }