001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.fanout;
018
019import java.io.IOException;
020import java.io.InterruptedIOException;
021import java.net.URI;
022import java.util.ArrayList;
023import java.util.Iterator;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.atomic.AtomicInteger;
026
027import org.apache.activemq.command.Command;
028import org.apache.activemq.command.ConsumerInfo;
029import org.apache.activemq.command.Message;
030import org.apache.activemq.command.RemoveInfo;
031import org.apache.activemq.command.Response;
032import org.apache.activemq.state.ConnectionStateTracker;
033import org.apache.activemq.thread.Task;
034import org.apache.activemq.thread.TaskRunner;
035import org.apache.activemq.thread.TaskRunnerFactory;
036import org.apache.activemq.transport.CompositeTransport;
037import org.apache.activemq.transport.DefaultTransportListener;
038import org.apache.activemq.transport.FutureResponse;
039import org.apache.activemq.transport.ResponseCallback;
040import org.apache.activemq.transport.Transport;
041import org.apache.activemq.transport.TransportFactory;
042import org.apache.activemq.transport.TransportListener;
043import org.apache.activemq.util.IOExceptionSupport;
044import org.apache.activemq.util.ServiceStopper;
045import org.apache.activemq.util.ServiceSupport;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * A Transport that fans out a connection to multiple brokers.
051 * 
052 * 
053 */
054public class FanoutTransport implements CompositeTransport {
055
056    private static final Logger LOG = LoggerFactory.getLogger(FanoutTransport.class);
057
058    private TransportListener transportListener;
059    private boolean disposed;
060    private boolean connected;
061
062    private final Object reconnectMutex = new Object();
063    private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
064    private final ConcurrentHashMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>();
065
066    private final TaskRunnerFactory reconnectTaskFactory;
067    private final TaskRunner reconnectTask;
068    private boolean started;
069
070    private final ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>();
071    private int connectedCount;
072
073    private int minAckCount = 2;
074
075    private long initialReconnectDelay = 10;
076    private long maxReconnectDelay = 1000 * 30;
077    private long backOffMultiplier = 2;
078    private final boolean useExponentialBackOff = true;
079    private int maxReconnectAttempts;
080    private Exception connectionFailure;
081    private FanoutTransportHandler primary;
082    private boolean fanOutQueues = false;
083
084    static class RequestCounter {
085
086        final Command command;
087        final AtomicInteger ackCount;
088
089        RequestCounter(Command command, int count) {
090            this.command = command;
091            this.ackCount = new AtomicInteger(count);
092        }
093
094        @Override
095        public String toString() {
096            return command.getCommandId() + "=" + ackCount.get();
097        }
098    }
099
100    class FanoutTransportHandler extends DefaultTransportListener {
101
102        private final URI uri;
103        private Transport transport;
104
105        private int connectFailures;
106        private long reconnectDelay = initialReconnectDelay;
107        private long reconnectDate;
108
109        public FanoutTransportHandler(URI uri) {
110            this.uri = uri;
111        }
112
113        @Override
114        public void onCommand(Object o) {
115            Command command = (Command)o;
116            if (command.isResponse()) {
117                Integer id = new Integer(((Response)command).getCorrelationId());
118                RequestCounter rc = requestMap.get(id);
119                if (rc != null) {
120                    if (rc.ackCount.decrementAndGet() <= 0) {
121                        requestMap.remove(id);
122                        transportListenerOnCommand(command);
123                    }
124                } else {
125                    transportListenerOnCommand(command);
126                }
127            } else {
128                transportListenerOnCommand(command);
129            }
130        }
131
132        @Override
133        public void onException(IOException error) {
134            try {
135                synchronized (reconnectMutex) {
136                    if (transport == null || !transport.isConnected()) {
137                        return;
138                    }
139
140                    LOG.debug("Transport failed, starting up reconnect task", error);
141
142                    ServiceSupport.dispose(transport);
143                    transport = null;
144                    connectedCount--;
145                    if (primary == this) {
146                        primary = null;
147                    }
148                    reconnectTask.wakeup();
149                }
150            } catch (InterruptedException e) {
151                Thread.currentThread().interrupt();
152                if (transportListener != null) {
153                    transportListener.onException(new InterruptedIOException());
154                }
155            }
156        }
157    }
158
159    public FanoutTransport() throws InterruptedIOException {
160        // Setup a task that is used to reconnect the a connection async.
161        reconnectTaskFactory = new TaskRunnerFactory();
162        reconnectTaskFactory.init();
163        reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
164            public boolean iterate() {
165                return doConnect();
166            }
167        }, "ActiveMQ Fanout Worker: " + System.identityHashCode(this));
168    }
169
170    /**
171     * @return
172     */
173    private boolean doConnect() {
174        long closestReconnectDate = 0;
175        synchronized (reconnectMutex) {
176
177            if (disposed || connectionFailure != null) {
178                reconnectMutex.notifyAll();
179            }
180
181            if (transports.size() == connectedCount || disposed || connectionFailure != null) {
182                return false;
183            } else {
184
185                if (transports.isEmpty()) {
186                    // connectionFailure = new IOException("No uris available to
187                    // connect to.");
188                } else {
189
190                    // Try to connect them up.
191                    Iterator<FanoutTransportHandler> iter = transports.iterator();
192                    for (int i = 0; iter.hasNext() && !disposed; i++) {
193
194                        long now = System.currentTimeMillis();
195
196                        FanoutTransportHandler fanoutHandler = iter.next();
197                        if (fanoutHandler.transport != null) {
198                            continue;
199                        }
200
201                        // Are we waiting a little to try to reconnect this one?
202                        if (fanoutHandler.reconnectDate != 0 && fanoutHandler.reconnectDate > now) {
203                            if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
204                                closestReconnectDate = fanoutHandler.reconnectDate;
205                            }
206                            continue;
207                        }
208
209                        URI uri = fanoutHandler.uri;
210                        try {
211                            LOG.debug("Stopped: " + this);
212                            LOG.debug("Attempting connect to: " + uri);
213                            Transport t = TransportFactory.compositeConnect(uri);
214                            fanoutHandler.transport = t;
215                            t.setTransportListener(fanoutHandler);
216                            if (started) {
217                                restoreTransport(fanoutHandler);
218                            }
219                            LOG.debug("Connection established");
220                            fanoutHandler.reconnectDelay = initialReconnectDelay;
221                            fanoutHandler.connectFailures = 0;
222                            if (primary == null) {
223                                primary = fanoutHandler;
224                            }
225                            connectedCount++;
226                        } catch (Exception e) {
227                            LOG.debug("Connect fail to: " + uri + ", reason: " + e);
228
229                            if( fanoutHandler.transport !=null ) {
230                                ServiceSupport.dispose(fanoutHandler.transport);
231                                fanoutHandler.transport=null;
232                            }
233                            
234                            if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
235                                LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)");
236                                connectionFailure = e;
237                                reconnectMutex.notifyAll();
238                                return false;
239                            } else {
240
241                                if (useExponentialBackOff) {
242                                    // Exponential increment of reconnect delay.
243                                    fanoutHandler.reconnectDelay *= backOffMultiplier;
244                                    if (fanoutHandler.reconnectDelay > maxReconnectDelay) {
245                                        fanoutHandler.reconnectDelay = maxReconnectDelay;
246                                    }
247                                }
248
249                                fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay;
250
251                                if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
252                                    closestReconnectDate = fanoutHandler.reconnectDate;
253                                }
254                            }
255                        }
256                    }
257                    if (transports.size() == connectedCount || disposed) {
258                        reconnectMutex.notifyAll();
259                        return false;
260                    }
261
262                }
263            }
264
265        }
266
267        try {
268            long reconnectDelay = closestReconnectDate - System.currentTimeMillis();
269            if (reconnectDelay > 0) {
270                LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
271                Thread.sleep(reconnectDelay);
272            }
273        } catch (InterruptedException e1) {
274            Thread.currentThread().interrupt();
275        }
276        return true;
277    }
278
279    public void start() throws Exception {
280        synchronized (reconnectMutex) {
281            LOG.debug("Started.");
282            if (started) {
283                return;
284            }
285            started = true;
286            for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
287                FanoutTransportHandler th = iter.next();
288                if (th.transport != null) {
289                    restoreTransport(th);
290                }
291            }
292            connected=true;
293        }
294    }
295
296    public void stop() throws Exception {
297        try {
298            synchronized (reconnectMutex) {
299                ServiceStopper ss = new ServiceStopper();
300
301                if (!started) {
302                    return;
303                }
304                started = false;
305                disposed = true;
306                connected=false;
307
308                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
309                    FanoutTransportHandler th = iter.next();
310                    if (th.transport != null) {
311                        ss.stop(th.transport);
312                    }
313                }
314
315                LOG.debug("Stopped: " + this);
316                ss.throwFirstException();
317            }
318        } finally {
319            reconnectTask.shutdown();
320            reconnectTaskFactory.shutdownNow();
321        }
322    }
323
324        public int getMinAckCount() {
325                return minAckCount;
326        }
327
328        public void setMinAckCount(int minAckCount) {
329                this.minAckCount = minAckCount;
330        }    
331    
332    public long getInitialReconnectDelay() {
333        return initialReconnectDelay;
334    }
335
336    public void setInitialReconnectDelay(long initialReconnectDelay) {
337        this.initialReconnectDelay = initialReconnectDelay;
338    }
339
340    public long getMaxReconnectDelay() {
341        return maxReconnectDelay;
342    }
343
344    public void setMaxReconnectDelay(long maxReconnectDelay) {
345        this.maxReconnectDelay = maxReconnectDelay;
346    }
347
348    public long getReconnectDelayExponent() {
349        return backOffMultiplier;
350    }
351
352    public void setReconnectDelayExponent(long reconnectDelayExponent) {
353        this.backOffMultiplier = reconnectDelayExponent;
354    }
355
356    public int getMaxReconnectAttempts() {
357        return maxReconnectAttempts;
358    }
359
360    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
361        this.maxReconnectAttempts = maxReconnectAttempts;
362    }
363
364    public void oneway(Object o) throws IOException {
365        final Command command = (Command)o;
366        try {
367            synchronized (reconnectMutex) {
368
369                // Wait for transport to be connected.
370                while (connectedCount < minAckCount && !disposed && connectionFailure == null) {
371                    LOG.debug("Waiting for at least " + minAckCount + " transports to be connected.");
372                    reconnectMutex.wait(1000);
373                }
374
375                // Still not fully connected.
376                if (connectedCount < minAckCount) {
377
378                    Exception error;
379
380                    // Throw the right kind of error..
381                    if (disposed) {
382                        error = new IOException("Transport disposed.");
383                    } else if (connectionFailure != null) {
384                        error = connectionFailure;
385                    } else {
386                        error = new IOException("Unexpected failure.");
387                    }
388
389                    if (error instanceof IOException) {
390                        throw (IOException)error;
391                    }
392                    throw IOExceptionSupport.create(error);
393                }
394
395                // If it was a request and it was not being tracked by
396                // the state tracker,
397                // then hold it in the requestMap so that we can replay
398                // it later.
399                boolean fanout = isFanoutCommand(command);
400                if (stateTracker.track(command) == null && command.isResponseRequired()) {
401                    int size = fanout ? minAckCount : 1;
402                    requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
403                }
404                
405                // Send the message.
406                if (fanout) {
407                    for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
408                        FanoutTransportHandler th = iter.next();
409                        if (th.transport != null) {
410                            try {
411                                th.transport.oneway(command);
412                            } catch (IOException e) {
413                                LOG.debug("Send attempt: failed.");
414                                th.onException(e);
415                            }
416                        }
417                    }
418                } else {
419                    try {
420                        primary.transport.oneway(command);
421                    } catch (IOException e) {
422                        LOG.debug("Send attempt: failed.");
423                        primary.onException(e);
424                    }
425                }
426
427            }
428        } catch (InterruptedException e) {
429            // Some one may be trying to stop our thread.
430            Thread.currentThread().interrupt();
431            throw new InterruptedIOException();
432        }
433    }
434
435    /**
436     * @param command
437     * @return
438     */
439    private boolean isFanoutCommand(Command command) {
440        if (command.isMessage()) {
441            if( fanOutQueues ) {
442                return true;
443            }
444            return ((Message)command).getDestination().isTopic();
445        }
446        if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE ||
447                command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
448            return false;
449        }
450        return true;
451    }
452
453    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
454        throw new AssertionError("Unsupported Method");
455    }
456
457    public Object request(Object command) throws IOException {
458        throw new AssertionError("Unsupported Method");
459    }
460
461    public Object request(Object command, int timeout) throws IOException {
462        throw new AssertionError("Unsupported Method");
463    }
464
465    public void reconnect() {
466        LOG.debug("Waking up reconnect task");
467        try {
468            reconnectTask.wakeup();
469        } catch (InterruptedException e) {
470            Thread.currentThread().interrupt();
471        }
472    }
473
474    public TransportListener getTransportListener() {
475        return transportListener;
476    }
477
478    public void setTransportListener(TransportListener commandListener) {
479        this.transportListener = commandListener;
480    }
481
482    public <T> T narrow(Class<T> target) {
483
484        if (target.isAssignableFrom(getClass())) {
485            return target.cast(this);
486        }
487
488        synchronized (reconnectMutex) {
489            for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
490                FanoutTransportHandler th = iter.next();
491                if (th.transport != null) {
492                    T rc = th.transport.narrow(target);
493                    if (rc != null) {
494                        return rc;
495                    }
496                }
497            }
498        }
499
500        return null;
501
502    }
503
504    protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException {
505        th.transport.start();
506        stateTracker.setRestoreConsumers(th.transport == primary);
507        stateTracker.restore(th.transport);
508        for (Iterator<RequestCounter> iter2 = requestMap.values().iterator(); iter2.hasNext();) {
509            RequestCounter rc = iter2.next();
510            th.transport.oneway(rc.command);
511        }
512    }
513
514    public void add(boolean reblance,URI uris[]) {
515
516        synchronized (reconnectMutex) {
517            for (int i = 0; i < uris.length; i++) {
518                URI uri = uris[i];
519
520                boolean match = false;
521                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
522                    FanoutTransportHandler th = iter.next();
523                    if (th.uri.equals(uri)) {
524                        match = true;
525                        break;
526                    }
527                }
528                if (!match) {
529                    FanoutTransportHandler th = new FanoutTransportHandler(uri);
530                    transports.add(th);
531                    reconnect();
532                }
533            }
534        }
535
536    }
537
538    public void remove(boolean rebalance,URI uris[]) {
539
540        synchronized (reconnectMutex) {
541            for (int i = 0; i < uris.length; i++) {
542                URI uri = uris[i];
543
544                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
545                    FanoutTransportHandler th = iter.next();
546                    if (th.uri.equals(uri)) {
547                        if (th.transport != null) {
548                            ServiceSupport.dispose(th.transport);
549                            connectedCount--;
550                        }
551                        iter.remove();
552                        break;
553                    }
554                }
555            }
556        }
557
558    }
559    
560    public void reconnect(URI uri) throws IOException {
561                add(true,new URI[]{uri});
562                
563        }
564    
565    public boolean isReconnectSupported() {
566        return true;
567    }
568
569    public boolean isUpdateURIsSupported() {
570        return true;
571    }
572    public void updateURIs(boolean reblance,URI[] uris) throws IOException {
573        add(reblance,uris);
574    }
575
576
577    public String getRemoteAddress() {
578        if (primary != null) {
579            if (primary.transport != null) {
580                return primary.transport.getRemoteAddress();
581            }
582        }
583        return null;
584    }
585
586    protected void transportListenerOnCommand(Command command) {
587        if (transportListener != null) {
588            transportListener.onCommand(command);
589        }
590    }
591
592    public boolean isFaultTolerant() {
593        return true;
594    }
595
596    public boolean isFanOutQueues() {
597        return fanOutQueues;
598    }
599
600    public void setFanOutQueues(boolean fanOutQueues) {
601        this.fanOutQueues = fanOutQueues;
602    }
603
604        public boolean isDisposed() {
605                return disposed;
606        }
607        
608
609    public boolean isConnected() {
610        return connected;
611    }
612
613    public int getReceiveCounter() {
614        int rc = 0;
615        synchronized (reconnectMutex) {
616            for (FanoutTransportHandler th : transports) {
617                if (th.transport != null) {
618                    rc += th.transport.getReceiveCounter();
619                }
620            }
621        }
622        return rc;
623    }
624}