1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.message;
20
21 import org.codehaus.activemq.util.BitArray;
22
23 import java.io.DataInput;
24 import java.io.IOException;
25
26 /***
27 * Writes a ProducerInfo object to a Stream
28 */
29 public class ActiveMQMessageReader extends AbstractPacketReader {
30 /***
31 * Return the type of Packet
32 *
33 * @return integer representation of the type of Packet
34 */
35 public int getPacketType() {
36 return Packet.ACTIVEMQ_MESSAGE;
37 }
38
39 /***
40 * @return a new Packet instance
41 */
42 public Packet createPacket() {
43 return new ActiveMQMessage();
44 }
45
46 /***
47 * build a Packet instance from the data input stream
48 *
49 * @param packet A Packet object
50 * @param dataIn the data input stream to build the packet from
51 * @throws IOException
52 */
53 public void buildPacket(Packet packet, DataInput dataIn) throws IOException {
54 super.buildPacket(packet, dataIn);
55 ActiveMQMessage msg = (ActiveMQMessage) packet;
56 msg.setJMSClientID(super.readUTF(dataIn));
57 msg.setProducerID(super.readUTF(dataIn));
58 msg.setJMSDestination(ActiveMQDestination.readFromStream(dataIn));
59 msg.setJMSDeliveryMode(dataIn.readByte());
60 msg.setJMSPriority(dataIn.readByte());
61 BitArray ba = msg.getBitArray();
62
63 msg.setJMSRedelivered(ba.get(ActiveMQMessage.REDELIVERED_INDEX));
64 msg.setXaTransacted(ba.get(ActiveMQMessage.XA_TRANS_INDEX));
65
66 if (ba.get(ActiveMQMessage.CORRELATION_INDEX)) {
67 msg.setJMSCorrelationID(super.readUTF(dataIn));
68 }
69 if (ba.get(ActiveMQMessage.TYPE_INDEX)) {
70 msg.setJMSType(super.readUTF(dataIn));
71 }
72 if (ba.get(ActiveMQMessage.BROKER_NAME_INDEX)) {
73 msg.setEntryBrokerName(super.readUTF(dataIn));
74 }
75 if (ba.get(ActiveMQMessage.CLUSTER_NAME_INDEX)) {
76 msg.setEntryClusterName(super.readUTF(dataIn));
77 }
78 if (ba.get(ActiveMQMessage.TRANSACTION_ID_INDEX)) {
79 msg.setTransactionId(super.readUTF(dataIn));
80 }
81 if (ba.get(ActiveMQMessage.REPLY_TO_INDEX)) {
82 msg.setJMSReplyTo(ActiveMQDestination.readFromStream(dataIn));
83 }
84 if (ba.get(ActiveMQMessage.TIMESTAMP_INDEX)) {
85 msg.setJMSTimestamp(dataIn.readLong());
86 }
87 if (ba.get(ActiveMQMessage.EXPIRATION_INDEX)) {
88 msg.setJMSExpiration(dataIn.readLong());
89 }
90 if (ba.get(ActiveMQMessage.CID_INDEX)) {
91 int cidlength = dataIn.readShort();
92 if (cidlength > 0) {
93 int[] cids = new int[cidlength];
94 for (int i = 0; i < cids.length; i++) {
95 cids[i] = dataIn.readShort();
96 }
97 msg.setConsumerNos(cids);
98 }
99 }
100 if (ba.get(ActiveMQMessage.PROPERTIES_INDEX)) {
101 msg.setProperties(msg.readMapProperties(dataIn));
102 }
103 if (ba.get(ActiveMQMessage.PAYLOAD_INDEX)) {
104 int payloadLength = dataIn.readInt();
105 if (payloadLength >= 0) {
106 byte[] payload = new byte[payloadLength];
107 dataIn.readFully(payload);
108 msg.setBodyAsBytes(payload);
109 }
110 }
111 }
112 }