View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.codehaus.activemq.ra;
19  
20  import org.apache.commons.logging.Log;
21  import org.apache.commons.logging.LogFactory;
22  import org.codehaus.activemq.message.ActiveMQQueue;
23  import org.codehaus.activemq.message.ActiveMQTopic;
24  
25  import javax.jms.*;
26  import javax.resource.ResourceException;
27  import javax.resource.spi.endpoint.MessageEndpoint;
28  import javax.resource.spi.work.*;
29  import javax.transaction.xa.XAResource;
30  import java.util.ArrayList;
31  import java.util.LinkedList;
32  
33  /***
34   * @version $Revision: 1.3 $ $Date: 2004/07/31 21:11:00 $
35   */
36  public class ActiveMQAsfEndpointWorker extends ActiveMQBaseEndpointWorker {
37  
38  	private static final Log log = LogFactory.getLog(ActiveMQAsfEndpointWorker.class);
39      private static final int MAX_MSGS_PER_SESSION = 1; 
40      private static final int MAX_SESSION = 10; 
41  
42  
43      ConnectionConsumer consumer;
44  	private ServerSessionPoolImpl serverSessionPool;
45  
46      /***
47  	 * @param adapter
48  	 * @param key
49       * @throws ResourceException
50  	 */
51  	public ActiveMQAsfEndpointWorker(ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
52  		super(adapter, key);
53  	}
54  
55  	
56      public void start() throws WorkException, ResourceException {
57      	log.debug("Starting");
58          boolean ok = false;
59          try {
60      		serverSessionPool = new ServerSessionPoolImpl();
61              ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec();        
62          	
63              Destination dest = null;
64  
65              if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
66                  dest = new ActiveMQQueue(activationSpec.getDestinationName());
67              } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
68                  dest = new ActiveMQTopic(activationSpec.getDestinationName());
69              } else {
70                  throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
71              }
72              
73              if (emptyToNull(activationSpec.getDurableSubscriptionName()) != null) {
74                  consumer = adapter.getPhysicalConnection().createDurableConnectionConsumer((Topic)dest,activationSpec.getDurableSubscriptionName(), emptyToNull(activationSpec.getMessageSelector()), serverSessionPool, MAX_MSGS_PER_SESSION);
75              } else {
76                  consumer = adapter.getPhysicalConnection().createConnectionConsumer(dest, emptyToNull(activationSpec.getMessageSelector()), serverSessionPool, MAX_MSGS_PER_SESSION);
77              }
78  
79              ok = true;
80          	log.debug("Started");
81  
82          } catch (JMSException e) {
83              throw new ResourceException("Could not start the endpoint.", e);
84          } finally {
85              // We don't want to leak sessions on errors.  Keep them around only if
86              // there were no errors.
87              if (!ok) {
88                  safeClose(consumer);
89              }
90          }
91      }
92      
93      /***
94  	 *
95  	 */
96      public void stop() throws InterruptedException {
97      	safeClose(consumer);
98      	serverSessionPool.close();
99      }
100     
101 	class ServerSessionPoolImpl implements ServerSessionPool {
102 
103 		ServerSessionImpl ss;
104 		ArrayList idleSessions = new ArrayList();
105 		LinkedList activeSessions = new LinkedList();
106 		int sessionIds=0;
107 		int nextUsedSession;
108 		boolean closing=false;
109 		
110 		public ServerSessionPoolImpl() {
111 		}
112 		
113 		public ServerSessionImpl createServerSessionImpl() throws JMSException {
114 			Session session = adapter.getPhysicalConnection().createSession(true, Session.SESSION_TRANSACTED);
115 			return new ServerSessionImpl(this, session);
116 		}
117 		
118 		/***
119 		 */
120 		synchronized public ServerSession getServerSession() throws JMSException {
121         	log.debug("ServerSession requested.");
122 			if( closing )
123 				throw new JMSException("Session Pool Shutting Down.");
124 			
125 			if( idleSessions.size()>0 ) {
126 				ServerSessionImpl ss = (ServerSessionImpl) idleSessions.remove(idleSessions.size()-1);
127 				activeSessions.addLast(ss);
128 	        	log.debug("Using idle session: "+ss);
129 				return ss;
130 			} else {
131 				// Are we at the upper limit?
132 				if( activeSessions.size() >= MAX_SESSION ) {
133 					// then reuse the allready created sessions..
134 					// This is going to queue up messages into a session for processing.
135 					ServerSessionImpl ss = (ServerSessionImpl) activeSessions.removeFirst();					
136 					activeSessions.addLast(ss);
137 		        	log.debug("Reusing an active session: "+ss);
138 					return ss;
139 				} else {
140 					ServerSessionImpl ss = createServerSessionImpl();					
141 					activeSessions.addLast(ss);
142 		        	log.debug("Created a new session: "+ss);
143 					return ss;
144 				}
145 			}
146 		}
147 		
148 		synchronized public void returnToPool(ServerSessionImpl ss) {
149         	log.debug("Session returned to pool: "+ss);
150 			idleSessions.add(ss);
151 		}
152 		
153 		public void close() {
154 			synchronized( this ) {
155 				closing = true;
156 			}			
157 		}
158 	} 
159 	
160 	class ServerSessionImpl implements ServerSession, Work, MessageListener {
161 		
162 		Session session;
163 		private final ServerSessionPoolImpl pool;
164 
165 		private Object runControlMutex = new Object();
166 	    boolean workPendingFlag=false;
167 	    boolean runningFlag=false;
168 		int runCounter=0;
169 		XAResource xaResource;
170 	    
171 
172 		public ServerSessionImpl(ServerSessionPoolImpl pool, Session session) throws JMSException {
173 			this.pool = pool;
174 			this.session=session;
175 			this.session.setMessageListener(this);
176 			if( session instanceof XASession ) {
177 				xaResource = ((XASession)session).getXAResource();
178 			}
179 		}
180 
181 		/***
182 		 * @see javax.jms.ServerSession#getSession()
183 		 */
184 		public Session getSession() throws JMSException {
185 			return session;
186 		}
187 
188 		
189 		/***
190 		 * @see javax.jms.ServerSession#start()
191 		 */
192 		public void start() throws JMSException {
193 
194         	log.debug("ServerSession started.");
195 			synchronized(runControlMutex) {
196 				runCounter++;
197 				// Is our thread allready running 
198 				if( runningFlag || workPendingFlag ) {
199 					// let it know that it should do more work..
200 					workPendingFlag=true;
201 		        	log.debug("ServerSession allready running.");
202 					return;
203 				} 
204 				workPendingFlag=true;
205 			}
206 			
207 			// We get here because we need to start a async worker.
208         	log.debug("ServerSession queuing request for a run.");
209 			try {
210 	            workManager.scheduleWork(this, WorkManager.INDEFINITE, null,
211 	                new WorkListener() {
212 	                    //The work listener is useful only for debugging...
213 	                    public void workAccepted(WorkEvent event) {
214 	                        log.debug("Work accepted: " + event);
215 	                    }
216 	
217 	                    public void workRejected(WorkEvent event) {
218 	                        log.debug("Work rejected: " + event);
219 	                    }
220 	
221 	                    public void workStarted(WorkEvent event) {
222 	                        log.debug("Work started: " + event);
223 	                    }
224 	
225 	                    public void workCompleted(WorkEvent event) {
226 	                        log.debug("Work completed: " + event);
227 	                    }
228 	
229 	                });
230 			} catch ( WorkException e ) {
231 				throw (JMSException)new JMSException("Work could not be started: "+e).initCause(e);
232 			}
233 		}
234 
235 		/***
236 		 * @see java.lang.Runnable#run()
237 		 */
238 		public void run() {
239 			while(true) {
240 				synchronized(runControlMutex) {
241 					workPendingFlag=false;
242 					runningFlag=true;
243 				}
244 
245 				log.debug("Running: " + this);				
246 				session.run();
247 				
248 				synchronized(runControlMutex) {
249 					runCounter--;
250 					runningFlag=false;
251 					if( !workPendingFlag ) {
252 						if( runCounter==0 )
253 							pool.returnToPool(this);						
254 						break;
255 						
256 					}
257 				}
258 			}
259 		}
260 
261 		/***
262 		 * @see javax.resource.spi.work.Work#release()
263 		 */
264 		public void release() {
265 			log.debug("release called");				
266 		}
267 
268 		/***
269 		 * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
270 		 */
271 		public void onMessage(Message message) {			
272             try {
273             	
274                 MessageEndpoint endpoint = endpointFactory.createEndpoint(xaResource);
275                 MessageListener listener = (MessageListener) endpoint;
276 
277                 endpoint.beforeDelivery(ON_MESSAGE_METHOD);
278                 try {
279                     listener.onMessage(message);
280                 } finally {
281                     endpoint.afterDelivery();
282                 }
283                 
284             } catch (NoSuchMethodException e) {
285                 log.info(e);
286             } catch (ResourceException e) {
287                 log.info(e);
288             }			
289 		}
290 		
291 		/***
292 		 * @see java.lang.Object#toString()
293 		 */
294 		public String toString() {
295 			return "ServerSessionImpl[session="+session+"]";
296 		}
297 	
298 	}
299 
300     private String emptyToNull(String value) {
301         if ("".equals(value)) {
302             return null;
303         }
304         return value;
305     }
306 
307 }