View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.service.impl;
19  
20  import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.codehaus.activemq.broker.Broker;
24  import org.codehaus.activemq.broker.BrokerClient;
25  import org.codehaus.activemq.message.ActiveMQXid;
26  import org.codehaus.activemq.service.Transaction;
27  import org.codehaus.activemq.service.TransactionManager;
28  import org.codehaus.activemq.store.PreparedTransactionStore;
29  import org.codehaus.activemq.util.JMSExceptionHelper;
30  
31  import javax.jms.JMSException;
32  import javax.transaction.xa.XAException;
33  import java.util.ArrayList;
34  import java.util.List;
35  import java.util.Map;
36  
37  /***
38   * @version $Revision: 1.5 $
39   */
40  public class TransactionManagerImpl implements TransactionManager {
41      private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);
42  
43      // the broker on which transactions operate
44      private Broker broker;
45      // The prepared XA transactions.
46      private PreparedTransactionStore preparedTransactions;
47      // Maps clients to the txids that they created.
48      private Map activeClients = new ConcurrentHashMap();
49      // Maps txids to ActiveMQTransactions
50      private Map localTxs = new ConcurrentHashMap();
51      // Maps txids to ActiveMQTransactions
52      private Map xaTxs = new ConcurrentHashMap();
53      // Keeps track of the current transaction
54      private final ThreadLocal contextTx = new ThreadLocal();
55  
56      public TransactionManagerImpl(Broker broker, PreparedTransactionStore preparedTransactions) {
57          this.preparedTransactions = preparedTransactions;
58          this.broker = broker;
59      }
60  
61      /***
62       * @see org.codehaus.activemq.service.TransactionManager#createLocalTransaction(org.codehaus.activemq.broker.BrokerClient, String)
63       */
64      public Transaction createLocalTransaction(final BrokerClient client, final String txid) throws JMSException {
65          AbstractTransaction t = new LocalTransactionCommand(broker, localTxs, txid);
66          localTxs.put(txid, t);
67          return t;
68      }
69  
70      /***
71       * @see org.codehaus.activemq.service.TransactionManager#createXATransaction(org.codehaus.activemq.broker.BrokerClient, org.codehaus.activemq.message.ActiveMQXid)
72       */
73      public Transaction createXATransaction(final BrokerClient client, final ActiveMQXid xid) throws XAException {
74          AbstractTransaction t = new XATransactionCommand(broker, xid, xaTxs, preparedTransactions);
75          xaTxs.put(xid, t);
76          return t;
77      }
78  
79      /***
80       * @see org.codehaus.activemq.service.TransactionManager#getLocalTransaction(String)
81       */
82      public Transaction getLocalTransaction(String txid) throws JMSException {
83          Transaction tx = (Transaction) localTxs.get(txid);
84          if (tx == null) {
85              throw new JMSException("Transaction '" + txid
86                      + "' has not been started.");
87          }
88          return tx;
89      }
90  
91      /***
92       * @see org.codehaus.activemq.service.TransactionManager#getXATransaction(org.codehaus.activemq.message.ActiveMQXid)
93       */
94      public Transaction getXATransaction(ActiveMQXid xid) throws XAException {
95          Transaction tx = (Transaction) xaTxs.get(xid);
96          if (tx == null) {
97              XAException e = new XAException("Transaction '" + xid + "' has not been started.");
98              e.errorCode = XAException.XAER_NOTA;
99              throw e;
100         }
101         return tx;
102     }
103 
104     /***
105      * @see org.codehaus.activemq.service.TransactionManager#getPreparedXATransactions()
106      */
107     public ActiveMQXid[] getPreparedXATransactions() throws XAException {
108         return preparedTransactions.getXids();
109     }
110 
111     /***
112      * @see org.codehaus.activemq.service.TransactionManager#setContexTransaction(org.codehaus.activemq.service.Transaction)
113      */
114     public void setContexTransaction(Transaction tx) {
115         contextTx.set(tx);
116     }
117 
118     /***
119      * @see org.codehaus.activemq.service.TransactionManager#getContexTransaction()
120      */
121     public Transaction getContexTransaction() {
122         return (Transaction) contextTx.get();
123     }
124 
125     /***
126      * @see org.codehaus.activemq.service.TransactionManager#cleanUpClient(org.codehaus.activemq.broker.BrokerClient)
127      */
128     public void cleanUpClient(BrokerClient client) throws JMSException {
129         // HRC: I don't think we need to keep track of the client's open transactions here...
130         // It seems like BrokerClientImpl.close() allready cleans up open transactions.
131         //
132         List list = (List) activeClients.remove(client);
133         if (list != null) {
134             for (int i = 0; i < list.size(); i++) {
135                 try {
136                     Object o = list.get(i);
137                     if (o instanceof String) {
138                         Transaction t = this.getLocalTransaction((String) o);
139                         t.rollback();
140                     }
141                     else {
142                         Transaction t = this.getXATransaction((ActiveMQXid) o);
143                         t.rollback();
144                     }
145                 }
146                 catch (Exception e) {
147                     log.warn("ERROR Rolling back disconnected client's transactions: ", e);
148                 }
149             }
150             list.clear();
151         }
152     }
153 
154     public void loadTransaction(ActiveMQXid xid, Transaction transaction) throws XAException {
155         // first lets associate any transient data structurs with the
156         // transaction which has recently been loaded from disk
157         if (transaction instanceof XATransactionCommand) {
158             XATransactionCommand xaTransaction = (XATransactionCommand) transaction;
159             xaTransaction.initialise(xaTxs, preparedTransactions);
160         }
161         transaction.setBroker(broker);
162 
163         xaTxs.put(xid, transaction);
164     }
165 
166     public void start() throws JMSException {
167         preparedTransactions.start();
168         try {
169             preparedTransactions.loadPreparedTransactions(this);
170         }
171         catch (XAException e) {
172             throw JMSExceptionHelper.newJMSException("Failed to recover: " + e, e);
173         }
174     }
175 
176     public void stop() throws JMSException {
177         preparedTransactions.stop();
178     }
179 
180 
181 
182     // Implementation methods
183     //-------------------------------------------------------------------------
184 
185     private void addActiveTransaction(BrokerClient client, Object transactionId) {
186         List list = (List) activeClients.get(client);
187         if (list == null) {
188             list = new ArrayList();
189             activeClients.put(client, list);
190         }
191         list.add(transactionId);
192     }
193 
194     private void removeActiveTransaction(BrokerClient client, Object transactionId) {
195         List list = (List) activeClients.get(client);
196         if (list != null) {
197             list.remove(transactionId);
198         }
199     }
200 
201 
202 }