View Javadoc

1   /***
2    *
3    * Copyright 2004 Protique Ltd
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.codehaus.activemq;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.message.ActiveMQXid;
23  import org.codehaus.activemq.message.IntResponseReceipt;
24  import org.codehaus.activemq.message.ResponseReceipt;
25  import org.codehaus.activemq.message.XATransactionInfo;
26  
27  import javax.jms.*;
28  import javax.transaction.xa.XAException;
29  import javax.transaction.xa.XAResource;
30  import javax.transaction.xa.Xid;
31  
32  /***
33   * The XASession interface extends the capability of Session by adding access
34   * to a JMS provider's support for the  Java Transaction API (JTA) (optional).
35   * This support takes the form of a javax.transaction.xa.XAResource object.
36   * The functionality of this object closely resembles that defined by the
37   * standard X/Open XA Resource interface.
38   * <p/>
39   * An application server controls the transactional assignment of an XASession
40   * by obtaining its XAResource. It uses the XAResource to assign the session
41   * to a transaction, prepare and commit work on the transaction, and so on.
42   * <p/>
43   * An XAResource provides some fairly sophisticated facilities for
44   * interleaving work on multiple transactions, recovering a list of
45   * transactions in progress, and so on. A JTA aware JMS provider must fully
46   * implement this functionality. This could be done by using the services of a
47   * database that supports XA, or a JMS provider may choose to implement this
48   * functionality from scratch.
49   * <p/>
50   * A client of the application server is given what it thinks is a regular
51   * JMS Session. Behind the scenes, the application server controls the
52   * transaction management of the underlying XASession.
53   * <p/>
54   * The XASession interface is optional. JMS providers are not required to
55   * support this interface. This interface is for use by JMS providers to
56   * support transactional environments. Client programs are strongly encouraged
57   * to use the transactional support  available in their environment, rather
58   * than use these XA  interfaces directly.
59   *
60   * @version $Revision: 1.11 $
61   * @see javax.jms.Session
62   * @see javax.jms.QueueSession
63   * @see javax.jms.TopicSession
64   * @see javax.jms.XASession
65   */
66  public class ActiveMQXASession extends ActiveMQSession implements XASession, XAQueueSession, XATopicSession, XAResource {
67      private static final Log log = LogFactory.getLog(ActiveMQXASession.class);
68      private Xid associatedXid;
69      private ActiveMQXid activeXid;
70  
71      public ActiveMQXASession(ActiveMQXAConnection theConnection) throws JMSException {
72          super(theConnection, Session.SESSION_TRANSACTED);
73      }
74  
75      public boolean getTransacted() throws JMSException {
76          return true;
77      }
78  
79      public void rollback() throws JMSException {
80          throw new TransactionInProgressException("Cannot rollback() inside an XASession");
81      }
82  
83      public void commit() throws JMSException {
84          throw new TransactionInProgressException("Cannot commit() inside an XASession");
85      }
86  
87      public Session getSession() throws JMSException {
88          return this;
89      }
90  
91      public XAResource getXAResource() {
92          return this;
93      }
94  
95      public QueueSession getQueueSession() throws JMSException {
96          return this;
97      }
98  
99      public TopicSession getTopicSession() throws JMSException {
100         return this;
101     }
102 
103     /***
104      * Associates a transaction with the resource.
105      */
106     public void start(Xid xid, int flags) throws XAException {
107         checkClosedXA();
108 
109         // Are we allready associated?
110         if (associatedXid != null) {
111             throw new XAException(XAException.XAER_PROTO);
112         }
113 
114         if ((flags & TMJOIN) == TMJOIN) {
115             // TODO: verify that the server has seen the xid
116         }
117         if ((flags & TMJOIN) == TMRESUME) {
118             // TODO: verify that the xid was suspended.
119         }
120 
121         // associate
122         setXid(xid);
123 
124         XATransactionInfo info = new XATransactionInfo();
125         info.setId(this.packetIdGenerator.generateId());
126         info.setXid(activeXid);
127         info.setType(XATransactionInfo.START);
128 
129         try {
130             // TODO: we may want to wait for reply..
131             // server could fail this request
132             this.connection.syncSendPacket(info);
133         } catch (JMSException e) {
134             throw toXAException(e);
135         }
136     }
137 
138     public void end(Xid xid, int flags) throws XAException {
139         checkClosedXA();
140 
141         if ((flags & TMSUSPEND) == TMSUSPEND) {
142             // You can only suspend the associated xid.
143             if (associatedXid == null || !ActiveMQXid.equals(associatedXid,xid)) {
144                 throw new XAException(XAException.XAER_PROTO);
145             }
146 
147             //TODO: we may want to put the xid in a suspended list.
148             setXid(null);
149         } else if ((flags & TMFAIL) == TMFAIL) {
150             //TODO: We need to rollback the transaction??
151             setXid(null);
152         } else if ((flags & TMSUCCESS) == TMSUCCESS) {
153             //set to null if this is the current xid.
154             //otherwise this could be an asynchronous success call
155             if (ActiveMQXid.equals(associatedXid,xid)) {
156                 setXid(null);
157             }
158         } else {
159             throw new XAException(XAException.XAER_INVAL);
160         }
161 
162     }
163 
164     public int prepare(Xid xid) throws XAException {
165 
166         // We allow interleaving multiple transactions, so
167         // we don't limit prepare to the associated xid.
168         ActiveMQXid x;
169         //THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been called first
170         if (ActiveMQXid.equals(associatedXid,xid)) {
171 //            x = activeXid;
172             throw new XAException(XAException.XAER_PROTO);
173         } else {
174             //TODO cache the known xids so we don't keep recreating this one??
175             x = new ActiveMQXid(xid);
176         }
177 
178         XATransactionInfo info = new XATransactionInfo();
179         info.setId(this.packetIdGenerator.generateId());
180         info.setXid(x);
181         info.setType(XATransactionInfo.PRE_COMMIT);
182 
183         try {
184 // Find out if the server wants to commit or rollback.
185             IntResponseReceipt receipt = (IntResponseReceipt) this.connection.syncSendRequest(info);
186             return receipt.getResult();
187         } catch (JMSException e) {
188             throw toXAException(e);
189         }
190     }
191 
192     public void rollback(Xid xid) throws XAException {
193 
194         // We allow interleaving multiple transactions, so
195         // we don't limit rollback to the associated xid.
196         ActiveMQXid x;
197         if (ActiveMQXid.equals(associatedXid,xid)) {
198             //I think this can happen even without an end(xid) call.  Need to check spec.
199             x = activeXid;
200         } else {
201             x = new ActiveMQXid(xid);
202         }
203 
204         XATransactionInfo info = new XATransactionInfo();
205         info.setId(this.packetIdGenerator.generateId());
206         info.setXid(x);
207         info.setType(XATransactionInfo.ROLLBACK);
208 
209         try {
210             // Let the server know that the tx is rollback.
211             this.connection.syncSendPacket(info);
212         } catch (JMSException e) {
213             throw toXAException(e);
214         }
215     }
216 
217     // XAResource interface
218     public void commit(Xid xid, boolean onePhase) throws XAException {
219         checkClosedXA();
220 
221         // We allow interleaving multiple transactions, so
222         // we don't limit commit to the associated xid.
223         ActiveMQXid x;
224         if (ActiveMQXid.equals(associatedXid,xid)) {
225             //should never happen, end(xid,TMSUCCESS) must have been previously called
226 //            x = activeXid;
227             throw new XAException(XAException.XAER_PROTO);
228         } else {
229             x = new ActiveMQXid(xid);
230         }
231 
232         XATransactionInfo info = new XATransactionInfo();
233         info.setId(this.packetIdGenerator.generateId());
234         info.setXid(x);
235         info.setType(onePhase ? XATransactionInfo.COMMIT_ONE_PHASE : XATransactionInfo.COMMIT);
236 
237         try {
238             // Notify the server that the tx was commited back
239             this.connection.syncSendPacket(info);
240         } catch (JMSException e) {
241             throw toXAException(e);
242         }
243     }
244 
245 
246     public void forget(Xid xid) throws XAException {
247         checkClosedXA();
248 
249         // We allow interleaving multiple transactions, so
250         // we don't limit forget to the associated xid.
251         ActiveMQXid x;
252         if (ActiveMQXid.equals(associatedXid,xid)) {
253             //TODO determine if this can happen... I think not.
254             x = activeXid;
255         } else {
256             x = new ActiveMQXid(xid);
257         }
258 
259         XATransactionInfo info = new XATransactionInfo();
260         info.setId(this.packetIdGenerator.generateId());
261         info.setXid(x);
262         info.setType(XATransactionInfo.FORGET);
263 
264         try {
265             // Tell the server to forget the transaction.
266             this.connection.syncSendPacket(info);
267         } catch (JMSException e) {
268             throw toXAException(e);
269         }
270     }
271 
272     private String getResourceManagerId() {
273         return ((ActiveMQXAConnection) this.connection).getResourceManagerId();
274     }
275 
276     public boolean isSameRM(XAResource xaResource) throws XAException {
277         if (xaResource == null) {
278             return false;
279         }
280         if (!(xaResource instanceof ActiveMQXASession)) {
281             return false;
282         }
283         ActiveMQXASession xar = (ActiveMQXASession) xaResource;
284         return getResourceManagerId().equals(xar.getResourceManagerId());
285     }
286 
287 
288     public Xid[] recover(int flag) throws XAException {
289         checkClosedXA();
290 
291         XATransactionInfo info = new XATransactionInfo();
292         info.setId(this.packetIdGenerator.generateId());
293         info.setType(XATransactionInfo.XA_RECOVER);
294 
295         try {
296             ResponseReceipt receipt = (ResponseReceipt) this.connection.syncSendRequest(info);
297             return (ActiveMQXid[]) receipt.getResult();
298         } catch (JMSException e) {
299             throw toXAException(e);
300         }
301     }
302 
303 
304     public int getTransactionTimeout() throws XAException {
305         checkClosedXA();
306 
307         XATransactionInfo info = new XATransactionInfo();
308         info.setId(this.packetIdGenerator.generateId());
309         info.setType(XATransactionInfo.GET_TX_TIMEOUT);
310 
311         try {
312             // get the tx timeout that was set.
313             IntResponseReceipt receipt = (IntResponseReceipt) this.connection.syncSendRequest(info);
314             return receipt.getResult();
315         } catch (JMSException e) {
316             throw toXAException(e);
317         }
318     }
319 
320 
321     public boolean setTransactionTimeout(int seconds) throws XAException {
322         checkClosedXA();
323 
324         XATransactionInfo info = new XATransactionInfo();
325         info.setId(this.packetIdGenerator.generateId());
326         info.setType(XATransactionInfo.SET_TX_TIMEOUT);
327         info.setTransactionTimeout(seconds);
328 
329         try {
330             // Setup the new tx timeout
331             this.connection.asyncSendPacket(info);
332             return true;
333         } catch (JMSException e) {
334             throw toXAException(e);
335         }
336     }
337     
338     /***
339      * overide Session - which needs to rollback if transacted
340      */
341     public void close() throws JMSException {
342         if (!this.closed.get()) {
343             doClose();
344             closed.set(true);
345         }      
346     }
347 
348 
349     /***
350      * @throws XAException if the Session is closed
351      */
352     protected void checkClosedXA() throws XAException {
353         if (this.closed.get()) {
354             throw new XAException(XAException.XAER_RMFAIL);
355         }
356     }
357 
358     protected boolean isXaTransacted() {
359         return true;
360     }
361 
362     protected String getNextTransactionId() {
363         return super.currentTransactionId;
364     }
365 
366     /***
367      * This is called before transacted work is done by
368      * the session.  XA Work can only be done when this
369      * XA resource is associated with an Xid.
370      *
371      * @throws JMSException not associated with an Xid
372      */
373     protected void doStartTransaction() throws JMSException {
374         if (associatedXid == null) {
375             throw new JMSException("Session's XAResource has not been enlisted in a distributed transaction.");
376         }
377     }
378 
379 
380     private void setXid(Xid xid) {
381         if (xid != null) {
382 // associate
383             associatedXid = xid;
384             activeXid = new ActiveMQXid(xid);
385             super.currentTransactionId = activeXid.toLocalTransactionId();
386         } else {
387 // dis-associate
388             associatedXid = null;
389             activeXid = null;
390             super.currentTransactionId = null;
391         }
392     }
393 
394     /***
395      * Converts a JMSException from the server to an XAException.
396      * if the JMSException contained a linked XAException that is
397      * returned instead.
398      *
399      * @param e
400      * @return
401      */
402     private XAException toXAException(JMSException e) {
403         if (e.getCause() != null && e.getCause() instanceof XAException) {
404             XAException original = (XAException) e.getCause();
405             XAException xae = new XAException(original.getMessage());
406             xae.errorCode = original.errorCode;
407             xae.initCause(original);
408             return xae;
409         }
410 
411         XAException xae = new XAException(e.getMessage());
412         xae.errorCode = XAException.XAER_RMFAIL;
413         xae.initCause(e);
414         return xae;
415     }
416 
417 }