1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.message.ActiveMQXid;
23 import org.codehaus.activemq.message.IntResponseReceipt;
24 import org.codehaus.activemq.message.ResponseReceipt;
25 import org.codehaus.activemq.message.XATransactionInfo;
26
27 import javax.jms.*;
28 import javax.transaction.xa.XAException;
29 import javax.transaction.xa.XAResource;
30 import javax.transaction.xa.Xid;
31
32 /***
33 * The XASession interface extends the capability of Session by adding access
34 * to a JMS provider's support for the Java Transaction API (JTA) (optional).
35 * This support takes the form of a javax.transaction.xa.XAResource object.
36 * The functionality of this object closely resembles that defined by the
37 * standard X/Open XA Resource interface.
38 * <p/>
39 * An application server controls the transactional assignment of an XASession
40 * by obtaining its XAResource. It uses the XAResource to assign the session
41 * to a transaction, prepare and commit work on the transaction, and so on.
42 * <p/>
43 * An XAResource provides some fairly sophisticated facilities for
44 * interleaving work on multiple transactions, recovering a list of
45 * transactions in progress, and so on. A JTA aware JMS provider must fully
46 * implement this functionality. This could be done by using the services of a
47 * database that supports XA, or a JMS provider may choose to implement this
48 * functionality from scratch.
49 * <p/>
50 * A client of the application server is given what it thinks is a regular
51 * JMS Session. Behind the scenes, the application server controls the
52 * transaction management of the underlying XASession.
53 * <p/>
54 * The XASession interface is optional. JMS providers are not required to
55 * support this interface. This interface is for use by JMS providers to
56 * support transactional environments. Client programs are strongly encouraged
57 * to use the transactional support available in their environment, rather
58 * than use these XA interfaces directly.
59 *
60 * @version $Revision: 1.11 $
61 * @see javax.jms.Session
62 * @see javax.jms.QueueSession
63 * @see javax.jms.TopicSession
64 * @see javax.jms.XASession
65 */
66 public class ActiveMQXASession extends ActiveMQSession implements XASession, XAQueueSession, XATopicSession, XAResource {
67 private static final Log log = LogFactory.getLog(ActiveMQXASession.class);
68 private Xid associatedXid;
69 private ActiveMQXid activeXid;
70
71 public ActiveMQXASession(ActiveMQXAConnection theConnection) throws JMSException {
72 super(theConnection, Session.SESSION_TRANSACTED);
73 }
74
75 public boolean getTransacted() throws JMSException {
76 return true;
77 }
78
79 public void rollback() throws JMSException {
80 throw new TransactionInProgressException("Cannot rollback() inside an XASession");
81 }
82
83 public void commit() throws JMSException {
84 throw new TransactionInProgressException("Cannot commit() inside an XASession");
85 }
86
87 public Session getSession() throws JMSException {
88 return this;
89 }
90
91 public XAResource getXAResource() {
92 return this;
93 }
94
95 public QueueSession getQueueSession() throws JMSException {
96 return this;
97 }
98
99 public TopicSession getTopicSession() throws JMSException {
100 return this;
101 }
102
103 /***
104 * Associates a transaction with the resource.
105 */
106 public void start(Xid xid, int flags) throws XAException {
107 checkClosedXA();
108
109
110 if (associatedXid != null) {
111 throw new XAException(XAException.XAER_PROTO);
112 }
113
114 if ((flags & TMJOIN) == TMJOIN) {
115
116 }
117 if ((flags & TMJOIN) == TMRESUME) {
118
119 }
120
121
122 setXid(xid);
123
124 XATransactionInfo info = new XATransactionInfo();
125 info.setId(this.packetIdGenerator.generateId());
126 info.setXid(activeXid);
127 info.setType(XATransactionInfo.START);
128
129 try {
130
131
132 this.connection.syncSendPacket(info);
133 } catch (JMSException e) {
134 throw toXAException(e);
135 }
136 }
137
138 public void end(Xid xid, int flags) throws XAException {
139 checkClosedXA();
140
141 if ((flags & TMSUSPEND) == TMSUSPEND) {
142
143 if (associatedXid == null || !ActiveMQXid.equals(associatedXid,xid)) {
144 throw new XAException(XAException.XAER_PROTO);
145 }
146
147
148 setXid(null);
149 } else if ((flags & TMFAIL) == TMFAIL) {
150
151 setXid(null);
152 } else if ((flags & TMSUCCESS) == TMSUCCESS) {
153
154
155 if (ActiveMQXid.equals(associatedXid,xid)) {
156 setXid(null);
157 }
158 } else {
159 throw new XAException(XAException.XAER_INVAL);
160 }
161
162 }
163
164 public int prepare(Xid xid) throws XAException {
165
166
167
168 ActiveMQXid x;
169
170 if (ActiveMQXid.equals(associatedXid,xid)) {
171
172 throw new XAException(XAException.XAER_PROTO);
173 } else {
174
175 x = new ActiveMQXid(xid);
176 }
177
178 XATransactionInfo info = new XATransactionInfo();
179 info.setId(this.packetIdGenerator.generateId());
180 info.setXid(x);
181 info.setType(XATransactionInfo.PRE_COMMIT);
182
183 try {
184
185 IntResponseReceipt receipt = (IntResponseReceipt) this.connection.syncSendRequest(info);
186 return receipt.getResult();
187 } catch (JMSException e) {
188 throw toXAException(e);
189 }
190 }
191
192 public void rollback(Xid xid) throws XAException {
193
194
195
196 ActiveMQXid x;
197 if (ActiveMQXid.equals(associatedXid,xid)) {
198
199 x = activeXid;
200 } else {
201 x = new ActiveMQXid(xid);
202 }
203
204 XATransactionInfo info = new XATransactionInfo();
205 info.setId(this.packetIdGenerator.generateId());
206 info.setXid(x);
207 info.setType(XATransactionInfo.ROLLBACK);
208
209 try {
210
211 this.connection.syncSendPacket(info);
212 } catch (JMSException e) {
213 throw toXAException(e);
214 }
215 }
216
217
218 public void commit(Xid xid, boolean onePhase) throws XAException {
219 checkClosedXA();
220
221
222
223 ActiveMQXid x;
224 if (ActiveMQXid.equals(associatedXid,xid)) {
225
226
227 throw new XAException(XAException.XAER_PROTO);
228 } else {
229 x = new ActiveMQXid(xid);
230 }
231
232 XATransactionInfo info = new XATransactionInfo();
233 info.setId(this.packetIdGenerator.generateId());
234 info.setXid(x);
235 info.setType(onePhase ? XATransactionInfo.COMMIT_ONE_PHASE : XATransactionInfo.COMMIT);
236
237 try {
238
239 this.connection.syncSendPacket(info);
240 } catch (JMSException e) {
241 throw toXAException(e);
242 }
243 }
244
245
246 public void forget(Xid xid) throws XAException {
247 checkClosedXA();
248
249
250
251 ActiveMQXid x;
252 if (ActiveMQXid.equals(associatedXid,xid)) {
253
254 x = activeXid;
255 } else {
256 x = new ActiveMQXid(xid);
257 }
258
259 XATransactionInfo info = new XATransactionInfo();
260 info.setId(this.packetIdGenerator.generateId());
261 info.setXid(x);
262 info.setType(XATransactionInfo.FORGET);
263
264 try {
265
266 this.connection.syncSendPacket(info);
267 } catch (JMSException e) {
268 throw toXAException(e);
269 }
270 }
271
272 private String getResourceManagerId() {
273 return ((ActiveMQXAConnection) this.connection).getResourceManagerId();
274 }
275
276 public boolean isSameRM(XAResource xaResource) throws XAException {
277 if (xaResource == null) {
278 return false;
279 }
280 if (!(xaResource instanceof ActiveMQXASession)) {
281 return false;
282 }
283 ActiveMQXASession xar = (ActiveMQXASession) xaResource;
284 return getResourceManagerId().equals(xar.getResourceManagerId());
285 }
286
287
288 public Xid[] recover(int flag) throws XAException {
289 checkClosedXA();
290
291 XATransactionInfo info = new XATransactionInfo();
292 info.setId(this.packetIdGenerator.generateId());
293 info.setType(XATransactionInfo.XA_RECOVER);
294
295 try {
296 ResponseReceipt receipt = (ResponseReceipt) this.connection.syncSendRequest(info);
297 return (ActiveMQXid[]) receipt.getResult();
298 } catch (JMSException e) {
299 throw toXAException(e);
300 }
301 }
302
303
304 public int getTransactionTimeout() throws XAException {
305 checkClosedXA();
306
307 XATransactionInfo info = new XATransactionInfo();
308 info.setId(this.packetIdGenerator.generateId());
309 info.setType(XATransactionInfo.GET_TX_TIMEOUT);
310
311 try {
312
313 IntResponseReceipt receipt = (IntResponseReceipt) this.connection.syncSendRequest(info);
314 return receipt.getResult();
315 } catch (JMSException e) {
316 throw toXAException(e);
317 }
318 }
319
320
321 public boolean setTransactionTimeout(int seconds) throws XAException {
322 checkClosedXA();
323
324 XATransactionInfo info = new XATransactionInfo();
325 info.setId(this.packetIdGenerator.generateId());
326 info.setType(XATransactionInfo.SET_TX_TIMEOUT);
327 info.setTransactionTimeout(seconds);
328
329 try {
330
331 this.connection.asyncSendPacket(info);
332 return true;
333 } catch (JMSException e) {
334 throw toXAException(e);
335 }
336 }
337
338 /***
339 * overide Session - which needs to rollback if transacted
340 */
341 public void close() throws JMSException {
342 if (!this.closed.get()) {
343 doClose();
344 closed.set(true);
345 }
346 }
347
348
349 /***
350 * @throws XAException if the Session is closed
351 */
352 protected void checkClosedXA() throws XAException {
353 if (this.closed.get()) {
354 throw new XAException(XAException.XAER_RMFAIL);
355 }
356 }
357
358 protected boolean isXaTransacted() {
359 return true;
360 }
361
362 protected String getNextTransactionId() {
363 return super.currentTransactionId;
364 }
365
366 /***
367 * This is called before transacted work is done by
368 * the session. XA Work can only be done when this
369 * XA resource is associated with an Xid.
370 *
371 * @throws JMSException not associated with an Xid
372 */
373 protected void doStartTransaction() throws JMSException {
374 if (associatedXid == null) {
375 throw new JMSException("Session's XAResource has not been enlisted in a distributed transaction.");
376 }
377 }
378
379
380 private void setXid(Xid xid) {
381 if (xid != null) {
382
383 associatedXid = xid;
384 activeXid = new ActiveMQXid(xid);
385 super.currentTransactionId = activeXid.toLocalTransactionId();
386 } else {
387
388 associatedXid = null;
389 activeXid = null;
390 super.currentTransactionId = null;
391 }
392 }
393
394 /***
395 * Converts a JMSException from the server to an XAException.
396 * if the JMSException contained a linked XAException that is
397 * returned instead.
398 *
399 * @param e
400 * @return
401 */
402 private XAException toXAException(JMSException e) {
403 if (e.getCause() != null && e.getCause() instanceof XAException) {
404 XAException original = (XAException) e.getCause();
405 XAException xae = new XAException(original.getMessage());
406 xae.errorCode = original.errorCode;
407 xae.initCause(original);
408 return xae;
409 }
410
411 XAException xae = new XAException(e.getMessage());
412 xae.errorCode = XAException.XAER_RMFAIL;
413 xae.initCause(e);
414 return xae;
415 }
416
417 }