1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.message;
20
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23
24 import javax.jms.JMSException;
25 import java.io.ByteArrayInputStream;
26 import java.io.ByteArrayOutputStream;
27 import java.io.DataInput;
28 import java.io.DataInputStream;
29 import java.io.DataOutput;
30 import java.io.DataOutputStream;
31 import java.io.IOException;
32 import java.net.DatagramPacket;
33
34 /***
35 * Represents a strategy of encoding packets on the wire or on disk
36 * using some kind of serialization or wire format.
37 * <p/>
38 * We use a default efficient format
39 * for Java to Java communication but other formats to other systems
40 * can be used, such as using simple text
41 * strings when talking to JavaScript or coming up with other formats for
42 * talking to C / C# languages or proprietary messaging systems
43 * we wish to interface with at the wire level etc.
44 *
45 * @version $Revision: 1.11 $
46 */
47 public abstract class WireFormat {
48
49 private static final Log log = LogFactory.getLog(WireFormat.class);
50
51 /***
52 * Reads a packet from the given input stream
53 *
54 * @param in
55 * @return
56 * @throws IOException
57 */
58 public abstract Packet readPacket(DataInput in) throws IOException;
59
60 /***
61 * A helper method for working with sockets where the first byte is read
62 * first, then the rest of the message is read.
63 * <p/>
64 * Its common when dealing with sockets to have different timeout semantics
65 * until the first non-zero byte is read of a message, after which
66 * time a zero timeout is used.
67 *
68 * @param firstByte the first byte of the packet
69 * @param in the rest of the packet
70 * @return
71 * @throws IOException
72 */
73 public abstract Packet readPacket(int firstByte, DataInput in) throws IOException;
74
75
76 /***
77 * Read a packet from a Datagram packet from the given channelID. If the
78 * packet is from the same channel ID as it was sent then we have a
79 * loop-back so discard the packet
80 *
81 * @param channelID is the unique channel ID
82 * @param dpacket
83 * @return the packet read from the datagram or null if it should be
84 * discarded
85 * @throws IOException
86 */
87 public Packet readPacket(String channelID, DatagramPacket dpacket) throws IOException {
88 DataInput in = new DataInputStream(new ByteArrayInputStream(dpacket.getData(), dpacket.getOffset(), dpacket.getLength()));
89 String id = in.readUTF();
90
91 if (channelID == null) {
92 log.trace("We do not have a channelID which is probably caused by a synchronization issue, we're receiving messages before we're fully initialised");
93 }
94 else if (channelID.equals(id)) {
95 if (log.isTraceEnabled()) {
96 log.trace("Discarding packet from id: " + id);
97 }
98 return null;
99 }
100 int type = in.readByte();
101 Packet packet = readPacket(type, in);
102
103
104
105
106
107 return packet;
108 }
109
110 /***
111 * Writes the packet to the given output stream
112 *
113 * @param packet
114 * @param out
115 * @throws IOException
116 * @throws JMSException
117 */
118 public abstract void writePacket(Packet packet, DataOutput out) throws IOException, JMSException;
119
120 /***
121 * Writes the given package to a new datagram
122 *
123 * @param channelID is the unique channel ID
124 * @param packet is the packet to write
125 * @return
126 * @throws IOException
127 * @throws JMSException
128 */
129 public DatagramPacket writePacket(String channelID, Packet packet) throws IOException, JMSException {
130 ByteArrayOutputStream out = new ByteArrayOutputStream();
131 DataOutputStream dataOut = new DataOutputStream(out);
132 channelID = channelID != null ? channelID : "";
133 dataOut.writeUTF(channelID);
134
135
136
137
138
139 writePacket(packet, dataOut);
140 dataOut.close();
141 byte[] data = out.toByteArray();
142 return new DatagramPacket(data, data.length);
143 }
144
145 /***
146 * Reads the packet from the given byte[]
147 * @param bytes
148 * @param offset
149 * @param length
150 * @return
151 * @throws IOException
152 */
153 public Packet fromBytes(byte[] bytes, int offset, int length) throws IOException {
154 DataInput in = new DataInputStream(new ByteArrayInputStream(bytes, offset, length));
155 return readPacket(in);
156 }
157
158 /***
159 * Reads the packet from the given byte[]
160 * @param bytes
161 * @return
162 * @throws IOException
163 */
164 public Packet fromBytes(byte[] bytes) throws IOException {
165 DataInput in = new DataInputStream(new ByteArrayInputStream(bytes));
166 return readPacket(in);
167 }
168
169 /***
170 * A helper method which converts a packet into a byte array
171 *
172 * @param packet
173 * @return a byte array representing the packet using some wire protocol
174 * @throws IOException
175 * @throws JMSException
176 */
177 public byte[] toBytes(Packet packet) throws IOException, JMSException {
178 ByteArrayOutputStream out = new ByteArrayOutputStream();
179 DataOutputStream dataOut = new DataOutputStream(out);
180 writePacket(packet, dataOut);
181 dataOut.close();
182 return out.toByteArray();
183 }
184
185 /***
186 * Creates a new copy of this wire format so it can be used in another thread/context
187 *
188 * @return
189 */
190 public abstract WireFormat copy();
191
192 /***
193 * Can this wireformat process packets of this version
194 * @param version the version number to test
195 * @return true if can accept the version
196 */
197 public abstract boolean canProcessWireFormatVersion(int version);
198
199 /***
200 * @return the current version of this wire format
201 */
202 public abstract int getCurrentWireFormatVersion();
203 }