View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.codehaus.activemq.ra;
19  
20  import EDU.oswego.cs.dl.util.concurrent.Latch;
21  import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.codehaus.activemq.ActiveMQConnectionConsumer;
25  import org.codehaus.activemq.ActiveMQSession;
26  import org.codehaus.activemq.message.ActiveMQMessage;
27  import org.codehaus.activemq.message.ActiveMQQueue;
28  import org.codehaus.activemq.message.ActiveMQTopic;
29  
30  import javax.jms.*;
31  import javax.resource.ResourceException;
32  import javax.resource.spi.endpoint.MessageEndpoint;
33  import javax.resource.spi.work.*;
34  import javax.transaction.xa.XAResource;
35  
36  /***
37   * @version $Revision: 1.5 $ $Date: 2004/08/01 02:21:15 $
38   */
39  public class ActiveMQPollingEndpointWorker extends ActiveMQBaseEndpointWorker implements Work {
40  
41  	private static final Log log = LogFactory.getLog(ActiveMQPollingEndpointWorker.class);
42      private static final int MAX_WORKERS = 10; 
43  
44      private SynchronizedBoolean started = new SynchronizedBoolean(false);
45      private SynchronizedBoolean stopping = new SynchronizedBoolean(false);
46      private Latch stopLatch = new Latch();
47  
48      private ActiveMQConnectionConsumer consumer;
49  
50      private CircularQueue workers;
51  
52      static WorkListener debugingWorkListener = new WorkListener() {
53          //The work listener is useful only for debugging...
54          public void workAccepted(WorkEvent event) {
55          }
56          public void workRejected(WorkEvent event) {
57              log.warn("Work rejected: " + event, event.getException());
58          }
59          public void workStarted(WorkEvent event) {
60          }
61          public void workCompleted(WorkEvent event) {
62          }
63      };
64      
65      /***
66  	 * @param adapter
67  	 * @param key
68       * @throws ResourceException
69  	 */
70  	public ActiveMQPollingEndpointWorker(ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
71  		super(adapter, key);
72  	}
73  
74      public void start() throws WorkException, ResourceException {
75          ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec();        
76          boolean ok = false;
77          try {   
78          	workers = new CircularQueue(MAX_WORKERS, stopping);
79          	for( int i=0; i < workers.size(); i++) {
80          		ActiveMQSession session = (ActiveMQSession) adapter.getPhysicalConnection().createSession(transacted,Session.AUTO_ACKNOWLEDGE);
81          		XAResource xaresource=null;
82          		if( session instanceof XASession  ) {
83          			if( !transacted )
84          				throw new ResourceException("You cannot use an XA Connection with a non transacted endpoint.");
85          			xaresource = ((XASession)session).getXAResource();
86          		} 
87          		
88          		MessageEndpoint endpoint = endpointFactory.createEndpoint(xaresource);
89          	    workers.returnObject(new InboundEndpointWork(session,endpoint, workers));
90          	}
91          	
92              Destination dest = null;
93              if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
94                  dest = new ActiveMQQueue(activationSpec.getDestinationName());
95              } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
96                  dest = new ActiveMQTopic(activationSpec.getDestinationName());
97              } else {
98                  throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
99              }
100 
101             if (emptyToNull(activationSpec.getDurableSubscriptionName()) != null) {
102                 consumer = (ActiveMQConnectionConsumer) adapter.getPhysicalConnection().createDurableConnectionConsumer((Topic)dest,activationSpec.getDurableSubscriptionName(), emptyToNull(activationSpec.getMessageSelector()), null, 0);
103             } else {
104                 consumer = (ActiveMQConnectionConsumer) adapter.getPhysicalConnection().createConnectionConsumer(dest, emptyToNull(activationSpec.getMessageSelector()), null, 0);
105             }
106 
107             ok = true;
108         	log.debug("Started");
109 
110             workManager.scheduleWork(this, WorkManager.INDEFINITE, null, debugingWorkListener);
111             ok = true;
112 
113         } catch (JMSException e) {
114             throw new ResourceException("Could not start the endpoint.", e);
115         } finally {
116         	
117             // We don't want to leak sessions on errors.  Keep them around only if
118             // there were no errors.
119             if (!ok) {
120                 safeClose(consumer);
121             }
122         }
123 
124     }
125 
126     private String emptyToNull(String value) {
127         if ("".equals(value)) {
128             return null;
129         }
130         return value;
131     }
132 
133     /***
134      *
135      */
136     public void stop() throws InterruptedException {
137         stopping.set(true);
138         workers.notifyWaiting();
139         if (started.compareTo(true) == 0) {
140             stopLatch.acquire();
141         }
142         safeClose(consumer);
143     }
144 
145     /***
146      * @see javax.resource.spi.work.Work#release()
147      */
148     public void release() {    	
149     }
150 
151     /***
152      * The WorkManager has started up and we now need to pull message off
153      * the destination and push them to an endpoint.
154      *
155      * @see java.lang.Runnable#run()
156      */
157     public void run() {
158         started.set(true);
159         try {
160         	
161             while (!stopping.get()) {
162                 ActiveMQMessage message = consumer.receive(500);
163                 if (message != null) {
164                 	InboundEndpointWork worker = (InboundEndpointWork) workers.get();
165                 	// Did we get stopped?
166                 	if( worker == null ) {
167                 		break;
168                 	}                		
169                 	worker.message = message;
170                     workManager.scheduleWork(worker, WorkManager.INDEFINITE, null, debugingWorkListener);
171                 }
172             }   
173             
174             // Try to collect the workers so that none are running.
175             workers.drain();
176             
177         } catch (Throwable e) {
178             log.info("dispatcher: ", e);
179         } finally {
180             stopLatch.release();
181         }
182     }
183     
184     public static class InboundEndpointWork implements Work{
185     	
186     	private final ActiveMQSession session;
187     	private final MessageEndpoint endpoint;
188         private final CircularQueue workers;
189     	ActiveMQMessage message;
190     	
191     	
192 		/***
193 		 * @param session
194 		 * @param endpoint
195 		 * @param workers
196          * @throws JMSException
197 		 */
198 		public InboundEndpointWork(ActiveMQSession session, MessageEndpoint endpoint, CircularQueue workers) throws JMSException {
199 			this.session = session;
200 			this.endpoint = endpoint;
201             this.workers = workers;
202 			session.setMessageListener((MessageListener) endpoint);
203  		}
204 		
205 		public void release() {
206 		}
207 
208 		/***
209 		 * @see java.lang.Runnable#run()
210 		 */
211 		public void run() {
212             try {
213                 
214             	endpoint.beforeDelivery(ON_MESSAGE_METHOD);
215                 try {
216                 	session.dispatch(message);
217                 	session.run();
218                 } finally {
219                     endpoint.afterDelivery();
220                 }
221                 
222             } catch (NoSuchMethodException e) {
223                 log.info("worker: ", e);
224             } catch (ResourceException e) {
225                 log.info("worker: ", e);
226             } finally {
227             	workers.returnObject(this);
228             }
229 		}
230     	
231     }
232  
233 }