/*
 * Decompiled with CFR 0.152.
 */
package org.hornetq.core.server.cluster.impl;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.Bridge;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.MessageFlowRecord;
import org.hornetq.core.server.cluster.RemoteQueueBinding;
import org.hornetq.core.server.cluster.impl.ClusterConnectionBridge;
import org.hornetq.core.server.cluster.impl.RemoteQueueBindingImpl;
import org.hornetq.core.server.group.impl.Proposal;
import org.hornetq.core.server.group.impl.Response;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.core.server.management.Notification;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUID;

public class ClusterConnectionImpl
implements ClusterConnection {
    private static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
    private final ExecutorFactory executorFactory;
    private final HornetQServer server;
    private final PostOffice postOffice;
    private final ManagementService managementService;
    private final SimpleString name;
    private final SimpleString address;
    private final long retryInterval;
    private final boolean useDuplicateDetection;
    private final boolean routeWhenNoConsumers;
    private final Map<String, MessageFlowRecord> records = new HashMap<String, MessageFlowRecord>();
    private final ScheduledExecutorService scheduledExecutor;
    private final int maxHops;
    private final UUID nodeUUID;
    private boolean backup;
    private volatile boolean started;
    private final String clusterUser;
    private final String clusterPassword;
    private final ClusterConnector clusterConnector;
    private ServerLocatorInternal serverLocator;
    private final TransportConfiguration connector;
    private final boolean allowDirectConnectionsOnly;
    private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();

    public ClusterConnectionImpl(TransportConfiguration[] tcConfigs, TransportConfiguration connector, SimpleString name, SimpleString address, long retryInterval, boolean useDuplicateDetection, boolean routeWhenNoConsumers, int confirmationWindowSize, ExecutorFactory executorFactory, HornetQServer server, PostOffice postOffice, ManagementService managementService, ScheduledExecutorService scheduledExecutor, int maxHops, UUID nodeUUID, boolean backup, String clusterUser, String clusterPassword, boolean allowDirectConnectionsOnly) throws Exception {
        if (nodeUUID == null) {
            throw new IllegalArgumentException("node id is null");
        }
        this.nodeUUID = nodeUUID;
        this.connector = connector;
        this.name = name;
        this.address = address;
        this.retryInterval = retryInterval;
        this.useDuplicateDetection = useDuplicateDetection;
        this.routeWhenNoConsumers = routeWhenNoConsumers;
        this.executorFactory = executorFactory;
        this.server = server;
        this.postOffice = postOffice;
        this.managementService = managementService;
        this.scheduledExecutor = scheduledExecutor;
        this.maxHops = maxHops;
        this.backup = backup;
        this.clusterUser = clusterUser;
        this.clusterPassword = clusterPassword;
        this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
        this.clusterConnector = new StaticClusterConnector(tcConfigs);
        if (tcConfigs != null && tcConfigs.length > 0 && allowDirectConnectionsOnly) {
            this.allowableConnections.addAll(Arrays.asList(tcConfigs));
        }
    }

    public ClusterConnectionImpl(DiscoveryGroupConfiguration dg, TransportConfiguration connector, SimpleString name, SimpleString address, long retryInterval, boolean useDuplicateDetection, boolean routeWhenNoConsumers, int confirmationWindowSize, ExecutorFactory executorFactory, HornetQServer server, PostOffice postOffice, ManagementService managementService, ScheduledExecutorService scheduledExecutor, int maxHops, UUID nodeUUID, boolean backup, String clusterUser, String clusterPassword, boolean allowDirectConnectionsOnly) throws Exception {
        if (nodeUUID == null) {
            throw new IllegalArgumentException("node id is null");
        }
        this.nodeUUID = nodeUUID;
        this.connector = connector;
        this.name = name;
        this.address = address;
        this.retryInterval = retryInterval;
        this.useDuplicateDetection = useDuplicateDetection;
        this.routeWhenNoConsumers = routeWhenNoConsumers;
        this.executorFactory = executorFactory;
        this.server = server;
        this.postOffice = postOffice;
        this.managementService = managementService;
        this.scheduledExecutor = scheduledExecutor;
        this.maxHops = maxHops;
        this.backup = backup;
        this.clusterUser = clusterUser;
        this.clusterPassword = clusterPassword;
        this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
        this.clusterConnector = new DiscoveryClusterConnector(dg);
    }

    @Override
    public synchronized void start() throws Exception {
        if (this.started) {
            return;
        }
        this.started = true;
        if (!this.backup) {
            this.activate();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() throws Exception {
        if (!this.started) {
            return;
        }
        if (this.serverLocator != null) {
            this.serverLocator.removeClusterTopologyListener(this);
        }
        ClusterConnectionImpl clusterConnectionImpl = this;
        synchronized (clusterConnectionImpl) {
            for (MessageFlowRecord record : this.records.values()) {
                try {
                    record.close();
                }
                catch (Exception ignore) {}
            }
            if (this.managementService != null) {
                TypedProperties props = new TypedProperties();
                props.putSimpleStringProperty(new SimpleString("name"), this.name);
                Notification notification = new Notification(this.nodeUUID.toString(), NotificationType.CLUSTER_CONNECTION_STOPPED, props);
                this.managementService.sendNotification(notification);
            }
            if (this.serverLocator != null) {
                this.serverLocator.close();
                this.serverLocator = null;
            }
            this.started = false;
        }
    }

    @Override
    public boolean isStarted() {
        return this.started;
    }

    @Override
    public SimpleString getName() {
        return this.name;
    }

    @Override
    public String getNodeID() {
        return this.nodeUUID.toString();
    }

    @Override
    public synchronized Map<String, String> getNodes() {
        HashMap<String, String> nodes = new HashMap<String, String>();
        for (Map.Entry<String, MessageFlowRecord> record : this.records.entrySet()) {
            if (record.getValue().getBridge().getForwardingConnection() == null) continue;
            nodes.put(record.getKey(), record.getValue().getBridge().getForwardingConnection().getRemoteAddress());
        }
        return nodes;
    }

    @Override
    public synchronized void activate() throws Exception {
        if (!this.started) {
            return;
        }
        this.backup = false;
        this.serverLocator = this.clusterConnector.createServerLocator();
        if (this.serverLocator != null) {
            this.serverLocator.setNodeID(this.nodeUUID.toString());
            this.serverLocator.setReconnectAttempts(-1);
            this.serverLocator.setClusterConnection(true);
            this.serverLocator.setClusterTransportConfiguration(this.connector);
            this.serverLocator.setBackup(this.server.getConfiguration().isBackup());
            this.serverLocator.setInitialConnectAttempts(-1);
            this.serverLocator.setConfirmationWindowSize(0);
            if (this.retryInterval > 0L) {
                this.serverLocator.setRetryInterval(this.retryInterval);
            }
            this.serverLocator.addClusterTopologyListener(this);
            this.serverLocator.start(this.server.getExecutorFactory().getExecutor());
        }
        if (this.managementService != null) {
            TypedProperties props = new TypedProperties();
            props.putSimpleStringProperty(new SimpleString("name"), this.name);
            Notification notification = new Notification(this.nodeUUID.toString(), NotificationType.CLUSTER_CONNECTION_STARTED, props);
            this.managementService.sendNotification(notification);
        }
    }

    @Override
    public TransportConfiguration getConnector() {
        return this.connector;
    }

    @Override
    public synchronized void nodeDown(String nodeID) {
        if (nodeID.equals(this.nodeUUID.toString())) {
            return;
        }
        MessageFlowRecord record = this.records.get(nodeID);
        if (record != null) {
            try {
                record.reset();
            }
            catch (Exception e) {
                log.error("Failed to close flow record", e);
            }
        }
        this.server.getClusterManager().notifyNodeDown(nodeID);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last) {
        if (nodeID.equals(this.nodeUUID.toString())) {
            if (connectorPair.b != null) {
                this.server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
            }
            return;
        }
        this.server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
        if (this.allowDirectConnectionsOnly && !this.allowableConnections.contains(connectorPair.a)) {
            return;
        }
        if (this.serverLocator == null) {
            return;
        }
        if (connectorPair.a == null) {
            return;
        }
        Map<String, MessageFlowRecord> map = this.records;
        synchronized (map) {
            try {
                MessageFlowRecord record = this.records.get(nodeID);
                if (record == null) {
                    SimpleString queueName = new SimpleString("sf." + this.name + "." + nodeID);
                    Binding queueBinding = this.postOffice.getBinding(queueName);
                    Queue queue = queueBinding != null ? (Queue)queueBinding.getBindable() : this.server.createQueue(queueName, queueName, null, true, false);
                    this.createNewRecord(nodeID, (TransportConfiguration)connectorPair.a, queueName, queue, true);
                }
            }
            catch (Exception e) {
                log.error("Failed to update topology", e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void nodeAnnounced(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair) {
        if (nodeID.equals(this.nodeUUID.toString())) {
            return;
        }
        if (this.allowDirectConnectionsOnly && !this.allowableConnections.contains(connectorPair.a)) {
            return;
        }
        if (this.serverLocator == null) {
            return;
        }
        if (connectorPair.a == null) {
            return;
        }
        Map<String, MessageFlowRecord> map = this.records;
        synchronized (map) {
            try {
                MessageFlowRecord record = this.records.get(nodeID);
                if (record == null) {
                    SimpleString queueName = new SimpleString("sf." + this.name + "." + nodeID);
                    Binding queueBinding = this.postOffice.getBinding(queueName);
                    Queue queue = queueBinding != null ? (Queue)queueBinding.getBindable() : this.server.createQueue(queueName, queueName, null, true, false);
                    this.createNewRecord(nodeID, (TransportConfiguration)connectorPair.a, queueName, queue, true);
                }
            }
            catch (Exception e) {
                log.error("Failed to update topology", e);
            }
        }
    }

    private void createNewRecord(String nodeID, TransportConfiguration connector, SimpleString queueName, Queue queue, boolean start) throws Exception {
        MessageFlowRecordImpl record = new MessageFlowRecordImpl(queue);
        ClusterConnectionBridge bridge = new ClusterConnectionBridge(this.serverLocator, this.nodeUUID, nodeID, queueName, queue, this.executorFactory.getExecutor(), null, null, this.scheduledExecutor, null, this.useDuplicateDetection, this.clusterUser, this.clusterPassword, !this.backup, this.server.getStorageManager(), this.managementService.getManagementAddress(), this.managementService.getManagementNotificationAddress(), record, connector);
        record.setBridge(bridge);
        this.records.put(nodeID, record);
        if (start) {
            bridge.start();
        }
    }

    @Override
    public void handleReplicatedAddBinding(SimpleString address, SimpleString uniqueName, SimpleString routingName, long queueID, SimpleString filterString, SimpleString queueName, int distance) throws Exception {
        Binding queueBinding = this.postOffice.getBinding(queueName);
        if (queueBinding == null) {
            throw new IllegalStateException("Cannot find s & f queue " + queueName);
        }
        Queue queue = (Queue)queueBinding.getBindable();
        RemoteQueueBindingImpl binding = new RemoteQueueBindingImpl(this.server.getStorageManager().generateUniqueID(), address, uniqueName, routingName, queueID, filterString, queue, queueName, distance);
        if (this.postOffice.getBinding(uniqueName) != null) {
            log.warn("Remoting queue binding " + uniqueName + " has already been bound in the post office. Most likely cause for this is you have a loop " + "in your cluster due to cluster max-hops being too large or you have multiple cluster connections to the same nodes using overlapping addresses");
            return;
        }
        this.postOffice.addBinding(binding);
        Bindings theBindings = this.postOffice.getBindingsForAddress(address);
        theBindings.setRouteWhenNoConsumers(this.routeWhenNoConsumers);
    }

    public Map<String, MessageFlowRecord> getRecords() {
        return this.records;
    }

    @Override
    public String description() {
        String out = this.name + " connected to\n";
        for (Map.Entry<String, MessageFlowRecord> messageFlow : this.records.entrySet()) {
            String nodeID = messageFlow.getKey();
            Bridge bridge = messageFlow.getValue().getBridge();
            out = out + "\t" + nodeID + " -- " + bridge.isStarted() + "\n";
        }
        return out;
    }

    private class DiscoveryClusterConnector
    implements ClusterConnector {
        private final DiscoveryGroupConfiguration dg;

        public DiscoveryClusterConnector(DiscoveryGroupConfiguration dg) {
            this.dg = dg;
        }

        @Override
        public ServerLocatorInternal createServerLocator() {
            return (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(this.dg);
        }
    }

    private class StaticClusterConnector
    implements ClusterConnector {
        private final TransportConfiguration[] tcConfigs;

        public StaticClusterConnector(TransportConfiguration[] tcConfigs) {
            this.tcConfigs = tcConfigs;
        }

        @Override
        public ServerLocatorInternal createServerLocator() {
            if (this.tcConfigs != null && this.tcConfigs.length > 0) {
                return (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(this.tcConfigs);
            }
            return null;
        }
    }

    static interface ClusterConnector {
        public ServerLocatorInternal createServerLocator();
    }

    private class MessageFlowRecordImpl
    implements MessageFlowRecord {
        private Bridge bridge;
        private final Queue queue;
        private final Map<SimpleString, RemoteQueueBinding> bindings = new HashMap<SimpleString, RemoteQueueBinding>();
        private volatile boolean firstReset = false;

        public MessageFlowRecordImpl(Queue queue) {
            this.queue = queue;
        }

        @Override
        public String getAddress() {
            return ClusterConnectionImpl.this.address.toString();
        }

        @Override
        public int getMaxHops() {
            return ClusterConnectionImpl.this.maxHops;
        }

        @Override
        public void close() throws Exception {
            this.bridge.stop();
            this.clearBindings();
        }

        @Override
        public void reset() throws Exception {
            this.clearBindings();
        }

        public void setBridge(Bridge bridge) {
            this.bridge = bridge;
        }

        @Override
        public Bridge getBridge() {
            return this.bridge;
        }

        @Override
        public synchronized void onMessage(ClientMessage message) {
            try {
                if (message.containsProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA)) {
                    this.clearBindings();
                    this.firstReset = true;
                    return;
                }
                if (!this.firstReset) {
                    return;
                }
                SimpleString type = message.getSimpleStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE);
                NotificationType ntype = NotificationType.valueOf(type.toString());
                switch (ntype) {
                    case BINDING_ADDED: {
                        this.doBindingAdded(message);
                        break;
                    }
                    case BINDING_REMOVED: {
                        this.doBindingRemoved(message);
                        break;
                    }
                    case CONSUMER_CREATED: {
                        this.doConsumerCreated(message);
                        break;
                    }
                    case CONSUMER_CLOSED: {
                        this.doConsumerClosed(message);
                        break;
                    }
                    case PROPOSAL: {
                        this.doProposalReceived(message);
                        break;
                    }
                    case PROPOSAL_RESPONSE: {
                        this.doProposalResponseReceived(message);
                        break;
                    }
                    default: {
                        throw new IllegalArgumentException("Invalid type " + (Object)((Object)ntype));
                    }
                }
            }
            catch (Exception e) {
                log.error("Failed to handle message", e);
            }
        }

        private synchronized void doProposalReceived(ClientMessage message) throws Exception {
            if (!message.containsProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID)) {
                throw new IllegalStateException("proposal type is null");
            }
            SimpleString type = message.getSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID);
            SimpleString val = message.getSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
            Integer hops = message.getIntProperty(ManagementHelper.HDR_DISTANCE);
            Response response = ClusterConnectionImpl.this.server.getGroupingHandler().receive(new Proposal(type, val), hops + 1);
            if (response != null) {
                ClusterConnectionImpl.this.server.getGroupingHandler().send(response, 0);
            }
        }

        private synchronized void doProposalResponseReceived(ClientMessage message) throws Exception {
            if (!message.containsProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID)) {
                throw new IllegalStateException("proposal type is null");
            }
            SimpleString type = message.getSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID);
            SimpleString val = message.getSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
            SimpleString alt = message.getSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE);
            Integer hops = message.getIntProperty(ManagementHelper.HDR_DISTANCE);
            Response response = new Response(type, val, alt);
            ClusterConnectionImpl.this.server.getGroupingHandler().proposed(response);
            ClusterConnectionImpl.this.server.getGroupingHandler().send(response, hops + 1);
        }

        private synchronized void clearBindings() throws Exception {
            for (RemoteQueueBinding binding : new HashSet<RemoteQueueBinding>(this.bindings.values())) {
                this.removeBinding(binding.getClusterName());
            }
        }

        private synchronized void doBindingAdded(ClientMessage message) throws Exception {
            if (!message.containsProperty(ManagementHelper.HDR_DISTANCE)) {
                throw new IllegalStateException("distance is null");
            }
            if (!message.containsProperty(ManagementHelper.HDR_ADDRESS)) {
                throw new IllegalStateException("queueAddress is null");
            }
            if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
                throw new IllegalStateException("clusterName is null");
            }
            if (!message.containsProperty(ManagementHelper.HDR_ROUTING_NAME)) {
                throw new IllegalStateException("routingName is null");
            }
            if (!message.containsProperty(ManagementHelper.HDR_BINDING_ID)) {
                throw new IllegalStateException("queueID is null");
            }
            Integer distance = message.getIntProperty(ManagementHelper.HDR_DISTANCE);
            SimpleString queueAddress = message.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS);
            SimpleString clusterName = message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
            SimpleString routingName = message.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME);
            SimpleString filterString = message.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
            Long queueID = message.getLongProperty(ManagementHelper.HDR_BINDING_ID);
            RemoteQueueBindingImpl binding = new RemoteQueueBindingImpl(ClusterConnectionImpl.this.server.getStorageManager().generateUniqueID(), queueAddress, clusterName, routingName, queueID, filterString, this.queue, this.bridge.getName(), distance + 1);
            if (ClusterConnectionImpl.this.postOffice.getBinding(clusterName) != null) {
                log.warn("Remote queue binding " + clusterName + " has already been bound in the post office. Most likely cause for this is you have a loop " + "in your cluster due to cluster max-hops being too large or you have multiple cluster connections to the same nodes using overlapping addresses");
                return;
            }
            this.bindings.put(clusterName, binding);
            try {
                ClusterConnectionImpl.this.postOffice.addBinding(binding);
            }
            catch (Exception ignore) {
                // empty catch block
            }
            Bindings theBindings = ClusterConnectionImpl.this.postOffice.getBindingsForAddress(queueAddress);
            theBindings.setRouteWhenNoConsumers(ClusterConnectionImpl.this.routeWhenNoConsumers);
        }

        private void doBindingRemoved(ClientMessage message) throws Exception {
            if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
                throw new IllegalStateException("clusterName is null");
            }
            SimpleString clusterName = message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
            this.removeBinding(clusterName);
        }

        private synchronized void removeBinding(SimpleString clusterName) throws Exception {
            RemoteQueueBinding binding = this.bindings.remove(clusterName);
            if (binding == null) {
                throw new IllegalStateException("Cannot find binding for queue " + clusterName);
            }
            ClusterConnectionImpl.this.postOffice.removeBinding(binding.getUniqueName());
        }

        private synchronized void doConsumerCreated(ClientMessage message) throws Exception {
            if (!message.containsProperty(ManagementHelper.HDR_DISTANCE)) {
                throw new IllegalStateException("distance is null");
            }
            if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
                throw new IllegalStateException("clusterName is null");
            }
            Integer distance = message.getIntProperty(ManagementHelper.HDR_DISTANCE);
            SimpleString clusterName = message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
            message.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
            SimpleString filterString = message.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
            RemoteQueueBinding binding = this.bindings.get(clusterName);
            if (binding == null) {
                throw new IllegalStateException("Cannot find binding for " + clusterName);
            }
            binding.addConsumer(filterString);
            TypedProperties props = new TypedProperties();
            props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
            props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName);
            props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
            props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
            Queue theQueue = (Queue)binding.getBindable();
            props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
            if (filterString != null) {
                props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
            }
            Notification notification = new Notification(null, NotificationType.CONSUMER_CREATED, props);
            ClusterConnectionImpl.this.managementService.sendNotification(notification);
        }

        private synchronized void doConsumerClosed(ClientMessage message) throws Exception {
            if (!message.containsProperty(ManagementHelper.HDR_DISTANCE)) {
                throw new IllegalStateException("distance is null");
            }
            if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
                throw new IllegalStateException("clusterName is null");
            }
            Integer distance = message.getIntProperty(ManagementHelper.HDR_DISTANCE);
            SimpleString clusterName = message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
            message.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
            SimpleString filterString = message.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
            RemoteQueueBinding binding = this.bindings.get(clusterName);
            if (binding == null) {
                throw new IllegalStateException("Cannot find binding for " + clusterName);
            }
            binding.removeConsumer(filterString);
            TypedProperties props = new TypedProperties();
            props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
            props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName);
            props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
            props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
            Queue theQueue = (Queue)binding.getBindable();
            props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
            if (filterString != null) {
                props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
            }
            Notification notification = new Notification(null, NotificationType.CONSUMER_CLOSED, props);
            ClusterConnectionImpl.this.managementService.sendNotification(notification);
        }
    }
}

