1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.service.impl;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.broker.Broker;
23 import org.codehaus.activemq.service.Transaction;
24 import org.codehaus.activemq.service.TransactionTask;
25
26 import javax.transaction.xa.XAException;
27 import java.io.Externalizable;
28 import java.io.IOException;
29 import java.io.ObjectInput;
30 import java.io.ObjectOutput;
31 import java.util.ArrayList;
32 import java.util.Iterator;
33
34 /***
35 * Keeps track of all the actions the need to be done when
36 * a transaction does a commit or rollback.
37 *
38 * @version $Revision: 1.4 $
39 */
40 public abstract class AbstractTransaction implements Transaction, Externalizable {
41
42 static final private Log log = LogFactory.getLog(AbstractTransaction.class);
43
44 static final public byte START_STATE = 0;
45 static final public byte IN_USE_STATE = 1;
46 static final public byte PREPARED_STATE = 2;
47 static final public byte FINISHED_STATE = 3;
48
49
50 private ArrayList prePrepareTasks = new ArrayList();
51 private ArrayList postCommitTasks = new ArrayList();
52 private ArrayList postRollbackTasks = new ArrayList();
53 private byte state = START_STATE;
54 private transient Broker broker;
55
56 protected AbstractTransaction(Broker broker) {
57 this.broker = broker;
58 }
59
60 public Broker getBroker() {
61 return broker;
62 }
63
64 /***
65 * Called after deserialization to register the broker
66 *
67 * @param broker
68 */
69 public void setBroker(Broker broker) {
70 this.broker = broker;
71 }
72
73 public byte getState() {
74 return state;
75 }
76
77 public void setState(byte state) {
78 this.state = state;
79 }
80
81 public void addPostCommitTask(TransactionTask r) {
82 postCommitTasks.add(r);
83 if (state == START_STATE) {
84 state = IN_USE_STATE;
85 }
86 }
87
88 public void addPostRollbackTask(TransactionTask r) {
89 postRollbackTasks.add(r);
90 if (state == START_STATE) {
91 state = IN_USE_STATE;
92 }
93 }
94
95 public void addPrePrepareTask(TransactionTask r) {
96 prePrepareTasks.add(r);
97 if (state == START_STATE) {
98 state = IN_USE_STATE;
99 }
100 }
101
102 public void prePrepare() throws Throwable {
103
104
105
106 switch (state) {
107 case START_STATE:
108 case IN_USE_STATE:
109 break;
110 default:
111 XAException xae = new XAException("Prepare cannot be called now.");
112 xae.errorCode = XAException.XAER_PROTO;
113 throw xae;
114 }
115
116
117 for (Iterator iter = prePrepareTasks.iterator(); iter.hasNext();) {
118 TransactionTask r = (TransactionTask) iter.next();
119 r.execute(broker);
120 }
121 }
122
123 protected void postCommit() throws Throwable {
124
125 for (Iterator iter = postCommitTasks.iterator(); iter.hasNext();) {
126 TransactionTask r = (TransactionTask) iter.next();
127 r.execute(broker);
128 }
129 }
130
131 public void postRollback() throws Throwable {
132
133 for (Iterator iter = postRollbackTasks.iterator(); iter.hasNext();) {
134 TransactionTask r = (TransactionTask) iter.next();
135 r.execute(broker);
136 }
137 }
138
139 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
140 state = in.readByte();
141 prePrepareTasks = readTaskList(in);
142 postCommitTasks = readTaskList(in);
143 postRollbackTasks = readTaskList(in);
144 }
145
146 public void writeExternal(ObjectOutput out) throws IOException {
147 out.writeByte(state);
148 writeTaskList(prePrepareTasks, out);
149 writeTaskList(postCommitTasks, out);
150 writeTaskList(postRollbackTasks, out);
151 }
152
153 public String toString() {
154 return super.toString() + "[prePrepares=" + prePrepareTasks + "; postCommits=" + postCommitTasks
155 + "; postRollbacks=" + postRollbackTasks + "]";
156 }
157
158
159
160 protected ArrayList readTaskList(ObjectInput in) throws IOException {
161 int size = in.readInt();
162 ArrayList answer = new ArrayList(size);
163 for (int i = 0; i < size; i++) {
164 answer.add(readTask(in));
165 }
166 return answer;
167 }
168
169 protected void writeTaskList(ArrayList tasks, ObjectOutput out) throws IOException {
170 int size = tasks.size();
171 out.writeInt(size);
172 for (int i = 0; i < size; i++) {
173 writeTask((TransactionTask) tasks.get(i), out);
174 }
175 }
176
177 protected TransactionTask readTask(ObjectInput in) throws IOException {
178 return PacketTransactionTask.readTask(in);
179 }
180
181 protected void writeTask(TransactionTask task, ObjectOutput out) throws IOException {
182 PacketTransactionTask.writeTask(task, out);
183 }
184 }