1 /***
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.transport.jrms;
19
20 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
21 import com.sun.multicast.reliable.RMException;
22 import com.sun.multicast.reliable.transport.RMPacketSocket;
23 import com.sun.multicast.reliable.transport.SessionDoneException;
24 import com.sun.multicast.reliable.transport.TransportProfile;
25 import com.sun.multicast.reliable.transport.lrmp.LRMPTransportProfile;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.codehaus.activemq.message.Packet;
29 import org.codehaus.activemq.message.WireFormat;
30 import org.codehaus.activemq.transport.TransportChannelSupport;
31 import org.codehaus.activemq.util.IdGenerator;
32
33 import javax.jms.JMSException;
34 import java.io.IOException;
35 import java.net.DatagramPacket;
36 import java.net.InetAddress;
37 import java.net.URI;
38
39 /***
40 * A JRMS implementation of a TransportChannel
41 *
42 * @version $Revision: 1.20 $
43 */
44 public class JRMSTransportChannel extends TransportChannelSupport implements Runnable {
45
46 private static final int SOCKET_BUFFER_SIZE = 32 * 1024;
47 private static final Log log = LogFactory.getLog(JRMSTransportChannel.class);
48
49 private WireFormat wireFormat;
50 private SynchronizedBoolean closed;
51 private SynchronizedBoolean started;
52 private Thread thread;
53
54 private RMPacketSocket socket;
55 private IdGenerator idGenerator;
56 private String channelId;
57 private int port;
58 private InetAddress inetAddress;
59 private Object lock;
60
61 /***
62 * Construct basic helpers
63 */
64 protected JRMSTransportChannel(WireFormat wireFormat) {
65 this.wireFormat = wireFormat;
66 idGenerator = new IdGenerator();
67 channelId = idGenerator.generateId();
68 closed = new SynchronizedBoolean(false);
69 started = new SynchronizedBoolean(false);
70 lock = new Object();
71 }
72
73 /***
74 * Connect to a remote Node - e.g. a Broker
75 *
76 * @param remoteLocation
77 * @throws JMSException
78 */
79 public JRMSTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
80 this(wireFormat);
81 try {
82 this.port = remoteLocation.getPort();
83 this.inetAddress = InetAddress.getByName(remoteLocation.getHost());
84 LRMPTransportProfile profile = new LRMPTransportProfile(inetAddress, port);
85 profile.setTTL((byte) 1);
86 profile.setOrdered(true);
87 this.socket = profile.createRMPacketSocket(TransportProfile.SEND_RECEIVE);
88 }
89 catch (Exception ioe) {
90 ioe.printStackTrace();
91 JMSException jmsEx = new JMSException("Initialization of JRMSTransportChannel failed: " + ioe);
92 jmsEx.setLinkedException(ioe);
93 throw jmsEx;
94 }
95 }
96
97 /***
98 * close the channel
99 */
100 public void stop() {
101 if (closed.commit(false, true)) {
102 super.stop();
103 try {
104 socket.close();
105 }
106 catch (Exception e) {
107 log.trace(toString() + " now closed");
108 }
109 }
110 }
111
112 /***
113 * start listeneing for events
114 *
115 * @throws JMSException if an error occurs
116 */
117 public void start() throws JMSException {
118 if (started.commit(false, true)) {
119 thread = new Thread(this, toString());
120 if (isServerSide()) {
121 thread.setDaemon(true);
122 }
123 thread.start();
124 }
125 }
126
127 /***
128 * Asynchronously send a Packet
129 *
130 * @param packet
131 * @throws JMSException
132 */
133 public void asyncSend(Packet packet) throws JMSException {
134 try {
135 DatagramPacket dpacket = createDatagramPacket(packet);
136
137
138
139 socket.send(dpacket);
140
141 }
142 catch (RMException rme) {
143 JMSException jmsEx = new JMSException("syncSend failed " + rme.getMessage());
144 jmsEx.setLinkedException(rme);
145 throw jmsEx;
146 }
147 catch (IOException e) {
148 JMSException jmsEx = new JMSException("asyncSend failed " + e.getMessage());
149 jmsEx.setLinkedException(e);
150 throw jmsEx;
151 }
152 }
153
154
155 public boolean isMulticast() {
156 return true;
157 }
158
159 /***
160 * reads packets from a Socket
161 */
162 public void run() {
163 try {
164 while (!closed.get()) {
165 DatagramPacket dpacket = socket.receive();
166 Packet packet = wireFormat.readPacket(channelId, dpacket);
167 if (packet != null) {
168 doConsumePacket(packet);
169 }
170 }
171 log.trace("The socket peer is now closed");
172
173 stop();
174 }
175 catch (SessionDoneException e) {
176
177
178 log.trace("Session completed", e);
179 stop();
180 }
181 catch (RMException ste) {
182 doClose(ste);
183 }
184 catch (IOException e) {
185 doClose(e);
186 }
187 }
188
189 /***
190 * Can this wireformat process packets of this version
191 * @param version the version number to test
192 * @return true if can accept the version
193 */
194 public boolean canProcessWireFormatVersion(int version){
195 return wireFormat.canProcessWireFormatVersion(version);
196 }
197
198 /***
199 * @return the current version of this wire format
200 */
201 public int getCurrentWireFormatVersion(){
202 return wireFormat.getCurrentWireFormatVersion();
203 }
204
205 protected DatagramPacket createDatagramPacket() {
206 DatagramPacket answer = new DatagramPacket(new byte[SOCKET_BUFFER_SIZE], SOCKET_BUFFER_SIZE);
207 answer.setPort(port);
208 answer.setAddress(inetAddress);
209 return answer;
210 }
211
212 protected DatagramPacket createDatagramPacket(Packet packet) throws IOException, JMSException {
213 DatagramPacket answer = wireFormat.writePacket(channelId, packet);
214 answer.setPort(port);
215 answer.setAddress(inetAddress);
216 return answer;
217 }
218
219 private void doClose(Exception ex) {
220 if (!closed.get()) {
221 JMSException jmsEx = new JMSException("Error reading socket: " + ex);
222 jmsEx.setLinkedException(ex);
223 onAsyncException(jmsEx);
224 stop();
225 }
226 }
227
228 /***
229 * pretty print for object
230 *
231 * @return String representation of this object
232 */
233 public String toString() {
234 return "JRMSTransportChannel: " + socket;
235 }
236 }