001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.state;
018
019import java.io.IOException;
020import java.util.Iterator;
021import java.util.LinkedHashMap;
022import java.util.Map;
023import java.util.Map.Entry;
024import java.util.Vector;
025import java.util.concurrent.ConcurrentHashMap;
026
027import javax.jms.TransactionRolledBackException;
028import javax.transaction.xa.XAResource;
029
030import org.apache.activemq.command.Command;
031import org.apache.activemq.command.ConnectionId;
032import org.apache.activemq.command.ConnectionInfo;
033import org.apache.activemq.command.ConsumerControl;
034import org.apache.activemq.command.ConsumerId;
035import org.apache.activemq.command.ConsumerInfo;
036import org.apache.activemq.command.DestinationInfo;
037import org.apache.activemq.command.ExceptionResponse;
038import org.apache.activemq.command.IntegerResponse;
039import org.apache.activemq.command.Message;
040import org.apache.activemq.command.MessagePull;
041import org.apache.activemq.command.ProducerId;
042import org.apache.activemq.command.ProducerInfo;
043import org.apache.activemq.command.Response;
044import org.apache.activemq.command.SessionId;
045import org.apache.activemq.command.SessionInfo;
046import org.apache.activemq.command.TransactionInfo;
047import org.apache.activemq.transport.Transport;
048import org.apache.activemq.util.IOExceptionSupport;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * Tracks the state of a connection so a newly established transport can be
054 * re-initialized to the state that was tracked.
055 *
056 *
057 */
058public class ConnectionStateTracker extends CommandVisitorAdapter {
059    private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class);
060
061    private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
062    private static final int MESSAGE_PULL_SIZE = 400;
063    protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
064
065    private boolean trackTransactions;
066    private boolean restoreSessions = true;
067    private boolean restoreConsumers = true;
068    private boolean restoreProducers = true;
069    private boolean restoreTransaction = true;
070    private boolean trackMessages = true;
071    private boolean trackTransactionProducers = true;
072    private int maxCacheSize = 128 * 1024;
073    private long currentCacheSize;  // use long to prevent overflow for folks who set high max.
074
075    private final Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
076        @Override
077        protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) {
078            boolean result = currentCacheSize > maxCacheSize;
079            if (result) {
080                if (eldest.getValue() instanceof Message) {
081                    currentCacheSize -= ((Message)eldest.getValue()).getSize();
082                } else if (eldest.getValue() instanceof MessagePull) {
083                    currentCacheSize -= MESSAGE_PULL_SIZE;
084                }
085                if (LOG.isTraceEnabled()) {
086                    LOG.trace("removing tracked message: " + eldest.getKey());
087                }
088            }
089            return result;
090        }
091    };
092
093    private class RemoveTransactionAction implements ResponseHandler {
094        private final TransactionInfo info;
095
096        public RemoveTransactionAction(TransactionInfo info) {
097            this.info = info;
098        }
099
100        @Override
101        public void onResponse(Command response) {
102            ConnectionId connectionId = info.getConnectionId();
103            ConnectionState cs = connectionStates.get(connectionId);
104            if (cs != null) {
105                cs.removeTransactionState(info.getTransactionId());
106            }
107        }
108    }
109
110    private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
111        public PrepareReadonlyTransactionAction(TransactionInfo info) {
112            super(info);
113        }
114
115        @Override
116        public void onResponse(Command command) {
117            if (command instanceof IntegerResponse) {
118                IntegerResponse response = (IntegerResponse) command;
119                if (XAResource.XA_RDONLY == response.getResult()) {
120                    // all done, no commit or rollback from TM
121                    super.onResponse(command);
122                }
123            }
124        }
125    }
126
127    /**
128     * Entry point for all tracked commands in the tracker.  Commands should be tracked before
129     * there is an attempt to send them on the wire.  Upon a successful send of a command it is
130     * necessary to call the trackBack method to complete the tracking of the given command.
131     *
132     * @param command
133     *      The command that is to be tracked by this tracker.
134     *
135     * @return null if the command is not state tracked.
136     *
137     * @throws IOException if an error occurs during setup of the tracking operation.
138     */
139    public Tracked track(Command command) throws IOException {
140        try {
141            return (Tracked)command.visit(this);
142        } catch (IOException e) {
143            throw e;
144        } catch (Throwable e) {
145            throw IOExceptionSupport.create(e);
146        }
147    }
148
149    /**
150     * Completes the two phase tracking operation for a command that is sent on the wire.  Once
151     * the command is sent successfully to complete the tracking operation or otherwise update
152     * the state of the tracker.
153     *
154     * @param command
155     *      The command that was previously provided to the track method.
156     */
157    public void trackBack(Command command) {
158        if (command != null) {
159            if (trackMessages && command.isMessage()) {
160                Message message = (Message) command;
161                if (message.getTransactionId()==null) {
162                    currentCacheSize = currentCacheSize +  message.getSize();
163                }
164            } else if (command instanceof MessagePull) {
165                // We only track one MessagePull per consumer so only add to cache size
166                // when the command has been marked as tracked.
167                if (((MessagePull)command).isTracked()) {
168                    // just needs to be a rough estimate of size, ~4 identifiers
169                    currentCacheSize += MESSAGE_PULL_SIZE;
170                }
171            }
172        }
173    }
174
175    public void restore(Transport transport) throws IOException {
176        // Restore the connections.
177        for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
178            ConnectionState connectionState = iter.next();
179            connectionState.getInfo().setFailoverReconnect(true);
180            if (LOG.isDebugEnabled()) {
181                LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
182            }
183            transport.oneway(connectionState.getInfo());
184            restoreTempDestinations(transport, connectionState);
185
186            if (restoreSessions) {
187                restoreSessions(transport, connectionState);
188            }
189
190            if (restoreTransaction) {
191                restoreTransactions(transport, connectionState);
192            }
193        }
194
195        // now flush messages and MessagePull commands.
196        for (Command msg : messageCache.values()) {
197            if (LOG.isDebugEnabled()) {
198                LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg));
199            }
200            transport.oneway(msg);
201        }
202    }
203
204    private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
205        Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>();
206        for (TransactionState transactionState : connectionState.getTransactionStates()) {
207            if (LOG.isDebugEnabled()) {
208                LOG.debug("tx: " + transactionState.getId());
209            }
210
211            // rollback any completed transactions - no way to know if commit got there
212            // or if reply went missing
213            //
214            if (!transactionState.getCommands().isEmpty()) {
215                Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1);
216                if (lastCommand instanceof TransactionInfo) {
217                    TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
218                    if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
219                        if (LOG.isDebugEnabled()) {
220                            LOG.debug("rolling back potentially completed tx: " + transactionState.getId());
221                        }
222                        toRollback.add(transactionInfo);
223                        continue;
224                    }
225                }
226            }
227
228            // replay short lived producers that may have been involved in the transaction
229            for (ProducerState producerState : transactionState.getProducerStates().values()) {
230                if (LOG.isDebugEnabled()) {
231                    LOG.debug("tx replay producer :" + producerState.getInfo());
232                }
233                transport.oneway(producerState.getInfo());
234            }
235
236            for (Command command : transactionState.getCommands()) {
237                if (LOG.isDebugEnabled()) {
238                    LOG.debug("tx replay: " + command);
239                }
240                transport.oneway(command);
241            }
242
243            for (ProducerState producerState : transactionState.getProducerStates().values()) {
244                if (LOG.isDebugEnabled()) {
245                    LOG.debug("tx remove replayed producer :" + producerState.getInfo());
246                }
247                transport.oneway(producerState.getInfo().createRemoveCommand());
248            }
249        }
250
251        for (TransactionInfo command: toRollback) {
252            // respond to the outstanding commit
253            ExceptionResponse response = new ExceptionResponse();
254            response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId()));
255            response.setCorrelationId(command.getCommandId());
256            transport.getTransportListener().onCommand(response);
257        }
258    }
259
260    /**
261     * @param transport
262     * @param connectionState
263     * @throws IOException
264     */
265    protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException {
266        // Restore the connection's sessions
267        for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
268            SessionState sessionState = (SessionState)iter2.next();
269            if (LOG.isDebugEnabled()) {
270                LOG.debug("session: " + sessionState.getInfo().getSessionId());
271            }
272            transport.oneway(sessionState.getInfo());
273
274            if (restoreProducers) {
275                restoreProducers(transport, sessionState);
276            }
277
278            if (restoreConsumers) {
279                restoreConsumers(transport, sessionState);
280            }
281        }
282    }
283
284    /**
285     * @param transport
286     * @param sessionState
287     * @throws IOException
288     */
289    protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException {
290        // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
291        final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId());
292        final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete();
293        for (ConsumerState consumerState : sessionState.getConsumerStates()) {
294            ConsumerInfo infoToSend = consumerState.getInfo();
295            if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) {
296                infoToSend = consumerState.getInfo().copy();
297                connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo());
298                infoToSend.setPrefetchSize(0);
299                if (LOG.isDebugEnabled()) {
300                    LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize());
301                }
302            }
303            if (LOG.isDebugEnabled()) {
304                LOG.debug("consumer: " + infoToSend.getConsumerId());
305            }
306            transport.oneway(infoToSend);
307        }
308    }
309
310    /**
311     * @param transport
312     * @param sessionState
313     * @throws IOException
314     */
315    protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException {
316        // Restore the session's producers
317        for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
318            ProducerState producerState = (ProducerState)iter3.next();
319            if (LOG.isDebugEnabled()) {
320                LOG.debug("producer: " + producerState.getInfo().getProducerId());
321            }
322            transport.oneway(producerState.getInfo());
323        }
324    }
325
326    /**
327     * @param transport
328     * @param connectionState
329     * @throws IOException
330     */
331    protected void restoreTempDestinations(Transport transport, ConnectionState connectionState)
332        throws IOException {
333        // Restore the connection's temp destinations.
334        for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) {
335            DestinationInfo info = (DestinationInfo)iter2.next();
336            transport.oneway(info);
337            if (LOG.isDebugEnabled()) {
338                LOG.debug("tempDest: " + info.getDestination());
339            }
340        }
341    }
342
343    @Override
344    public Response processAddDestination(DestinationInfo info) {
345        if (info != null) {
346            ConnectionState cs = connectionStates.get(info.getConnectionId());
347            if (cs != null && info.getDestination().isTemporary()) {
348                cs.addTempDestination(info);
349            }
350        }
351        return TRACKED_RESPONSE_MARKER;
352    }
353
354    @Override
355    public Response processRemoveDestination(DestinationInfo info) {
356        if (info != null) {
357            ConnectionState cs = connectionStates.get(info.getConnectionId());
358            if (cs != null && info.getDestination().isTemporary()) {
359                cs.removeTempDestination(info.getDestination());
360            }
361        }
362        return TRACKED_RESPONSE_MARKER;
363    }
364
365    @Override
366    public Response processAddProducer(ProducerInfo info) {
367        if (info != null && info.getProducerId() != null) {
368            SessionId sessionId = info.getProducerId().getParentId();
369            if (sessionId != null) {
370                ConnectionId connectionId = sessionId.getParentId();
371                if (connectionId != null) {
372                    ConnectionState cs = connectionStates.get(connectionId);
373                    if (cs != null) {
374                        SessionState ss = cs.getSessionState(sessionId);
375                        if (ss != null) {
376                            ss.addProducer(info);
377                        }
378                    }
379                }
380            }
381        }
382        return TRACKED_RESPONSE_MARKER;
383    }
384
385    @Override
386    public Response processRemoveProducer(ProducerId id) {
387        if (id != null) {
388            SessionId sessionId = id.getParentId();
389            if (sessionId != null) {
390                ConnectionId connectionId = sessionId.getParentId();
391                if (connectionId != null) {
392                    ConnectionState cs = connectionStates.get(connectionId);
393                    if (cs != null) {
394                        SessionState ss = cs.getSessionState(sessionId);
395                        if (ss != null) {
396                            ss.removeProducer(id);
397                        }
398                    }
399                }
400            }
401        }
402        return TRACKED_RESPONSE_MARKER;
403    }
404
405    @Override
406    public Response processAddConsumer(ConsumerInfo info) {
407        if (info != null) {
408            SessionId sessionId = info.getConsumerId().getParentId();
409            if (sessionId != null) {
410                ConnectionId connectionId = sessionId.getParentId();
411                if (connectionId != null) {
412                    ConnectionState cs = connectionStates.get(connectionId);
413                    if (cs != null) {
414                        SessionState ss = cs.getSessionState(sessionId);
415                        if (ss != null) {
416                            ss.addConsumer(info);
417                        }
418                    }
419                }
420            }
421        }
422        return TRACKED_RESPONSE_MARKER;
423    }
424
425    @Override
426    public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
427        if (id != null) {
428            SessionId sessionId = id.getParentId();
429            if (sessionId != null) {
430                ConnectionId connectionId = sessionId.getParentId();
431                if (connectionId != null) {
432                    ConnectionState cs = connectionStates.get(connectionId);
433                    if (cs != null) {
434                        SessionState ss = cs.getSessionState(sessionId);
435                        if (ss != null) {
436                            ss.removeConsumer(id);
437                        }
438                        cs.getRecoveringPullConsumers().remove(id);
439                    }
440                }
441            }
442        }
443        return TRACKED_RESPONSE_MARKER;
444    }
445
446    @Override
447    public Response processAddSession(SessionInfo info) {
448        if (info != null) {
449            ConnectionId connectionId = info.getSessionId().getParentId();
450            if (connectionId != null) {
451                ConnectionState cs = connectionStates.get(connectionId);
452                if (cs != null) {
453                    cs.addSession(info);
454                }
455            }
456        }
457        return TRACKED_RESPONSE_MARKER;
458    }
459
460    @Override
461    public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
462        if (id != null) {
463            ConnectionId connectionId = id.getParentId();
464            if (connectionId != null) {
465                ConnectionState cs = connectionStates.get(connectionId);
466                if (cs != null) {
467                    cs.removeSession(id);
468                }
469            }
470        }
471        return TRACKED_RESPONSE_MARKER;
472    }
473
474    @Override
475    public Response processAddConnection(ConnectionInfo info) {
476        if (info != null) {
477            connectionStates.put(info.getConnectionId(), new ConnectionState(info));
478        }
479        return TRACKED_RESPONSE_MARKER;
480    }
481
482    @Override
483    public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
484        if (id != null) {
485            connectionStates.remove(id);
486        }
487        return TRACKED_RESPONSE_MARKER;
488    }
489
490    @Override
491    public Response processMessage(Message send) throws Exception {
492        if (send != null) {
493            if (trackTransactions && send.getTransactionId() != null) {
494                ProducerId producerId = send.getProducerId();
495                ConnectionId connectionId = producerId.getParentId().getParentId();
496                if (connectionId != null) {
497                    ConnectionState cs = connectionStates.get(connectionId);
498                    if (cs != null) {
499                        TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
500                        if (transactionState != null) {
501                            transactionState.addCommand(send);
502
503                            if (trackTransactionProducers) {
504                                // for jmstemplate, track the producer in case it is closed before commit
505                                // and needs to be replayed
506                                SessionState ss = cs.getSessionState(producerId.getParentId());
507                                ProducerState producerState = ss.getProducerState(producerId);
508                                producerState.setTransactionState(transactionState);
509                            }
510                        }
511                    }
512                }
513                return TRACKED_RESPONSE_MARKER;
514            }else if (trackMessages) {
515                messageCache.put(send.getMessageId(), send);
516            }
517        }
518        return null;
519    }
520
521    @Override
522    public Response processBeginTransaction(TransactionInfo info) {
523        if (trackTransactions && info != null && info.getTransactionId() != null) {
524            ConnectionId connectionId = info.getConnectionId();
525            if (connectionId != null) {
526                ConnectionState cs = connectionStates.get(connectionId);
527                if (cs != null) {
528                    cs.addTransactionState(info.getTransactionId());
529                    TransactionState state = cs.getTransactionState(info.getTransactionId());
530                    state.addCommand(info);
531                }
532            }
533            return TRACKED_RESPONSE_MARKER;
534        }
535        return null;
536    }
537
538    @Override
539    public Response processPrepareTransaction(TransactionInfo info) throws Exception {
540        if (trackTransactions && info != null && info.getTransactionId() != null) {
541            ConnectionId connectionId = info.getConnectionId();
542            if (connectionId != null) {
543                ConnectionState cs = connectionStates.get(connectionId);
544                if (cs != null) {
545                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
546                    if (transactionState != null) {
547                        transactionState.addCommand(info);
548                        return new Tracked(new PrepareReadonlyTransactionAction(info));
549                    }
550                }
551            }
552        }
553        return null;
554    }
555
556    @Override
557    public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
558        if (trackTransactions && info != null && info.getTransactionId() != null) {
559            ConnectionId connectionId = info.getConnectionId();
560            if (connectionId != null) {
561                ConnectionState cs = connectionStates.get(connectionId);
562                if (cs != null) {
563                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
564                    if (transactionState != null) {
565                        transactionState.addCommand(info);
566                        return new Tracked(new RemoveTransactionAction(info));
567                    }
568                }
569            }
570        }
571        return null;
572    }
573
574    @Override
575    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
576        if (trackTransactions && info != null && info.getTransactionId() != null) {
577            ConnectionId connectionId = info.getConnectionId();
578            if (connectionId != null) {
579                ConnectionState cs = connectionStates.get(connectionId);
580                if (cs != null) {
581                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
582                    if (transactionState != null) {
583                        transactionState.addCommand(info);
584                        return new Tracked(new RemoveTransactionAction(info));
585                    }
586                }
587            }
588        }
589        return null;
590    }
591
592    @Override
593    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
594        if (trackTransactions && info != null && info.getTransactionId() != null) {
595            ConnectionId connectionId = info.getConnectionId();
596            if (connectionId != null) {
597                ConnectionState cs = connectionStates.get(connectionId);
598                if (cs != null) {
599                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
600                    if (transactionState != null) {
601                        transactionState.addCommand(info);
602                        return new Tracked(new RemoveTransactionAction(info));
603                    }
604                }
605            }
606        }
607        return null;
608    }
609
610    @Override
611    public Response processEndTransaction(TransactionInfo info) throws Exception {
612        if (trackTransactions && info != null && info.getTransactionId() != null) {
613            ConnectionId connectionId = info.getConnectionId();
614            if (connectionId != null) {
615                ConnectionState cs = connectionStates.get(connectionId);
616                if (cs != null) {
617                    TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
618                    if (transactionState != null) {
619                        transactionState.addCommand(info);
620                    }
621                }
622            }
623            return TRACKED_RESPONSE_MARKER;
624        }
625        return null;
626    }
627
628    @Override
629    public Response processMessagePull(MessagePull pull) throws Exception {
630        if (pull != null) {
631            // leave a single instance in the cache
632            final String id = pull.getDestination() + "::" + pull.getConsumerId();
633            if (messageCache.put(id.intern(), pull) == null) {
634                // Only marked as tracked if this is the first request we've seen.
635                pull.setTracked(true);
636            }
637        }
638        return null;
639    }
640
641    public boolean isRestoreConsumers() {
642        return restoreConsumers;
643    }
644
645    public void setRestoreConsumers(boolean restoreConsumers) {
646        this.restoreConsumers = restoreConsumers;
647    }
648
649    public boolean isRestoreProducers() {
650        return restoreProducers;
651    }
652
653    public void setRestoreProducers(boolean restoreProducers) {
654        this.restoreProducers = restoreProducers;
655    }
656
657    public boolean isRestoreSessions() {
658        return restoreSessions;
659    }
660
661    public void setRestoreSessions(boolean restoreSessions) {
662        this.restoreSessions = restoreSessions;
663    }
664
665    public boolean isTrackTransactions() {
666        return trackTransactions;
667    }
668
669    public void setTrackTransactions(boolean trackTransactions) {
670        this.trackTransactions = trackTransactions;
671    }
672
673    public boolean isTrackTransactionProducers() {
674        return this.trackTransactionProducers;
675    }
676
677    public void setTrackTransactionProducers(boolean trackTransactionProducers) {
678        this.trackTransactionProducers = trackTransactionProducers;
679    }
680
681    public boolean isRestoreTransaction() {
682        return restoreTransaction;
683    }
684
685    public void setRestoreTransaction(boolean restoreTransaction) {
686        this.restoreTransaction = restoreTransaction;
687    }
688
689    public boolean isTrackMessages() {
690        return trackMessages;
691    }
692
693    public void setTrackMessages(boolean trackMessages) {
694        this.trackMessages = trackMessages;
695    }
696
697    public int getMaxCacheSize() {
698        return maxCacheSize;
699    }
700
701    public void setMaxCacheSize(int maxCacheSize) {
702        this.maxCacheSize = maxCacheSize;
703    }
704
705    /**
706     * @return the current cache size for the Message and MessagePull Command cache.
707     */
708    public long getCurrentCacheSize() {
709        return this.currentCacheSize;
710    }
711
712    public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) {
713        ConnectionState connectionState = connectionStates.get(connectionId);
714        if (connectionState != null) {
715            connectionState.setConnectionInterruptProcessingComplete(true);
716            Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers();
717            for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) {
718                ConsumerControl control = new ConsumerControl();
719                control.setConsumerId(entry.getKey());
720                control.setPrefetch(entry.getValue().getPrefetchSize());
721                control.setDestination(entry.getValue().getDestination());
722                try {
723                    if (LOG.isDebugEnabled()) {
724                        LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch());
725                    }
726                    transport.oneway(control);
727                } catch (Exception ex) {
728                    if (LOG.isDebugEnabled()) {
729                        LOG.debug("Failed to submit control for consumer: " + control.getConsumerId()
730                                + " with: " + control.getPrefetch(), ex);
731                    }
732                }
733            }
734            stalledConsumers.clear();
735        }
736    }
737
738    public void transportInterrupted(ConnectionId connectionId) {
739        ConnectionState connectionState = connectionStates.get(connectionId);
740        if (connectionState != null) {
741            connectionState.setConnectionInterruptProcessingComplete(false);
742        }
743    }
744}