1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.transport.udp;
19
20 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
21 import org.apache.commons.logging.Log;
22 import org.apache.commons.logging.LogFactory;
23 import org.codehaus.activemq.message.Packet;
24 import org.codehaus.activemq.message.WireFormat;
25 import org.codehaus.activemq.transport.TransportChannelSupport;
26 import org.codehaus.activemq.util.IdGenerator;
27
28 import javax.jms.JMSException;
29 import java.io.IOException;
30 import java.net.DatagramPacket;
31 import java.net.DatagramSocket;
32 import java.net.InetAddress;
33 import java.net.SocketTimeoutException;
34 import java.net.URI;
35
36 /***
37 * A UDP implementation of a TransportChannel
38 *
39 * @version $Revision: 1.23 $
40 */
41 public class UdpTransportChannel extends TransportChannelSupport implements Runnable {
42
43 private static final int SOCKET_BUFFER_SIZE = 32 * 1024;
44 private static final int SO_TIMEOUT = 5000;
45 private static final Log log = LogFactory.getLog(UdpTransportChannel.class);
46
47 protected DatagramSocket socket;
48 protected int port;
49 protected InetAddress inetAddress;
50
51 private WireFormat wireFormat;
52 private SynchronizedBoolean closed;
53 private SynchronizedBoolean started;
54 private Thread thread;
55 private IdGenerator idGenerator = new IdGenerator();
56 private Object lock;
57
58
59 /***
60 * Construct basic helpers
61 */
62 protected UdpTransportChannel(WireFormat wireFormat) {
63 this.wireFormat = wireFormat;
64 closed = new SynchronizedBoolean(false);
65 started = new SynchronizedBoolean(false);
66 lock = new Object();
67 }
68
69 public UdpTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
70 this(wireFormat, remoteLocation, remoteLocation.getPort());
71 }
72
73 public UdpTransportChannel(WireFormat wireFormat, URI remoteLocation, int port) throws JMSException {
74 this(wireFormat);
75 try {
76 this.port = port;
77 this.inetAddress = InetAddress.getByName(remoteLocation.getHost());
78 this.socket = createSocket(remoteLocation.getPort());
79
80
81
82
83 socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE);
84 socket.setSendBufferSize(SOCKET_BUFFER_SIZE);
85
86 connect();
87
88
89 }
90 catch (Exception ioe) {
91 JMSException jmsEx = new JMSException("Initialization of TransportChannel failed: " + ioe);
92 jmsEx.setLinkedException(ioe);
93 throw jmsEx;
94 }
95 }
96
97 /***
98 * @param socket
99 * @throws JMSException
100 */
101 public UdpTransportChannel(WireFormat wireFormat, DatagramSocket socket) throws JMSException {
102 this(wireFormat);
103 this.socket = socket;
104 this.port = socket.getPort();
105 this.inetAddress = socket.getInetAddress();
106 try {
107 socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE);
108 socket.setSendBufferSize(SOCKET_BUFFER_SIZE);
109 }
110 catch (IOException ioe) {
111 JMSException jmsEx = new JMSException("Initialization of TransportChannel failed");
112 jmsEx.setLinkedException(ioe);
113 throw jmsEx;
114 }
115 }
116
117 public UdpTransportChannel(WireFormat wireFormat, DatagramSocket socket, int port) throws JMSException {
118 this(wireFormat, socket);
119 this.port = port;
120 }
121
122 /***
123 * close the channel
124 */
125 public void stop() {
126 if (closed.commit(false, true)) {
127 super.stop();
128 try {
129 socket.close();
130 }
131 catch (Exception e) {
132 log.trace(toString() + " now closed");
133 }
134 }
135 }
136
137 /***
138 * start listeneing for events
139 *
140 * @throws JMSException if an error occurs
141 */
142 public void start() throws JMSException {
143 if (started.commit(false, true)) {
144 thread = new Thread(this, toString());
145 if (isServerSide()) {
146 thread.setDaemon(true);
147 }
148 thread.start();
149 }
150 }
151
152
153 /***
154 * Asynchronously send a Packet
155 *
156 * @param packet
157 * @throws JMSException
158 */
159 public void asyncSend(Packet packet) throws JMSException {
160 try {
161 if (log.isDebugEnabled()) {
162 log.debug("Sending packet: " + packet);
163 }
164 DatagramPacket dpacket = createDatagramPacket(packet);
165
166
167
168
169 socket.send(dpacket);
170
171 }
172 catch (IOException e) {
173 JMSException jmsEx = new JMSException("asyncSend failed " + e);
174 jmsEx.setLinkedException(e);
175 throw jmsEx;
176 }
177 }
178
179 public boolean isMulticast() {
180 return false;
181 }
182
183 /***
184 * reads packets from a Socket
185 */
186 public void run() {
187 while (!closed.get()) {
188 try {
189 socket.setSoTimeout(SO_TIMEOUT);
190
191 DatagramPacket dpacket = createDatagramPacket();
192 while (!socket.isClosed()) {
193 socket.setSoTimeout(0);
194 socket.receive(dpacket);
195 Packet packet = wireFormat.readPacket(getClientID(), dpacket);
196 if (packet != null) {
197 doConsumePacket(packet);
198 }
199 }
200
201 log.trace("The socket peer is now closed");
202 doClose(new IOException("Socket peer is now closed"));
203 }
204 catch (SocketTimeoutException ste) {
205
206 }
207 catch (IOException e) {
208 doClose(e);
209 }
210 }
211 }
212
213 /***
214 * Can this wireformat process packets of this version
215 * @param version the version number to test
216 * @return true if can accept the version
217 */
218 public boolean canProcessWireFormatVersion(int version){
219 return wireFormat.canProcessWireFormatVersion(version);
220 }
221
222 /***
223 * @return the current version of this wire format
224 */
225 public int getCurrentWireFormatVersion(){
226 return wireFormat.getCurrentWireFormatVersion();
227 }
228
229 /***
230 * @return
231 */
232 protected DatagramPacket createDatagramPacket() {
233 DatagramPacket answer = new DatagramPacket(new byte[SOCKET_BUFFER_SIZE], SOCKET_BUFFER_SIZE);
234 if (port >= 0) {
235 answer.setPort(port);
236 }
237 answer.setAddress(inetAddress);
238 return answer;
239 }
240
241 protected DatagramPacket createDatagramPacket(Packet packet) throws IOException, JMSException {
242
243
244
245
246 DatagramPacket answer = wireFormat.writePacket(getClientID(), packet);
247 if (port >= 0) {
248 answer.setPort(port);
249 }
250 answer.setAddress(inetAddress);
251 return answer;
252 }
253
254 private void doClose(Exception ex) {
255 if (!closed.get()) {
256 JMSException jmsEx = new JMSException("Error reading socket: " + ex.getMessage());
257 jmsEx.setLinkedException(ex);
258 onAsyncException(jmsEx);
259 stop();
260 }
261 }
262
263 protected void connect() throws IOException {
264
265 }
266
267 protected DatagramSocket createSocket(int port) throws IOException {
268 return new DatagramSocket(port, inetAddress);
269 }
270
271 /***
272 * pretty print for object
273 *
274 * @return String representation of this object
275 */
276 public String toString() {
277 return "UdpTransportChannel: " + socket;
278 }
279 }