001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.state; 018 019import java.io.IOException; 020import java.util.Iterator; 021import java.util.LinkedHashMap; 022import java.util.Map; 023import java.util.Map.Entry; 024import java.util.Vector; 025import java.util.concurrent.ConcurrentHashMap; 026 027import javax.jms.TransactionRolledBackException; 028import javax.transaction.xa.XAResource; 029 030import org.apache.activemq.command.Command; 031import org.apache.activemq.command.ConnectionId; 032import org.apache.activemq.command.ConnectionInfo; 033import org.apache.activemq.command.ConsumerControl; 034import org.apache.activemq.command.ConsumerId; 035import org.apache.activemq.command.ConsumerInfo; 036import org.apache.activemq.command.DestinationInfo; 037import org.apache.activemq.command.ExceptionResponse; 038import org.apache.activemq.command.IntegerResponse; 039import org.apache.activemq.command.Message; 040import org.apache.activemq.command.MessagePull; 041import org.apache.activemq.command.ProducerId; 042import org.apache.activemq.command.ProducerInfo; 043import org.apache.activemq.command.Response; 044import org.apache.activemq.command.SessionId; 045import org.apache.activemq.command.SessionInfo; 046import org.apache.activemq.command.TransactionInfo; 047import org.apache.activemq.transport.Transport; 048import org.apache.activemq.util.IOExceptionSupport; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * Tracks the state of a connection so a newly established transport can be 054 * re-initialized to the state that was tracked. 055 * 056 * 057 */ 058public class ConnectionStateTracker extends CommandVisitorAdapter { 059 private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class); 060 061 private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); 062 private static final int MESSAGE_PULL_SIZE = 400; 063 protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>(); 064 065 private boolean trackTransactions; 066 private boolean restoreSessions = true; 067 private boolean restoreConsumers = true; 068 private boolean restoreProducers = true; 069 private boolean restoreTransaction = true; 070 private boolean trackMessages = true; 071 private boolean trackTransactionProducers = true; 072 private int maxCacheSize = 128 * 1024; 073 private long currentCacheSize; // use long to prevent overflow for folks who set high max. 074 075 private final Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){ 076 @Override 077 protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) { 078 boolean result = currentCacheSize > maxCacheSize; 079 if (result) { 080 if (eldest.getValue() instanceof Message) { 081 currentCacheSize -= ((Message)eldest.getValue()).getSize(); 082 } else if (eldest.getValue() instanceof MessagePull) { 083 currentCacheSize -= MESSAGE_PULL_SIZE; 084 } 085 if (LOG.isTraceEnabled()) { 086 LOG.trace("removing tracked message: " + eldest.getKey()); 087 } 088 } 089 return result; 090 } 091 }; 092 093 private class RemoveTransactionAction implements ResponseHandler { 094 private final TransactionInfo info; 095 096 public RemoveTransactionAction(TransactionInfo info) { 097 this.info = info; 098 } 099 100 @Override 101 public void onResponse(Command response) { 102 ConnectionId connectionId = info.getConnectionId(); 103 ConnectionState cs = connectionStates.get(connectionId); 104 if (cs != null) { 105 cs.removeTransactionState(info.getTransactionId()); 106 } 107 } 108 } 109 110 private class PrepareReadonlyTransactionAction extends RemoveTransactionAction { 111 public PrepareReadonlyTransactionAction(TransactionInfo info) { 112 super(info); 113 } 114 115 @Override 116 public void onResponse(Command command) { 117 if (command instanceof IntegerResponse) { 118 IntegerResponse response = (IntegerResponse) command; 119 if (XAResource.XA_RDONLY == response.getResult()) { 120 // all done, no commit or rollback from TM 121 super.onResponse(command); 122 } 123 } 124 } 125 } 126 127 /** 128 * Entry point for all tracked commands in the tracker. Commands should be tracked before 129 * there is an attempt to send them on the wire. Upon a successful send of a command it is 130 * necessary to call the trackBack method to complete the tracking of the given command. 131 * 132 * @param command 133 * The command that is to be tracked by this tracker. 134 * 135 * @return null if the command is not state tracked. 136 * 137 * @throws IOException if an error occurs during setup of the tracking operation. 138 */ 139 public Tracked track(Command command) throws IOException { 140 try { 141 return (Tracked)command.visit(this); 142 } catch (IOException e) { 143 throw e; 144 } catch (Throwable e) { 145 throw IOExceptionSupport.create(e); 146 } 147 } 148 149 /** 150 * Completes the two phase tracking operation for a command that is sent on the wire. Once 151 * the command is sent successfully to complete the tracking operation or otherwise update 152 * the state of the tracker. 153 * 154 * @param command 155 * The command that was previously provided to the track method. 156 */ 157 public void trackBack(Command command) { 158 if (command != null) { 159 if (trackMessages && command.isMessage()) { 160 Message message = (Message) command; 161 if (message.getTransactionId()==null) { 162 currentCacheSize = currentCacheSize + message.getSize(); 163 } 164 } else if (command instanceof MessagePull) { 165 // We only track one MessagePull per consumer so only add to cache size 166 // when the command has been marked as tracked. 167 if (((MessagePull)command).isTracked()) { 168 // just needs to be a rough estimate of size, ~4 identifiers 169 currentCacheSize += MESSAGE_PULL_SIZE; 170 } 171 } 172 } 173 } 174 175 public void restore(Transport transport) throws IOException { 176 // Restore the connections. 177 for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) { 178 ConnectionState connectionState = iter.next(); 179 connectionState.getInfo().setFailoverReconnect(true); 180 if (LOG.isDebugEnabled()) { 181 LOG.debug("conn: " + connectionState.getInfo().getConnectionId()); 182 } 183 transport.oneway(connectionState.getInfo()); 184 restoreTempDestinations(transport, connectionState); 185 186 if (restoreSessions) { 187 restoreSessions(transport, connectionState); 188 } 189 190 if (restoreTransaction) { 191 restoreTransactions(transport, connectionState); 192 } 193 } 194 195 // now flush messages and MessagePull commands. 196 for (Command msg : messageCache.values()) { 197 if (LOG.isDebugEnabled()) { 198 LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg)); 199 } 200 transport.oneway(msg); 201 } 202 } 203 204 private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException { 205 Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>(); 206 for (TransactionState transactionState : connectionState.getTransactionStates()) { 207 if (LOG.isDebugEnabled()) { 208 LOG.debug("tx: " + transactionState.getId()); 209 } 210 211 // rollback any completed transactions - no way to know if commit got there 212 // or if reply went missing 213 // 214 if (!transactionState.getCommands().isEmpty()) { 215 Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1); 216 if (lastCommand instanceof TransactionInfo) { 217 TransactionInfo transactionInfo = (TransactionInfo) lastCommand; 218 if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) { 219 if (LOG.isDebugEnabled()) { 220 LOG.debug("rolling back potentially completed tx: " + transactionState.getId()); 221 } 222 toRollback.add(transactionInfo); 223 continue; 224 } 225 } 226 } 227 228 // replay short lived producers that may have been involved in the transaction 229 for (ProducerState producerState : transactionState.getProducerStates().values()) { 230 if (LOG.isDebugEnabled()) { 231 LOG.debug("tx replay producer :" + producerState.getInfo()); 232 } 233 transport.oneway(producerState.getInfo()); 234 } 235 236 for (Command command : transactionState.getCommands()) { 237 if (LOG.isDebugEnabled()) { 238 LOG.debug("tx replay: " + command); 239 } 240 transport.oneway(command); 241 } 242 243 for (ProducerState producerState : transactionState.getProducerStates().values()) { 244 if (LOG.isDebugEnabled()) { 245 LOG.debug("tx remove replayed producer :" + producerState.getInfo()); 246 } 247 transport.oneway(producerState.getInfo().createRemoveCommand()); 248 } 249 } 250 251 for (TransactionInfo command: toRollback) { 252 // respond to the outstanding commit 253 ExceptionResponse response = new ExceptionResponse(); 254 response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId())); 255 response.setCorrelationId(command.getCommandId()); 256 transport.getTransportListener().onCommand(response); 257 } 258 } 259 260 /** 261 * @param transport 262 * @param connectionState 263 * @throws IOException 264 */ 265 protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException { 266 // Restore the connection's sessions 267 for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) { 268 SessionState sessionState = (SessionState)iter2.next(); 269 if (LOG.isDebugEnabled()) { 270 LOG.debug("session: " + sessionState.getInfo().getSessionId()); 271 } 272 transport.oneway(sessionState.getInfo()); 273 274 if (restoreProducers) { 275 restoreProducers(transport, sessionState); 276 } 277 278 if (restoreConsumers) { 279 restoreConsumers(transport, sessionState); 280 } 281 } 282 } 283 284 /** 285 * @param transport 286 * @param sessionState 287 * @throws IOException 288 */ 289 protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException { 290 // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete 291 final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId()); 292 final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete(); 293 for (ConsumerState consumerState : sessionState.getConsumerStates()) { 294 ConsumerInfo infoToSend = consumerState.getInfo(); 295 if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) { 296 infoToSend = consumerState.getInfo().copy(); 297 connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo()); 298 infoToSend.setPrefetchSize(0); 299 if (LOG.isDebugEnabled()) { 300 LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize()); 301 } 302 } 303 if (LOG.isDebugEnabled()) { 304 LOG.debug("consumer: " + infoToSend.getConsumerId()); 305 } 306 transport.oneway(infoToSend); 307 } 308 } 309 310 /** 311 * @param transport 312 * @param sessionState 313 * @throws IOException 314 */ 315 protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException { 316 // Restore the session's producers 317 for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) { 318 ProducerState producerState = (ProducerState)iter3.next(); 319 if (LOG.isDebugEnabled()) { 320 LOG.debug("producer: " + producerState.getInfo().getProducerId()); 321 } 322 transport.oneway(producerState.getInfo()); 323 } 324 } 325 326 /** 327 * @param transport 328 * @param connectionState 329 * @throws IOException 330 */ 331 protected void restoreTempDestinations(Transport transport, ConnectionState connectionState) 332 throws IOException { 333 // Restore the connection's temp destinations. 334 for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) { 335 DestinationInfo info = (DestinationInfo)iter2.next(); 336 transport.oneway(info); 337 if (LOG.isDebugEnabled()) { 338 LOG.debug("tempDest: " + info.getDestination()); 339 } 340 } 341 } 342 343 @Override 344 public Response processAddDestination(DestinationInfo info) { 345 if (info != null) { 346 ConnectionState cs = connectionStates.get(info.getConnectionId()); 347 if (cs != null && info.getDestination().isTemporary()) { 348 cs.addTempDestination(info); 349 } 350 } 351 return TRACKED_RESPONSE_MARKER; 352 } 353 354 @Override 355 public Response processRemoveDestination(DestinationInfo info) { 356 if (info != null) { 357 ConnectionState cs = connectionStates.get(info.getConnectionId()); 358 if (cs != null && info.getDestination().isTemporary()) { 359 cs.removeTempDestination(info.getDestination()); 360 } 361 } 362 return TRACKED_RESPONSE_MARKER; 363 } 364 365 @Override 366 public Response processAddProducer(ProducerInfo info) { 367 if (info != null && info.getProducerId() != null) { 368 SessionId sessionId = info.getProducerId().getParentId(); 369 if (sessionId != null) { 370 ConnectionId connectionId = sessionId.getParentId(); 371 if (connectionId != null) { 372 ConnectionState cs = connectionStates.get(connectionId); 373 if (cs != null) { 374 SessionState ss = cs.getSessionState(sessionId); 375 if (ss != null) { 376 ss.addProducer(info); 377 } 378 } 379 } 380 } 381 } 382 return TRACKED_RESPONSE_MARKER; 383 } 384 385 @Override 386 public Response processRemoveProducer(ProducerId id) { 387 if (id != null) { 388 SessionId sessionId = id.getParentId(); 389 if (sessionId != null) { 390 ConnectionId connectionId = sessionId.getParentId(); 391 if (connectionId != null) { 392 ConnectionState cs = connectionStates.get(connectionId); 393 if (cs != null) { 394 SessionState ss = cs.getSessionState(sessionId); 395 if (ss != null) { 396 ss.removeProducer(id); 397 } 398 } 399 } 400 } 401 } 402 return TRACKED_RESPONSE_MARKER; 403 } 404 405 @Override 406 public Response processAddConsumer(ConsumerInfo info) { 407 if (info != null) { 408 SessionId sessionId = info.getConsumerId().getParentId(); 409 if (sessionId != null) { 410 ConnectionId connectionId = sessionId.getParentId(); 411 if (connectionId != null) { 412 ConnectionState cs = connectionStates.get(connectionId); 413 if (cs != null) { 414 SessionState ss = cs.getSessionState(sessionId); 415 if (ss != null) { 416 ss.addConsumer(info); 417 } 418 } 419 } 420 } 421 } 422 return TRACKED_RESPONSE_MARKER; 423 } 424 425 @Override 426 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) { 427 if (id != null) { 428 SessionId sessionId = id.getParentId(); 429 if (sessionId != null) { 430 ConnectionId connectionId = sessionId.getParentId(); 431 if (connectionId != null) { 432 ConnectionState cs = connectionStates.get(connectionId); 433 if (cs != null) { 434 SessionState ss = cs.getSessionState(sessionId); 435 if (ss != null) { 436 ss.removeConsumer(id); 437 } 438 cs.getRecoveringPullConsumers().remove(id); 439 } 440 } 441 } 442 } 443 return TRACKED_RESPONSE_MARKER; 444 } 445 446 @Override 447 public Response processAddSession(SessionInfo info) { 448 if (info != null) { 449 ConnectionId connectionId = info.getSessionId().getParentId(); 450 if (connectionId != null) { 451 ConnectionState cs = connectionStates.get(connectionId); 452 if (cs != null) { 453 cs.addSession(info); 454 } 455 } 456 } 457 return TRACKED_RESPONSE_MARKER; 458 } 459 460 @Override 461 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) { 462 if (id != null) { 463 ConnectionId connectionId = id.getParentId(); 464 if (connectionId != null) { 465 ConnectionState cs = connectionStates.get(connectionId); 466 if (cs != null) { 467 cs.removeSession(id); 468 } 469 } 470 } 471 return TRACKED_RESPONSE_MARKER; 472 } 473 474 @Override 475 public Response processAddConnection(ConnectionInfo info) { 476 if (info != null) { 477 connectionStates.put(info.getConnectionId(), new ConnectionState(info)); 478 } 479 return TRACKED_RESPONSE_MARKER; 480 } 481 482 @Override 483 public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception { 484 if (id != null) { 485 connectionStates.remove(id); 486 } 487 return TRACKED_RESPONSE_MARKER; 488 } 489 490 @Override 491 public Response processMessage(Message send) throws Exception { 492 if (send != null) { 493 if (trackTransactions && send.getTransactionId() != null) { 494 ProducerId producerId = send.getProducerId(); 495 ConnectionId connectionId = producerId.getParentId().getParentId(); 496 if (connectionId != null) { 497 ConnectionState cs = connectionStates.get(connectionId); 498 if (cs != null) { 499 TransactionState transactionState = cs.getTransactionState(send.getTransactionId()); 500 if (transactionState != null) { 501 transactionState.addCommand(send); 502 503 if (trackTransactionProducers) { 504 // for jmstemplate, track the producer in case it is closed before commit 505 // and needs to be replayed 506 SessionState ss = cs.getSessionState(producerId.getParentId()); 507 ProducerState producerState = ss.getProducerState(producerId); 508 producerState.setTransactionState(transactionState); 509 } 510 } 511 } 512 } 513 return TRACKED_RESPONSE_MARKER; 514 }else if (trackMessages) { 515 messageCache.put(send.getMessageId(), send); 516 } 517 } 518 return null; 519 } 520 521 @Override 522 public Response processBeginTransaction(TransactionInfo info) { 523 if (trackTransactions && info != null && info.getTransactionId() != null) { 524 ConnectionId connectionId = info.getConnectionId(); 525 if (connectionId != null) { 526 ConnectionState cs = connectionStates.get(connectionId); 527 if (cs != null) { 528 cs.addTransactionState(info.getTransactionId()); 529 TransactionState state = cs.getTransactionState(info.getTransactionId()); 530 state.addCommand(info); 531 } 532 } 533 return TRACKED_RESPONSE_MARKER; 534 } 535 return null; 536 } 537 538 @Override 539 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 540 if (trackTransactions && info != null && info.getTransactionId() != null) { 541 ConnectionId connectionId = info.getConnectionId(); 542 if (connectionId != null) { 543 ConnectionState cs = connectionStates.get(connectionId); 544 if (cs != null) { 545 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 546 if (transactionState != null) { 547 transactionState.addCommand(info); 548 return new Tracked(new PrepareReadonlyTransactionAction(info)); 549 } 550 } 551 } 552 } 553 return null; 554 } 555 556 @Override 557 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 558 if (trackTransactions && info != null && info.getTransactionId() != null) { 559 ConnectionId connectionId = info.getConnectionId(); 560 if (connectionId != null) { 561 ConnectionState cs = connectionStates.get(connectionId); 562 if (cs != null) { 563 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 564 if (transactionState != null) { 565 transactionState.addCommand(info); 566 return new Tracked(new RemoveTransactionAction(info)); 567 } 568 } 569 } 570 } 571 return null; 572 } 573 574 @Override 575 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 576 if (trackTransactions && info != null && info.getTransactionId() != null) { 577 ConnectionId connectionId = info.getConnectionId(); 578 if (connectionId != null) { 579 ConnectionState cs = connectionStates.get(connectionId); 580 if (cs != null) { 581 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 582 if (transactionState != null) { 583 transactionState.addCommand(info); 584 return new Tracked(new RemoveTransactionAction(info)); 585 } 586 } 587 } 588 } 589 return null; 590 } 591 592 @Override 593 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 594 if (trackTransactions && info != null && info.getTransactionId() != null) { 595 ConnectionId connectionId = info.getConnectionId(); 596 if (connectionId != null) { 597 ConnectionState cs = connectionStates.get(connectionId); 598 if (cs != null) { 599 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 600 if (transactionState != null) { 601 transactionState.addCommand(info); 602 return new Tracked(new RemoveTransactionAction(info)); 603 } 604 } 605 } 606 } 607 return null; 608 } 609 610 @Override 611 public Response processEndTransaction(TransactionInfo info) throws Exception { 612 if (trackTransactions && info != null && info.getTransactionId() != null) { 613 ConnectionId connectionId = info.getConnectionId(); 614 if (connectionId != null) { 615 ConnectionState cs = connectionStates.get(connectionId); 616 if (cs != null) { 617 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 618 if (transactionState != null) { 619 transactionState.addCommand(info); 620 } 621 } 622 } 623 return TRACKED_RESPONSE_MARKER; 624 } 625 return null; 626 } 627 628 @Override 629 public Response processMessagePull(MessagePull pull) throws Exception { 630 if (pull != null) { 631 // leave a single instance in the cache 632 final String id = pull.getDestination() + "::" + pull.getConsumerId(); 633 if (messageCache.put(id.intern(), pull) == null) { 634 // Only marked as tracked if this is the first request we've seen. 635 pull.setTracked(true); 636 } 637 } 638 return null; 639 } 640 641 public boolean isRestoreConsumers() { 642 return restoreConsumers; 643 } 644 645 public void setRestoreConsumers(boolean restoreConsumers) { 646 this.restoreConsumers = restoreConsumers; 647 } 648 649 public boolean isRestoreProducers() { 650 return restoreProducers; 651 } 652 653 public void setRestoreProducers(boolean restoreProducers) { 654 this.restoreProducers = restoreProducers; 655 } 656 657 public boolean isRestoreSessions() { 658 return restoreSessions; 659 } 660 661 public void setRestoreSessions(boolean restoreSessions) { 662 this.restoreSessions = restoreSessions; 663 } 664 665 public boolean isTrackTransactions() { 666 return trackTransactions; 667 } 668 669 public void setTrackTransactions(boolean trackTransactions) { 670 this.trackTransactions = trackTransactions; 671 } 672 673 public boolean isTrackTransactionProducers() { 674 return this.trackTransactionProducers; 675 } 676 677 public void setTrackTransactionProducers(boolean trackTransactionProducers) { 678 this.trackTransactionProducers = trackTransactionProducers; 679 } 680 681 public boolean isRestoreTransaction() { 682 return restoreTransaction; 683 } 684 685 public void setRestoreTransaction(boolean restoreTransaction) { 686 this.restoreTransaction = restoreTransaction; 687 } 688 689 public boolean isTrackMessages() { 690 return trackMessages; 691 } 692 693 public void setTrackMessages(boolean trackMessages) { 694 this.trackMessages = trackMessages; 695 } 696 697 public int getMaxCacheSize() { 698 return maxCacheSize; 699 } 700 701 public void setMaxCacheSize(int maxCacheSize) { 702 this.maxCacheSize = maxCacheSize; 703 } 704 705 /** 706 * @return the current cache size for the Message and MessagePull Command cache. 707 */ 708 public long getCurrentCacheSize() { 709 return this.currentCacheSize; 710 } 711 712 public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) { 713 ConnectionState connectionState = connectionStates.get(connectionId); 714 if (connectionState != null) { 715 connectionState.setConnectionInterruptProcessingComplete(true); 716 Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers(); 717 for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) { 718 ConsumerControl control = new ConsumerControl(); 719 control.setConsumerId(entry.getKey()); 720 control.setPrefetch(entry.getValue().getPrefetchSize()); 721 control.setDestination(entry.getValue().getDestination()); 722 try { 723 if (LOG.isDebugEnabled()) { 724 LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch()); 725 } 726 transport.oneway(control); 727 } catch (Exception ex) { 728 if (LOG.isDebugEnabled()) { 729 LOG.debug("Failed to submit control for consumer: " + control.getConsumerId() 730 + " with: " + control.getPrefetch(), ex); 731 } 732 } 733 } 734 stalledConsumers.clear(); 735 } 736 } 737 738 public void transportInterrupted(ConnectionId connectionId) { 739 ConnectionState connectionState = connectionStates.get(connectionId); 740 if (connectionState != null) { 741 connectionState.setConnectionInterruptProcessingComplete(false); 742 } 743 } 744}