View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.bdb;
19  
20  import com.sleepycat.je.Cursor;
21  import com.sleepycat.je.CursorConfig;
22  import com.sleepycat.je.Database;
23  import com.sleepycat.je.DatabaseEntry;
24  import com.sleepycat.je.DatabaseException;
25  import com.sleepycat.je.LockMode;
26  import com.sleepycat.je.OperationStatus;
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.codehaus.activemq.message.ActiveMQXid;
30  import org.codehaus.activemq.service.Transaction;
31  import org.codehaus.activemq.service.TransactionManager;
32  import org.codehaus.activemq.service.impl.XATransactionCommand;
33  import org.codehaus.activemq.store.PreparedTransactionStore;
34  
35  import javax.jms.JMSException;
36  import javax.transaction.xa.XAException;
37  import java.io.IOException;
38  import java.util.ArrayList;
39  import java.util.List;
40  
41  /***
42   * @version $Revision: 1.2 $
43   */
44  public class BDbPreparedTransactionStore implements PreparedTransactionStore {
45      private static final Log log = LogFactory.getLog(BDbMessageStore.class);
46  
47      private Database database;
48      private CursorConfig cursorConfig;
49  
50      public BDbPreparedTransactionStore(Database database) {
51          this.database = database;
52      }
53  
54      public ActiveMQXid[] getXids() throws XAException {
55          checkClosed();
56          Cursor cursor = null;
57          try {
58              cursor = database.openCursor(BDbHelper.getTransaction(), cursorConfig);
59              List list = new ArrayList();
60              DatabaseEntry keyEntry = new DatabaseEntry();
61              DatabaseEntry valueEntry = new DatabaseEntry();
62              OperationStatus status = cursor.getFirst(keyEntry, valueEntry, LockMode.DEFAULT);
63              while (status == OperationStatus.SUCCESS) {
64                  list.add(extractXid(keyEntry));
65  
66              }
67              if (status != OperationStatus.NOTFOUND) {
68                  log.warn("Unexpected status code while recovering: " + status);
69              }
70              ActiveMQXid[] answer = new ActiveMQXid[list.size()];
71              list.toArray(answer);
72              return answer;
73          }
74          catch (DatabaseException e) {
75              log.error("Failed to recover prepared transaction log: " + e, e);
76              throw new XAException("Failed to recover prepared transaction log. Reason: " + e);
77          }
78          catch (IOException e) {
79              log.error("Failed to recover prepared transaction log: " + e, e);
80              throw new XAException("Failed to recover prepared transaction log. Reason: " + e);
81          }
82          finally {
83              if (cursor != null) {
84                  try {
85                      cursor.close();
86                  }
87                  catch (DatabaseException e) {
88                      log.warn("Caught exception closing cursor: " + e, e);
89                  }
90              }
91          }
92      }
93  
94      public void remove(ActiveMQXid xid) throws XAException {
95          checkClosed();
96          try {
97              DatabaseEntry key = new DatabaseEntry(asBytes(xid));
98              OperationStatus status = database.delete(BDbHelper.getTransaction(), key);
99              if (status != OperationStatus.SUCCESS) {
100                 log.error("Could not delete sequenece number for: " + xid + " status: " + status);
101             }
102         }
103         catch (DatabaseException e) {
104             throw new XAException("Failed to remove prepared transaction: " + xid + ". Reason: " + e);
105         }
106         catch (IOException e) {
107             throw new XAException("Failed to remove prepared transaction: " + xid + ". Reason: " + e);
108         }
109     }
110 
111     public void put(ActiveMQXid xid, Transaction transaction) throws XAException {
112         checkClosed();
113         try {
114             DatabaseEntry key = new DatabaseEntry(asBytes(xid));
115             DatabaseEntry value = new DatabaseEntry(asBytes(transaction));
116             database.put(BDbHelper.getTransaction(), key, value);
117         }
118         catch (Exception e) {
119             throw new XAException("Failed to store prepared transaction: " + xid + ". Reason: " + e);
120         }
121     }
122 
123     public void loadPreparedTransactions(TransactionManager transactionManager) throws XAException {
124         checkClosed();
125         Cursor cursor = null;
126         try {
127             cursor = database.openCursor(BDbHelper.getTransaction(), cursorConfig);
128             DatabaseEntry keyEntry = new DatabaseEntry();
129             DatabaseEntry valueEntry = new DatabaseEntry();
130             OperationStatus status = cursor.getFirst(keyEntry, valueEntry, LockMode.DEFAULT);
131             while (status == OperationStatus.SUCCESS) {
132                 ActiveMQXid xid = extractXid(keyEntry);
133                 Transaction transaction = extractTransaction(valueEntry);
134                 transactionManager.loadTransaction(xid, transaction);
135                 status = cursor.getNext(keyEntry, valueEntry, LockMode.DEFAULT);
136             }
137             if (status != OperationStatus.NOTFOUND) {
138                 log.warn("Unexpected status code while recovering: " + status);
139             }
140         }
141         catch (Exception e) {
142             log.error("Failed to recover prepared transaction log: " + e, e);
143             throw new XAException("Failed to recover prepared transaction log. Reason: " + e);
144         }
145         finally {
146             if (cursor != null) {
147                 try {
148                     cursor.close();
149                 }
150                 catch (DatabaseException e) {
151                     log.warn("Caught exception closing cursor: " + e, e);
152                 }
153             }
154         }
155     }
156 
157     public void start() throws JMSException {
158     }
159 
160     public synchronized void stop() throws JMSException {
161         if (database != null) {
162             JMSException exception = BDbPersistenceAdapter.closeDatabase(database, null);
163             database = null;
164             if (exception != null) {
165                 throw exception;
166             }
167         }
168     }
169 
170     // Implementation methods
171     //-------------------------------------------------------------------------
172     protected ActiveMQXid extractXid(DatabaseEntry entry) throws IOException {
173         return ActiveMQXid.fromBytes(entry.getData());
174     }
175 
176     protected Transaction extractTransaction(DatabaseEntry entry) throws IOException, ClassNotFoundException {
177         return XATransactionCommand.fromBytes(entry.getData());
178     }
179 
180 
181     private byte[] asBytes(ActiveMQXid xid) throws IOException {
182         return xid.toBytes();
183     }
184 
185     private byte[] asBytes(Transaction transaction) throws IOException, JMSException {
186         if (transaction instanceof XATransactionCommand) {
187             XATransactionCommand packetTask = (XATransactionCommand) transaction;
188             return packetTask.toBytes();
189         }
190         else {
191             throw new IOException("Unsupported transaction type: " + transaction);
192         }
193     }
194 
195     protected void checkClosed() throws XAException {
196         if (database == null) {
197             throw new XAException("Prepared Transaction Store is already closed");
198         }
199     }
200 }