1 /***
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.codehaus.activemq.ra;
19
20 import EDU.oswego.cs.dl.util.concurrent.Latch;
21 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.codehaus.activemq.ActiveMQConnectionConsumer;
25 import org.codehaus.activemq.ActiveMQSession;
26 import org.codehaus.activemq.message.ActiveMQMessage;
27 import org.codehaus.activemq.message.ActiveMQQueue;
28 import org.codehaus.activemq.message.ActiveMQTopic;
29
30 import javax.jms.*;
31 import javax.resource.ResourceException;
32 import javax.resource.spi.endpoint.MessageEndpoint;
33 import javax.resource.spi.work.*;
34 import javax.transaction.xa.XAResource;
35
36 /***
37 * @version $Revision: 1.5 $ $Date: 2004/08/01 02:21:15 $
38 */
39 public class ActiveMQPollingEndpointWorker extends ActiveMQBaseEndpointWorker implements Work {
40
41 private static final Log log = LogFactory.getLog(ActiveMQPollingEndpointWorker.class);
42 private static final int MAX_WORKERS = 10;
43
44 private SynchronizedBoolean started = new SynchronizedBoolean(false);
45 private SynchronizedBoolean stopping = new SynchronizedBoolean(false);
46 private Latch stopLatch = new Latch();
47
48 private ActiveMQConnectionConsumer consumer;
49
50 private CircularQueue workers;
51
52 static WorkListener debugingWorkListener = new WorkListener() {
53
54 public void workAccepted(WorkEvent event) {
55 }
56 public void workRejected(WorkEvent event) {
57 log.warn("Work rejected: " + event, event.getException());
58 }
59 public void workStarted(WorkEvent event) {
60 }
61 public void workCompleted(WorkEvent event) {
62 }
63 };
64
65 /***
66 * @param adapter
67 * @param key
68 * @throws ResourceException
69 */
70 public ActiveMQPollingEndpointWorker(ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
71 super(adapter, key);
72 }
73
74 public void start() throws WorkException, ResourceException {
75 ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
76 boolean ok = false;
77 try {
78 workers = new CircularQueue(MAX_WORKERS, stopping);
79 for( int i=0; i < workers.size(); i++) {
80 ActiveMQSession session = (ActiveMQSession) adapter.getPhysicalConnection().createSession(transacted,Session.AUTO_ACKNOWLEDGE);
81 XAResource xaresource=null;
82 if( session instanceof XASession ) {
83 if( !transacted )
84 throw new ResourceException("You cannot use an XA Connection with a non transacted endpoint.");
85 xaresource = ((XASession)session).getXAResource();
86 }
87
88 MessageEndpoint endpoint = endpointFactory.createEndpoint(xaresource);
89 workers.returnObject(new InboundEndpointWork(session,endpoint, workers));
90 }
91
92 Destination dest = null;
93 if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
94 dest = new ActiveMQQueue(activationSpec.getDestinationName());
95 } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
96 dest = new ActiveMQTopic(activationSpec.getDestinationName());
97 } else {
98 throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
99 }
100
101 if (emptyToNull(activationSpec.getDurableSubscriptionName()) != null) {
102 consumer = (ActiveMQConnectionConsumer) adapter.getPhysicalConnection().createDurableConnectionConsumer((Topic)dest,activationSpec.getDurableSubscriptionName(), emptyToNull(activationSpec.getMessageSelector()), null, 0);
103 } else {
104 consumer = (ActiveMQConnectionConsumer) adapter.getPhysicalConnection().createConnectionConsumer(dest, emptyToNull(activationSpec.getMessageSelector()), null, 0);
105 }
106
107 ok = true;
108 log.debug("Started");
109
110 workManager.scheduleWork(this, WorkManager.INDEFINITE, null, debugingWorkListener);
111 ok = true;
112
113 } catch (JMSException e) {
114 throw new ResourceException("Could not start the endpoint.", e);
115 } finally {
116
117
118
119 if (!ok) {
120 safeClose(consumer);
121 }
122 }
123
124 }
125
126 private String emptyToNull(String value) {
127 if ("".equals(value)) {
128 return null;
129 }
130 return value;
131 }
132
133 /***
134 *
135 */
136 public void stop() throws InterruptedException {
137 stopping.set(true);
138 workers.notifyWaiting();
139 if (started.compareTo(true) == 0) {
140 stopLatch.acquire();
141 }
142 safeClose(consumer);
143 }
144
145 /***
146 * @see javax.resource.spi.work.Work#release()
147 */
148 public void release() {
149 }
150
151 /***
152 * The WorkManager has started up and we now need to pull message off
153 * the destination and push them to an endpoint.
154 *
155 * @see java.lang.Runnable#run()
156 */
157 public void run() {
158 started.set(true);
159 try {
160
161 while (!stopping.get()) {
162 ActiveMQMessage message = consumer.receive(500);
163 if (message != null) {
164 InboundEndpointWork worker = (InboundEndpointWork) workers.get();
165
166 if( worker == null ) {
167 break;
168 }
169 worker.message = message;
170 workManager.scheduleWork(worker, WorkManager.INDEFINITE, null, debugingWorkListener);
171 }
172 }
173
174
175 workers.drain();
176
177 } catch (Throwable e) {
178 log.info("dispatcher: ", e);
179 } finally {
180 stopLatch.release();
181 }
182 }
183
184 public static class InboundEndpointWork implements Work{
185
186 private final ActiveMQSession session;
187 private final MessageEndpoint endpoint;
188 private final CircularQueue workers;
189 ActiveMQMessage message;
190
191
192 /***
193 * @param session
194 * @param endpoint
195 * @param workers
196 * @throws JMSException
197 */
198 public InboundEndpointWork(ActiveMQSession session, MessageEndpoint endpoint, CircularQueue workers) throws JMSException {
199 this.session = session;
200 this.endpoint = endpoint;
201 this.workers = workers;
202 session.setMessageListener((MessageListener) endpoint);
203 }
204
205 public void release() {
206 }
207
208 /***
209 * @see java.lang.Runnable#run()
210 */
211 public void run() {
212 try {
213
214 endpoint.beforeDelivery(ON_MESSAGE_METHOD);
215 try {
216 session.dispatch(message);
217 session.run();
218 } finally {
219 endpoint.afterDelivery();
220 }
221
222 } catch (NoSuchMethodException e) {
223 log.info("worker: ", e);
224 } catch (ResourceException e) {
225 log.info("worker: ", e);
226 } finally {
227 workers.returnObject(this);
228 }
229 }
230
231 }
232
233 }