1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.message;
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.message.util.WireByteArrayInputStream;
23 import org.codehaus.activemq.message.util.WireByteArrayOutputStream;
24 import java.io.DataInput;
25 import java.io.DataInputStream;
26 import java.io.DataOutput;
27 import java.io.DataOutputStream;
28 import java.io.IOException;
29 import java.io.ObjectStreamException;
30 import java.io.Serializable;
31
32 /***
33 * Default implementation used for Java-Java protocols. When talking to non-Java nodes we may use a different wire
34 * format.
35 *
36 * @version $Revision: 1.14 $
37 */
38 public class DefaultWireFormat extends WireFormat implements Serializable {
39 /***
40 * Current wire format version for this implementation
41 */
42 public static final int WIRE_FORMAT_VERSION = 1;
43 private static final Log log = LogFactory.getLog(DefaultWireFormat.class);
44
45 private transient final PacketReader messageReader = new ActiveMQMessageReader();
46 private transient final PacketReader textMessageReader = new ActiveMQTextMessageReader();
47 private transient final PacketReader objectMessageReader = new ActiveMQObjectMessageReader();
48 private transient final PacketReader bytesMessageReader = new ActiveMQBytesMessageReader();
49 private transient final PacketReader streamMessageReader = new ActiveMQStreamMessageReader();
50 private transient final PacketReader mapMessageReader = new ActiveMQMapMessageReader();
51 private transient final PacketReader messageAckReader = new MessageAckReader();
52 private transient final PacketReader receiptReader = new ReceiptReader();
53 private transient final PacketReader consumerInfoReader = new ConsumerInfoReader();
54 private transient final PacketReader producerInfoReader = new ProducerInfoReader();
55 private transient final PacketReader transactionInfoReader = new TransactionInfoReader();
56 private transient final PacketReader xaTransactionInfoReader = new XATransactionInfoReader();
57 private transient final PacketReader brokerInfoReader = new BrokerInfoReader();
58 private transient final PacketReader connectionInfoReader = new ConnectionInfoReader();
59 private transient final PacketReader sessionInfoReader = new SessionInfoReader();
60 private transient final PacketReader durableUnsubscribeReader = new DurableUnsubscribeReader();
61 private transient final PacketReader reponseReceiptReader = new ResponseReceiptReader();
62 private transient final PacketReader intReponseReceiptReader = new IntResponseReceiptReader();
63 private transient final PacketReader capacityInfoReader = new CapacityInfoReader();
64 private transient final PacketReader capacityInfoRequestReader = new CapacityInfoRequestReader();
65 private transient final PacketReader wireFormatInfoReader = new WireFormatInfoReader();
66 private transient final PacketWriter messageWriter = new ActiveMQMessageWriter();
67 private transient final PacketWriter textMessageWriter = new ActiveMQTextMessageWriter();
68 private transient final PacketWriter objectMessageWriter = new ActiveMQObjectMessageWriter();
69 private transient final PacketWriter bytesMessageWriter = new ActiveMQBytesMessageWriter();
70 private transient final PacketWriter streamMessageWriter = new ActiveMQStreamMessageWriter();
71 private transient final PacketWriter mapMessageWriter = new ActiveMQMapMessageWriter();
72 private transient final PacketWriter messageAckWriter = new MessageAckWriter();
73 private transient final PacketWriter receiptWriter = new ReceiptWriter();
74 private transient final PacketWriter consumerInfoWriter = new ConsumerInfoWriter();
75 private transient final PacketWriter producerInfoWriter = new ProducerInfoWriter();
76 private transient final PacketWriter transactionInfoWriter = new TransactionInfoWriter();
77 private transient final PacketWriter xaTransactionInfoWriter = new XATransactionInfoWriter();
78 private transient final PacketWriter brokerInfoWriter = new BrokerInfoWriter();
79 private transient final PacketWriter connectionInfoWriter = new ConnectionInfoWriter();
80 private transient final PacketWriter sessionInfoWriter = new SessionInfoWriter();
81 private transient final PacketWriter durableUnsubscribeWriter = new DurableUnsubscribeWriter();
82 private transient final PacketWriter reponseReceiptWriter = new ResponseReceiptWriter();
83 private transient final PacketWriter intReponseReceiptWriter = new IntResponseReceiptWriter();
84 private transient final PacketWriter capacityInfoWriter = new CapacityInfoWriter();
85 private transient final PacketWriter capacityInfoRequestWriter = new CapacityInfoRequestWriter();
86 private transient final PacketWriter wireFormatInfoWriter = new WireFormatInfoWriter();
87 private transient WireByteArrayOutputStream internalBytesOut;
88 private transient DataOutputStream internalDataOut;
89 private transient WireByteArrayInputStream internalBytesIn;
90 private transient DataInputStream internalDataIn;
91
92 /***
93 * Default Constructor
94 */
95 public DefaultWireFormat() {
96 internalBytesOut = new WireByteArrayOutputStream();
97 internalDataOut = new DataOutputStream(internalBytesOut);
98 internalBytesIn = new WireByteArrayInputStream();
99 internalDataIn = new DataInputStream(internalBytesIn);
100 }
101
102
103 /***
104 * @return new WireFormat
105 */
106 public WireFormat copy() {
107 return new DefaultWireFormat();
108 }
109
110
111 /***
112 * @param in
113 * @return
114 * @throws IOException
115 */
116 public Packet readPacket(DataInput in) throws IOException {
117 int type = in.readByte();
118 return readPacket(type, in);
119 }
120
121 /***
122 * @param firstByte
123 * @param dataIn
124 * @return
125 * @throws IOException
126 *
127 */
128 public Packet readPacket(int firstByte, DataInput dataIn) throws IOException {
129 switch (firstByte) {
130 case Packet.ACTIVEMQ_MESSAGE :
131 return readPacket(dataIn, messageReader);
132 case Packet.ACTIVEMQ_TEXT_MESSAGE :
133 return readPacket(dataIn, textMessageReader);
134 case Packet.ACTIVEMQ_OBJECT_MESSAGE :
135 return readPacket(dataIn, objectMessageReader);
136 case Packet.ACTIVEMQ_BYTES_MESSAGE :
137 return readPacket(dataIn, bytesMessageReader);
138 case Packet.ACTIVEMQ_STREAM_MESSAGE :
139 return readPacket(dataIn, streamMessageReader);
140 case Packet.ACTIVEMQ_MAP_MESSAGE :
141 return readPacket(dataIn, mapMessageReader);
142 case Packet.ACTIVEMQ_MSG_ACK :
143 return readPacket(dataIn, messageAckReader);
144 case Packet.RECEIPT_INFO :
145 return readPacket(dataIn, receiptReader);
146 case Packet.CONSUMER_INFO :
147 return readPacket(dataIn, consumerInfoReader);
148 case Packet.PRODUCER_INFO :
149 return readPacket(dataIn, producerInfoReader);
150 case Packet.TRANSACTION_INFO :
151 return readPacket(dataIn, transactionInfoReader);
152 case Packet.XA_TRANSACTION_INFO :
153 return readPacket(dataIn, xaTransactionInfoReader);
154 case Packet.ACTIVEMQ_BROKER_INFO :
155 return readPacket(dataIn, brokerInfoReader);
156 case Packet.ACTIVEMQ_CONNECTION_INFO :
157 return readPacket(dataIn, connectionInfoReader);
158 case Packet.SESSION_INFO :
159 return readPacket(dataIn, sessionInfoReader);
160 case Packet.DURABLE_UNSUBSCRIBE :
161 return readPacket(dataIn, durableUnsubscribeReader);
162 case Packet.RESPONSE_RECEIPT_INFO :
163 return readPacket(dataIn, reponseReceiptReader);
164 case Packet.INT_RESPONSE_RECEIPT_INFO :
165 return readPacket(dataIn, intReponseReceiptReader);
166 case Packet.CAPACITY_INFO :
167 return readPacket(dataIn, capacityInfoReader);
168 case Packet.CAPACITY_INFO_REQUEST :
169 return readPacket(dataIn, capacityInfoRequestReader);
170 case Packet.WIRE_FORMAT_INFO :
171 return readPacket(dataIn, wireFormatInfoReader);
172 default :
173 log.error("Could not find PacketReader for packet type: "
174 + AbstractPacket.getPacketTypeAsString(firstByte));
175 return null;
176 }
177 }
178
179 /***
180 * Write a Packet to a DataOutput
181 *
182 * @param packet
183 * @param dataOut
184 * @throws IOException
185 */
186 public void writePacket(Packet packet, DataOutput dataOut) throws IOException {
187 PacketWriter writer = getWriter(packet);
188 if (writer != null) {
189 writePacket(packet, dataOut, writer);
190 }
191 }
192
193 /***
194 * A helper method which converts a packet into a byte array Overrides the WireFormat to make use of the internal
195 * BytesOutputStream
196 *
197 * @param packet
198 * @return a byte array representing the packet using some wire protocol
199 * @throws IOException
200 */
201 public byte[] toBytes(Packet packet) throws IOException {
202 byte[] data = null;
203 PacketWriter writer = getWriter(packet);
204 if (writer != null) {
205 internalBytesOut.reset();
206 internalDataOut.writeByte(packet.getPacketType());
207 internalDataOut.writeInt(-1);
208 writer.writePacket(packet, internalDataOut);
209 internalDataOut.flush();
210 data = internalBytesOut.toByteArray();
211
212 int length = data.length - 5;
213 packet.setMemoryUsage(length);
214
215 data[1] = (byte) ((length >>> 24) & 0xFF);
216 data[2] = (byte) ((length >>> 16) & 0xFF);
217 data[3] = (byte) ((length >>> 8) & 0xFF);
218 data[4] = (byte) ((length >>> 0) & 0xFF);
219 }
220 return data;
221 }
222
223 /***
224 * Can this wireformat process packets of this version
225 * @param version the version number to test
226 * @return true if can accept the version
227 */
228 public boolean canProcessWireFormatVersion(int version){
229 return version == WIRE_FORMAT_VERSION;
230 }
231
232 /***
233 * @return the current version of this wire format
234 */
235 public int getCurrentWireFormatVersion(){
236 return WIRE_FORMAT_VERSION;
237 }
238
239 protected synchronized final void writePacket(Packet packet, DataOutput dataOut, PacketWriter writer)
240 throws IOException {
241 dataOut.writeByte(packet.getPacketType());
242 internalBytesOut.reset();
243 writer.writePacket(packet, internalDataOut);
244 internalDataOut.flush();
245
246 byte[] data = internalBytesOut.getData();
247 int count = internalBytesOut.size();
248 dataOut.writeInt(count);
249
250
251
252 packet.setMemoryUsage(count);
253 dataOut.write(data, 0, count);
254 }
255
256 protected synchronized final Packet readPacket(DataInput dataIn, PacketReader reader) throws IOException {
257 Packet packet = reader.createPacket();
258 int length = dataIn.readInt();
259 packet.setMemoryUsage(length);
260
261
262 byte[] data = new byte[length];
263 dataIn.readFully(data);
264
265 internalBytesIn.restart(data);
266 reader.buildPacket(packet, internalDataIn);
267 return packet;
268 }
269
270 private Object readResolve() throws ObjectStreamException {
271 return new DefaultWireFormat();
272 }
273
274 private PacketWriter getWriter(Packet packet) throws IOException {
275 PacketWriter answer = null;
276 switch (packet.getPacketType()) {
277 case Packet.ACTIVEMQ_MESSAGE :
278 answer = messageWriter;
279 break;
280 case Packet.ACTIVEMQ_TEXT_MESSAGE :
281 answer = textMessageWriter;
282 break;
283 case Packet.ACTIVEMQ_OBJECT_MESSAGE :
284 answer = objectMessageWriter;
285 break;
286 case Packet.ACTIVEMQ_BYTES_MESSAGE :
287 answer = bytesMessageWriter;
288 break;
289 case Packet.ACTIVEMQ_STREAM_MESSAGE :
290 answer = streamMessageWriter;
291 break;
292 case Packet.ACTIVEMQ_MAP_MESSAGE :
293 answer = mapMessageWriter;
294 break;
295 case Packet.ACTIVEMQ_MSG_ACK :
296 answer = messageAckWriter;
297 break;
298 case Packet.RECEIPT_INFO :
299 answer = receiptWriter;
300 break;
301 case Packet.CONSUMER_INFO :
302 answer = consumerInfoWriter;
303 break;
304 case Packet.PRODUCER_INFO :
305 answer = producerInfoWriter;
306 break;
307 case Packet.TRANSACTION_INFO :
308 answer = transactionInfoWriter;
309 break;
310 case Packet.XA_TRANSACTION_INFO :
311 answer = xaTransactionInfoWriter;
312 break;
313 case Packet.ACTIVEMQ_BROKER_INFO :
314 answer = brokerInfoWriter;
315 break;
316 case Packet.ACTIVEMQ_CONNECTION_INFO :
317 answer = connectionInfoWriter;
318 break;
319 case Packet.SESSION_INFO :
320 answer = sessionInfoWriter;
321 break;
322 case Packet.DURABLE_UNSUBSCRIBE :
323 answer = durableUnsubscribeWriter;
324 break;
325 case Packet.RESPONSE_RECEIPT_INFO :
326 answer = reponseReceiptWriter;
327 break;
328 case Packet.INT_RESPONSE_RECEIPT_INFO :
329 answer = intReponseReceiptWriter;
330 break;
331 case Packet.CAPACITY_INFO :
332 answer = capacityInfoWriter;
333 break;
334 case Packet.CAPACITY_INFO_REQUEST :
335 answer = capacityInfoRequestWriter;
336 break;
337 case Packet.WIRE_FORMAT_INFO :
338 answer = wireFormatInfoWriter;
339 break;
340 default :
341 log.error("no PacketWriter for packet: " + packet);
342 }
343 return answer;
344 }
345 }