001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.failover; 018 019import java.io.BufferedReader; 020import java.io.FileReader; 021import java.io.IOException; 022import java.io.InputStreamReader; 023import java.io.InterruptedIOException; 024import java.net.InetAddress; 025import java.net.MalformedURLException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.net.URL; 029import java.util.ArrayList; 030import java.util.Collections; 031import java.util.HashSet; 032import java.util.Iterator; 033import java.util.LinkedHashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.StringTokenizer; 037import java.util.concurrent.CopyOnWriteArrayList; 038import java.util.concurrent.atomic.AtomicReference; 039 040import org.apache.activemq.broker.SslContext; 041import org.apache.activemq.command.Command; 042import org.apache.activemq.command.ConnectionControl; 043import org.apache.activemq.command.ConnectionId; 044import org.apache.activemq.command.MessageDispatch; 045import org.apache.activemq.command.MessagePull; 046import org.apache.activemq.command.RemoveInfo; 047import org.apache.activemq.command.Response; 048import org.apache.activemq.state.ConnectionStateTracker; 049import org.apache.activemq.state.Tracked; 050import org.apache.activemq.thread.Task; 051import org.apache.activemq.thread.TaskRunner; 052import org.apache.activemq.thread.TaskRunnerFactory; 053import org.apache.activemq.transport.CompositeTransport; 054import org.apache.activemq.transport.DefaultTransportListener; 055import org.apache.activemq.transport.FutureResponse; 056import org.apache.activemq.transport.ResponseCallback; 057import org.apache.activemq.transport.Transport; 058import org.apache.activemq.transport.TransportFactory; 059import org.apache.activemq.transport.TransportListener; 060import org.apache.activemq.util.IOExceptionSupport; 061import org.apache.activemq.util.ServiceSupport; 062import org.apache.activemq.util.URISupport; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066/** 067 * A Transport that is made reliable by being able to fail over to another 068 * transport when a transport failure is detected. 069 */ 070public class FailoverTransport implements CompositeTransport { 071 072 private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class); 073 private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10; 074 private static final int INFINITE = -1; 075 private TransportListener transportListener; 076 private boolean disposed; 077 private boolean connected; 078 private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>(); 079 private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>(); 080 081 private final Object reconnectMutex = new Object(); 082 private final Object backupMutex = new Object(); 083 private final Object sleepMutex = new Object(); 084 private final Object listenerMutex = new Object(); 085 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); 086 private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>(); 087 088 private URI connectedTransportURI; 089 private URI failedConnectTransportURI; 090 private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>(); 091 private final TaskRunnerFactory reconnectTaskFactory; 092 private final TaskRunner reconnectTask; 093 private boolean started; 094 private boolean initialized; 095 private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; 096 private long maxReconnectDelay = 1000 * 30; 097 private double backOffMultiplier = 2d; 098 private long timeout = INFINITE; 099 private boolean useExponentialBackOff = true; 100 private boolean randomize = true; 101 private int maxReconnectAttempts = INFINITE; 102 private int startupMaxReconnectAttempts = INFINITE; 103 private int connectFailures; 104 private int warnAfterReconnectAttempts = 10; 105 private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; 106 private Exception connectionFailure; 107 private boolean firstConnection = true; 108 // optionally always have a backup created 109 private boolean backup = false; 110 private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>(); 111 private int backupPoolSize = 1; 112 private boolean trackMessages = false; 113 private boolean trackTransactionProducers = true; 114 private int maxCacheSize = 128 * 1024; 115 private final TransportListener disposedListener = new DefaultTransportListener() { 116 }; 117 private final TransportListener myTransportListener = createTransportListener(); 118 private boolean updateURIsSupported = true; 119 private boolean reconnectSupported = true; 120 // remember for reconnect thread 121 private SslContext brokerSslContext; 122 private String updateURIsURL = null; 123 private boolean rebalanceUpdateURIs = true; 124 private boolean doRebalance = false; 125 private boolean connectedToPriority = false; 126 127 private boolean priorityBackup = false; 128 private final ArrayList<URI> priorityList = new ArrayList<URI>(); 129 private boolean priorityBackupAvailable = false; 130 private String nestedExtraQueryOptions; 131 private boolean shuttingDown = false; 132 133 public FailoverTransport() throws InterruptedIOException { 134 brokerSslContext = SslContext.getCurrentSslContext(); 135 stateTracker.setTrackTransactions(true); 136 // Setup a task that is used to reconnect the a connection async. 137 reconnectTaskFactory = new TaskRunnerFactory(); 138 reconnectTaskFactory.init(); 139 reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() { 140 @Override 141 public boolean iterate() { 142 boolean result = false; 143 if (!started) { 144 return result; 145 } 146 boolean buildBackup = true; 147 synchronized (backupMutex) { 148 if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) { 149 result = doReconnect(); 150 buildBackup = false; 151 } 152 } 153 if (buildBackup) { 154 buildBackups(); 155 if (priorityBackup && !connectedToPriority) { 156 try { 157 doDelay(); 158 if (reconnectTask == null) { 159 return true; 160 } 161 reconnectTask.wakeup(); 162 } catch (InterruptedException e) { 163 LOG.debug("Reconnect task has been interrupted.", e); 164 } 165 } 166 } else { 167 // build backups on the next iteration 168 buildBackup = true; 169 try { 170 if (reconnectTask == null) { 171 return true; 172 } 173 reconnectTask.wakeup(); 174 } catch (InterruptedException e) { 175 LOG.debug("Reconnect task has been interrupted.", e); 176 } 177 } 178 return result; 179 } 180 181 }, "ActiveMQ Failover Worker: " + System.identityHashCode(this)); 182 } 183 184 TransportListener createTransportListener() { 185 return new TransportListener() { 186 @Override 187 public void onCommand(Object o) { 188 Command command = (Command) o; 189 if (command == null) { 190 return; 191 } 192 if (command.isResponse()) { 193 Object object = null; 194 synchronized (requestMap) { 195 object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId())); 196 } 197 if (object != null && object.getClass() == Tracked.class) { 198 ((Tracked) object).onResponses(command); 199 } 200 } 201 if (!initialized) { 202 initialized = true; 203 } 204 205 if (command.isConnectionControl()) { 206 handleConnectionControl((ConnectionControl) command); 207 } 208 if (transportListener != null) { 209 transportListener.onCommand(command); 210 } 211 } 212 213 @Override 214 public void onException(IOException error) { 215 try { 216 handleTransportFailure(error); 217 } catch (InterruptedException e) { 218 Thread.currentThread().interrupt(); 219 transportListener.onException(new InterruptedIOException()); 220 } 221 } 222 223 @Override 224 public void transportInterupted() { 225 if (transportListener != null) { 226 transportListener.transportInterupted(); 227 } 228 } 229 230 @Override 231 public void transportResumed() { 232 if (transportListener != null) { 233 transportListener.transportResumed(); 234 } 235 } 236 }; 237 } 238 239 public final void disposeTransport(Transport transport) { 240 transport.setTransportListener(disposedListener); 241 ServiceSupport.dispose(transport); 242 } 243 244 public final void handleTransportFailure(IOException e) throws InterruptedException { 245 synchronized (reconnectMutex) { 246 if (shuttingDown) { 247 // shutdown info sent and remote socket closed and we see that before a local close 248 // let the close do the work 249 return; 250 } 251 252 if (LOG.isTraceEnabled()) { 253 LOG.trace(this + " handleTransportFailure: " + e, e); 254 } 255 256 Transport transport = connectedTransport.getAndSet(null); 257 258 if (transport != null) { 259 260 disposeTransport(transport); 261 262 boolean reconnectOk = false; 263 264 if (canReconnect()) { 265 reconnectOk = true; 266 } 267 LOG.warn("Transport (" + transport + ") failed" 268 + (reconnectOk ? "," : ", not") + " attempting to automatically reconnect", e); 269 270 initialized = false; 271 failedConnectTransportURI = connectedTransportURI; 272 connectedTransportURI = null; 273 connected = false; 274 connectedToPriority = false; 275 276 if (reconnectOk) { 277 // notify before any reconnect attempt so ack state can be whacked 278 if (transportListener != null) { 279 transportListener.transportInterupted(); 280 } 281 282 updated.remove(failedConnectTransportURI); 283 reconnectTask.wakeup(); 284 } else if (!isDisposed()) { 285 propagateFailureToExceptionListener(e); 286 } 287 } 288 } 289 } 290 291 private boolean canReconnect() { 292 return started && 0 != calculateReconnectAttemptLimit(); 293 } 294 295 public final void handleConnectionControl(ConnectionControl control) { 296 String reconnectStr = control.getReconnectTo(); 297 if (LOG.isTraceEnabled()) { 298 LOG.trace("Received ConnectionControl: {}", control); 299 } 300 301 if (reconnectStr != null) { 302 reconnectStr = reconnectStr.trim(); 303 if (reconnectStr.length() > 0) { 304 try { 305 URI uri = new URI(reconnectStr); 306 if (isReconnectSupported()) { 307 reconnect(uri); 308 LOG.info("Reconnected to: " + uri); 309 } 310 } catch (Exception e) { 311 LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e); 312 } 313 } 314 } 315 processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers()); 316 } 317 318 private final void processNewTransports(boolean rebalance, String newTransports) { 319 if (newTransports != null) { 320 newTransports = newTransports.trim(); 321 if (newTransports.length() > 0 && isUpdateURIsSupported()) { 322 List<URI> list = new ArrayList<URI>(); 323 StringTokenizer tokenizer = new StringTokenizer(newTransports, ","); 324 while (tokenizer.hasMoreTokens()) { 325 String str = tokenizer.nextToken(); 326 try { 327 URI uri = new URI(str); 328 list.add(uri); 329 } catch (Exception e) { 330 LOG.error("Failed to parse broker address: " + str, e); 331 } 332 } 333 if (list.isEmpty() == false) { 334 try { 335 updateURIs(rebalance, list.toArray(new URI[list.size()])); 336 } catch (IOException e) { 337 LOG.error("Failed to update transport URI's from: " + newTransports, e); 338 } 339 } 340 } 341 } 342 } 343 344 @Override 345 public void start() throws Exception { 346 synchronized (reconnectMutex) { 347 if (LOG.isDebugEnabled()) { 348 LOG.debug("Started " + this); 349 } 350 if (started) { 351 return; 352 } 353 started = true; 354 stateTracker.setMaxCacheSize(getMaxCacheSize()); 355 stateTracker.setTrackMessages(isTrackMessages()); 356 stateTracker.setTrackTransactionProducers(isTrackTransactionProducers()); 357 if (connectedTransport.get() != null) { 358 stateTracker.restore(connectedTransport.get()); 359 } else { 360 reconnect(false); 361 } 362 } 363 } 364 365 @Override 366 public void stop() throws Exception { 367 Transport transportToStop = null; 368 List<Transport> backupsToStop = new ArrayList<Transport>(backups.size()); 369 370 try { 371 synchronized (reconnectMutex) { 372 if (LOG.isDebugEnabled()) { 373 LOG.debug("Stopped " + this); 374 } 375 if (!started) { 376 return; 377 } 378 started = false; 379 disposed = true; 380 connected = false; 381 382 if (connectedTransport.get() != null) { 383 transportToStop = connectedTransport.getAndSet(null); 384 } 385 reconnectMutex.notifyAll(); 386 } 387 synchronized (sleepMutex) { 388 sleepMutex.notifyAll(); 389 } 390 } finally { 391 reconnectTask.shutdown(); 392 reconnectTaskFactory.shutdownNow(); 393 } 394 395 synchronized(backupMutex) { 396 for (BackupTransport backup : backups) { 397 backup.setDisposed(true); 398 Transport transport = backup.getTransport(); 399 if (transport != null) { 400 transport.setTransportListener(disposedListener); 401 backupsToStop.add(transport); 402 } 403 } 404 backups.clear(); 405 } 406 for (Transport transport : backupsToStop) { 407 try { 408 if (LOG.isTraceEnabled()) { 409 LOG.trace("Stopped backup: " + transport); 410 } 411 disposeTransport(transport); 412 } catch (Exception e) { 413 } 414 } 415 if (transportToStop != null) { 416 transportToStop.stop(); 417 } 418 } 419 420 public long getInitialReconnectDelay() { 421 return initialReconnectDelay; 422 } 423 424 public void setInitialReconnectDelay(long initialReconnectDelay) { 425 this.initialReconnectDelay = initialReconnectDelay; 426 } 427 428 public long getMaxReconnectDelay() { 429 return maxReconnectDelay; 430 } 431 432 public void setMaxReconnectDelay(long maxReconnectDelay) { 433 this.maxReconnectDelay = maxReconnectDelay; 434 } 435 436 public long getReconnectDelay() { 437 return reconnectDelay; 438 } 439 440 public void setReconnectDelay(long reconnectDelay) { 441 this.reconnectDelay = reconnectDelay; 442 } 443 444 public double getReconnectDelayExponent() { 445 return backOffMultiplier; 446 } 447 448 public void setReconnectDelayExponent(double reconnectDelayExponent) { 449 this.backOffMultiplier = reconnectDelayExponent; 450 } 451 452 public Transport getConnectedTransport() { 453 return connectedTransport.get(); 454 } 455 456 public URI getConnectedTransportURI() { 457 return connectedTransportURI; 458 } 459 460 public int getMaxReconnectAttempts() { 461 return maxReconnectAttempts; 462 } 463 464 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 465 this.maxReconnectAttempts = maxReconnectAttempts; 466 } 467 468 public int getStartupMaxReconnectAttempts() { 469 return this.startupMaxReconnectAttempts; 470 } 471 472 public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) { 473 this.startupMaxReconnectAttempts = startupMaxReconnectAttempts; 474 } 475 476 public long getTimeout() { 477 return timeout; 478 } 479 480 public void setTimeout(long timeout) { 481 this.timeout = timeout; 482 } 483 484 /** 485 * @return Returns the randomize. 486 */ 487 public boolean isRandomize() { 488 return randomize; 489 } 490 491 /** 492 * @param randomize The randomize to set. 493 */ 494 public void setRandomize(boolean randomize) { 495 this.randomize = randomize; 496 } 497 498 public boolean isBackup() { 499 return backup; 500 } 501 502 public void setBackup(boolean backup) { 503 this.backup = backup; 504 } 505 506 public int getBackupPoolSize() { 507 return backupPoolSize; 508 } 509 510 public void setBackupPoolSize(int backupPoolSize) { 511 this.backupPoolSize = backupPoolSize; 512 } 513 514 public int getCurrentBackups() { 515 return this.backups.size(); 516 } 517 518 public boolean isTrackMessages() { 519 return trackMessages; 520 } 521 522 public void setTrackMessages(boolean trackMessages) { 523 this.trackMessages = trackMessages; 524 } 525 526 public boolean isTrackTransactionProducers() { 527 return this.trackTransactionProducers; 528 } 529 530 public void setTrackTransactionProducers(boolean trackTransactionProducers) { 531 this.trackTransactionProducers = trackTransactionProducers; 532 } 533 534 public int getMaxCacheSize() { 535 return maxCacheSize; 536 } 537 538 public void setMaxCacheSize(int maxCacheSize) { 539 this.maxCacheSize = maxCacheSize; 540 } 541 542 public boolean isPriorityBackup() { 543 return priorityBackup; 544 } 545 546 public void setPriorityBackup(boolean priorityBackup) { 547 this.priorityBackup = priorityBackup; 548 } 549 550 public void setPriorityURIs(String priorityURIs) { 551 StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ","); 552 while (tokenizer.hasMoreTokens()) { 553 String str = tokenizer.nextToken(); 554 try { 555 URI uri = new URI(str); 556 priorityList.add(uri); 557 } catch (Exception e) { 558 LOG.error("Failed to parse broker address: " + str, e); 559 } 560 } 561 } 562 563 @Override 564 public void oneway(Object o) throws IOException { 565 566 Command command = (Command) o; 567 Exception error = null; 568 try { 569 570 synchronized (reconnectMutex) { 571 572 if (command != null && connectedTransport.get() == null) { 573 if (command.isShutdownInfo()) { 574 // Skipping send of ShutdownInfo command when not connected. 575 return; 576 } else if (command instanceof RemoveInfo || command.isMessageAck()) { 577 // Simulate response to RemoveInfo command or MessageAck (as it will be stale) 578 stateTracker.track(command); 579 if (command.isResponseRequired()) { 580 Response response = new Response(); 581 response.setCorrelationId(command.getCommandId()); 582 myTransportListener.onCommand(response); 583 } 584 return; 585 } else if (command instanceof MessagePull) { 586 // Simulate response to MessagePull if timed as we can't honor that now. 587 MessagePull pullRequest = (MessagePull) command; 588 if (pullRequest.getTimeout() != 0) { 589 MessageDispatch dispatch = new MessageDispatch(); 590 dispatch.setConsumerId(pullRequest.getConsumerId()); 591 dispatch.setDestination(pullRequest.getDestination()); 592 myTransportListener.onCommand(dispatch); 593 } 594 return; 595 } 596 } 597 598 // Keep trying until the message is sent. 599 for (int i = 0; !disposed; i++) { 600 try { 601 602 // Wait for transport to be connected. 603 Transport transport = connectedTransport.get(); 604 long start = System.currentTimeMillis(); 605 boolean timedout = false; 606 while (transport == null && !disposed && connectionFailure == null 607 && !Thread.currentThread().isInterrupted()) { 608 if (LOG.isTraceEnabled()) { 609 LOG.trace("Waiting for transport to reconnect..: " + command); 610 } 611 long end = System.currentTimeMillis(); 612 if (command.isMessage() && timeout > 0 && (end - start > timeout)) { 613 timedout = true; 614 if (LOG.isInfoEnabled()) { 615 LOG.info("Failover timed out after " + (end - start) + "ms"); 616 } 617 break; 618 } 619 try { 620 reconnectMutex.wait(100); 621 } catch (InterruptedException e) { 622 Thread.currentThread().interrupt(); 623 if (LOG.isDebugEnabled()) { 624 LOG.debug("Interupted: " + e, e); 625 } 626 } 627 transport = connectedTransport.get(); 628 } 629 630 if (transport == null) { 631 // Previous loop may have exited due to use being 632 // disposed. 633 if (disposed) { 634 error = new IOException("Transport disposed."); 635 } else if (connectionFailure != null) { 636 error = connectionFailure; 637 } else if (timedout == true) { 638 error = new IOException("Failover timeout of " + timeout + " ms reached."); 639 } else { 640 error = new IOException("Unexpected failure."); 641 } 642 break; 643 } 644 645 Tracked tracked = null; 646 try { 647 tracked = stateTracker.track(command); 648 } catch (IOException ioe) { 649 LOG.debug("Cannot track the command " + command, ioe); 650 } 651 // If it was a request and it was not being tracked by 652 // the state tracker, 653 // then hold it in the requestMap so that we can replay 654 // it later. 655 synchronized (requestMap) { 656 if (tracked != null && tracked.isWaitingForResponse()) { 657 requestMap.put(Integer.valueOf(command.getCommandId()), tracked); 658 } else if (tracked == null && command.isResponseRequired()) { 659 requestMap.put(Integer.valueOf(command.getCommandId()), command); 660 } 661 } 662 663 // Send the message. 664 try { 665 transport.oneway(command); 666 stateTracker.trackBack(command); 667 if (command.isShutdownInfo()) { 668 shuttingDown = true; 669 } 670 } catch (IOException e) { 671 672 // If the command was not tracked.. we will retry in 673 // this method 674 if (tracked == null) { 675 676 // since we will retry in this method.. take it 677 // out of the request 678 // map so that it is not sent 2 times on 679 // recovery 680 if (command.isResponseRequired()) { 681 requestMap.remove(Integer.valueOf(command.getCommandId())); 682 } 683 684 // Rethrow the exception so it will handled by 685 // the outer catch 686 throw e; 687 } else { 688 // Handle the error but allow the method to return since the 689 // tracked commands are replayed on reconnect. 690 if (LOG.isDebugEnabled()) { 691 LOG.debug("Send oneway attempt: " + i + " failed for command:" + command); 692 } 693 handleTransportFailure(e); 694 } 695 } 696 697 return; 698 699 } catch (IOException e) { 700 if (LOG.isDebugEnabled()) { 701 LOG.debug("Send oneway attempt: " + i + " failed for command:" + command); 702 } 703 handleTransportFailure(e); 704 } 705 } 706 } 707 } catch (InterruptedException e) { 708 // Some one may be trying to stop our thread. 709 Thread.currentThread().interrupt(); 710 throw new InterruptedIOException(); 711 } 712 713 if (!disposed) { 714 if (error != null) { 715 if (error instanceof IOException) { 716 throw (IOException) error; 717 } 718 throw IOExceptionSupport.create(error); 719 } 720 } 721 } 722 723 @Override 724 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 725 throw new AssertionError("Unsupported Method"); 726 } 727 728 @Override 729 public Object request(Object command) throws IOException { 730 throw new AssertionError("Unsupported Method"); 731 } 732 733 @Override 734 public Object request(Object command, int timeout) throws IOException { 735 throw new AssertionError("Unsupported Method"); 736 } 737 738 @Override 739 public void add(boolean rebalance, URI u[]) { 740 boolean newURI = false; 741 for (URI uri : u) { 742 if (!contains(uri)) { 743 uris.add(uri); 744 newURI = true; 745 } 746 } 747 if (newURI) { 748 reconnect(rebalance); 749 } 750 } 751 752 @Override 753 public void remove(boolean rebalance, URI u[]) { 754 for (URI uri : u) { 755 uris.remove(uri); 756 } 757 // rebalance is automatic if any connected to removed/stopped broker 758 } 759 760 public void add(boolean rebalance, String u) { 761 try { 762 URI newURI = new URI(u); 763 if (contains(newURI) == false) { 764 uris.add(newURI); 765 reconnect(rebalance); 766 } 767 768 } catch (Exception e) { 769 LOG.error("Failed to parse URI: " + u); 770 } 771 } 772 773 public void reconnect(boolean rebalance) { 774 synchronized (reconnectMutex) { 775 if (started) { 776 if (rebalance) { 777 doRebalance = true; 778 } 779 LOG.debug("Waking up reconnect task"); 780 try { 781 reconnectTask.wakeup(); 782 } catch (InterruptedException e) { 783 Thread.currentThread().interrupt(); 784 } 785 } else { 786 LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport."); 787 } 788 } 789 } 790 791 private List<URI> getConnectList() { 792 if (!updated.isEmpty()) { 793 return updated; 794 } 795 ArrayList<URI> l = new ArrayList<URI>(uris); 796 boolean removed = false; 797 if (failedConnectTransportURI != null) { 798 removed = l.remove(failedConnectTransportURI); 799 } 800 if (randomize) { 801 // Randomly, reorder the list by random swapping 802 for (int i = 0; i < l.size(); i++) { 803 // meed parenthesis due other JDKs (see AMQ-4826) 804 int p = ((int) (Math.random() * 100)) % l.size(); 805 URI t = l.get(p); 806 l.set(p, l.get(i)); 807 l.set(i, t); 808 } 809 } 810 if (removed) { 811 l.add(failedConnectTransportURI); 812 } 813 if (LOG.isDebugEnabled()) { 814 LOG.debug("urlList connectionList:" + l + ", from: " + uris); 815 } 816 return l; 817 } 818 819 @Override 820 public TransportListener getTransportListener() { 821 return transportListener; 822 } 823 824 @Override 825 public void setTransportListener(TransportListener commandListener) { 826 synchronized (listenerMutex) { 827 this.transportListener = commandListener; 828 listenerMutex.notifyAll(); 829 } 830 } 831 832 @Override 833 public <T> T narrow(Class<T> target) { 834 835 if (target.isAssignableFrom(getClass())) { 836 return target.cast(this); 837 } 838 Transport transport = connectedTransport.get(); 839 if (transport != null) { 840 return transport.narrow(target); 841 } 842 return null; 843 844 } 845 846 protected void restoreTransport(Transport t) throws Exception, IOException { 847 t.start(); 848 // send information to the broker - informing it we are an ft client 849 ConnectionControl cc = new ConnectionControl(); 850 cc.setFaultTolerant(true); 851 t.oneway(cc); 852 stateTracker.restore(t); 853 Map<Integer, Command> tmpMap = null; 854 synchronized (requestMap) { 855 tmpMap = new LinkedHashMap<Integer, Command>(requestMap); 856 } 857 for (Command command : tmpMap.values()) { 858 if (LOG.isTraceEnabled()) { 859 LOG.trace("restore requestMap, replay: " + command); 860 } 861 t.oneway(command); 862 } 863 } 864 865 public boolean isUseExponentialBackOff() { 866 return useExponentialBackOff; 867 } 868 869 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 870 this.useExponentialBackOff = useExponentialBackOff; 871 } 872 873 @Override 874 public String toString() { 875 return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString(); 876 } 877 878 @Override 879 public String getRemoteAddress() { 880 Transport transport = connectedTransport.get(); 881 if (transport != null) { 882 return transport.getRemoteAddress(); 883 } 884 return null; 885 } 886 887 @Override 888 public boolean isFaultTolerant() { 889 return true; 890 } 891 892 private void doUpdateURIsFromDisk() { 893 // If updateURIsURL is specified, read the file and add any new 894 // transport URI's to this FailOverTransport. 895 // Note: Could track file timestamp to avoid unnecessary reading. 896 String fileURL = getUpdateURIsURL(); 897 if (fileURL != null) { 898 BufferedReader in = null; 899 String newUris = null; 900 StringBuffer buffer = new StringBuffer(); 901 902 try { 903 in = new BufferedReader(getURLStream(fileURL)); 904 while (true) { 905 String line = in.readLine(); 906 if (line == null) { 907 break; 908 } 909 buffer.append(line); 910 } 911 newUris = buffer.toString(); 912 } catch (IOException ioe) { 913 LOG.error("Failed to read updateURIsURL: " + fileURL, ioe); 914 } finally { 915 if (in != null) { 916 try { 917 in.close(); 918 } catch (IOException ioe) { 919 // ignore 920 } 921 } 922 } 923 924 processNewTransports(isRebalanceUpdateURIs(), newUris); 925 } 926 } 927 928 final boolean doReconnect() { 929 Exception failure = null; 930 synchronized (reconnectMutex) { 931 932 // First ensure we are up to date. 933 doUpdateURIsFromDisk(); 934 935 if (disposed || connectionFailure != null) { 936 reconnectMutex.notifyAll(); 937 } 938 if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) { 939 return false; 940 } else { 941 List<URI> connectList = getConnectList(); 942 if (connectList.isEmpty()) { 943 failure = new IOException("No uris available to connect to."); 944 } else { 945 if (doRebalance) { 946 if (connectedToPriority || compareURIs(connectList.get(0), connectedTransportURI)) { 947 // already connected to first in the list, no need to rebalance 948 doRebalance = false; 949 return false; 950 } else { 951 if (LOG.isDebugEnabled()) { 952 LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList); 953 } 954 955 try { 956 Transport transport = this.connectedTransport.getAndSet(null); 957 if (transport != null) { 958 disposeTransport(transport); 959 } 960 } catch (Exception e) { 961 if (LOG.isDebugEnabled()) { 962 LOG.debug("Caught an exception stopping existing transport for rebalance", e); 963 } 964 } 965 } 966 doRebalance = false; 967 } 968 969 resetReconnectDelay(); 970 971 Transport transport = null; 972 URI uri = null; 973 974 // If we have a backup already waiting lets try it. 975 synchronized (backupMutex) { 976 if ((priorityBackup || backup) && !backups.isEmpty()) { 977 ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups); 978 if (randomize) { 979 Collections.shuffle(l); 980 } 981 BackupTransport bt = l.remove(0); 982 backups.remove(bt); 983 transport = bt.getTransport(); 984 uri = bt.getUri(); 985 if (priorityBackup && priorityBackupAvailable) { 986 Transport old = this.connectedTransport.getAndSet(null); 987 if (old != null) { 988 disposeTransport(old); 989 } 990 priorityBackupAvailable = false; 991 } 992 } 993 } 994 995 // Sleep for the reconnectDelay if there's no backup and we aren't trying 996 // for the first time, or we were disposed for some reason. 997 if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) { 998 synchronized (sleepMutex) { 999 if (LOG.isDebugEnabled()) { 1000 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); 1001 } 1002 try { 1003 sleepMutex.wait(reconnectDelay); 1004 } catch (InterruptedException e) { 1005 Thread.currentThread().interrupt(); 1006 } 1007 } 1008 } 1009 1010 Iterator<URI> iter = connectList.iterator(); 1011 while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) { 1012 1013 try { 1014 SslContext.setCurrentSslContext(brokerSslContext); 1015 1016 // We could be starting with a backup and if so we wait to grab a 1017 // URI from the pool until next time around. 1018 if (transport == null) { 1019 uri = addExtraQueryOptions(iter.next()); 1020 transport = TransportFactory.compositeConnect(uri); 1021 } 1022 1023 if (LOG.isDebugEnabled()) { 1024 LOG.debug("Attempting " + connectFailures + "th connect to: " + uri); 1025 } 1026 transport.setTransportListener(myTransportListener); 1027 transport.start(); 1028 1029 if (started && !firstConnection) { 1030 restoreTransport(transport); 1031 } 1032 1033 if (LOG.isDebugEnabled()) { 1034 LOG.debug("Connection established"); 1035 } 1036 reconnectDelay = initialReconnectDelay; 1037 connectedTransportURI = uri; 1038 connectedTransport.set(transport); 1039 connectedToPriority = isPriority(connectedTransportURI); 1040 reconnectMutex.notifyAll(); 1041 connectFailures = 0; 1042 1043 // Make sure on initial startup, that the transportListener 1044 // has been initialized for this instance. 1045 synchronized (listenerMutex) { 1046 if (transportListener == null) { 1047 try { 1048 // if it isn't set after 2secs - it probably never will be 1049 listenerMutex.wait(2000); 1050 } catch (InterruptedException ex) { 1051 } 1052 } 1053 } 1054 1055 if (transportListener != null) { 1056 transportListener.transportResumed(); 1057 } else { 1058 if (LOG.isDebugEnabled()) { 1059 LOG.debug("transport resumed by transport listener not set"); 1060 } 1061 } 1062 1063 if (firstConnection) { 1064 firstConnection = false; 1065 LOG.info("Successfully connected to " + uri); 1066 } else { 1067 LOG.info("Successfully reconnected to " + uri); 1068 } 1069 1070 connected = true; 1071 return false; 1072 } catch (Exception e) { 1073 failure = e; 1074 if (LOG.isDebugEnabled()) { 1075 LOG.debug("Connect fail to: " + uri + ", reason: " + e); 1076 } 1077 if (transport != null) { 1078 try { 1079 transport.stop(); 1080 transport = null; 1081 } catch (Exception ee) { 1082 if (LOG.isDebugEnabled()) { 1083 LOG.debug("Stop of failed transport: " + transport + 1084 " failed with reason: " + ee); 1085 } 1086 } 1087 } 1088 } finally { 1089 SslContext.setCurrentSslContext(null); 1090 } 1091 } 1092 } 1093 } 1094 1095 int reconnectLimit = calculateReconnectAttemptLimit(); 1096 1097 connectFailures++; 1098 if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) { 1099 LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)"); 1100 connectionFailure = failure; 1101 1102 // Make sure on initial startup, that the transportListener has been 1103 // initialized for this instance. 1104 synchronized (listenerMutex) { 1105 if (transportListener == null) { 1106 try { 1107 listenerMutex.wait(2000); 1108 } catch (InterruptedException ex) { 1109 } 1110 } 1111 } 1112 1113 propagateFailureToExceptionListener(connectionFailure); 1114 return false; 1115 } 1116 1117 int warnInterval = getWarnAfterReconnectAttempts(); 1118 if (warnInterval > 0 && (connectFailures % warnInterval) == 0) { 1119 LOG.warn("Failed to connect to {} after: {} attempt(s) continuing to retry.", 1120 uris, connectFailures); 1121 } 1122 } 1123 1124 if (!disposed) { 1125 doDelay(); 1126 } 1127 1128 return !disposed; 1129 } 1130 1131 private void doDelay() { 1132 if (reconnectDelay > 0) { 1133 synchronized (sleepMutex) { 1134 if (LOG.isDebugEnabled()) { 1135 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection"); 1136 } 1137 try { 1138 sleepMutex.wait(reconnectDelay); 1139 } catch (InterruptedException e) { 1140 Thread.currentThread().interrupt(); 1141 } 1142 } 1143 } 1144 1145 if (useExponentialBackOff) { 1146 // Exponential increment of reconnect delay. 1147 reconnectDelay *= backOffMultiplier; 1148 if (reconnectDelay > maxReconnectDelay) { 1149 reconnectDelay = maxReconnectDelay; 1150 } 1151 } 1152 } 1153 1154 private void resetReconnectDelay() { 1155 if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) { 1156 reconnectDelay = initialReconnectDelay; 1157 } 1158 } 1159 1160 /* 1161 * called with reconnectMutex held 1162 */ 1163 private void propagateFailureToExceptionListener(Exception exception) { 1164 if (transportListener != null) { 1165 if (exception instanceof IOException) { 1166 transportListener.onException((IOException)exception); 1167 } else { 1168 transportListener.onException(IOExceptionSupport.create(exception)); 1169 } 1170 } 1171 reconnectMutex.notifyAll(); 1172 } 1173 1174 private int calculateReconnectAttemptLimit() { 1175 int maxReconnectValue = this.maxReconnectAttempts; 1176 if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) { 1177 maxReconnectValue = this.startupMaxReconnectAttempts; 1178 } 1179 return maxReconnectValue; 1180 } 1181 1182 private boolean shouldBuildBackups() { 1183 return (backup && backups.size() < backupPoolSize) || (priorityBackup && !(priorityBackupAvailable || connectedToPriority)); 1184 } 1185 1186 final boolean buildBackups() { 1187 synchronized (backupMutex) { 1188 if (!disposed && shouldBuildBackups()) { 1189 ArrayList<URI> backupList = new ArrayList<URI>(priorityList); 1190 List<URI> connectList = getConnectList(); 1191 for (URI uri: connectList) { 1192 if (!backupList.contains(uri)) { 1193 backupList.add(uri); 1194 } 1195 } 1196 // removed disposed backups 1197 List<BackupTransport> disposedList = new ArrayList<BackupTransport>(); 1198 for (BackupTransport bt : backups) { 1199 if (bt.isDisposed()) { 1200 disposedList.add(bt); 1201 } 1202 } 1203 backups.removeAll(disposedList); 1204 disposedList.clear(); 1205 for (Iterator<URI> iter = backupList.iterator(); !disposed && iter.hasNext() && shouldBuildBackups(); ) { 1206 URI uri = addExtraQueryOptions(iter.next()); 1207 if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) { 1208 try { 1209 SslContext.setCurrentSslContext(brokerSslContext); 1210 BackupTransport bt = new BackupTransport(this); 1211 bt.setUri(uri); 1212 if (!backups.contains(bt)) { 1213 Transport t = TransportFactory.compositeConnect(uri); 1214 t.setTransportListener(bt); 1215 t.start(); 1216 bt.setTransport(t); 1217 if (priorityBackup && isPriority(uri)) { 1218 priorityBackupAvailable = true; 1219 backups.add(0, bt); 1220 // if this priority backup overflows the pool 1221 // remove the backup with the lowest priority 1222 if (backups.size() > backupPoolSize) { 1223 BackupTransport disposeTransport = backups.remove(backups.size() - 1); 1224 disposeTransport.setDisposed(true); 1225 Transport transport = disposeTransport.getTransport(); 1226 if (transport != null) { 1227 transport.setTransportListener(disposedListener); 1228 disposeTransport(transport); 1229 } 1230 } 1231 } else { 1232 backups.add(bt); 1233 } 1234 } 1235 } catch (Exception e) { 1236 LOG.debug("Failed to build backup ", e); 1237 } finally { 1238 SslContext.setCurrentSslContext(null); 1239 } 1240 } 1241 } 1242 } 1243 } 1244 return false; 1245 } 1246 1247 protected boolean isPriority(URI uri) { 1248 if (!priorityBackup) { 1249 return false; 1250 } 1251 1252 if (!priorityList.isEmpty()) { 1253 return priorityList.contains(uri); 1254 } 1255 return uris.indexOf(uri) == 0; 1256 } 1257 1258 @Override 1259 public boolean isDisposed() { 1260 return disposed; 1261 } 1262 1263 @Override 1264 public boolean isConnected() { 1265 return connected; 1266 } 1267 1268 @Override 1269 public void reconnect(URI uri) throws IOException { 1270 add(true, new URI[]{uri}); 1271 } 1272 1273 @Override 1274 public boolean isReconnectSupported() { 1275 return this.reconnectSupported; 1276 } 1277 1278 public void setReconnectSupported(boolean value) { 1279 this.reconnectSupported = value; 1280 } 1281 1282 @Override 1283 public boolean isUpdateURIsSupported() { 1284 return this.updateURIsSupported; 1285 } 1286 1287 public void setUpdateURIsSupported(boolean value) { 1288 this.updateURIsSupported = value; 1289 } 1290 1291 @Override 1292 public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException { 1293 if (isUpdateURIsSupported()) { 1294 HashSet<URI> copy = new HashSet<URI>(this.updated); 1295 updated.clear(); 1296 if (updatedURIs != null && updatedURIs.length > 0) { 1297 for (URI uri : updatedURIs) { 1298 if (uri != null && !updated.contains(uri)) { 1299 updated.add(uri); 1300 } 1301 } 1302 if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) { 1303 buildBackups(); 1304 synchronized (reconnectMutex) { 1305 reconnect(rebalance); 1306 } 1307 } 1308 } 1309 } 1310 } 1311 1312 /** 1313 * @return the updateURIsURL 1314 */ 1315 public String getUpdateURIsURL() { 1316 return this.updateURIsURL; 1317 } 1318 1319 /** 1320 * @param updateURIsURL the updateURIsURL to set 1321 */ 1322 public void setUpdateURIsURL(String updateURIsURL) { 1323 this.updateURIsURL = updateURIsURL; 1324 } 1325 1326 /** 1327 * @return the rebalanceUpdateURIs 1328 */ 1329 public boolean isRebalanceUpdateURIs() { 1330 return this.rebalanceUpdateURIs; 1331 } 1332 1333 /** 1334 * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set 1335 */ 1336 public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) { 1337 this.rebalanceUpdateURIs = rebalanceUpdateURIs; 1338 } 1339 1340 @Override 1341 public int getReceiveCounter() { 1342 Transport transport = connectedTransport.get(); 1343 if (transport == null) { 1344 return 0; 1345 } 1346 return transport.getReceiveCounter(); 1347 } 1348 1349 public int getConnectFailures() { 1350 return connectFailures; 1351 } 1352 1353 public void connectionInterruptProcessingComplete(ConnectionId connectionId) { 1354 synchronized (reconnectMutex) { 1355 stateTracker.connectionInterruptProcessingComplete(this, connectionId); 1356 } 1357 } 1358 1359 public ConnectionStateTracker getStateTracker() { 1360 return stateTracker; 1361 } 1362 1363 private boolean contains(URI newURI) { 1364 boolean result = false; 1365 for (URI uri : uris) { 1366 if (compareURIs(newURI, uri)) { 1367 result = true; 1368 break; 1369 } 1370 } 1371 1372 return result; 1373 } 1374 1375 private boolean compareURIs(final URI first, final URI second) { 1376 1377 boolean result = false; 1378 if (first == null || second == null) { 1379 return result; 1380 } 1381 1382 if (first.getPort() == second.getPort()) { 1383 InetAddress firstAddr = null; 1384 InetAddress secondAddr = null; 1385 try { 1386 firstAddr = InetAddress.getByName(first.getHost()); 1387 secondAddr = InetAddress.getByName(second.getHost()); 1388 1389 if (firstAddr.equals(secondAddr)) { 1390 result = true; 1391 } 1392 1393 } catch(IOException e) { 1394 1395 if (firstAddr == null) { 1396 LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e); 1397 } else { 1398 LOG.error("Failed to Lookup INetAddress for URI[ " + second + " ] : " + e); 1399 } 1400 1401 if (first.getHost().equalsIgnoreCase(second.getHost())) { 1402 result = true; 1403 } 1404 } 1405 } 1406 1407 return result; 1408 } 1409 1410 private InputStreamReader getURLStream(String path) throws IOException { 1411 InputStreamReader result = null; 1412 URL url = null; 1413 try { 1414 url = new URL(path); 1415 result = new InputStreamReader(url.openStream()); 1416 } catch (MalformedURLException e) { 1417 // ignore - it could be a path to a a local file 1418 } 1419 if (result == null) { 1420 result = new FileReader(path); 1421 } 1422 return result; 1423 } 1424 1425 private URI addExtraQueryOptions(URI uri) { 1426 try { 1427 if( nestedExtraQueryOptions!=null && !nestedExtraQueryOptions.isEmpty() ) { 1428 if( uri.getQuery() == null ) { 1429 uri = URISupport.createURIWithQuery(uri, nestedExtraQueryOptions); 1430 } else { 1431 uri = URISupport.createURIWithQuery(uri, uri.getQuery()+"&"+nestedExtraQueryOptions); 1432 } 1433 } 1434 } catch (URISyntaxException e) { 1435 throw new RuntimeException(e); 1436 } 1437 return uri; 1438 } 1439 1440 public void setNestedExtraQueryOptions(String nestedExtraQueryOptions) { 1441 this.nestedExtraQueryOptions = nestedExtraQueryOptions; 1442 } 1443 1444 public int getWarnAfterReconnectAttempts() { 1445 return warnAfterReconnectAttempts; 1446 } 1447 1448 /** 1449 * Sets the number of Connect / Reconnect attempts that must occur before a warn message 1450 * is logged indicating that the transport is not connected. This can be useful when the 1451 * client is running inside some container or service as it give an indication of some 1452 * problem with the client connection that might not otherwise be visible. To disable the 1453 * log messages this value should be set to a value @{code attempts <= 0} 1454 * 1455 * @param warnAfterReconnectAttempts 1456 * The number of failed connection attempts that must happen before a warning is logged. 1457 */ 1458 public void setWarnAfterReconnectAttempts(int warnAfterReconnectAttempts) { 1459 this.warnAfterReconnectAttempts = warnAfterReconnectAttempts; 1460 } 1461 1462}