001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.failover;
018
019import java.io.BufferedReader;
020import java.io.FileReader;
021import java.io.IOException;
022import java.io.InputStreamReader;
023import java.io.InterruptedIOException;
024import java.net.InetAddress;
025import java.net.MalformedURLException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.URL;
029import java.util.ArrayList;
030import java.util.Collections;
031import java.util.HashSet;
032import java.util.Iterator;
033import java.util.LinkedHashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.StringTokenizer;
037import java.util.concurrent.CopyOnWriteArrayList;
038import java.util.concurrent.atomic.AtomicReference;
039
040import org.apache.activemq.broker.SslContext;
041import org.apache.activemq.command.Command;
042import org.apache.activemq.command.ConnectionControl;
043import org.apache.activemq.command.ConnectionId;
044import org.apache.activemq.command.MessageDispatch;
045import org.apache.activemq.command.MessagePull;
046import org.apache.activemq.command.RemoveInfo;
047import org.apache.activemq.command.Response;
048import org.apache.activemq.state.ConnectionStateTracker;
049import org.apache.activemq.state.Tracked;
050import org.apache.activemq.thread.Task;
051import org.apache.activemq.thread.TaskRunner;
052import org.apache.activemq.thread.TaskRunnerFactory;
053import org.apache.activemq.transport.CompositeTransport;
054import org.apache.activemq.transport.DefaultTransportListener;
055import org.apache.activemq.transport.FutureResponse;
056import org.apache.activemq.transport.ResponseCallback;
057import org.apache.activemq.transport.Transport;
058import org.apache.activemq.transport.TransportFactory;
059import org.apache.activemq.transport.TransportListener;
060import org.apache.activemq.util.IOExceptionSupport;
061import org.apache.activemq.util.ServiceSupport;
062import org.apache.activemq.util.URISupport;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * A Transport that is made reliable by being able to fail over to another
068 * transport when a transport failure is detected.
069 */
070public class FailoverTransport implements CompositeTransport {
071
072    private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class);
073    private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
074    private static final int INFINITE = -1;
075    private TransportListener transportListener;
076    private boolean disposed;
077    private boolean connected;
078    private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
079    private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>();
080
081    private final Object reconnectMutex = new Object();
082    private final Object backupMutex = new Object();
083    private final Object sleepMutex = new Object();
084    private final Object listenerMutex = new Object();
085    private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
086    private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>();
087
088    private URI connectedTransportURI;
089    private URI failedConnectTransportURI;
090    private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
091    private final TaskRunnerFactory reconnectTaskFactory;
092    private final TaskRunner reconnectTask;
093    private boolean started;
094    private boolean initialized;
095    private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
096    private long maxReconnectDelay = 1000 * 30;
097    private double backOffMultiplier = 2d;
098    private long timeout = INFINITE;
099    private boolean useExponentialBackOff = true;
100    private boolean randomize = true;
101    private int maxReconnectAttempts = INFINITE;
102    private int startupMaxReconnectAttempts = INFINITE;
103    private int connectFailures;
104    private int warnAfterReconnectAttempts = 10;
105    private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
106    private Exception connectionFailure;
107    private boolean firstConnection = true;
108    // optionally always have a backup created
109    private boolean backup = false;
110    private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>();
111    private int backupPoolSize = 1;
112    private boolean trackMessages = false;
113    private boolean trackTransactionProducers = true;
114    private int maxCacheSize = 128 * 1024;
115    private final TransportListener disposedListener = new DefaultTransportListener() {
116    };
117    private final TransportListener myTransportListener = createTransportListener();
118    private boolean updateURIsSupported = true;
119    private boolean reconnectSupported = true;
120    // remember for reconnect thread
121    private SslContext brokerSslContext;
122    private String updateURIsURL = null;
123    private boolean rebalanceUpdateURIs = true;
124    private boolean doRebalance = false;
125    private boolean connectedToPriority = false;
126
127    private boolean priorityBackup = false;
128    private final ArrayList<URI> priorityList = new ArrayList<URI>();
129    private boolean priorityBackupAvailable = false;
130    private String nestedExtraQueryOptions;
131    private boolean shuttingDown = false;
132
133    public FailoverTransport() throws InterruptedIOException {
134        brokerSslContext = SslContext.getCurrentSslContext();
135        stateTracker.setTrackTransactions(true);
136        // Setup a task that is used to reconnect the a connection async.
137        reconnectTaskFactory = new TaskRunnerFactory();
138        reconnectTaskFactory.init();
139        reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
140            @Override
141            public boolean iterate() {
142                boolean result = false;
143                if (!started) {
144                    return result;
145                }
146                boolean buildBackup = true;
147                synchronized (backupMutex) {
148                    if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
149                        result = doReconnect();
150                        buildBackup = false;
151                    }
152                }
153                if (buildBackup) {
154                    buildBackups();
155                    if (priorityBackup && !connectedToPriority) {
156                        try {
157                            doDelay();
158                            if (reconnectTask == null) {
159                                return true;
160                            }
161                            reconnectTask.wakeup();
162                        } catch (InterruptedException e) {
163                            LOG.debug("Reconnect task has been interrupted.", e);
164                        }
165                    }
166                } else {
167                    // build backups on the next iteration
168                    buildBackup = true;
169                    try {
170                        if (reconnectTask == null) {
171                            return true;
172                        }
173                        reconnectTask.wakeup();
174                    } catch (InterruptedException e) {
175                        LOG.debug("Reconnect task has been interrupted.", e);
176                    }
177                }
178                return result;
179            }
180
181        }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
182    }
183
184    TransportListener createTransportListener() {
185        return new TransportListener() {
186            @Override
187            public void onCommand(Object o) {
188                Command command = (Command) o;
189                if (command == null) {
190                    return;
191                }
192                if (command.isResponse()) {
193                    Object object = null;
194                    synchronized (requestMap) {
195                        object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
196                    }
197                    if (object != null && object.getClass() == Tracked.class) {
198                        ((Tracked) object).onResponses(command);
199                    }
200                }
201                if (!initialized) {
202                    initialized = true;
203                }
204
205                if (command.isConnectionControl()) {
206                    handleConnectionControl((ConnectionControl) command);
207                }
208                if (transportListener != null) {
209                    transportListener.onCommand(command);
210                }
211            }
212
213            @Override
214            public void onException(IOException error) {
215                try {
216                    handleTransportFailure(error);
217                } catch (InterruptedException e) {
218                    Thread.currentThread().interrupt();
219                    transportListener.onException(new InterruptedIOException());
220                }
221            }
222
223            @Override
224            public void transportInterupted() {
225                if (transportListener != null) {
226                    transportListener.transportInterupted();
227                }
228            }
229
230            @Override
231            public void transportResumed() {
232                if (transportListener != null) {
233                    transportListener.transportResumed();
234                }
235            }
236        };
237    }
238
239    public final void disposeTransport(Transport transport) {
240        transport.setTransportListener(disposedListener);
241        ServiceSupport.dispose(transport);
242    }
243
244    public final void handleTransportFailure(IOException e) throws InterruptedException {
245        synchronized (reconnectMutex) {
246            if (shuttingDown) {
247                // shutdown info sent and remote socket closed and we see that before a local close
248                // let the close do the work
249                return;
250            }
251
252            if (LOG.isTraceEnabled()) {
253                LOG.trace(this + " handleTransportFailure: " + e, e);
254            }
255
256            Transport transport = connectedTransport.getAndSet(null);
257
258            if (transport != null) {
259
260                disposeTransport(transport);
261
262                boolean reconnectOk = false;
263
264                if (canReconnect()) {
265                    reconnectOk = true;
266                }
267                LOG.warn("Transport (" + transport + ") failed"
268                        + (reconnectOk ? "," : ", not") + " attempting to automatically reconnect", e);
269
270                initialized = false;
271                failedConnectTransportURI = connectedTransportURI;
272                connectedTransportURI = null;
273                connected = false;
274                connectedToPriority = false;
275
276                if (reconnectOk) {
277                    // notify before any reconnect attempt so ack state can be whacked
278                    if (transportListener != null) {
279                        transportListener.transportInterupted();
280                    }
281
282                    updated.remove(failedConnectTransportURI);
283                    reconnectTask.wakeup();
284                } else if (!isDisposed()) {
285                    propagateFailureToExceptionListener(e);
286                }
287            }
288        }
289    }
290
291    private boolean canReconnect() {
292        return started && 0 != calculateReconnectAttemptLimit();
293    }
294
295    public final void handleConnectionControl(ConnectionControl control) {
296        String reconnectStr = control.getReconnectTo();
297        if (LOG.isTraceEnabled()) {
298            LOG.trace("Received ConnectionControl: {}", control);
299        }
300
301        if (reconnectStr != null) {
302            reconnectStr = reconnectStr.trim();
303            if (reconnectStr.length() > 0) {
304                try {
305                    URI uri = new URI(reconnectStr);
306                    if (isReconnectSupported()) {
307                        reconnect(uri);
308                        LOG.info("Reconnected to: " + uri);
309                    }
310                } catch (Exception e) {
311                    LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e);
312                }
313            }
314        }
315        processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers());
316    }
317
318    private final void processNewTransports(boolean rebalance, String newTransports) {
319        if (newTransports != null) {
320            newTransports = newTransports.trim();
321            if (newTransports.length() > 0 && isUpdateURIsSupported()) {
322                List<URI> list = new ArrayList<URI>();
323                StringTokenizer tokenizer = new StringTokenizer(newTransports, ",");
324                while (tokenizer.hasMoreTokens()) {
325                    String str = tokenizer.nextToken();
326                    try {
327                        URI uri = new URI(str);
328                        list.add(uri);
329                    } catch (Exception e) {
330                        LOG.error("Failed to parse broker address: " + str, e);
331                    }
332                }
333                if (list.isEmpty() == false) {
334                    try {
335                        updateURIs(rebalance, list.toArray(new URI[list.size()]));
336                    } catch (IOException e) {
337                        LOG.error("Failed to update transport URI's from: " + newTransports, e);
338                    }
339                }
340            }
341        }
342    }
343
344    @Override
345    public void start() throws Exception {
346        synchronized (reconnectMutex) {
347            if (LOG.isDebugEnabled()) {
348                LOG.debug("Started " + this);
349            }
350            if (started) {
351                return;
352            }
353            started = true;
354            stateTracker.setMaxCacheSize(getMaxCacheSize());
355            stateTracker.setTrackMessages(isTrackMessages());
356            stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
357            if (connectedTransport.get() != null) {
358                stateTracker.restore(connectedTransport.get());
359            } else {
360                reconnect(false);
361            }
362        }
363    }
364
365    @Override
366    public void stop() throws Exception {
367        Transport transportToStop = null;
368        List<Transport> backupsToStop = new ArrayList<Transport>(backups.size());
369
370        try {
371            synchronized (reconnectMutex) {
372                if (LOG.isDebugEnabled()) {
373                    LOG.debug("Stopped " + this);
374                }
375                if (!started) {
376                    return;
377                }
378                started = false;
379                disposed = true;
380                connected = false;
381
382                if (connectedTransport.get() != null) {
383                    transportToStop = connectedTransport.getAndSet(null);
384                }
385                reconnectMutex.notifyAll();
386            }
387            synchronized (sleepMutex) {
388                sleepMutex.notifyAll();
389            }
390        } finally {
391            reconnectTask.shutdown();
392            reconnectTaskFactory.shutdownNow();
393        }
394
395        synchronized(backupMutex) {
396            for (BackupTransport backup : backups) {
397                backup.setDisposed(true);
398                Transport transport = backup.getTransport();
399                if (transport != null) {
400                    transport.setTransportListener(disposedListener);
401                    backupsToStop.add(transport);
402                }
403            }
404            backups.clear();
405        }
406        for (Transport transport : backupsToStop) {
407            try {
408                if (LOG.isTraceEnabled()) {
409                    LOG.trace("Stopped backup: " + transport);
410                }
411                disposeTransport(transport);
412            } catch (Exception e) {
413            }
414        }
415        if (transportToStop != null) {
416            transportToStop.stop();
417        }
418    }
419
420    public long getInitialReconnectDelay() {
421        return initialReconnectDelay;
422    }
423
424    public void setInitialReconnectDelay(long initialReconnectDelay) {
425        this.initialReconnectDelay = initialReconnectDelay;
426    }
427
428    public long getMaxReconnectDelay() {
429        return maxReconnectDelay;
430    }
431
432    public void setMaxReconnectDelay(long maxReconnectDelay) {
433        this.maxReconnectDelay = maxReconnectDelay;
434    }
435
436    public long getReconnectDelay() {
437        return reconnectDelay;
438    }
439
440    public void setReconnectDelay(long reconnectDelay) {
441        this.reconnectDelay = reconnectDelay;
442    }
443
444    public double getReconnectDelayExponent() {
445        return backOffMultiplier;
446    }
447
448    public void setReconnectDelayExponent(double reconnectDelayExponent) {
449        this.backOffMultiplier = reconnectDelayExponent;
450    }
451
452    public Transport getConnectedTransport() {
453        return connectedTransport.get();
454    }
455
456    public URI getConnectedTransportURI() {
457        return connectedTransportURI;
458    }
459
460    public int getMaxReconnectAttempts() {
461        return maxReconnectAttempts;
462    }
463
464    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
465        this.maxReconnectAttempts = maxReconnectAttempts;
466    }
467
468    public int getStartupMaxReconnectAttempts() {
469        return this.startupMaxReconnectAttempts;
470    }
471
472    public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) {
473        this.startupMaxReconnectAttempts = startupMaxReconnectAttempts;
474    }
475
476    public long getTimeout() {
477        return timeout;
478    }
479
480    public void setTimeout(long timeout) {
481        this.timeout = timeout;
482    }
483
484    /**
485     * @return Returns the randomize.
486     */
487    public boolean isRandomize() {
488        return randomize;
489    }
490
491    /**
492     * @param randomize The randomize to set.
493     */
494    public void setRandomize(boolean randomize) {
495        this.randomize = randomize;
496    }
497
498    public boolean isBackup() {
499        return backup;
500    }
501
502    public void setBackup(boolean backup) {
503        this.backup = backup;
504    }
505
506    public int getBackupPoolSize() {
507        return backupPoolSize;
508    }
509
510    public void setBackupPoolSize(int backupPoolSize) {
511        this.backupPoolSize = backupPoolSize;
512    }
513
514    public int getCurrentBackups() {
515        return this.backups.size();
516    }
517
518    public boolean isTrackMessages() {
519        return trackMessages;
520    }
521
522    public void setTrackMessages(boolean trackMessages) {
523        this.trackMessages = trackMessages;
524    }
525
526    public boolean isTrackTransactionProducers() {
527        return this.trackTransactionProducers;
528    }
529
530    public void setTrackTransactionProducers(boolean trackTransactionProducers) {
531        this.trackTransactionProducers = trackTransactionProducers;
532    }
533
534    public int getMaxCacheSize() {
535        return maxCacheSize;
536    }
537
538    public void setMaxCacheSize(int maxCacheSize) {
539        this.maxCacheSize = maxCacheSize;
540    }
541
542    public boolean isPriorityBackup() {
543        return priorityBackup;
544    }
545
546    public void setPriorityBackup(boolean priorityBackup) {
547        this.priorityBackup = priorityBackup;
548    }
549
550    public void setPriorityURIs(String priorityURIs) {
551        StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ",");
552        while (tokenizer.hasMoreTokens()) {
553            String str = tokenizer.nextToken();
554            try {
555                URI uri = new URI(str);
556                priorityList.add(uri);
557            } catch (Exception e) {
558                LOG.error("Failed to parse broker address: " + str, e);
559            }
560        }
561    }
562
563    @Override
564    public void oneway(Object o) throws IOException {
565
566        Command command = (Command) o;
567        Exception error = null;
568        try {
569
570            synchronized (reconnectMutex) {
571
572                if (command != null && connectedTransport.get() == null) {
573                    if (command.isShutdownInfo()) {
574                        // Skipping send of ShutdownInfo command when not connected.
575                        return;
576                    } else if (command instanceof RemoveInfo || command.isMessageAck()) {
577                        // Simulate response to RemoveInfo command or MessageAck (as it will be stale)
578                        stateTracker.track(command);
579                        if (command.isResponseRequired()) {
580                            Response response = new Response();
581                            response.setCorrelationId(command.getCommandId());
582                            myTransportListener.onCommand(response);
583                        }
584                        return;
585                    } else if (command instanceof MessagePull) {
586                        // Simulate response to MessagePull if timed as we can't honor that now.
587                        MessagePull pullRequest = (MessagePull) command;
588                        if (pullRequest.getTimeout() != 0) {
589                            MessageDispatch dispatch = new MessageDispatch();
590                            dispatch.setConsumerId(pullRequest.getConsumerId());
591                            dispatch.setDestination(pullRequest.getDestination());
592                            myTransportListener.onCommand(dispatch);
593                        }
594                        return;
595                    }
596                }
597
598                // Keep trying until the message is sent.
599                for (int i = 0; !disposed; i++) {
600                    try {
601
602                        // Wait for transport to be connected.
603                        Transport transport = connectedTransport.get();
604                        long start = System.currentTimeMillis();
605                        boolean timedout = false;
606                        while (transport == null && !disposed && connectionFailure == null
607                                && !Thread.currentThread().isInterrupted()) {
608                            if (LOG.isTraceEnabled()) {
609                                LOG.trace("Waiting for transport to reconnect..: " + command);
610                            }
611                            long end = System.currentTimeMillis();
612                            if (command.isMessage() && timeout > 0 && (end - start > timeout)) {
613                                timedout = true;
614                                if (LOG.isInfoEnabled()) {
615                                    LOG.info("Failover timed out after " + (end - start) + "ms");
616                                }
617                                break;
618                            }
619                            try {
620                                reconnectMutex.wait(100);
621                            } catch (InterruptedException e) {
622                                Thread.currentThread().interrupt();
623                                if (LOG.isDebugEnabled()) {
624                                    LOG.debug("Interupted: " + e, e);
625                                }
626                            }
627                            transport = connectedTransport.get();
628                        }
629
630                        if (transport == null) {
631                            // Previous loop may have exited due to use being
632                            // disposed.
633                            if (disposed) {
634                                error = new IOException("Transport disposed.");
635                            } else if (connectionFailure != null) {
636                                error = connectionFailure;
637                            } else if (timedout == true) {
638                                error = new IOException("Failover timeout of " + timeout + " ms reached.");
639                            } else {
640                                error = new IOException("Unexpected failure.");
641                            }
642                            break;
643                        }
644
645                        Tracked tracked = null;
646                        try {
647                            tracked = stateTracker.track(command);
648                        } catch (IOException ioe) {
649                            LOG.debug("Cannot track the command " + command, ioe);
650                        }
651                        // If it was a request and it was not being tracked by
652                        // the state tracker,
653                        // then hold it in the requestMap so that we can replay
654                        // it later.
655                        synchronized (requestMap) {
656                            if (tracked != null && tracked.isWaitingForResponse()) {
657                                requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
658                            } else if (tracked == null && command.isResponseRequired()) {
659                                requestMap.put(Integer.valueOf(command.getCommandId()), command);
660                            }
661                        }
662
663                        // Send the message.
664                        try {
665                            transport.oneway(command);
666                            stateTracker.trackBack(command);
667                            if (command.isShutdownInfo()) {
668                                shuttingDown = true;
669                            }
670                        } catch (IOException e) {
671
672                            // If the command was not tracked.. we will retry in
673                            // this method
674                            if (tracked == null) {
675
676                                // since we will retry in this method.. take it
677                                // out of the request
678                                // map so that it is not sent 2 times on
679                                // recovery
680                                if (command.isResponseRequired()) {
681                                    requestMap.remove(Integer.valueOf(command.getCommandId()));
682                                }
683
684                                // Rethrow the exception so it will handled by
685                                // the outer catch
686                                throw e;
687                            } else {
688                                // Handle the error but allow the method to return since the
689                                // tracked commands are replayed on reconnect.
690                                if (LOG.isDebugEnabled()) {
691                                    LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
692                                }
693                                handleTransportFailure(e);
694                            }
695                        }
696
697                        return;
698
699                    } catch (IOException e) {
700                        if (LOG.isDebugEnabled()) {
701                            LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
702                        }
703                        handleTransportFailure(e);
704                    }
705                }
706            }
707        } catch (InterruptedException e) {
708            // Some one may be trying to stop our thread.
709            Thread.currentThread().interrupt();
710            throw new InterruptedIOException();
711        }
712
713        if (!disposed) {
714            if (error != null) {
715                if (error instanceof IOException) {
716                    throw (IOException) error;
717                }
718                throw IOExceptionSupport.create(error);
719            }
720        }
721    }
722
723    @Override
724    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
725        throw new AssertionError("Unsupported Method");
726    }
727
728    @Override
729    public Object request(Object command) throws IOException {
730        throw new AssertionError("Unsupported Method");
731    }
732
733    @Override
734    public Object request(Object command, int timeout) throws IOException {
735        throw new AssertionError("Unsupported Method");
736    }
737
738    @Override
739    public void add(boolean rebalance, URI u[]) {
740        boolean newURI = false;
741        for (URI uri : u) {
742            if (!contains(uri)) {
743                uris.add(uri);
744                newURI = true;
745            }
746        }
747        if (newURI) {
748            reconnect(rebalance);
749        }
750    }
751
752    @Override
753    public void remove(boolean rebalance, URI u[]) {
754        for (URI uri : u) {
755            uris.remove(uri);
756        }
757        // rebalance is automatic if any connected to removed/stopped broker
758    }
759
760    public void add(boolean rebalance, String u) {
761        try {
762            URI newURI = new URI(u);
763            if (contains(newURI) == false) {
764                uris.add(newURI);
765                reconnect(rebalance);
766            }
767
768        } catch (Exception e) {
769            LOG.error("Failed to parse URI: " + u);
770        }
771    }
772
773    public void reconnect(boolean rebalance) {
774        synchronized (reconnectMutex) {
775            if (started) {
776                if (rebalance) {
777                    doRebalance = true;
778                }
779                LOG.debug("Waking up reconnect task");
780                try {
781                    reconnectTask.wakeup();
782                } catch (InterruptedException e) {
783                    Thread.currentThread().interrupt();
784                }
785            } else {
786                LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
787            }
788        }
789    }
790
791    private List<URI> getConnectList() {
792        if (!updated.isEmpty()) {
793            return updated;
794        }
795        ArrayList<URI> l = new ArrayList<URI>(uris);
796        boolean removed = false;
797        if (failedConnectTransportURI != null) {
798            removed = l.remove(failedConnectTransportURI);
799        }
800        if (randomize) {
801            // Randomly, reorder the list by random swapping
802            for (int i = 0; i < l.size(); i++) {
803                // meed parenthesis due other JDKs (see AMQ-4826)
804                int p = ((int) (Math.random() * 100)) % l.size();
805                URI t = l.get(p);
806                l.set(p, l.get(i));
807                l.set(i, t);
808            }
809        }
810        if (removed) {
811            l.add(failedConnectTransportURI);
812        }
813        if (LOG.isDebugEnabled()) {
814            LOG.debug("urlList connectionList:" + l + ", from: " + uris);
815        }
816        return l;
817    }
818
819    @Override
820    public TransportListener getTransportListener() {
821        return transportListener;
822    }
823
824    @Override
825    public void setTransportListener(TransportListener commandListener) {
826        synchronized (listenerMutex) {
827            this.transportListener = commandListener;
828            listenerMutex.notifyAll();
829        }
830    }
831
832    @Override
833    public <T> T narrow(Class<T> target) {
834
835        if (target.isAssignableFrom(getClass())) {
836            return target.cast(this);
837        }
838        Transport transport = connectedTransport.get();
839        if (transport != null) {
840            return transport.narrow(target);
841        }
842        return null;
843
844    }
845
846    protected void restoreTransport(Transport t) throws Exception, IOException {
847        t.start();
848        // send information to the broker - informing it we are an ft client
849        ConnectionControl cc = new ConnectionControl();
850        cc.setFaultTolerant(true);
851        t.oneway(cc);
852        stateTracker.restore(t);
853        Map<Integer, Command> tmpMap = null;
854        synchronized (requestMap) {
855            tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
856        }
857        for (Command command : tmpMap.values()) {
858            if (LOG.isTraceEnabled()) {
859                LOG.trace("restore requestMap, replay: " + command);
860            }
861            t.oneway(command);
862        }
863    }
864
865    public boolean isUseExponentialBackOff() {
866        return useExponentialBackOff;
867    }
868
869    public void setUseExponentialBackOff(boolean useExponentialBackOff) {
870        this.useExponentialBackOff = useExponentialBackOff;
871    }
872
873    @Override
874    public String toString() {
875        return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
876    }
877
878    @Override
879    public String getRemoteAddress() {
880        Transport transport = connectedTransport.get();
881        if (transport != null) {
882            return transport.getRemoteAddress();
883        }
884        return null;
885    }
886
887    @Override
888    public boolean isFaultTolerant() {
889        return true;
890    }
891
892    private void doUpdateURIsFromDisk() {
893        // If updateURIsURL is specified, read the file and add any new
894        // transport URI's to this FailOverTransport.
895        // Note: Could track file timestamp to avoid unnecessary reading.
896        String fileURL = getUpdateURIsURL();
897        if (fileURL != null) {
898            BufferedReader in = null;
899            String newUris = null;
900            StringBuffer buffer = new StringBuffer();
901
902            try {
903                in = new BufferedReader(getURLStream(fileURL));
904                while (true) {
905                    String line = in.readLine();
906                    if (line == null) {
907                        break;
908                    }
909                    buffer.append(line);
910                }
911                newUris = buffer.toString();
912            } catch (IOException ioe) {
913                LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
914            } finally {
915                if (in != null) {
916                    try {
917                        in.close();
918                    } catch (IOException ioe) {
919                        // ignore
920                    }
921                }
922            }
923
924            processNewTransports(isRebalanceUpdateURIs(), newUris);
925        }
926    }
927
928    final boolean doReconnect() {
929        Exception failure = null;
930        synchronized (reconnectMutex) {
931
932            // First ensure we are up to date.
933            doUpdateURIsFromDisk();
934
935            if (disposed || connectionFailure != null) {
936                reconnectMutex.notifyAll();
937            }
938            if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) {
939                return false;
940            } else {
941                List<URI> connectList = getConnectList();
942                if (connectList.isEmpty()) {
943                    failure = new IOException("No uris available to connect to.");
944                } else {
945                    if (doRebalance) {
946                        if (connectedToPriority || compareURIs(connectList.get(0), connectedTransportURI)) {
947                            // already connected to first in the list, no need to rebalance
948                            doRebalance = false;
949                            return false;
950                        } else {
951                            if (LOG.isDebugEnabled()) {
952                                LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
953                            }
954
955                            try {
956                                Transport transport = this.connectedTransport.getAndSet(null);
957                                if (transport != null) {
958                                    disposeTransport(transport);
959                                }
960                            } catch (Exception e) {
961                                if (LOG.isDebugEnabled()) {
962                                    LOG.debug("Caught an exception stopping existing transport for rebalance", e);
963                                }
964                            }
965                        }
966                        doRebalance = false;
967                    }
968
969                    resetReconnectDelay();
970
971                    Transport transport = null;
972                    URI uri = null;
973
974                    // If we have a backup already waiting lets try it.
975                    synchronized (backupMutex) {
976                        if ((priorityBackup || backup) && !backups.isEmpty()) {
977                            ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups);
978                            if (randomize) {
979                                Collections.shuffle(l);
980                            }
981                            BackupTransport bt = l.remove(0);
982                            backups.remove(bt);
983                            transport = bt.getTransport();
984                            uri = bt.getUri();
985                            if (priorityBackup && priorityBackupAvailable) {
986                                Transport old = this.connectedTransport.getAndSet(null);
987                                if (old != null) {
988                                    disposeTransport(old);
989                                }
990                                priorityBackupAvailable = false;
991                            }
992                        }
993                    }
994
995                    // Sleep for the reconnectDelay if there's no backup and we aren't trying
996                    // for the first time, or we were disposed for some reason.
997                    if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) {
998                        synchronized (sleepMutex) {
999                            if (LOG.isDebugEnabled()) {
1000                                LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
1001                            }
1002                            try {
1003                                sleepMutex.wait(reconnectDelay);
1004                            } catch (InterruptedException e) {
1005                                Thread.currentThread().interrupt();
1006                            }
1007                        }
1008                    }
1009
1010                    Iterator<URI> iter = connectList.iterator();
1011                    while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {
1012
1013                        try {
1014                            SslContext.setCurrentSslContext(brokerSslContext);
1015
1016                            // We could be starting with a backup and if so we wait to grab a
1017                            // URI from the pool until next time around.
1018                            if (transport == null) {
1019                                uri = addExtraQueryOptions(iter.next());
1020                                transport = TransportFactory.compositeConnect(uri);
1021                            }
1022
1023                            if (LOG.isDebugEnabled()) {
1024                                LOG.debug("Attempting  " + connectFailures + "th  connect to: " + uri);
1025                            }
1026                            transport.setTransportListener(myTransportListener);
1027                            transport.start();
1028
1029                            if (started &&  !firstConnection) {
1030                                restoreTransport(transport);
1031                            }
1032
1033                            if (LOG.isDebugEnabled()) {
1034                                LOG.debug("Connection established");
1035                            }
1036                            reconnectDelay = initialReconnectDelay;
1037                            connectedTransportURI = uri;
1038                            connectedTransport.set(transport);
1039                            connectedToPriority = isPriority(connectedTransportURI);
1040                            reconnectMutex.notifyAll();
1041                            connectFailures = 0;
1042
1043                            // Make sure on initial startup, that the transportListener
1044                            // has been initialized for this instance.
1045                            synchronized (listenerMutex) {
1046                                if (transportListener == null) {
1047                                    try {
1048                                        // if it isn't set after 2secs - it probably never will be
1049                                        listenerMutex.wait(2000);
1050                                    } catch (InterruptedException ex) {
1051                                    }
1052                                }
1053                            }
1054
1055                            if (transportListener != null) {
1056                                transportListener.transportResumed();
1057                            } else {
1058                                if (LOG.isDebugEnabled()) {
1059                                    LOG.debug("transport resumed by transport listener not set");
1060                                }
1061                            }
1062
1063                            if (firstConnection) {
1064                                firstConnection = false;
1065                                LOG.info("Successfully connected to " + uri);
1066                            } else {
1067                                LOG.info("Successfully reconnected to " + uri);
1068                            }
1069
1070                            connected = true;
1071                            return false;
1072                        } catch (Exception e) {
1073                            failure = e;
1074                            if (LOG.isDebugEnabled()) {
1075                                LOG.debug("Connect fail to: " + uri + ", reason: " + e);
1076                            }
1077                            if (transport != null) {
1078                                try {
1079                                    transport.stop();
1080                                    transport = null;
1081                                } catch (Exception ee) {
1082                                    if (LOG.isDebugEnabled()) {
1083                                        LOG.debug("Stop of failed transport: " + transport +
1084                                                  " failed with reason: " + ee);
1085                                    }
1086                                }
1087                            }
1088                        } finally {
1089                            SslContext.setCurrentSslContext(null);
1090                        }
1091                    }
1092                }
1093            }
1094
1095            int reconnectLimit = calculateReconnectAttemptLimit();
1096
1097            connectFailures++;
1098            if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) {
1099                LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)");
1100                connectionFailure = failure;
1101
1102                // Make sure on initial startup, that the transportListener has been
1103                // initialized for this instance.
1104                synchronized (listenerMutex) {
1105                    if (transportListener == null) {
1106                        try {
1107                            listenerMutex.wait(2000);
1108                        } catch (InterruptedException ex) {
1109                        }
1110                    }
1111                }
1112
1113                propagateFailureToExceptionListener(connectionFailure);
1114                return false;
1115            }
1116
1117            int warnInterval = getWarnAfterReconnectAttempts();
1118            if (warnInterval > 0 && (connectFailures % warnInterval) == 0) {
1119                LOG.warn("Failed to connect to {} after: {} attempt(s) continuing to retry.",
1120                         uris, connectFailures);
1121            }
1122        }
1123
1124        if (!disposed) {
1125            doDelay();
1126        }
1127
1128        return !disposed;
1129    }
1130
1131    private void doDelay() {
1132        if (reconnectDelay > 0) {
1133            synchronized (sleepMutex) {
1134                if (LOG.isDebugEnabled()) {
1135                    LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection");
1136                }
1137                try {
1138                    sleepMutex.wait(reconnectDelay);
1139                } catch (InterruptedException e) {
1140                    Thread.currentThread().interrupt();
1141                }
1142            }
1143        }
1144
1145        if (useExponentialBackOff) {
1146            // Exponential increment of reconnect delay.
1147            reconnectDelay *= backOffMultiplier;
1148            if (reconnectDelay > maxReconnectDelay) {
1149                reconnectDelay = maxReconnectDelay;
1150            }
1151        }
1152    }
1153
1154    private void resetReconnectDelay() {
1155        if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
1156            reconnectDelay = initialReconnectDelay;
1157        }
1158    }
1159
1160    /*
1161      * called with reconnectMutex held
1162     */
1163    private void propagateFailureToExceptionListener(Exception exception) {
1164        if (transportListener != null) {
1165            if (exception instanceof IOException) {
1166                transportListener.onException((IOException)exception);
1167            } else {
1168                transportListener.onException(IOExceptionSupport.create(exception));
1169            }
1170        }
1171        reconnectMutex.notifyAll();
1172    }
1173
1174    private int calculateReconnectAttemptLimit() {
1175        int maxReconnectValue = this.maxReconnectAttempts;
1176        if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) {
1177            maxReconnectValue = this.startupMaxReconnectAttempts;
1178        }
1179        return maxReconnectValue;
1180    }
1181
1182    private boolean shouldBuildBackups() {
1183       return (backup && backups.size() < backupPoolSize) || (priorityBackup && !(priorityBackupAvailable || connectedToPriority));
1184    }
1185
1186    final boolean buildBackups() {
1187        synchronized (backupMutex) {
1188            if (!disposed && shouldBuildBackups()) {
1189                ArrayList<URI> backupList = new ArrayList<URI>(priorityList);
1190                List<URI> connectList = getConnectList();
1191                for (URI uri: connectList) {
1192                    if (!backupList.contains(uri)) {
1193                        backupList.add(uri);
1194                    }
1195                }
1196                // removed disposed backups
1197                List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
1198                for (BackupTransport bt : backups) {
1199                    if (bt.isDisposed()) {
1200                        disposedList.add(bt);
1201                    }
1202                }
1203                backups.removeAll(disposedList);
1204                disposedList.clear();
1205                for (Iterator<URI> iter = backupList.iterator(); !disposed && iter.hasNext() && shouldBuildBackups(); ) {
1206                    URI uri = addExtraQueryOptions(iter.next());
1207                    if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
1208                        try {
1209                            SslContext.setCurrentSslContext(brokerSslContext);
1210                            BackupTransport bt = new BackupTransport(this);
1211                            bt.setUri(uri);
1212                            if (!backups.contains(bt)) {
1213                                Transport t = TransportFactory.compositeConnect(uri);
1214                                t.setTransportListener(bt);
1215                                t.start();
1216                                bt.setTransport(t);
1217                                if (priorityBackup && isPriority(uri)) {
1218                                   priorityBackupAvailable = true;
1219                                   backups.add(0, bt);
1220                                   // if this priority backup overflows the pool
1221                                   // remove the backup with the lowest priority
1222                                   if (backups.size() > backupPoolSize) {
1223                                       BackupTransport disposeTransport = backups.remove(backups.size() - 1);
1224                                       disposeTransport.setDisposed(true);
1225                                       Transport transport = disposeTransport.getTransport();
1226                                       if (transport != null) {
1227                                           transport.setTransportListener(disposedListener);
1228                                           disposeTransport(transport);
1229                                       }
1230                                   }
1231                                } else {
1232                                    backups.add(bt);
1233                                }
1234                            }
1235                        } catch (Exception e) {
1236                            LOG.debug("Failed to build backup ", e);
1237                        } finally {
1238                            SslContext.setCurrentSslContext(null);
1239                        }
1240                    }
1241                }
1242            }
1243        }
1244        return false;
1245    }
1246
1247    protected boolean isPriority(URI uri) {
1248        if (!priorityBackup) {
1249            return false;
1250        }
1251
1252        if (!priorityList.isEmpty()) {
1253            return priorityList.contains(uri);
1254        }
1255        return uris.indexOf(uri) == 0;
1256    }
1257
1258    @Override
1259    public boolean isDisposed() {
1260        return disposed;
1261    }
1262
1263    @Override
1264    public boolean isConnected() {
1265        return connected;
1266    }
1267
1268    @Override
1269    public void reconnect(URI uri) throws IOException {
1270        add(true, new URI[]{uri});
1271    }
1272
1273    @Override
1274    public boolean isReconnectSupported() {
1275        return this.reconnectSupported;
1276    }
1277
1278    public void setReconnectSupported(boolean value) {
1279        this.reconnectSupported = value;
1280    }
1281
1282    @Override
1283    public boolean isUpdateURIsSupported() {
1284        return this.updateURIsSupported;
1285    }
1286
1287    public void setUpdateURIsSupported(boolean value) {
1288        this.updateURIsSupported = value;
1289    }
1290
1291    @Override
1292    public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
1293        if (isUpdateURIsSupported()) {
1294            HashSet<URI> copy = new HashSet<URI>(this.updated);
1295            updated.clear();
1296            if (updatedURIs != null && updatedURIs.length > 0) {
1297                for (URI uri : updatedURIs) {
1298                    if (uri != null && !updated.contains(uri)) {
1299                        updated.add(uri);
1300                    }
1301                }
1302                if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) {
1303                    buildBackups();
1304                    synchronized (reconnectMutex) {
1305                        reconnect(rebalance);
1306                    }
1307                }
1308            }
1309        }
1310    }
1311
1312    /**
1313     * @return the updateURIsURL
1314     */
1315    public String getUpdateURIsURL() {
1316        return this.updateURIsURL;
1317    }
1318
1319    /**
1320     * @param updateURIsURL the updateURIsURL to set
1321     */
1322    public void setUpdateURIsURL(String updateURIsURL) {
1323        this.updateURIsURL = updateURIsURL;
1324    }
1325
1326    /**
1327     * @return the rebalanceUpdateURIs
1328     */
1329    public boolean isRebalanceUpdateURIs() {
1330        return this.rebalanceUpdateURIs;
1331    }
1332
1333    /**
1334     * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set
1335     */
1336    public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) {
1337        this.rebalanceUpdateURIs = rebalanceUpdateURIs;
1338    }
1339
1340    @Override
1341    public int getReceiveCounter() {
1342        Transport transport = connectedTransport.get();
1343        if (transport == null) {
1344            return 0;
1345        }
1346        return transport.getReceiveCounter();
1347    }
1348
1349    public int getConnectFailures() {
1350        return connectFailures;
1351    }
1352
1353    public void connectionInterruptProcessingComplete(ConnectionId connectionId) {
1354        synchronized (reconnectMutex) {
1355            stateTracker.connectionInterruptProcessingComplete(this, connectionId);
1356        }
1357    }
1358
1359    public ConnectionStateTracker getStateTracker() {
1360        return stateTracker;
1361    }
1362
1363    private boolean contains(URI newURI) {
1364        boolean result = false;
1365        for (URI uri : uris) {
1366            if (compareURIs(newURI, uri)) {
1367                result = true;
1368                break;
1369            }
1370        }
1371
1372        return result;
1373    }
1374
1375    private boolean compareURIs(final URI first, final URI second) {
1376
1377        boolean result = false;
1378        if (first == null || second == null) {
1379            return result;
1380        }
1381
1382        if (first.getPort() == second.getPort()) {
1383            InetAddress firstAddr = null;
1384            InetAddress secondAddr = null;
1385            try {
1386                firstAddr = InetAddress.getByName(first.getHost());
1387                secondAddr = InetAddress.getByName(second.getHost());
1388
1389                if (firstAddr.equals(secondAddr)) {
1390                    result = true;
1391                }
1392
1393            } catch(IOException e) {
1394
1395                if (firstAddr == null) {
1396                    LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e);
1397                } else {
1398                    LOG.error("Failed to Lookup INetAddress for URI[ " + second + " ] : " + e);
1399                }
1400
1401                if (first.getHost().equalsIgnoreCase(second.getHost())) {
1402                    result = true;
1403                }
1404            }
1405        }
1406
1407        return result;
1408    }
1409
1410    private InputStreamReader getURLStream(String path) throws IOException {
1411        InputStreamReader result = null;
1412        URL url = null;
1413        try {
1414            url = new URL(path);
1415            result = new InputStreamReader(url.openStream());
1416        } catch (MalformedURLException e) {
1417            // ignore - it could be a path to a a local file
1418        }
1419        if (result == null) {
1420            result = new FileReader(path);
1421        }
1422        return result;
1423    }
1424
1425    private URI addExtraQueryOptions(URI uri) {
1426        try {
1427            if( nestedExtraQueryOptions!=null && !nestedExtraQueryOptions.isEmpty() ) {
1428                if( uri.getQuery() == null ) {
1429                    uri = URISupport.createURIWithQuery(uri, nestedExtraQueryOptions);
1430                } else {
1431                    uri = URISupport.createURIWithQuery(uri, uri.getQuery()+"&"+nestedExtraQueryOptions);
1432                }
1433            }
1434        } catch (URISyntaxException e) {
1435            throw new RuntimeException(e);
1436        }
1437        return uri;
1438    }
1439
1440    public void setNestedExtraQueryOptions(String nestedExtraQueryOptions) {
1441        this.nestedExtraQueryOptions = nestedExtraQueryOptions;
1442    }
1443
1444    public int getWarnAfterReconnectAttempts() {
1445        return warnAfterReconnectAttempts;
1446    }
1447
1448    /**
1449     * Sets the number of Connect / Reconnect attempts that must occur before a warn message
1450     * is logged indicating that the transport is not connected.  This can be useful when the
1451     * client is running inside some container or service as it give an indication of some
1452     * problem with the client connection that might not otherwise be visible.  To disable the
1453     * log messages this value should be set to a value @{code attempts <= 0}
1454     *
1455     * @param warnAfterReconnectAttempts
1456     *      The number of failed connection attempts that must happen before a warning is logged.
1457     */
1458    public void setWarnAfterReconnectAttempts(int warnAfterReconnectAttempts) {
1459        this.warnAfterReconnectAttempts = warnAfterReconnectAttempts;
1460    }
1461
1462}