View Javadoc

1   /*** 
2    * 
3    * Copyright 2004 Hiram Chirino
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.codehaus.activemq.store.jdbc;
19  
20  import java.io.IOException;
21  import java.sql.Connection;
22  import java.sql.SQLException;
23  
24  import javax.jms.JMSException;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.codehaus.activemq.message.ActiveMQMessage;
29  import org.codehaus.activemq.message.MessageAck;
30  import org.codehaus.activemq.message.WireFormat;
31  import org.codehaus.activemq.service.MessageIdentity;
32  import org.codehaus.activemq.service.QueueMessageContainer;
33  import org.codehaus.activemq.store.MessageStore;
34  import org.codehaus.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler;
35  import org.codehaus.activemq.util.JMSExceptionHelper;
36  import org.codehaus.activemq.util.LongSequenceGenerator;
37  
38  /***
39   * @version $Revision: 1.5 $
40   */
41  public class JDBCMessageStore implements MessageStore {
42      private static final Log log = LogFactory.getLog(JDBCMessageStore.class);
43      
44      protected final WireFormat wireFormat;
45      protected final String destinationName;
46      protected final LongSequenceGenerator sequenceGenerator;
47      protected final JDBCAdapter adapter;
48  	protected final JDBCPersistenceAdapter persistenceAdapter;
49  
50      public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, String destinationName) {
51          this.persistenceAdapter = persistenceAdapter;
52  		this.adapter = adapter;
53          this.sequenceGenerator = adapter.getSequenceGenerator();
54          this.wireFormat = wireFormat;
55          this.destinationName = destinationName;
56      }
57  
58      public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
59          
60          // Serialize the Message..
61          String messageID = message.getJMSMessageID();
62          byte data[];
63          try {
64              data = wireFormat.toBytes(message);
65          } catch (IOException e) {
66              throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
67          }
68          
69          long seq=sequenceGenerator.getNextSequenceId();
70  
71          // Get a connection and insert the message into the DB.
72          Connection c = null;
73          try {
74              c = persistenceAdapter.getConnection();            
75              adapter.doAddMessage(c, seq, messageID, destinationName, data);
76          } catch (SQLException e) {
77              throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
78          } finally {
79          	persistenceAdapter.returnConnection(c);
80          }
81  
82          MessageIdentity answer = message.getJMSMessageIdentity();
83          answer.setSequenceNumber(new Long(seq));
84          return answer;
85      }
86  
87  
88      public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
89  
90      	long id = getMessageSequenceId(identity);
91          
92          // Get a connection and pull the message out of the DB
93          Connection c = null;
94          try {
95              c = persistenceAdapter.getConnection();            
96              byte data[] = adapter.doGetMessage(c, id);
97              ActiveMQMessage answer = (ActiveMQMessage) wireFormat.fromBytes(data);;
98              answer.setJMSMessageIdentity(identity);
99              return answer;            
100         } catch (IOException e) {
101             throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
102         } catch (SQLException e) {
103             throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
104         } finally {
105         	persistenceAdapter.returnConnection(c);
106         }
107     }
108 
109     /***
110 	 * @param identity
111 	 * @return
112      * @throws JMSException
113 	 */
114 	private long getMessageSequenceId(MessageIdentity identity) throws JMSException {
115 		Object sequenceNumber = identity.getSequenceNumber();
116     	if( sequenceNumber !=null && sequenceNumber.getClass()==Long.class ) {
117             return ((Long)sequenceNumber).longValue();    		
118     	} else {
119     		// Get a connection and pull the message out of the DB
120             Connection c = null;
121             try {
122                 c = persistenceAdapter.getConnection();            
123                 Long rc = adapter.getMessageSequenceId(c, identity.getMessageID());
124                 if( rc == null )
125                 	throw new JMSException("Could not locate message in database with message id: "+identity.getMessageID());        
126                 return rc.longValue();
127             } catch (SQLException e) {
128                 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
129             } finally {
130             	persistenceAdapter.returnConnection(c);
131             }
132     	}
133 	}
134 
135 	public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
136         long seq = getMessageSequenceId(identity);
137 
138         // Get a connection and remove the message from the DB
139         Connection c = null;
140         try {
141             c = persistenceAdapter.getConnection();            
142             adapter.doRemoveMessage(c, seq);
143         } catch (SQLException e) {
144             throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
145         } finally {
146         	persistenceAdapter.returnConnection(c);
147         }
148     }
149 
150 
151     public void recover(final QueueMessageContainer container) throws JMSException {
152         
153         // Get all the Message ids out of the database.
154         Connection c = null;
155         try {
156             c = persistenceAdapter.getConnection();            
157             adapter.doRecover(c, destinationName, new MessageListResultHandler() {
158                 public void onMessage(long seq, String messageID) throws JMSException {
159                     container.recoverMessageToBeDelivered(new MessageIdentity(messageID, new Long(seq)));                
160                 }
161             });     
162             
163         } catch (SQLException e) {
164             throw JMSExceptionHelper.newJMSException("Failed to recover container. Reason: " + e, e);
165         } finally {
166         	persistenceAdapter.returnConnection(c);
167         } 
168     }
169 
170     public void start() throws JMSException {
171     }
172 
173     public void stop() throws JMSException {
174     }
175 }