1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.ra;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22 import org.codehaus.activemq.message.ActiveMQQueue;
23 import org.codehaus.activemq.message.ActiveMQTopic;
24
25 import javax.jms.*;
26 import javax.resource.ResourceException;
27 import javax.resource.spi.endpoint.MessageEndpoint;
28 import javax.resource.spi.work.*;
29 import javax.transaction.xa.XAResource;
30 import java.util.ArrayList;
31 import java.util.LinkedList;
32
33 /***
34 * @version $Revision: 1.3 $ $Date: 2004/07/31 21:11:00 $
35 */
36 public class ActiveMQAsfEndpointWorker extends ActiveMQBaseEndpointWorker {
37
38 private static final Log log = LogFactory.getLog(ActiveMQAsfEndpointWorker.class);
39 private static final int MAX_MSGS_PER_SESSION = 1;
40 private static final int MAX_SESSION = 10;
41
42
43 ConnectionConsumer consumer;
44 private ServerSessionPoolImpl serverSessionPool;
45
46 /***
47 * @param adapter
48 * @param key
49 * @throws ResourceException
50 */
51 public ActiveMQAsfEndpointWorker(ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
52 super(adapter, key);
53 }
54
55
56 public void start() throws WorkException, ResourceException {
57 log.debug("Starting");
58 boolean ok = false;
59 try {
60 serverSessionPool = new ServerSessionPoolImpl();
61 ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
62
63 Destination dest = null;
64
65 if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
66 dest = new ActiveMQQueue(activationSpec.getDestinationName());
67 } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
68 dest = new ActiveMQTopic(activationSpec.getDestinationName());
69 } else {
70 throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
71 }
72
73 if (emptyToNull(activationSpec.getDurableSubscriptionName()) != null) {
74 consumer = adapter.getPhysicalConnection().createDurableConnectionConsumer((Topic)dest,activationSpec.getDurableSubscriptionName(), emptyToNull(activationSpec.getMessageSelector()), serverSessionPool, MAX_MSGS_PER_SESSION);
75 } else {
76 consumer = adapter.getPhysicalConnection().createConnectionConsumer(dest, emptyToNull(activationSpec.getMessageSelector()), serverSessionPool, MAX_MSGS_PER_SESSION);
77 }
78
79 ok = true;
80 log.debug("Started");
81
82 } catch (JMSException e) {
83 throw new ResourceException("Could not start the endpoint.", e);
84 } finally {
85
86
87 if (!ok) {
88 safeClose(consumer);
89 }
90 }
91 }
92
93 /***
94 *
95 */
96 public void stop() throws InterruptedException {
97 safeClose(consumer);
98 serverSessionPool.close();
99 }
100
101 class ServerSessionPoolImpl implements ServerSessionPool {
102
103 ServerSessionImpl ss;
104 ArrayList idleSessions = new ArrayList();
105 LinkedList activeSessions = new LinkedList();
106 int sessionIds=0;
107 int nextUsedSession;
108 boolean closing=false;
109
110 public ServerSessionPoolImpl() {
111 }
112
113 public ServerSessionImpl createServerSessionImpl() throws JMSException {
114 Session session = adapter.getPhysicalConnection().createSession(true, Session.SESSION_TRANSACTED);
115 return new ServerSessionImpl(this, session);
116 }
117
118 /***
119 */
120 synchronized public ServerSession getServerSession() throws JMSException {
121 log.debug("ServerSession requested.");
122 if( closing )
123 throw new JMSException("Session Pool Shutting Down.");
124
125 if( idleSessions.size()>0 ) {
126 ServerSessionImpl ss = (ServerSessionImpl) idleSessions.remove(idleSessions.size()-1);
127 activeSessions.addLast(ss);
128 log.debug("Using idle session: "+ss);
129 return ss;
130 } else {
131
132 if( activeSessions.size() >= MAX_SESSION ) {
133
134
135 ServerSessionImpl ss = (ServerSessionImpl) activeSessions.removeFirst();
136 activeSessions.addLast(ss);
137 log.debug("Reusing an active session: "+ss);
138 return ss;
139 } else {
140 ServerSessionImpl ss = createServerSessionImpl();
141 activeSessions.addLast(ss);
142 log.debug("Created a new session: "+ss);
143 return ss;
144 }
145 }
146 }
147
148 synchronized public void returnToPool(ServerSessionImpl ss) {
149 log.debug("Session returned to pool: "+ss);
150 idleSessions.add(ss);
151 }
152
153 public void close() {
154 synchronized( this ) {
155 closing = true;
156 }
157 }
158 }
159
160 class ServerSessionImpl implements ServerSession, Work, MessageListener {
161
162 Session session;
163 private final ServerSessionPoolImpl pool;
164
165 private Object runControlMutex = new Object();
166 boolean workPendingFlag=false;
167 boolean runningFlag=false;
168 int runCounter=0;
169 XAResource xaResource;
170
171
172 public ServerSessionImpl(ServerSessionPoolImpl pool, Session session) throws JMSException {
173 this.pool = pool;
174 this.session=session;
175 this.session.setMessageListener(this);
176 if( session instanceof XASession ) {
177 xaResource = ((XASession)session).getXAResource();
178 }
179 }
180
181 /***
182 * @see javax.jms.ServerSession#getSession()
183 */
184 public Session getSession() throws JMSException {
185 return session;
186 }
187
188
189 /***
190 * @see javax.jms.ServerSession#start()
191 */
192 public void start() throws JMSException {
193
194 log.debug("ServerSession started.");
195 synchronized(runControlMutex) {
196 runCounter++;
197
198 if( runningFlag || workPendingFlag ) {
199
200 workPendingFlag=true;
201 log.debug("ServerSession allready running.");
202 return;
203 }
204 workPendingFlag=true;
205 }
206
207
208 log.debug("ServerSession queuing request for a run.");
209 try {
210 workManager.scheduleWork(this, WorkManager.INDEFINITE, null,
211 new WorkListener() {
212
213 public void workAccepted(WorkEvent event) {
214 log.debug("Work accepted: " + event);
215 }
216
217 public void workRejected(WorkEvent event) {
218 log.debug("Work rejected: " + event);
219 }
220
221 public void workStarted(WorkEvent event) {
222 log.debug("Work started: " + event);
223 }
224
225 public void workCompleted(WorkEvent event) {
226 log.debug("Work completed: " + event);
227 }
228
229 });
230 } catch ( WorkException e ) {
231 throw (JMSException)new JMSException("Work could not be started: "+e).initCause(e);
232 }
233 }
234
235 /***
236 * @see java.lang.Runnable#run()
237 */
238 public void run() {
239 while(true) {
240 synchronized(runControlMutex) {
241 workPendingFlag=false;
242 runningFlag=true;
243 }
244
245 log.debug("Running: " + this);
246 session.run();
247
248 synchronized(runControlMutex) {
249 runCounter--;
250 runningFlag=false;
251 if( !workPendingFlag ) {
252 if( runCounter==0 )
253 pool.returnToPool(this);
254 break;
255
256 }
257 }
258 }
259 }
260
261 /***
262 * @see javax.resource.spi.work.Work#release()
263 */
264 public void release() {
265 log.debug("release called");
266 }
267
268 /***
269 * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
270 */
271 public void onMessage(Message message) {
272 try {
273
274 MessageEndpoint endpoint = endpointFactory.createEndpoint(xaResource);
275 MessageListener listener = (MessageListener) endpoint;
276
277 endpoint.beforeDelivery(ON_MESSAGE_METHOD);
278 try {
279 listener.onMessage(message);
280 } finally {
281 endpoint.afterDelivery();
282 }
283
284 } catch (NoSuchMethodException e) {
285 log.info(e);
286 } catch (ResourceException e) {
287 log.info(e);
288 }
289 }
290
291 /***
292 * @see java.lang.Object#toString()
293 */
294 public String toString() {
295 return "ServerSessionImpl[session="+session+"]";
296 }
297
298 }
299
300 private String emptyToNull(String value) {
301 if ("".equals(value)) {
302 return null;
303 }
304 return value;
305 }
306
307 }