001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.fanout; 018 019import java.io.IOException; 020import java.io.InterruptedIOException; 021import java.net.URI; 022import java.util.ArrayList; 023import java.util.Iterator; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.atomic.AtomicInteger; 026 027import org.apache.activemq.command.Command; 028import org.apache.activemq.command.ConsumerInfo; 029import org.apache.activemq.command.Message; 030import org.apache.activemq.command.RemoveInfo; 031import org.apache.activemq.command.Response; 032import org.apache.activemq.state.ConnectionStateTracker; 033import org.apache.activemq.thread.Task; 034import org.apache.activemq.thread.TaskRunner; 035import org.apache.activemq.thread.TaskRunnerFactory; 036import org.apache.activemq.transport.CompositeTransport; 037import org.apache.activemq.transport.DefaultTransportListener; 038import org.apache.activemq.transport.FutureResponse; 039import org.apache.activemq.transport.ResponseCallback; 040import org.apache.activemq.transport.Transport; 041import org.apache.activemq.transport.TransportFactory; 042import org.apache.activemq.transport.TransportListener; 043import org.apache.activemq.util.IOExceptionSupport; 044import org.apache.activemq.util.ServiceStopper; 045import org.apache.activemq.util.ServiceSupport; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * A Transport that fans out a connection to multiple brokers. 051 * 052 * 053 */ 054public class FanoutTransport implements CompositeTransport { 055 056 private static final Logger LOG = LoggerFactory.getLogger(FanoutTransport.class); 057 058 private TransportListener transportListener; 059 private boolean disposed; 060 private boolean connected; 061 062 private final Object reconnectMutex = new Object(); 063 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); 064 private final ConcurrentHashMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>(); 065 066 private final TaskRunnerFactory reconnectTaskFactory; 067 private final TaskRunner reconnectTask; 068 private boolean started; 069 070 private final ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>(); 071 private int connectedCount; 072 073 private int minAckCount = 2; 074 075 private long initialReconnectDelay = 10; 076 private long maxReconnectDelay = 1000 * 30; 077 private long backOffMultiplier = 2; 078 private final boolean useExponentialBackOff = true; 079 private int maxReconnectAttempts; 080 private Exception connectionFailure; 081 private FanoutTransportHandler primary; 082 private boolean fanOutQueues = false; 083 084 static class RequestCounter { 085 086 final Command command; 087 final AtomicInteger ackCount; 088 089 RequestCounter(Command command, int count) { 090 this.command = command; 091 this.ackCount = new AtomicInteger(count); 092 } 093 094 @Override 095 public String toString() { 096 return command.getCommandId() + "=" + ackCount.get(); 097 } 098 } 099 100 class FanoutTransportHandler extends DefaultTransportListener { 101 102 private final URI uri; 103 private Transport transport; 104 105 private int connectFailures; 106 private long reconnectDelay = initialReconnectDelay; 107 private long reconnectDate; 108 109 public FanoutTransportHandler(URI uri) { 110 this.uri = uri; 111 } 112 113 @Override 114 public void onCommand(Object o) { 115 Command command = (Command)o; 116 if (command.isResponse()) { 117 Integer id = new Integer(((Response)command).getCorrelationId()); 118 RequestCounter rc = requestMap.get(id); 119 if (rc != null) { 120 if (rc.ackCount.decrementAndGet() <= 0) { 121 requestMap.remove(id); 122 transportListenerOnCommand(command); 123 } 124 } else { 125 transportListenerOnCommand(command); 126 } 127 } else { 128 transportListenerOnCommand(command); 129 } 130 } 131 132 @Override 133 public void onException(IOException error) { 134 try { 135 synchronized (reconnectMutex) { 136 if (transport == null || !transport.isConnected()) { 137 return; 138 } 139 140 LOG.debug("Transport failed, starting up reconnect task", error); 141 142 ServiceSupport.dispose(transport); 143 transport = null; 144 connectedCount--; 145 if (primary == this) { 146 primary = null; 147 } 148 reconnectTask.wakeup(); 149 } 150 } catch (InterruptedException e) { 151 Thread.currentThread().interrupt(); 152 if (transportListener != null) { 153 transportListener.onException(new InterruptedIOException()); 154 } 155 } 156 } 157 } 158 159 public FanoutTransport() throws InterruptedIOException { 160 // Setup a task that is used to reconnect the a connection async. 161 reconnectTaskFactory = new TaskRunnerFactory(); 162 reconnectTaskFactory.init(); 163 reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() { 164 public boolean iterate() { 165 return doConnect(); 166 } 167 }, "ActiveMQ Fanout Worker: " + System.identityHashCode(this)); 168 } 169 170 /** 171 * @return 172 */ 173 private boolean doConnect() { 174 long closestReconnectDate = 0; 175 synchronized (reconnectMutex) { 176 177 if (disposed || connectionFailure != null) { 178 reconnectMutex.notifyAll(); 179 } 180 181 if (transports.size() == connectedCount || disposed || connectionFailure != null) { 182 return false; 183 } else { 184 185 if (transports.isEmpty()) { 186 // connectionFailure = new IOException("No uris available to 187 // connect to."); 188 } else { 189 190 // Try to connect them up. 191 Iterator<FanoutTransportHandler> iter = transports.iterator(); 192 for (int i = 0; iter.hasNext() && !disposed; i++) { 193 194 long now = System.currentTimeMillis(); 195 196 FanoutTransportHandler fanoutHandler = iter.next(); 197 if (fanoutHandler.transport != null) { 198 continue; 199 } 200 201 // Are we waiting a little to try to reconnect this one? 202 if (fanoutHandler.reconnectDate != 0 && fanoutHandler.reconnectDate > now) { 203 if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) { 204 closestReconnectDate = fanoutHandler.reconnectDate; 205 } 206 continue; 207 } 208 209 URI uri = fanoutHandler.uri; 210 try { 211 LOG.debug("Stopped: " + this); 212 LOG.debug("Attempting connect to: " + uri); 213 Transport t = TransportFactory.compositeConnect(uri); 214 fanoutHandler.transport = t; 215 t.setTransportListener(fanoutHandler); 216 if (started) { 217 restoreTransport(fanoutHandler); 218 } 219 LOG.debug("Connection established"); 220 fanoutHandler.reconnectDelay = initialReconnectDelay; 221 fanoutHandler.connectFailures = 0; 222 if (primary == null) { 223 primary = fanoutHandler; 224 } 225 connectedCount++; 226 } catch (Exception e) { 227 LOG.debug("Connect fail to: " + uri + ", reason: " + e); 228 229 if( fanoutHandler.transport !=null ) { 230 ServiceSupport.dispose(fanoutHandler.transport); 231 fanoutHandler.transport=null; 232 } 233 234 if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) { 235 LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)"); 236 connectionFailure = e; 237 reconnectMutex.notifyAll(); 238 return false; 239 } else { 240 241 if (useExponentialBackOff) { 242 // Exponential increment of reconnect delay. 243 fanoutHandler.reconnectDelay *= backOffMultiplier; 244 if (fanoutHandler.reconnectDelay > maxReconnectDelay) { 245 fanoutHandler.reconnectDelay = maxReconnectDelay; 246 } 247 } 248 249 fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay; 250 251 if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) { 252 closestReconnectDate = fanoutHandler.reconnectDate; 253 } 254 } 255 } 256 } 257 if (transports.size() == connectedCount || disposed) { 258 reconnectMutex.notifyAll(); 259 return false; 260 } 261 262 } 263 } 264 265 } 266 267 try { 268 long reconnectDelay = closestReconnectDate - System.currentTimeMillis(); 269 if (reconnectDelay > 0) { 270 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); 271 Thread.sleep(reconnectDelay); 272 } 273 } catch (InterruptedException e1) { 274 Thread.currentThread().interrupt(); 275 } 276 return true; 277 } 278 279 public void start() throws Exception { 280 synchronized (reconnectMutex) { 281 LOG.debug("Started."); 282 if (started) { 283 return; 284 } 285 started = true; 286 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 287 FanoutTransportHandler th = iter.next(); 288 if (th.transport != null) { 289 restoreTransport(th); 290 } 291 } 292 connected=true; 293 } 294 } 295 296 public void stop() throws Exception { 297 try { 298 synchronized (reconnectMutex) { 299 ServiceStopper ss = new ServiceStopper(); 300 301 if (!started) { 302 return; 303 } 304 started = false; 305 disposed = true; 306 connected=false; 307 308 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 309 FanoutTransportHandler th = iter.next(); 310 if (th.transport != null) { 311 ss.stop(th.transport); 312 } 313 } 314 315 LOG.debug("Stopped: " + this); 316 ss.throwFirstException(); 317 } 318 } finally { 319 reconnectTask.shutdown(); 320 reconnectTaskFactory.shutdownNow(); 321 } 322 } 323 324 public int getMinAckCount() { 325 return minAckCount; 326 } 327 328 public void setMinAckCount(int minAckCount) { 329 this.minAckCount = minAckCount; 330 } 331 332 public long getInitialReconnectDelay() { 333 return initialReconnectDelay; 334 } 335 336 public void setInitialReconnectDelay(long initialReconnectDelay) { 337 this.initialReconnectDelay = initialReconnectDelay; 338 } 339 340 public long getMaxReconnectDelay() { 341 return maxReconnectDelay; 342 } 343 344 public void setMaxReconnectDelay(long maxReconnectDelay) { 345 this.maxReconnectDelay = maxReconnectDelay; 346 } 347 348 public long getReconnectDelayExponent() { 349 return backOffMultiplier; 350 } 351 352 public void setReconnectDelayExponent(long reconnectDelayExponent) { 353 this.backOffMultiplier = reconnectDelayExponent; 354 } 355 356 public int getMaxReconnectAttempts() { 357 return maxReconnectAttempts; 358 } 359 360 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 361 this.maxReconnectAttempts = maxReconnectAttempts; 362 } 363 364 public void oneway(Object o) throws IOException { 365 final Command command = (Command)o; 366 try { 367 synchronized (reconnectMutex) { 368 369 // Wait for transport to be connected. 370 while (connectedCount < minAckCount && !disposed && connectionFailure == null) { 371 LOG.debug("Waiting for at least " + minAckCount + " transports to be connected."); 372 reconnectMutex.wait(1000); 373 } 374 375 // Still not fully connected. 376 if (connectedCount < minAckCount) { 377 378 Exception error; 379 380 // Throw the right kind of error.. 381 if (disposed) { 382 error = new IOException("Transport disposed."); 383 } else if (connectionFailure != null) { 384 error = connectionFailure; 385 } else { 386 error = new IOException("Unexpected failure."); 387 } 388 389 if (error instanceof IOException) { 390 throw (IOException)error; 391 } 392 throw IOExceptionSupport.create(error); 393 } 394 395 // If it was a request and it was not being tracked by 396 // the state tracker, 397 // then hold it in the requestMap so that we can replay 398 // it later. 399 boolean fanout = isFanoutCommand(command); 400 if (stateTracker.track(command) == null && command.isResponseRequired()) { 401 int size = fanout ? minAckCount : 1; 402 requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size)); 403 } 404 405 // Send the message. 406 if (fanout) { 407 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 408 FanoutTransportHandler th = iter.next(); 409 if (th.transport != null) { 410 try { 411 th.transport.oneway(command); 412 } catch (IOException e) { 413 LOG.debug("Send attempt: failed."); 414 th.onException(e); 415 } 416 } 417 } 418 } else { 419 try { 420 primary.transport.oneway(command); 421 } catch (IOException e) { 422 LOG.debug("Send attempt: failed."); 423 primary.onException(e); 424 } 425 } 426 427 } 428 } catch (InterruptedException e) { 429 // Some one may be trying to stop our thread. 430 Thread.currentThread().interrupt(); 431 throw new InterruptedIOException(); 432 } 433 } 434 435 /** 436 * @param command 437 * @return 438 */ 439 private boolean isFanoutCommand(Command command) { 440 if (command.isMessage()) { 441 if( fanOutQueues ) { 442 return true; 443 } 444 return ((Message)command).getDestination().isTopic(); 445 } 446 if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE || 447 command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) { 448 return false; 449 } 450 return true; 451 } 452 453 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 454 throw new AssertionError("Unsupported Method"); 455 } 456 457 public Object request(Object command) throws IOException { 458 throw new AssertionError("Unsupported Method"); 459 } 460 461 public Object request(Object command, int timeout) throws IOException { 462 throw new AssertionError("Unsupported Method"); 463 } 464 465 public void reconnect() { 466 LOG.debug("Waking up reconnect task"); 467 try { 468 reconnectTask.wakeup(); 469 } catch (InterruptedException e) { 470 Thread.currentThread().interrupt(); 471 } 472 } 473 474 public TransportListener getTransportListener() { 475 return transportListener; 476 } 477 478 public void setTransportListener(TransportListener commandListener) { 479 this.transportListener = commandListener; 480 } 481 482 public <T> T narrow(Class<T> target) { 483 484 if (target.isAssignableFrom(getClass())) { 485 return target.cast(this); 486 } 487 488 synchronized (reconnectMutex) { 489 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 490 FanoutTransportHandler th = iter.next(); 491 if (th.transport != null) { 492 T rc = th.transport.narrow(target); 493 if (rc != null) { 494 return rc; 495 } 496 } 497 } 498 } 499 500 return null; 501 502 } 503 504 protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException { 505 th.transport.start(); 506 stateTracker.setRestoreConsumers(th.transport == primary); 507 stateTracker.restore(th.transport); 508 for (Iterator<RequestCounter> iter2 = requestMap.values().iterator(); iter2.hasNext();) { 509 RequestCounter rc = iter2.next(); 510 th.transport.oneway(rc.command); 511 } 512 } 513 514 public void add(boolean reblance,URI uris[]) { 515 516 synchronized (reconnectMutex) { 517 for (int i = 0; i < uris.length; i++) { 518 URI uri = uris[i]; 519 520 boolean match = false; 521 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 522 FanoutTransportHandler th = iter.next(); 523 if (th.uri.equals(uri)) { 524 match = true; 525 break; 526 } 527 } 528 if (!match) { 529 FanoutTransportHandler th = new FanoutTransportHandler(uri); 530 transports.add(th); 531 reconnect(); 532 } 533 } 534 } 535 536 } 537 538 public void remove(boolean rebalance,URI uris[]) { 539 540 synchronized (reconnectMutex) { 541 for (int i = 0; i < uris.length; i++) { 542 URI uri = uris[i]; 543 544 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) { 545 FanoutTransportHandler th = iter.next(); 546 if (th.uri.equals(uri)) { 547 if (th.transport != null) { 548 ServiceSupport.dispose(th.transport); 549 connectedCount--; 550 } 551 iter.remove(); 552 break; 553 } 554 } 555 } 556 } 557 558 } 559 560 public void reconnect(URI uri) throws IOException { 561 add(true,new URI[]{uri}); 562 563 } 564 565 public boolean isReconnectSupported() { 566 return true; 567 } 568 569 public boolean isUpdateURIsSupported() { 570 return true; 571 } 572 public void updateURIs(boolean reblance,URI[] uris) throws IOException { 573 add(reblance,uris); 574 } 575 576 577 public String getRemoteAddress() { 578 if (primary != null) { 579 if (primary.transport != null) { 580 return primary.transport.getRemoteAddress(); 581 } 582 } 583 return null; 584 } 585 586 protected void transportListenerOnCommand(Command command) { 587 if (transportListener != null) { 588 transportListener.onCommand(command); 589 } 590 } 591 592 public boolean isFaultTolerant() { 593 return true; 594 } 595 596 public boolean isFanOutQueues() { 597 return fanOutQueues; 598 } 599 600 public void setFanOutQueues(boolean fanOutQueues) { 601 this.fanOutQueues = fanOutQueues; 602 } 603 604 public boolean isDisposed() { 605 return disposed; 606 } 607 608 609 public boolean isConnected() { 610 return connected; 611 } 612 613 public int getReceiveCounter() { 614 int rc = 0; 615 synchronized (reconnectMutex) { 616 for (FanoutTransportHandler th : transports) { 617 if (th.transport != null) { 618 rc += th.transport.getReceiveCounter(); 619 } 620 } 621 } 622 return rc; 623 } 624}