1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 * Copyright 2004 Protique Ltd
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 **/
19 package org.codehaus.activemq.transport.gnet;
20
21 import EDU.oswego.cs.dl.util.concurrent.Latch;
22 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.geronimo.network.SelectorManager;
26 import org.apache.geronimo.network.protocol.AcceptableProtocol;
27 import org.apache.geronimo.network.protocol.ProtocolFactory;
28 import org.apache.geronimo.network.protocol.ProtocolFactory.AcceptedCallBack;
29 import org.apache.geronimo.network.protocol.ServerSocketAcceptor;
30 import org.apache.geronimo.network.protocol.SocketProtocol;
31 import org.apache.geronimo.pool.ClockPool;
32 import org.apache.geronimo.pool.ThreadPool;
33 import org.codehaus.activemq.message.WireFormat;
34 import org.codehaus.activemq.transport.TransportServerChannel;
35 import org.codehaus.activemq.transport.TransportServerChannelSupport;
36 import org.codehaus.activemq.util.JMSExceptionHelper;
37
38 import javax.jms.JMSException;
39 import java.net.URI;
40
41 /***
42 * An implementation of TransportServerChannel which uses
43 * the Geronimo network layer for connectivity.
44 *
45 * @version $Revision: 1.12 $
46 */
47 public class GTransportServerChannel extends TransportServerChannelSupport implements TransportServerChannel {
48 protected static final int BACKLOG = 500;
49 private static final Log log = LogFactory.getLog(GTransportServerChannel.class);
50
51 private WireFormat wireFormat;
52 private SynchronizedBoolean closed;
53 private ThreadPool tp;
54 private ClockPool cp;
55 private SelectorManager sm;
56 private ServerSocketAcceptor ssa;
57 private ProtocolFactory pf;
58 private Latch startLatch;
59
60 /***
61 * Default Constructor
62 *
63 * @param bindAddr
64 * @throws JMSException
65 */
66 public GTransportServerChannel(WireFormat wireFormat, URI bindAddr, SelectorManager selectorManager, ThreadPool threadPool, ClockPool clockPool) throws Exception {
67 super(bindAddr);
68 this.wireFormat = wireFormat;
69 this.sm = selectorManager;
70 this.tp = threadPool;
71 this.cp = clockPool;
72
73 closed = new SynchronizedBoolean(false);
74 startLatch = new Latch();
75
76
77
78
79 SocketProtocol spt = new SocketProtocol();
80 spt.setTimeout(30 * 1000);
81 spt.setSelectorManager(sm);
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102 pf = new ProtocolFactory();
103 pf.setClockPool(cp);
104 pf.setMaxAge(Long.MAX_VALUE);
105 pf.setMaxInactivity(Long.MAX_VALUE);
106
107 pf.setReclaimPeriod(10 * 1000);
108 pf.setTemplate(spt);
109
110 pf.setAcceptedCallBack(createAcceptedCallBack());
111
112 ssa = new ServerSocketAcceptor();
113 ssa.setSelectorManager(sm);
114 ssa.setTimeOut(5 * 1000);
115 ssa.setUri(bindAddr);
116 ssa.setAcceptorListener(pf);
117 }
118
119 /***
120 * @return
121 */
122 private AcceptedCallBack createAcceptedCallBack() {
123 return new AcceptedCallBack() {
124 public void accepted(AcceptableProtocol p) {
125 try {
126
127 startLatch.acquire();
128
129 if (p != null) {
130 GTransportChannel channel = new GTransportChannel(wireFormat, p, tp);
131 addClient(channel);
132 }
133
134 }
135 catch (Exception e) {
136 log.error("Caught while attempting to add new protocol: " + e, e);
137 }
138 }
139 };
140 }
141
142 /***
143 * start listeneing for events
144 *
145 * @throws JMSException if an error occurs
146 */
147 public void start() throws JMSException {
148 super.start();
149 try {
150 ssa.startup();
151 }
152 catch (Exception e) {
153 JMSException jmsEx = new JMSException("Could not start ServerSocketAcceptor: " + e);
154 jmsEx.setLinkedException(e);
155 throw jmsEx;
156 }
157 startLatch.release();
158 }
159
160 /***
161 * close the ServerChannel
162 */
163 public void stop() throws JMSException {
164 if (closed.commit(false, true)) {
165 super.stop();
166 try {
167 ssa.drain();
168 pf.drain();
169 }
170 catch (Throwable e) {
171 throw JMSExceptionHelper.newJMSException("Failed to stop: " + e, e);
172 }
173 }
174 }
175
176
177 /***
178 * @return pretty print of this
179 */
180 public String toString() {
181 return "GTransportServerChannel@" + getUrl();
182 }
183
184 }