001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport;
018
019import org.apache.activemq.util.FactoryFinder;
020import org.apache.activemq.util.IOExceptionSupport;
021import org.apache.activemq.util.IntrospectionSupport;
022import org.apache.activemq.util.URISupport;
023import org.apache.activemq.wireformat.WireFormat;
024import org.apache.activemq.wireformat.WireFormatFactory;
025
026import java.io.IOException;
027import java.net.MalformedURLException;
028import java.net.URI;
029import java.net.URISyntaxException;
030import java.net.UnknownHostException;
031import java.util.HashMap;
032import java.util.Map;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.Executor;
035
036public abstract class TransportFactory {
037
038    private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
039    private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
040    private static final ConcurrentHashMap<String, TransportFactory> TRANSPORT_FACTORYS = new ConcurrentHashMap<String, TransportFactory>();
041
042    private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
043    private static final String THREAD_NAME_FILTER = "threadName";
044
045    public abstract TransportServer doBind(URI location) throws IOException;
046
047    public Transport doConnect(URI location, Executor ex) throws Exception {
048        return doConnect(location);
049    }
050
051    public Transport doCompositeConnect(URI location, Executor ex) throws Exception {
052        return doCompositeConnect(location);
053    }
054
055    /**
056     * Creates a normal transport.
057     *
058     * @param location
059     * @return the transport
060     * @throws Exception
061     */
062    public static Transport connect(URI location) throws Exception {
063        TransportFactory tf = findTransportFactory(location);
064        return tf.doConnect(location);
065    }
066
067    /**
068     * Creates a normal transport.
069     *
070     * @param location
071     * @param ex
072     * @return the transport
073     * @throws Exception
074     */
075    public static Transport connect(URI location, Executor ex) throws Exception {
076        TransportFactory tf = findTransportFactory(location);
077        return tf.doConnect(location, ex);
078    }
079
080    /**
081     * Creates a slimmed down transport that is more efficient so that it can be
082     * used by composite transports like reliable and HA.
083     *
084     * @param location
085     * @return the Transport
086     * @throws Exception
087     */
088    public static Transport compositeConnect(URI location) throws Exception {
089        TransportFactory tf = findTransportFactory(location);
090        return tf.doCompositeConnect(location);
091    }
092
093    /**
094     * Creates a slimmed down transport that is more efficient so that it can be
095     * used by composite transports like reliable and HA.
096     *
097     * @param location
098     * @param ex
099     * @return the Transport
100     * @throws Exception
101     */
102    public static Transport compositeConnect(URI location, Executor ex) throws Exception {
103        TransportFactory tf = findTransportFactory(location);
104        return tf.doCompositeConnect(location, ex);
105    }
106
107    public static TransportServer bind(URI location) throws IOException {
108        TransportFactory tf = findTransportFactory(location);
109        return tf.doBind(location);
110    }
111
112    public Transport doConnect(URI location) throws Exception {
113        try {
114            Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
115            if( !options.containsKey("wireFormat.host") ) {
116                options.put("wireFormat.host", location.getHost());
117            }
118            WireFormat wf = createWireFormat(options);
119            Transport transport = createTransport(location, wf);
120            Transport rc = configure(transport, wf, options);
121            if (!options.isEmpty()) {
122                throw new IllegalArgumentException("Invalid connect parameters: " + options);
123            }
124            return rc;
125        } catch (URISyntaxException e) {
126            throw IOExceptionSupport.create(e);
127        }
128    }
129
130    public Transport doCompositeConnect(URI location) throws Exception {
131        try {
132            Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
133            WireFormat wf = createWireFormat(options);
134            Transport transport = createTransport(location, wf);
135            Transport rc = compositeConfigure(transport, wf, options);
136            if (!options.isEmpty()) {
137                throw new IllegalArgumentException("Invalid connect parameters: " + options);
138            }
139            return rc;
140        } catch (URISyntaxException e) {
141            throw IOExceptionSupport.create(e);
142        }
143    }
144
145     /**
146      * Allow registration of a transport factory without wiring via META-INF classes
147     * @param scheme
148     * @param tf
149     */
150    public static void registerTransportFactory(String scheme, TransportFactory tf) {
151        TRANSPORT_FACTORYS.put(scheme, tf);
152      }
153
154    /**
155     * Factory method to create a new transport
156     *
157     * @throws IOException
158     * @throws UnknownHostException
159     */
160    protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException, UnknownHostException, IOException {
161        throw new IOException("createTransport() method not implemented!");
162    }
163
164    /**
165     * @param location
166     * @return
167     * @throws IOException
168     */
169    public static TransportFactory findTransportFactory(URI location) throws IOException {
170        String scheme = location.getScheme();
171        if (scheme == null) {
172            throw new IOException("Transport not scheme specified: [" + location + "]");
173        }
174        TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
175        if (tf == null) {
176            // Try to load if from a META-INF property.
177            try {
178                tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
179                TRANSPORT_FACTORYS.put(scheme, tf);
180            } catch (Throwable e) {
181                throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
182            }
183        }
184        return tf;
185    }
186
187    protected WireFormat createWireFormat(Map<String, String> options) throws IOException {
188        WireFormatFactory factory = createWireFormatFactory(options);
189        WireFormat format = factory.createWireFormat();
190        return format;
191    }
192
193    protected WireFormatFactory createWireFormatFactory(Map<String, String> options) throws IOException {
194        String wireFormat = (String)options.remove("wireFormat");
195        if (wireFormat == null) {
196            wireFormat = getDefaultWireFormatType();
197        }
198
199        try {
200            WireFormatFactory wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
201            IntrospectionSupport.setProperties(wff, options, "wireFormat.");
202            return wff;
203        } catch (Throwable e) {
204            throw IOExceptionSupport.create("Could not create wire format factory for: " + wireFormat + ", reason: " + e, e);
205        }
206    }
207
208    protected String getDefaultWireFormatType() {
209        return "default";
210    }
211
212    /**
213     * Fully configures and adds all need transport filters so that the
214     * transport can be used by the JMS client.
215     *
216     * @param transport
217     * @param wf
218     * @param options
219     * @return
220     * @throws Exception
221     */
222    @SuppressWarnings("rawtypes")
223    public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
224        transport = compositeConfigure(transport, wf, options);
225
226        transport = new MutexTransport(transport);
227        transport = new ResponseCorrelator(transport);
228
229        return transport;
230    }
231
232    /**
233     * Fully configures and adds all need transport filters so that the
234     * transport can be used by the ActiveMQ message broker. The main difference
235     * between this and the configure() method is that the broker does not issue
236     * requests to the client so the ResponseCorrelator is not needed.
237     *
238     * @param transport
239     * @param format
240     * @param options
241     * @return
242     * @throws Exception
243     */
244    @SuppressWarnings("rawtypes")
245    public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
246        if (options.containsKey(THREAD_NAME_FILTER)) {
247            transport = new ThreadNameFilter(transport);
248        }
249        transport = compositeConfigure(transport, format, options);
250        transport = new MutexTransport(transport);
251        return transport;
252    }
253
254    /**
255     * Similar to configure(...) but this avoid adding in the MutexTransport and
256     * ResponseCorrelator transport layers so that the resulting transport can
257     * more efficiently be used as part of a composite transport.
258     *
259     * @param transport
260     * @param format
261     * @param options
262     * @return
263     */
264    @SuppressWarnings("rawtypes")
265    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
266        if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
267            transport = new WriteTimeoutFilter(transport);
268            String soWriteTimeout = (String)options.remove(WRITE_TIMEOUT_FILTER);
269            if (soWriteTimeout!=null) {
270                ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
271            }
272        }
273        IntrospectionSupport.setProperties(transport, options);
274        return transport;
275    }
276
277    @SuppressWarnings("rawtypes")
278    protected String getOption(Map options, String key, String def) {
279        String rc = (String) options.remove(key);
280        if( rc == null ) {
281            rc = def;
282        }
283        return rc;
284    }
285}