View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    * Copyright 2004 Protique Ltd
5    *
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   *
18   **/
19  package org.codehaus.activemq.transport.gnet;
20  
21  import EDU.oswego.cs.dl.util.concurrent.Latch;
22  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.geronimo.network.SelectorManager;
26  import org.apache.geronimo.network.protocol.AcceptableProtocol;
27  import org.apache.geronimo.network.protocol.ProtocolFactory;
28  import org.apache.geronimo.network.protocol.ProtocolFactory.AcceptedCallBack;
29  import org.apache.geronimo.network.protocol.ServerSocketAcceptor;
30  import org.apache.geronimo.network.protocol.SocketProtocol;
31  import org.apache.geronimo.pool.ClockPool;
32  import org.apache.geronimo.pool.ThreadPool;
33  import org.codehaus.activemq.message.WireFormat;
34  import org.codehaus.activemq.transport.TransportServerChannel;
35  import org.codehaus.activemq.transport.TransportServerChannelSupport;
36  import org.codehaus.activemq.util.JMSExceptionHelper;
37  
38  import javax.jms.JMSException;
39  import java.net.URI;
40  
41  /***
42   * An implementation of TransportServerChannel which uses
43   * the Geronimo network layer for connectivity.
44   *
45   * @version $Revision: 1.12 $
46   */
47  public class GTransportServerChannel extends TransportServerChannelSupport implements TransportServerChannel {
48      protected static final int BACKLOG = 500;
49      private static final Log log = LogFactory.getLog(GTransportServerChannel.class);
50  
51      private WireFormat wireFormat;
52      private SynchronizedBoolean closed;
53      private ThreadPool tp;
54      private ClockPool cp;
55      private SelectorManager sm;
56      private ServerSocketAcceptor ssa;
57      private ProtocolFactory pf;
58      private Latch startLatch;
59  
60      /***
61       * Default Constructor
62       *
63       * @param bindAddr
64       * @throws JMSException
65       */
66      public GTransportServerChannel(WireFormat wireFormat, URI bindAddr, SelectorManager selectorManager, ThreadPool threadPool, ClockPool clockPool) throws Exception {
67          super(bindAddr);
68          this.wireFormat = wireFormat;
69          this.sm = selectorManager;
70          this.tp = threadPool;
71          this.cp = clockPool;
72  
73          closed = new SynchronizedBoolean(false);
74          startLatch = new Latch();
75  
76  /*
77          ControlServerProtocolStack templateStack = new ControlServerProtocolStack();
78  */
79          SocketProtocol spt = new SocketProtocol();
80          spt.setTimeout(30 * 1000);
81          spt.setSelectorManager(sm);
82  /*
83          templateStack.push(spt);
84  
85          ControlServerProtocol csp = new ControlServerProtocol();
86          csp.setTimeout(30 * 1000);
87          csp.setThreadPool(tp);
88          csp.setClockPool(cp);
89          csp.setSelectorManager(sm);
90          csp.setControlServerListener(new ControlServerListener() {
91              public void shutdown() {
92                  log.trace("SERVER SIDE SHUTDOWN");
93              }
94          });
95  
96          templateStack.push(csp);
97          ControlServerProtocolWaiter waiter = new ControlServerProtocolWaiter();
98          waiter.push(new CountingProtocol());
99          templateStack.push(waiter);
100 */
101 
102         pf = new ProtocolFactory();
103         pf.setClockPool(cp);
104         pf.setMaxAge(Long.MAX_VALUE);
105         pf.setMaxInactivity(Long.MAX_VALUE);
106         //pf.setReclaimPeriod(Long.MAX_VALUE);
107         pf.setReclaimPeriod(10 * 1000);
108         pf.setTemplate(spt);
109 //        pf.setTemplate(templateStack);
110         pf.setAcceptedCallBack(createAcceptedCallBack());
111 
112         ssa = new ServerSocketAcceptor();
113         ssa.setSelectorManager(sm);
114         ssa.setTimeOut(5 * 1000);
115         ssa.setUri(bindAddr);
116         ssa.setAcceptorListener(pf);
117     }
118 
119     /***
120      * @return
121      */
122     private AcceptedCallBack createAcceptedCallBack() {
123         return new AcceptedCallBack() {
124             public void accepted(AcceptableProtocol p) {
125                 try {
126                     // Wait for start to be called before accepting connections..
127                     startLatch.acquire();
128 
129                     if (p != null) {
130                         GTransportChannel channel = new GTransportChannel(wireFormat, p, tp);
131                         addClient(channel);
132                     }
133 
134                 }
135                 catch (Exception e) {
136                     log.error("Caught while attempting to add new protocol: " + e, e);
137                 }
138             }
139         };
140     }
141 
142     /***
143      * start listeneing for events
144      *
145      * @throws JMSException if an error occurs
146      */
147     public void start() throws JMSException {
148         super.start();
149         try {
150             ssa.startup();
151         }
152         catch (Exception e) {
153             JMSException jmsEx = new JMSException("Could not start ServerSocketAcceptor: " + e);
154             jmsEx.setLinkedException(e);
155             throw jmsEx;
156         }
157         startLatch.release();
158     }
159 
160     /***
161      * close the ServerChannel
162      */
163     public void stop() throws JMSException {
164         if (closed.commit(false, true)) {
165             super.stop();
166             try {
167                 ssa.drain();
168                 pf.drain();
169             }
170             catch (Throwable e) {
171                 throw JMSExceptionHelper.newJMSException("Failed to stop: " + e, e);
172             }
173         }
174     }
175 
176 
177     /***
178      * @return pretty print of this
179      */
180     public String toString() {
181         return "GTransportServerChannel@" + getUrl();
182     }
183 
184 }