1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 * Copyright 2004 Protique Ltd
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 **/
19 package org.codehaus.activemq.transport.gnet;
20
21 import EDU.oswego.cs.dl.util.concurrent.Latch;
22 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.geronimo.network.SelectorManager;
26 import org.apache.geronimo.network.protocol.AbstractProtocol;
27 import org.apache.geronimo.network.protocol.DownPacket;
28 import org.apache.geronimo.network.protocol.PlainDownPacket;
29 import org.apache.geronimo.network.protocol.Protocol;
30 import org.apache.geronimo.network.protocol.ProtocolException;
31 import org.apache.geronimo.network.protocol.SocketProtocol;
32 import org.apache.geronimo.network.protocol.UpPacket;
33 import org.apache.geronimo.pool.ClockPool;
34 import org.apache.geronimo.pool.ThreadPool;
35 import org.codehaus.activemq.message.Packet;
36 import org.codehaus.activemq.message.WireFormat;
37 import org.codehaus.activemq.transport.TransportChannelSupport;
38
39 import javax.jms.JMSException;
40 import java.io.ByteArrayOutputStream;
41 import java.io.DataInputStream;
42 import java.io.DataOutputStream;
43 import java.io.IOException;
44 import java.io.InputStream;
45 import java.net.InetAddress;
46 import java.net.InetSocketAddress;
47 import java.net.URI;
48 import java.net.UnknownHostException;
49 import java.nio.ByteBuffer;
50 import java.util.ArrayList;
51
52 /***
53 * An implementation of a TransportChannel which uses the Geronimo network layer
54 * for connectivity.
55 *
56 * @version $Revision: 1.16 $
57 */
58 public class GTransportChannel extends TransportChannelSupport {
59 private static final Log log = LogFactory.getLog(GTransportChannel.class);
60
61 private SynchronizedBoolean closed;
62 private SynchronizedBoolean started;
63 private Protocol protocol;
64 private Latch dispatchLatch;
65 private ThreadPool threadPool;
66 private WireFormat wireFormat;
67
68 /***
69 * Construct basic helpers
70 */
71 protected GTransportChannel(WireFormat wireFormat, ThreadPool tp) {
72 this.wireFormat = wireFormat;
73 closed = new SynchronizedBoolean(false);
74 started = new SynchronizedBoolean(false);
75 dispatchLatch = new Latch();
76 threadPool = tp;
77 }
78
79 /***
80 * @param protocol
81 */
82 public GTransportChannel(WireFormat wireFormat, Protocol protocol, ThreadPool tp) {
83 this(wireFormat, tp);
84 init(protocol);
85 }
86
87 /***
88 * @param remoteLocation
89 * @param localLocation
90 */
91 public GTransportChannel(WireFormat wireFormat, URI remoteLocation, URI localLocation,
92 SelectorManager sm, ThreadPool tp, ClockPool cp)
93 throws UnknownHostException, ProtocolException {
94 this(wireFormat, tp);
95
96
97
98
99
100
101
102
103
104 SocketProtocol sp = new SocketProtocol();
105 sp.setTimeout(1000 * 30);
106 if (localLocation != null) {
107 sp.setInterface(new InetSocketAddress(InetAddress
108 .getByName(localLocation.getHost()), localLocation
109 .getPort()));
110 }
111 sp.setAddress(new InetSocketAddress(InetAddress
112 .getByName(remoteLocation.getHost()), remoteLocation
113 .getPort()));
114 sp.setSelectorManager(sm);
115
116
117
118
119
120
121
122
123 init(sp);
124 sp.setup();
125 }
126
127 /***
128 * @param protocol
129 */
130 private void init(Protocol protocol) {
131 this.protocol = protocol;
132
133 protocol.setUpProtocol(new AbstractProtocol() {
134 public void setup() {
135 }
136
137 public void drain() {
138 }
139
140 public void teardown() {
141 }
142
143 public void sendUp(final UpPacket p) {
144 try {
145 log.trace("AQUIRING: " + dispatchLatch);
146 dispatchLatch.acquire();
147 log.trace("AQUIRED: " + dispatchLatch);
148
149 dispatch(p);
150 }
151 catch (InterruptedException e) {
152 log.warn("Caught exception dispatching packet: " + p + ". Reason: "
153 + e, e);
154
155 }
156 }
157
158 public void sendDown(DownPacket p) throws ProtocolException {
159 getDownProtocol().sendDown(p);
160 }
161
162 public void flush() throws ProtocolException {
163 getDownProtocol().flush();
164 }
165 });
166 }
167
168 private void dispatch(UpPacket p) {
169 try {
170
171 Packet packet = toPacket(p);
172 log.trace("<<<< SENDING UP <<<< " + packet);
173 if (packet != null) {
174 doConsumePacket(packet);
175 }
176 }
177 catch (IOException e) {
178 log.warn("Caught exception dispatching packet: " + p + ". Reason: "
179 + e, e);
180
181 }
182 }
183
184 /***
185 * close the channel
186 */
187 public void stop() {
188 super.stop();
189 if (closed.commit(false, true)) {
190 try {
191 protocol.drain();
192 }
193 catch (Exception e) {
194 log.trace(toString() + " now closed");
195 }
196 }
197 }
198
199 /***
200 * start listeneing for events
201 *
202 * @throws JMSException if an error occurs
203 */
204 public void start() throws JMSException {
205 if (started.commit(false, true)) {
206
207 dispatchLatch.release();
208 }
209 }
210
211
212 /***
213 * Asynchronously send a Packet
214 *
215 * @param packet
216 * @throws JMSException
217 */
218 public void asyncSend(Packet packet) throws JMSException {
219 try {
220 if (log.isTraceEnabled()) {
221 log.trace(">>>> ASYNC SENDING DOWN >>>> " + packet);
222 }
223
224
225 synchronized (protocol) {
226 protocol.sendDown(toPlainDownPacket(packet));
227 }
228 }
229 catch (IOException e) {
230 System.out.println("Caught: " + e);
231 e.printStackTrace();
232 JMSException jmsEx = new JMSException("asyncSend failed "
233 + e.getMessage());
234 jmsEx.setLinkedException(e);
235 throw jmsEx;
236 }
237 catch (ProtocolException e) {
238 System.out.println("Caught: " + e);
239 e.printStackTrace();
240 JMSException jmsEx = new JMSException("asyncSend failed "
241 + e.getMessage());
242 jmsEx.setLinkedException(e);
243 throw jmsEx;
244 }
245 }
246
247 public boolean isMulticast() {
248 return false;
249 }
250
251 protected PlainDownPacket toPlainDownPacket(Packet mqpacket)
252 throws IOException, JMSException {
253
254 ByteArrayOutputStream baos = new ByteArrayOutputStream();
255 DataOutputStream dos = new DataOutputStream(baos);
256 wireFormat.writePacket(mqpacket, dos);
257 dos.close();
258 ArrayList list = new ArrayList(1);
259 ByteBuffer buffer = ByteBuffer.wrap(baos.toByteArray());
260 buffer.limit(buffer.capacity());
261 list.add(buffer);
262 PlainDownPacket packet = new PlainDownPacket();
263 packet.setBuffers(list);
264 return packet;
265 }
266
267 protected Packet toPacket(UpPacket packet) throws IOException {
268 final ByteBuffer buffer = packet.getBuffer();
269 InputStream is = new InputStream() {
270 public int read() {
271 if (!buffer.hasRemaining()) {
272 return -1;
273 }
274 int rc = 0xFF & buffer.get();
275 return rc;
276 }
277
278 public synchronized int read(byte[] bytes, int off, int len) {
279 len = Math.min(len, buffer.remaining());
280 buffer.get(bytes, off, len);
281 return len;
282 }
283 };
284 DataInputStream dis = new DataInputStream(is);
285 return wireFormat.readPacket(dis);
286 }
287
288 /***
289 * pretty print for object
290 *
291 * @return String representation of this object
292 */
293 public String toString() {
294 return "GTransportChannel: " + protocol;
295 }
296
297 /***
298 * Can this wireformat process packets of this version
299 * @param version the version number to test
300 * @return true if can accept the version
301 */
302 public boolean canProcessWireFormatVersion(int version){
303 return wireFormat.canProcessWireFormatVersion(version);
304 }
305
306 /***
307 * @return the current version of this wire format
308 */
309 public int getCurrentWireFormatVersion(){
310 return wireFormat.getCurrentWireFormatVersion();
311 }
312 }