View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Hiram Chirino
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.service.impl;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.broker.Broker;
23  import org.codehaus.activemq.service.Transaction;
24  import org.codehaus.activemq.service.TransactionTask;
25  
26  import javax.transaction.xa.XAException;
27  import java.io.Externalizable;
28  import java.io.IOException;
29  import java.io.ObjectInput;
30  import java.io.ObjectOutput;
31  import java.util.ArrayList;
32  import java.util.Iterator;
33  
34  /***
35   * Keeps track of all the actions the need to be done when
36   * a transaction does a commit or rollback.
37   *
38   * @version $Revision: 1.4 $
39   */
40  public abstract class AbstractTransaction implements Transaction, Externalizable {
41  
42      static final private Log log = LogFactory.getLog(AbstractTransaction.class);
43  
44      static final public byte START_STATE = 0;      // can go to: 1,2,3
45      static final public byte IN_USE_STATE = 1;     // can go to: 2,3
46      static final public byte PREPARED_STATE = 2;   // can go to: 3
47      static final public byte FINISHED_STATE = 3;
48  
49  
50      private ArrayList prePrepareTasks = new ArrayList();
51      private ArrayList postCommitTasks = new ArrayList();
52      private ArrayList postRollbackTasks = new ArrayList();
53      private byte state = START_STATE;
54      private transient Broker broker;
55  
56      protected AbstractTransaction(Broker broker) {
57          this.broker = broker;
58      }
59  
60      public Broker getBroker() {
61          return broker;
62      }
63  
64      /***
65       * Called after deserialization to register the broker
66       *
67       * @param broker
68       */
69      public void setBroker(Broker broker) {
70          this.broker = broker;
71      }
72  
73      public byte getState() {
74          return state;
75      }
76  
77      public void setState(byte state) {
78          this.state = state;
79      }
80  
81      public void addPostCommitTask(TransactionTask r) {
82          postCommitTasks.add(r);
83          if (state == START_STATE) {
84              state = IN_USE_STATE;
85          }
86      }
87  
88      public void addPostRollbackTask(TransactionTask r) {
89          postRollbackTasks.add(r);
90          if (state == START_STATE) {
91              state = IN_USE_STATE;
92          }
93      }
94  
95      public void addPrePrepareTask(TransactionTask r) {
96          prePrepareTasks.add(r);
97          if (state == START_STATE) {
98              state = IN_USE_STATE;
99          }
100     }
101 
102     public void prePrepare() throws Throwable {
103 
104         // Is it ok to call prepare now given the state of the
105         // transaction?
106         switch (state) {
107             case START_STATE:
108             case IN_USE_STATE:
109                 break;
110             default:
111                 XAException xae = new XAException("Prepare cannot be called now.");
112                 xae.errorCode = XAException.XAER_PROTO;
113                 throw xae;
114         }
115 
116         // Run the prePrepareTasks
117         for (Iterator iter = prePrepareTasks.iterator(); iter.hasNext();) {
118             TransactionTask r = (TransactionTask) iter.next();
119             r.execute(broker);
120         }
121     }
122 
123     protected void postCommit() throws Throwable {
124         // Run the postCommitTasks
125         for (Iterator iter = postCommitTasks.iterator(); iter.hasNext();) {
126             TransactionTask r = (TransactionTask) iter.next();
127             r.execute(broker);
128         }
129     }
130 
131     public void postRollback() throws Throwable {
132         // Run the postRollbackTasks
133         for (Iterator iter = postRollbackTasks.iterator(); iter.hasNext();) {
134             TransactionTask r = (TransactionTask) iter.next();
135             r.execute(broker);
136         }
137     }
138 
139     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
140         state = in.readByte();
141         prePrepareTasks = readTaskList(in);
142         postCommitTasks = readTaskList(in);
143         postRollbackTasks = readTaskList(in);
144     }
145 
146     public void writeExternal(ObjectOutput out) throws IOException {
147         out.writeByte(state);
148         writeTaskList(prePrepareTasks, out);
149         writeTaskList(postCommitTasks, out);
150         writeTaskList(postRollbackTasks, out);
151     }
152 
153     public String toString() {
154         return super.toString() + "[prePrepares=" + prePrepareTasks + "; postCommits=" + postCommitTasks
155                 + "; postRollbacks=" + postRollbackTasks + "]";
156     }
157 
158     // Implementation methods
159     //-------------------------------------------------------------------------
160     protected ArrayList readTaskList(ObjectInput in) throws IOException {
161         int size = in.readInt();
162         ArrayList answer = new ArrayList(size);
163         for (int i = 0; i < size; i++) {
164             answer.add(readTask(in));
165         }
166         return answer;
167     }
168 
169     protected void writeTaskList(ArrayList tasks, ObjectOutput out) throws IOException {
170         int size = tasks.size();
171         out.writeInt(size);
172         for (int i = 0; i < size; i++) {
173             writeTask((TransactionTask) tasks.get(i), out);
174         }
175     }
176 
177     protected TransactionTask readTask(ObjectInput in) throws IOException {
178         return PacketTransactionTask.readTask(in);
179     }
180 
181     protected void writeTask(TransactionTask task, ObjectOutput out) throws IOException {
182         PacketTransactionTask.writeTask(task, out);
183     }
184 }