1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq;
19  
20  import org.codehaus.activemq.message.XidStub;
21  import org.codehaus.activemq.util.IndentPrinter;
22  
23  import javax.jms.Connection;
24  import javax.jms.ConnectionFactory;
25  import javax.jms.Destination;
26  import javax.jms.JMSException;
27  import javax.jms.Message;
28  import javax.jms.MessageConsumer;
29  import javax.jms.MessageProducer;
30  import javax.jms.Session;
31  import javax.jms.XASession;
32  import javax.transaction.xa.XAResource;
33  import java.util.ArrayList;
34  
35  /***
36   * @version $Revision: 1.7 $
37   */
38  public abstract class JmsXATransactionTestSupport extends TestSupport {
39  
40      protected ConnectionFactory connectionFactory;
41      protected Connection connection;
42      protected Session session;
43      protected MessageConsumer consumer;
44      protected MessageProducer producer;
45  
46      public JmsXATransactionTestSupport() {
47          super();
48      }
49  
50      public JmsXATransactionTestSupport(String name) {
51          super(name);
52      }
53  
54      public void testSendOutsideXATransaction() throws Exception {
55          try {
56              producer.send(session.createTextMessage("First Message"));
57              fail("Using an XA session outside of a transaction should throw an exception.");
58          }
59          catch (JMSException e) {
60              // normal
61          }
62      }
63  
64  
65      public void testSendRollback() throws Exception {
66  
67          XAResource resource = ((XASession) session).getXAResource();
68          Message[] outbound = new Message[]{
69              session.createTextMessage("First Message"),
70              session.createTextMessage("Second Message")
71          };
72  
73  
74          XidStub xid1 = new XidStub(new byte[]{1, 2, 3, 4, 5});
75          resource.start(xid1, XAResource.TMNOFLAGS);
76          producer.send(outbound[0]);
77          resource.end(xid1, XAResource.TMSUCCESS);
78          resource.commit(xid1, true);
79  
80          XidStub xid2 = new XidStub(new byte[]{2, 2, 3, 4, 5});
81          resource.start(xid2, XAResource.TMNOFLAGS);
82          producer.send(session.createTextMessage("I'm going to get rolled back."));
83          resource.end(xid2, XAResource.TMSUCCESS);
84          resource.rollback(xid2);
85  
86          XidStub xid3 = new XidStub(new byte[]{3, 2, 3, 4, 5});
87          resource.start(xid3, XAResource.TMNOFLAGS);
88          producer.send(outbound[1]);
89          resource.end(xid3, XAResource.TMSUCCESS);
90          if (resource.prepare(xid3) == XAResource.XA_OK) {
91              resource.commit(xid3, false);
92          }
93  
94          ArrayList messages = new ArrayList();
95          XidStub xid4 = new XidStub(new byte[]{4, 2, 3, 4, 5});
96          resource.start(xid4, XAResource.TMNOFLAGS);
97          messages.add(consumer.receive(1000));
98          messages.add(consumer.receive(1000));
99          resource.end(xid4, XAResource.TMSUCCESS);
100         resource.commit(xid4, true);
101 
102         Message inbound[] = new Message[messages.size()];
103         messages.toArray(inbound);
104 
105         assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
106     }
107 
108     public void testSendPrepareRecoverRollback() throws Exception {
109 
110         XAResource resource = ((XASession) session).getXAResource();
111         Message[] outbound = new Message[]{
112             session.createTextMessage("First Message"),
113             session.createTextMessage("Second Message")
114         };
115 
116         XidStub xid2 = new XidStub(new byte[]{1, 2, 3, 4, 6});
117         resource.start(xid2, XAResource.TMNOFLAGS);
118         producer.send(session.createTextMessage("I'm going to get rolled back."));
119         resource.end(xid2, XAResource.TMSUCCESS);
120         assertEquals(XAResource.XA_OK, resource.prepare(xid2));
121 
122 
123         XidStub xid3 = new XidStub(new byte[]{2, 2, 3, 4, 6});
124         resource.start(xid3, XAResource.TMNOFLAGS);
125         producer.send(outbound[1]);
126         resource.end(xid3, XAResource.TMSUCCESS);
127 
128         assertEquals(XAResource.XA_OK, resource.prepare(xid3));
129 
130         restart();
131 
132         resource = ((XASession) session).getXAResource();
133 
134         // lets commit a transaction
135         resource.commit(xid3, false);
136 
137         // lets try rollback a transaction
138         resource.rollback(xid2);
139 
140     }
141 
142 
143     abstract protected JmsResourceProvider getJmsResourceProvider();
144 
145     protected void setUp() throws Exception {
146         super.setUp();
147         openConnection();
148     }
149 
150     protected void restart() throws JMSException {
151         closeConnection();
152         openConnection();
153     }
154 
155     protected void openConnection() throws JMSException {
156         JmsResourceProvider p = getJmsResourceProvider();
157 
158         connectionFactory = p.createConnectionFactory();
159         connection = p.createConnection(connectionFactory);
160         System.out.println("Created connection: " + connection);
161         session = p.createSession(connection);
162         System.out.println("Created session: " + session);
163         Destination destination = p.createDestination(session, "FOO");
164         System.out.println("Created destination: " + destination + " of type: " + destination.getClass());
165         producer = p.createProducer(session, destination);
166         System.out.println("Created producer: " + producer);
167         consumer = p.createConsumer(session, destination);
168         System.out.println("Created consumer: " + consumer);
169         connection.start();
170     }
171 
172     protected void tearDown() throws Exception {
173         System.out.println("Test Done.  Stats");
174         ((ActiveMQConnectionFactory) connectionFactory).getFactoryStats().dump(new IndentPrinter());
175         closeConnection();
176     }
177 
178     private void closeConnection() throws JMSException {
179         System.out.println("Closing down connection");
180 
181         connection.stop();
182         connection.close();
183         System.out.println("Connection closed.");
184     }
185 
186 }