1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.codehaus.activemq.transport.tcp;
20 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
21 import EDU.oswego.cs.dl.util.concurrent.BoundedChannel;
22 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
23 import EDU.oswego.cs.dl.util.concurrent.Executor;
24 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
25 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.codehaus.activemq.message.Packet;
29 import org.codehaus.activemq.message.WireFormat;
30 import org.codehaus.activemq.message.WireFormatInfo;
31 import org.codehaus.activemq.transport.TransportChannelSupport;
32 import org.codehaus.activemq.util.JMSExceptionHelper;
33 import javax.jms.JMSException;
34 import java.io.BufferedInputStream;
35 import java.io.DataInputStream;
36 import java.io.DataOutputStream;
37 import java.io.IOException;
38 import java.io.InterruptedIOException;
39 import java.net.InetAddress;
40 import java.net.Socket;
41 import java.net.SocketTimeoutException;
42 import java.net.URI;
43 import java.net.UnknownHostException;
44
45 /***
46 * A tcp implementation of a TransportChannel
47 *
48 * @version $Revision: 1.53 $
49 */
50 public class TcpTransportChannel extends TransportChannelSupport implements Runnable {
51 private static final int SOCKET_BUFFER_SIZE = 64 * 1024;
52 private static final Log log = LogFactory.getLog(TcpTransportChannel.class);
53 protected Socket socket;
54 private WireFormat wireFormat;
55 private DataOutputStream dataOut;
56 private DataInputStream dataIn;
57 private SynchronizedBoolean closed;
58 private SynchronizedBoolean started;
59 private Object outboundLock;
60 private Executor executor;
61 private Thread thread;
62 private boolean useAsyncSend = false;
63 private boolean changeTimeout = false;
64 private int soTimeout = 5000;
65 private BoundedChannel exceptionsList;
66
67 /***
68 * Construct basic helpers
69 */
70 protected TcpTransportChannel(WireFormat wireFormat) {
71 this.wireFormat = wireFormat;
72 closed = new SynchronizedBoolean(false);
73 started = new SynchronizedBoolean(false);
74
75 exceptionsList = new BoundedLinkedQueue(10);
76 outboundLock = new Object();
77 if (useAsyncSend) {
78 executor = new PooledExecutor(new BoundedBuffer(1000), 1);
79 }
80 }
81
82 /***
83 * Connect to a remote Node - e.g. a Broker
84 *
85 * @param remoteLocation
86 * @throws JMSException
87 */
88 public TcpTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
89 this(wireFormat);
90 try {
91 this.socket = createSocket(remoteLocation);
92 initialiseSocket();
93 }
94 catch (Exception ioe) {
95 throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
96 }
97 }
98
99 /***
100 * Connect to a remote Node - e.g. a Broker
101 *
102 * @param remoteLocation
103 * @param localLocation - e.g. local InetAddress and local port
104 * @throws JMSException
105 */
106 public TcpTransportChannel(WireFormat wireFormat, URI remoteLocation, URI localLocation) throws JMSException {
107 this(wireFormat);
108 try {
109 this.socket = createSocket(remoteLocation, localLocation);
110 initialiseSocket();
111 }
112 catch (Exception ioe) {
113 throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
114 }
115 }
116
117 /***
118 * @param socket
119 * @throws JMSException
120 */
121 public TcpTransportChannel(WireFormat wireFormat, Socket socket, Executor executor) throws JMSException {
122 this(wireFormat);
123 this.socket = socket;
124 this.executor = executor;
125 setServerSide(true);
126 try {
127 initialiseSocket();
128 }
129 catch (IOException ioe) {
130 throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
131 }
132 }
133
134 /***
135 * start listeneing for events
136 *
137 * @throws JMSException if an error occurs
138 */
139 public void start() throws JMSException {
140 if (started.commit(false, true)) {
141 thread = new Thread(this, toString());
142 if (isServerSide()) {
143 thread.setDaemon(true);
144 }
145 else {
146 thread.setPriority(Thread.NORM_PRIORITY + 2);
147 }
148 thread.start();
149
150 if (isServerSide()) {
151 WireFormatInfo info = new WireFormatInfo();
152 info.setVersion(getCurrentWireFormatVersion());
153 asyncSend(info);
154 }
155 }
156 }
157
158 /***
159 * close the channel
160 */
161 public void stop() {
162 if (closed.commit(false, true)) {
163 super.stop();
164 try {
165 stopExecutor(executor);
166 dataOut.close();
167 dataIn.close();
168 socket.close();
169
170
171 }
172 catch (Exception e) {
173 log.warn("Caught while closing: " + e + ". Now Closed", e);
174 }
175 }
176 }
177
178 /***
179 * Asynchronously send a Packet
180 *
181 * @param packet
182 * @throws JMSException
183 */
184 public void asyncSend(final Packet packet) throws JMSException {
185 if (executor != null) {
186 try {
187 executor.execute(new Runnable() {
188 public void run() {
189 try {
190 if (!closed.get()) {
191 doAsyncSend(packet);
192 }
193 }
194 catch (JMSException e) {
195 try {
196 exceptionsList.put(e);
197 }
198 catch (InterruptedException e1) {
199 log.warn("Failed to add element to exception list: " + e1);
200 }
201 }
202 }
203 });
204 }
205 catch (InterruptedException e) {
206 log.info("Caught: " + e, e);
207 }
208 try {
209 JMSException e = (JMSException) exceptionsList.poll(0);
210 if (e != null) {
211 throw e;
212 }
213 }
214 catch (InterruptedException e1) {
215 log.warn("Failed to remove element to exception list: " + e1);
216 }
217 }
218 else {
219 doAsyncSend(packet);
220 }
221 }
222
223 /***
224 * @return false
225 */
226 public boolean isMulticast() {
227 return false;
228 }
229
230 /***
231 * reads packets from a Socket
232 */
233 public void run() {
234 log.trace("TCP consumer thread starting");
235 int count = 0;
236 while (!closed.get()) {
237 if (isServerSide() && ++count > 500) {
238 count = 0;
239 Thread.yield();
240 }
241 int type = 0;
242 try {
243 if (changeTimeout) {
244 socket.setSoTimeout(soTimeout);
245 }
246 while ((type = dataIn.read()) == 0) {
247 }
248 if (type == -1) {
249 log.info("The socket peer is now closed");
250 onAsyncException(new JMSException("Socket peer is now closed"));
251 stop();
252 }
253 else {
254 if (changeTimeout) {
255 socket.setSoTimeout(0);
256 }
257 Packet packet = wireFormat.readPacket(type, dataIn);
258 if (packet != null) {
259 doConsumePacket(packet);
260 }
261 }
262 }
263 catch (SocketTimeoutException e) {
264
265 }
266 catch (InterruptedIOException e) {
267
268
269
270
271
272
273 }
274 catch (IOException e) {
275 doClose(e);
276 }
277 }
278 }
279
280 /***
281 * pretty print for object
282 *
283 * @return String representation of this object
284 */
285 public String toString() {
286 return "TcpTransportChannel: " + socket;
287 }
288
289 public Socket getSocket() {
290 return socket;
291 }
292
293 /***
294 * Can this wireformat process packets of this version
295 *
296 * @param version the version number to test
297 * @return true if can accept the version
298 */
299 public boolean canProcessWireFormatVersion(int version) {
300 return wireFormat.canProcessWireFormatVersion(version);
301 }
302
303 /***
304 * @return the current version of this wire format
305 */
306 public int getCurrentWireFormatVersion() {
307 return wireFormat.getCurrentWireFormatVersion();
308 }
309
310
311
312 public boolean isChangeTimeout() {
313 return changeTimeout;
314 }
315
316 public void setChangeTimeout(boolean changeTimeout) {
317 this.changeTimeout = changeTimeout;
318 }
319
320 public boolean isUseAsyncSend() {
321 return useAsyncSend;
322 }
323
324 public void setUseAsyncSend(boolean useAsyncSend) {
325 this.useAsyncSend = useAsyncSend;
326 }
327
328 public int getSoTimeout() {
329 return soTimeout;
330 }
331
332 public void setSoTimeout(int soTimeout) {
333 this.soTimeout = soTimeout;
334 this.changeTimeout = true;
335 }
336
337
338
339 /***
340 * Actually performs the async send of a packet
341 *
342 * @param packet
343 * @throws JMSException
344 */
345 protected void doAsyncSend(Packet packet) throws JMSException {
346 try {
347 synchronized (outboundLock) {
348 wireFormat.writePacket(packet, dataOut);
349 dataOut.flush();
350 }
351 }
352 catch (IOException e) {
353 if (closed.get()) {
354 log.trace("Caught exception while closed: " + e, e);
355 }
356 else {
357 throw JMSExceptionHelper.newJMSException("asyncSend failed: " + e, e);
358 }
359 }
360 catch (JMSException e) {
361 if (closed.get()) {
362 log.trace("Caught exception while closed: " + e, e);
363 }
364 else {
365 throw e;
366 }
367 }
368 }
369
370 private void doClose(Exception ex) {
371 if (!closed.get()) {
372 setPendingStop(true);
373 onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
374 stop();
375 }
376 }
377
378 /***
379 * Configures the socket for use
380 *
381 * @throws IOException
382 */
383 protected void initialiseSocket() throws IOException {
384 socket.setReceiveBufferSize(SOCKET_BUFFER_SIZE);
385 socket.setSendBufferSize(SOCKET_BUFFER_SIZE);
386 socket.setSoTimeout(soTimeout);
387 BufferedInputStream buffIn = new BufferedInputStream(socket.getInputStream());
388 this.dataIn = new DataInputStream(buffIn);
389 TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream());
390 this.dataOut = new DataOutputStream(buffOut);
391 }
392
393 /***
394 * Factory method to create a new socket
395 *
396 * @param remoteLocation the URI to connect to
397 * @return the newly created socket
398 * @throws UnknownHostException
399 * @throws IOException
400 */
401 protected Socket createSocket(URI remoteLocation) throws UnknownHostException, IOException {
402 return new Socket(remoteLocation.getHost(), remoteLocation.getPort());
403 }
404
405 /***
406 * Factory method to create a new socket
407 *
408 * @param remoteLocation
409 * @param localLocation
410 * @return @throws IOException
411 * @throws IOException
412 * @throws UnknownHostException
413 */
414 protected Socket createSocket(URI remoteLocation, URI localLocation) throws IOException, UnknownHostException {
415 return new Socket(remoteLocation.getHost(), remoteLocation.getPort(), InetAddress.getByName(localLocation
416 .getHost()), localLocation.getPort());
417 }
418 }