1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.store.jdbc;
19
20 import java.io.IOException;
21 import java.sql.Connection;
22 import java.sql.SQLException;
23
24 import javax.jms.JMSException;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.codehaus.activemq.message.ActiveMQMessage;
29 import org.codehaus.activemq.message.MessageAck;
30 import org.codehaus.activemq.message.WireFormat;
31 import org.codehaus.activemq.service.MessageIdentity;
32 import org.codehaus.activemq.service.QueueMessageContainer;
33 import org.codehaus.activemq.store.MessageStore;
34 import org.codehaus.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler;
35 import org.codehaus.activemq.util.JMSExceptionHelper;
36 import org.codehaus.activemq.util.LongSequenceGenerator;
37
38 /***
39 * @version $Revision: 1.5 $
40 */
41 public class JDBCMessageStore implements MessageStore {
42 private static final Log log = LogFactory.getLog(JDBCMessageStore.class);
43
44 protected final WireFormat wireFormat;
45 protected final String destinationName;
46 protected final LongSequenceGenerator sequenceGenerator;
47 protected final JDBCAdapter adapter;
48 protected final JDBCPersistenceAdapter persistenceAdapter;
49
50 public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, String destinationName) {
51 this.persistenceAdapter = persistenceAdapter;
52 this.adapter = adapter;
53 this.sequenceGenerator = adapter.getSequenceGenerator();
54 this.wireFormat = wireFormat;
55 this.destinationName = destinationName;
56 }
57
58 public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
59
60
61 String messageID = message.getJMSMessageID();
62 byte data[];
63 try {
64 data = wireFormat.toBytes(message);
65 } catch (IOException e) {
66 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
67 }
68
69 long seq=sequenceGenerator.getNextSequenceId();
70
71
72 Connection c = null;
73 try {
74 c = persistenceAdapter.getConnection();
75 adapter.doAddMessage(c, seq, messageID, destinationName, data);
76 } catch (SQLException e) {
77 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
78 } finally {
79 persistenceAdapter.returnConnection(c);
80 }
81
82 MessageIdentity answer = message.getJMSMessageIdentity();
83 answer.setSequenceNumber(new Long(seq));
84 return answer;
85 }
86
87
88 public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
89
90 long id = getMessageSequenceId(identity);
91
92
93 Connection c = null;
94 try {
95 c = persistenceAdapter.getConnection();
96 byte data[] = adapter.doGetMessage(c, id);
97 ActiveMQMessage answer = (ActiveMQMessage) wireFormat.fromBytes(data);;
98 answer.setJMSMessageIdentity(identity);
99 return answer;
100 } catch (IOException e) {
101 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
102 } catch (SQLException e) {
103 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
104 } finally {
105 persistenceAdapter.returnConnection(c);
106 }
107 }
108
109 /***
110 * @param identity
111 * @return
112 * @throws JMSException
113 */
114 private long getMessageSequenceId(MessageIdentity identity) throws JMSException {
115 Object sequenceNumber = identity.getSequenceNumber();
116 if( sequenceNumber !=null && sequenceNumber.getClass()==Long.class ) {
117 return ((Long)sequenceNumber).longValue();
118 } else {
119
120 Connection c = null;
121 try {
122 c = persistenceAdapter.getConnection();
123 Long rc = adapter.getMessageSequenceId(c, identity.getMessageID());
124 if( rc == null )
125 throw new JMSException("Could not locate message in database with message id: "+identity.getMessageID());
126 return rc.longValue();
127 } catch (SQLException e) {
128 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
129 } finally {
130 persistenceAdapter.returnConnection(c);
131 }
132 }
133 }
134
135 public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
136 long seq = getMessageSequenceId(identity);
137
138
139 Connection c = null;
140 try {
141 c = persistenceAdapter.getConnection();
142 adapter.doRemoveMessage(c, seq);
143 } catch (SQLException e) {
144 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
145 } finally {
146 persistenceAdapter.returnConnection(c);
147 }
148 }
149
150
151 public void recover(final QueueMessageContainer container) throws JMSException {
152
153
154 Connection c = null;
155 try {
156 c = persistenceAdapter.getConnection();
157 adapter.doRecover(c, destinationName, new MessageListResultHandler() {
158 public void onMessage(long seq, String messageID) throws JMSException {
159 container.recoverMessageToBeDelivered(new MessageIdentity(messageID, new Long(seq)));
160 }
161 });
162
163 } catch (SQLException e) {
164 throw JMSExceptionHelper.newJMSException("Failed to recover container. Reason: " + e, e);
165 } finally {
166 persistenceAdapter.returnConnection(c);
167 }
168 }
169
170 public void start() throws JMSException {
171 }
172
173 public void stop() throws JMSException {
174 }
175 }