View Javadoc

1   /***
2    *
3    * Copyright 2004 Hiram Chirino
4    *
5    * Licensed under the Apache License, Version 2.0 (the "License");
6    * you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   *
17   **/
18  package org.codehaus.activemq.ra;
19  
20  import java.util.HashMap;
21  
22  import javax.jms.Connection;
23  import javax.jms.ConnectionFactory;
24  import javax.jms.JMSException;
25  import javax.jms.XAConnection;
26  import javax.jms.XASession;
27  import javax.resource.NotSupportedException;
28  import javax.resource.ResourceException;
29  import javax.resource.spi.ActivationSpec;
30  import javax.resource.spi.BootstrapContext;
31  import javax.resource.spi.ResourceAdapter;
32  import javax.resource.spi.ResourceAdapterInternalException;
33  import javax.resource.spi.endpoint.MessageEndpointFactory;
34  import javax.transaction.xa.XAResource;
35  
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.codehaus.activemq.ActiveMQXAConnectionFactory;
39  
40  
41  /***
42   * Knows how to connect to one ActiveMQ server.  It can then activate endpoints and deliver
43   * messages to those enpoints using the connection configure in the resource adapter.
44   *
45   * @version $Revision: 1.11 $
46   */
47  public class ActiveMQResourceAdapter implements ResourceAdapter {
48      private static final Log log = LogFactory.getLog(ActiveMQResourceAdapter.class);
49      
50      private static final String ASF_ENDPOINT_WORKER_TYPE="asf";
51      private static final String POLLING_ENDPOINT_WORKER_TYPE="polling";
52      
53      private BootstrapContext bootstrapContext;
54      private HashMap endpointWorkers = new HashMap();
55      final private ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo();
56      private String endpointWorkerType=POLLING_ENDPOINT_WORKER_TYPE;
57      private Connection physicalConnection;
58      private ConnectionFactory connectionFactory;
59  
60      public ActiveMQResourceAdapter() {
61          this(null);
62      }
63  
64      public ActiveMQResourceAdapter(ConnectionFactory connectionFactory) {
65          this.connectionFactory = connectionFactory;
66      }
67  
68      /***
69       * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext)
70       */
71      public void start(BootstrapContext bootstrapContext)
72              throws ResourceAdapterInternalException {
73          this.bootstrapContext = bootstrapContext;
74  
75          try {
76              physicalConnection = makeConnection(connectionFactory, info);
77              physicalConnection.start();
78          }
79          catch (JMSException e) {
80              throw new ResourceAdapterInternalException("Could not establish a connection to the server.", e);
81          }
82  
83      }
84  
85      /***
86       * A helper method to create a new JMS connection from the connection request info.
87       * If no specific connection factory instance is passed in then the default ActiveMQ
88       * implementation is used
89       *
90       * @param connectionFactory an optional connection factory to use or null to use the default
91       * @param info              the connection request info
92       * @return a newly created connection
93       * @throws JMSException
94       */
95      public static Connection makeConnection(ConnectionFactory connectionFactory, ActiveMQConnectionRequestInfo info) throws JMSException {
96          if (connectionFactory == null) {
97              if (info.isXa()) {
98                  connectionFactory = new ActiveMQXAConnectionFactory(info.getServerUrl());
99              }
100             else {
101                 connectionFactory = new org.codehaus.activemq.ActiveMQConnectionFactory(info.getServerUrl());
102             }
103         }
104         Connection physicalConnection = connectionFactory.createConnection(info.getUserName(), info.getPassword());
105         if (info.getClientid() != null) {
106             physicalConnection.setClientID(info.getClientid());
107         }
108         return physicalConnection;
109     }
110 
111     /***
112      * @return Returns the physicalConnection.
113      */
114     public Connection getPhysicalConnection() {
115         return physicalConnection;
116     }
117 
118     /***
119      * @see javax.resource.spi.ResourceAdapter#stop()
120      */
121     public void stop() {
122         this.bootstrapContext = null;
123         if (physicalConnection != null) {
124             try {
125                 physicalConnection.close();
126                 physicalConnection = null;
127             }
128             catch (JMSException e) {
129                 log.debug("Error occured during ResourceAdapter stop: ", e);
130             }
131         }
132     }
133 
134     /***
135      * @return
136      */
137     public BootstrapContext getBootstrapContext() {
138         return bootstrapContext;
139     }
140 
141     /***
142      * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory,
143             *      javax.resource.spi.ActivationSpec)
144      */
145     public void endpointActivation(MessageEndpointFactory endpointFactory,
146                                    ActivationSpec activationSpec) throws ResourceException {
147 
148         //spec section 5.3.3
149         if (activationSpec.getResourceAdapter() != this) {
150             throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance");
151         }
152 
153         if (activationSpec.getClass().equals(ActiveMQActivationSpec.class)) {
154 
155             ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (ActiveMQActivationSpec) activationSpec);
156             // This is weird.. the same endpoint activated twice.. must be a container error.
157             if (endpointWorkers.containsKey(key)) {
158                 throw new IllegalStateException("Endpoint previously activated");
159             }
160 
161             ActiveMQBaseEndpointWorker worker;
162             if( POLLING_ENDPOINT_WORKER_TYPE.equals(getEndpointWorkerType())) {
163             	worker = new ActiveMQPollingEndpointWorker(this, key);
164             } else if( ASF_ENDPOINT_WORKER_TYPE.equals(getEndpointWorkerType())) {
165             	worker = new ActiveMQAsfEndpointWorker(this, key);
166             } else {
167                 throw new NotSupportedException("That type of EndpointWorkerType is not supported: " + getEndpointWorkerType());
168             }
169             
170             endpointWorkers.put(key, worker);
171             worker.start();
172 
173         }
174         else {
175             throw new NotSupportedException("That type of ActicationSpec not supported: " + activationSpec.getClass());
176         }
177 
178     }
179 
180     /***
181      * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory,
182             *      javax.resource.spi.ActivationSpec)
183      */
184     public void endpointDeactivation(MessageEndpointFactory endpointFactory,
185                                      ActivationSpec activationSpec) {
186 
187         if (activationSpec.getClass().equals(ActiveMQActivationSpec.class)) {
188             ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory, (ActiveMQActivationSpec) activationSpec);
189             ActiveMQBaseEndpointWorker worker = (ActiveMQBaseEndpointWorker) endpointWorkers.get(key);
190             if (worker == null) {
191                 // This is weird.. that endpoint was not activated..  oh well.. this method
192                 // does not throw exceptions so just return.
193                 return;
194             }
195             try {
196                 worker.stop();
197             }
198             catch (InterruptedException e) {
199                 // We interrupted.. we won't throw an exception but will stop waiting for the worker
200                 // to stop..  we tried our best.  Keep trying to interrupt the thread.
201                 Thread.currentThread().interrupt();
202             }
203 
204         }
205 
206     }
207 
208     /***
209      * We only connect to one resource manager per ResourceAdapter instance, so any ActivationSpec
210      * will return the same XAResource.
211      * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[])
212      */
213     public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
214         try {
215             Connection connection = getPhysicalConnection();
216             if (connection instanceof XAConnection) {
217                 XASession session = ((XAConnection)connection).createXASession();
218                 XAResource xaResource = session.getXAResource();
219                 return new XAResource[] {xaResource};
220             } else {
221                 return new XAResource[] {};
222             }
223         } catch (JMSException e) {
224             throw new ResourceException(e);
225         }
226     }
227 
228     /////////////////////////////////////////////////////////////////////////
229     //
230     // Java Bean getters and setters for this ResourceAdapter class.
231     //
232     /////////////////////////////////////////////////////////////////////////
233 
234     /***
235      * @return
236      */
237     public String getClientid() {
238         return info.getClientid();
239     }
240 
241     /***
242      * @return
243      */
244     public String getPassword() {
245         return info.getPassword();
246     }
247 
248     /***
249      * @return
250      */
251     public String getServerUrl() {
252         return info.getServerUrl();
253     }
254 
255     /***
256      * @return
257      */
258     public String getUserName() {
259         return info.getUserName();
260     }
261 
262     /***
263      * @param clientid
264      */
265     public void setClientid(String clientid) {
266         info.setClientid(clientid);
267     }
268 
269     /***
270      * @param password
271      */
272     public void setPassword(String password) {
273         info.setPassword(password);
274     }
275 
276     /***
277      * @param url
278      */
279     public void setServerUrl(String url) {
280         info.setServerUrl(url);
281     }
282 
283     /***
284      * @param userid
285      */
286     public void setUserName(String userid) {
287         info.setUserName(userid);
288     }
289 
290     public Boolean isXA() {
291         return Boolean.valueOf(info.isXa());
292     }
293 
294     public void setXA(Boolean xa) {
295         info.setXa(xa.booleanValue());
296     }
297 
298 	/***
299 	 * @return Returns the endpointWorkerType.
300 	 */
301 	public String getEndpointWorkerType() {
302 		return endpointWorkerType;
303 	}
304 	/***
305 	 * @param endpointWorkerType The endpointWorkerType to set.
306 	 */
307 	public void setEndpointWorkerType(String endpointWorkerType) {
308 		this.endpointWorkerType = endpointWorkerType;
309 	}
310 	
311 	/***
312 	 * @return Returns the info.
313 	 */
314 	public ActiveMQConnectionRequestInfo getInfo() {
315 		return info;
316 	}
317 }