View Javadoc

1   /***
2    * 
3    * Copyright 2004 Protique Ltd
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.bdb;
19  
20  import com.sleepycat.je.Database;
21  import com.sleepycat.je.DatabaseEntry;
22  import com.sleepycat.je.DatabaseException;
23  import com.sleepycat.je.LockMode;
24  import com.sleepycat.je.OperationStatus;
25  import com.sleepycat.je.SecondaryConfig;
26  import com.sleepycat.je.SecondaryCursor;
27  import com.sleepycat.je.SecondaryDatabase;
28  import com.sleepycat.je.Transaction;
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.codehaus.activemq.message.ActiveMQMessage;
32  import org.codehaus.activemq.message.ConsumerInfo;
33  import org.codehaus.activemq.message.MessageAck;
34  import org.codehaus.activemq.message.WireFormat;
35  import org.codehaus.activemq.service.MessageIdentity;
36  import org.codehaus.activemq.service.SubscriberEntry;
37  import org.codehaus.activemq.service.Subscription;
38  import org.codehaus.activemq.store.TopicMessageStore;
39  import org.codehaus.activemq.util.JMSExceptionHelper;
40  
41  import javax.jms.JMSException;
42  import java.io.IOException;
43  
44  /***
45   * @version $Revision: 1.2 $
46   */
47  public class BDbTopicMessageStore extends BDbMessageStore implements TopicMessageStore {
48      private static final Log log = LogFactory.getLog(BDbTopicMessageStore.class);
49  
50      private Database subscriptionDatabase;
51  
52      public BDbTopicMessageStore(Database database, SecondaryDatabase secondaryDatabase, SecondaryConfig secondaryConfig, SequenceNumberCreator sequenceNumberCreator, WireFormat wireFormat, Database subscriptionDatabase) {
53          super(database, secondaryDatabase, secondaryConfig, sequenceNumberCreator, wireFormat);
54          this.subscriptionDatabase = subscriptionDatabase;
55      }
56  
57      public void incrementMessageCount(MessageIdentity messageId) {
58          /*** TODO */
59      }
60  
61      public void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity, MessageAck ack) {
62          /*** TODO */
63      }
64  
65      public void setLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws JMSException {
66          checkClosed();
67          try {
68              doSetLastAcknowledgedMessageIdentity(subscription, messageIdentity);
69          }
70          catch (DatabaseException e) {
71              throw JMSExceptionHelper.newJMSException("Failed to update last acknowledge messageID for : "
72                      + messageIdentity + ". Reason: " + e, e);
73          }
74      }
75  
76  
77      public void recoverSubscription(Subscription subscription, MessageIdentity lastDispatchedMessage) throws JMSException {
78          checkClosed();
79          SecondaryCursor cursor = null;
80          try {
81              DatabaseEntry lastAckKey = getLastAcknowledgedMessageID(subscription, lastDispatchedMessage);
82              if (lastAckKey != null) {
83                  cursor = getSecondaryDatabase().openSecondaryCursor(BDbHelper.getTransaction(), getCursorConfig());
84                  DatabaseEntry valueEntry = new DatabaseEntry();
85                  OperationStatus status = cursor.getSearchKey(lastAckKey, valueEntry, LockMode.DEFAULT);
86                  if (status != OperationStatus.SUCCESS) {
87                      log.error("Could not find the last acknowledged record for: " + subscription + ". Status: " + status);
88                  }
89                  else {
90                      while (true) {
91                          // lets pass straight over the first entry, which we've already ack'd
92                          status = cursor.getNext(lastAckKey, valueEntry, LockMode.DEFAULT);
93                          if (status != OperationStatus.SUCCESS) {
94                              if (status != OperationStatus.NOTFOUND) {
95                                  log.warn("Strange result when iterating to end of collection: " + status);
96                              }
97                              break;
98                          }
99  
100                         ActiveMQMessage message = extractMessage(valueEntry);
101                         subscription.addMessage(getContainer(), message);
102                     }
103                 }
104             }
105         }
106         catch (DatabaseException e) {
107             throw JMSExceptionHelper.newJMSException("Unable to recover topic subscription for: "
108                     + subscription + ". Reason: " + e, e);
109         }
110         catch (IOException e) {
111             throw JMSExceptionHelper.newJMSException("Unable to recover topic subscription for: "
112                     + subscription + ". Reason: " + e, e);
113         }
114         finally {
115             if (cursor != null) {
116                 try {
117                     cursor.close();
118                 }
119                 catch (DatabaseException e) {
120                     log.warn("Caught exception closing cursor: " + e, e);
121                 }
122             }
123         }
124     }
125 
126     public MessageIdentity getLastestMessageIdentity() throws JMSException {
127         checkClosed();
128         SecondaryCursor cursor = null;
129         try {
130             cursor = getSecondaryDatabase().openSecondaryCursor(BDbHelper.getTransaction(), getCursorConfig());
131             DatabaseEntry keyEntry = new DatabaseEntry();
132             DatabaseEntry valueEntry = new DatabaseEntry();
133             OperationStatus status = cursor.getLast(keyEntry, valueEntry, LockMode.DEFAULT);
134             if (status == OperationStatus.SUCCESS) {
135                 if (log.isDebugEnabled()) {
136                     log.debug("Loaded last sequence number of: " + BDbHelper.longFromBytes(keyEntry.getData()));
137                 }
138                 return new MessageIdentity(null, keyEntry);
139             }
140             else if (status != OperationStatus.NOTFOUND) {
141                 log.error("Could not find the last sequence number. Status: " + status);
142             }
143             return null;
144         }
145         catch (DatabaseException e) {
146             throw JMSExceptionHelper.newJMSException("Unable to load the last sequence number. Reason: " + e, e);
147         }
148         finally {
149             if (cursor != null) {
150                 try {
151                     cursor.close();
152                 }
153                 catch (DatabaseException e) {
154                     log.warn("Caught exception closing cursor: " + e, e);
155                 }
156             }
157         }
158     }
159 
160     public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
161         return null;  /*** TODO */
162     }
163 
164     public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
165         /*** TODO */
166     }
167 
168     public synchronized void stop() throws JMSException {
169         JMSException firstException = BDbPersistenceAdapter.closeDatabase(subscriptionDatabase, null);
170         subscriptionDatabase = null;
171         super.stop();
172         if (firstException != null) {
173             throw JMSExceptionHelper.newJMSException("Unable to close the subscription database: " + firstException, firstException);
174         }
175     }
176 
177     // Implementation methods
178     //-------------------------------------------------------------------------
179 
180     protected DatabaseEntry getLastAcknowledgedMessageID(Subscription subscription, MessageIdentity lastDispatchedMessage) throws DatabaseException {
181         DatabaseEntry key = createKey(subscription.getPersistentKey());
182         DatabaseEntry value = new DatabaseEntry();
183         OperationStatus status = subscriptionDatabase.get(null, key, value, null);
184         if (status == OperationStatus.SUCCESS) {
185             return value;
186         }
187         else if (status == OperationStatus.NOTFOUND) {
188             // we need to insert the new entry
189             if (lastDispatchedMessage != null) {
190                 return doSetLastAcknowledgedMessageIdentity(subscription, lastDispatchedMessage);
191             }
192         }
193         else {
194             log.warn("Unexpected status return from querying lastAcknowledgeSequenceNumber for: " + subscription + " status: " + status);
195         }
196         return null;
197     }
198 
199     protected DatabaseEntry doSetLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws DatabaseException {
200         Transaction transaction = BDbHelper.getTransaction();
201         DatabaseEntry key = createKey(subscription.getPersistentKey());
202         DatabaseEntry value = getSequenceNumberKey(messageIdentity);
203         subscriptionDatabase.put(transaction, key, value);
204         return value;
205     }
206 }