/*
 * Decompiled with CFR 0.152.
 */
package org.hornetq.core.server.cluster.impl;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.TopologyMember;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.AfterConnectInternalListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.client.impl.TopologyMemberImpl;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.server.HornetQMessageBundle;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServerLogger;
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.Bridge;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.cluster.MessageFlowRecord;
import org.hornetq.core.server.cluster.RemoteQueueBinding;
import org.hornetq.core.server.cluster.impl.BridgeImpl;
import org.hornetq.core.server.cluster.impl.ClusterConnectionBridge;
import org.hornetq.core.server.cluster.impl.RemoteQueueBindingImpl;
import org.hornetq.core.server.group.impl.Proposal;
import org.hornetq.core.server.group.impl.Response;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.core.server.management.Notification;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.FutureLatch;
import org.hornetq.utils.TypedProperties;

public final class ClusterConnectionImpl
implements ClusterConnection,
AfterConnectInternalListener {
    private static final boolean isTrace = HornetQServerLogger.LOGGER.isTraceEnabled();
    private final ExecutorFactory executorFactory;
    private final Executor executor;
    private final HornetQServer server;
    private final PostOffice postOffice;
    private final ManagementService managementService;
    private final SimpleString name;
    private final SimpleString address;
    private final long clientFailureCheckPeriod;
    private final long connectionTTL;
    private final long retryInterval;
    private final long callTimeout;
    private final long callFailoverTimeout;
    private final double retryIntervalMultiplier;
    private final long maxRetryInterval;
    private final int reconnectAttempts;
    private final boolean useDuplicateDetection;
    private final boolean routeWhenNoConsumers;
    private final int confirmationWindowSize;
    private final Object recordsGuard = new Object();
    private final Map<String, MessageFlowRecord> records = new ConcurrentHashMap<String, MessageFlowRecord>();
    private final ScheduledExecutorService scheduledExecutor;
    private final int maxHops;
    private final NodeManager nodeManager;
    private boolean backup;
    private volatile boolean started;
    private final String clusterUser;
    private final String clusterPassword;
    private final ClusterConnector clusterConnector;
    private ServerLocatorInternal serverLocator;
    private final TransportConfiguration connector;
    private final boolean allowDirectConnectionsOnly;
    private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
    private final ClusterManager manager;
    private final int minLargeMessageSize;
    private final Topology topology = new Topology((Object)this);
    private volatile ServerLocatorInternal backupServerLocator;
    private volatile boolean announcingBackup;
    private volatile boolean stopping = false;

    public ClusterConnectionImpl(ClusterManager manager, TransportConfiguration[] staticTranspConfigs, TransportConfiguration connector, SimpleString name, SimpleString address, int minLargeMessageSize, long clientFailureCheckPeriod, long connectionTTL, long retryInterval, double retryIntervalMultiplier, long maxRetryInterval, int reconnectAttempts, long callTimeout, long callFailoverTimeout, boolean useDuplicateDetection, boolean routeWhenNoConsumers, int confirmationWindowSize, ExecutorFactory executorFactory, HornetQServer server, PostOffice postOffice, ManagementService managementService, ScheduledExecutorService scheduledExecutor, int maxHops, NodeManager nodeManager, boolean backup, String clusterUser, String clusterPassword, boolean allowDirectConnectionsOnly) throws Exception {
        this.nodeManager = nodeManager;
        this.connector = connector;
        this.name = name;
        this.address = address;
        this.clientFailureCheckPeriod = clientFailureCheckPeriod;
        this.connectionTTL = connectionTTL;
        this.retryInterval = retryInterval;
        this.retryIntervalMultiplier = retryIntervalMultiplier;
        this.maxRetryInterval = maxRetryInterval;
        this.reconnectAttempts = reconnectAttempts;
        this.useDuplicateDetection = useDuplicateDetection;
        this.routeWhenNoConsumers = routeWhenNoConsumers;
        this.confirmationWindowSize = confirmationWindowSize;
        this.executorFactory = executorFactory;
        this.executor = executorFactory.getExecutor();
        this.topology.setExecutor(this.executor);
        this.server = server;
        this.postOffice = postOffice;
        this.managementService = managementService;
        this.scheduledExecutor = scheduledExecutor;
        this.maxHops = maxHops;
        this.backup = backup;
        this.clusterUser = clusterUser;
        this.clusterPassword = clusterPassword;
        this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
        this.manager = manager;
        this.callTimeout = callTimeout;
        this.callFailoverTimeout = callFailoverTimeout;
        this.minLargeMessageSize = minLargeMessageSize;
        this.clusterConnector = new StaticClusterConnector(staticTranspConfigs);
        this.setUpBackupLocator();
        if (staticTranspConfigs != null && staticTranspConfigs.length > 0 && allowDirectConnectionsOnly) {
            this.allowableConnections.addAll(Arrays.asList(staticTranspConfigs));
        }
    }

    private void setUpBackupLocator() {
        this.backupServerLocator = this.clusterConnector.createServerLocator();
        if (this.backupServerLocator != null) {
            this.backupServerLocator.setIdentity("backupLocatorFor='" + this.server + "'");
            this.backupServerLocator.setReconnectAttempts(-1);
            this.backupServerLocator.setInitialConnectAttempts(-1);
        }
    }

    public ClusterConnectionImpl(ClusterManager manager, DiscoveryGroupConfiguration dg, TransportConfiguration connector, SimpleString name, SimpleString address, int minLargeMessageSize, long clientFailureCheckPeriod, long connectionTTL, long retryInterval, double retryIntervalMultiplier, long maxRetryInterval, int reconnectAttempts, long callTimeout, long callFailoverTimeout, boolean useDuplicateDetection, boolean routeWhenNoConsumers, int confirmationWindowSize, ExecutorFactory executorFactory, HornetQServer server, PostOffice postOffice, ManagementService managementService, ScheduledExecutorService scheduledExecutor, int maxHops, NodeManager nodeManager, boolean backup, String clusterUser, String clusterPassword, boolean allowDirectConnectionsOnly) throws Exception {
        this.nodeManager = nodeManager;
        this.connector = connector;
        this.name = name;
        this.address = address;
        this.clientFailureCheckPeriod = clientFailureCheckPeriod;
        this.connectionTTL = connectionTTL;
        this.retryInterval = retryInterval;
        this.retryIntervalMultiplier = retryIntervalMultiplier;
        this.maxRetryInterval = maxRetryInterval;
        this.minLargeMessageSize = minLargeMessageSize;
        this.reconnectAttempts = reconnectAttempts;
        this.callTimeout = callTimeout;
        this.callFailoverTimeout = callFailoverTimeout;
        this.useDuplicateDetection = useDuplicateDetection;
        this.routeWhenNoConsumers = routeWhenNoConsumers;
        this.confirmationWindowSize = confirmationWindowSize;
        this.executorFactory = executorFactory;
        this.executor = executorFactory.getExecutor();
        this.topology.setExecutor(this.executor);
        this.server = server;
        this.postOffice = postOffice;
        this.managementService = managementService;
        this.scheduledExecutor = scheduledExecutor;
        this.maxHops = maxHops;
        this.backup = backup;
        this.clusterUser = clusterUser;
        this.clusterPassword = clusterPassword;
        this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
        this.clusterConnector = new DiscoveryClusterConnector(dg);
        this.setUpBackupLocator();
        this.manager = manager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws Exception {
        ClusterConnectionImpl clusterConnectionImpl = this;
        synchronized (clusterConnectionImpl) {
            if (this.started) {
                return;
            }
            this.stopping = false;
            this.started = true;
            if (!this.backup) {
                this.activate();
            }
        }
    }

    @Override
    public void flushExecutor() {
        FutureLatch future = new FutureLatch();
        this.executor.execute((Runnable)future);
        if (!future.await(10000L)) {
            this.server.threadDump("Couldn't finish executor on " + this);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() throws Exception {
        if (!this.started) {
            return;
        }
        this.stopping = true;
        if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
            HornetQServerLogger.LOGGER.debug(this + "::stopping ClusterConnection");
        }
        if (this.serverLocator != null) {
            this.serverLocator.removeClusterTopologyListener((ClusterTopologyListener)this);
        }
        HornetQServerLogger.LOGGER.debug("Cluster connection being stopped for node" + this.nodeManager.getNodeId() + ", server = " + this.server + " serverLocator = " + this.serverLocator);
        ClusterConnectionImpl clusterConnectionImpl = this;
        synchronized (clusterConnectionImpl) {
            for (MessageFlowRecord record : this.records.values()) {
                try {
                    record.close();
                }
                catch (Exception ignore) {}
            }
        }
        if (this.managementService != null) {
            TypedProperties props = new TypedProperties();
            props.putSimpleStringProperty(new SimpleString("name"), this.name);
            Notification notification = new Notification(this.nodeManager.getNodeId().toString(), NotificationType.CLUSTER_CONNECTION_STOPPED, props);
            this.managementService.sendNotification(notification);
        }
        if (this.announcingBackup) {
            this.closeLocator(this.backupServerLocator);
        }
        this.executor.execute(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                ClusterConnectionImpl clusterConnectionImpl = ClusterConnectionImpl.this;
                synchronized (clusterConnectionImpl) {
                    ClusterConnectionImpl.this.closeLocator(ClusterConnectionImpl.this.backupServerLocator);
                    ClusterConnectionImpl.this.backupServerLocator = null;
                    ClusterConnectionImpl.this.closeLocator(ClusterConnectionImpl.this.serverLocator);
                    ClusterConnectionImpl.this.serverLocator = null;
                }
            }
        });
        this.started = false;
    }

    private void closeLocator(ServerLocatorInternal locator) {
        if (locator != null) {
            locator.close();
        }
    }

    @Override
    public void announceBackup() {
        this.executor.execute(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                if (ClusterConnectionImpl.this.stopping) {
                    return;
                }
                try {
                    ServerLocatorInternal localBackupLocator = ClusterConnectionImpl.this.backupServerLocator;
                    if (localBackupLocator == null) {
                        if (!ClusterConnectionImpl.this.stopping) {
                            HornetQServerLogger.LOGGER.error("Error announcing backup: backupServerLocator is null. " + this);
                        }
                        return;
                    }
                    if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
                        HornetQServerLogger.LOGGER.debug(ClusterConnectionImpl.this + ":: announcing " + ClusterConnectionImpl.this.connector + " to " + ClusterConnectionImpl.this.backupServerLocator);
                    }
                    ClusterConnectionImpl.this.announcingBackup = true;
                    ClientSessionFactoryInternal backupSessionFactory = localBackupLocator.connect();
                    if (backupSessionFactory != null) {
                        backupSessionFactory.getConnection().getChannel(0L, -1).send((Packet)new NodeAnnounceMessage(System.currentTimeMillis(), ClusterConnectionImpl.this.nodeManager.getNodeId().toString(), ClusterConnectionImpl.this.manager.getNodeGroupName(), true, ClusterConnectionImpl.this.connector, null));
                        HornetQServerLogger.LOGGER.backupAnnounced();
                    }
                }
                catch (RejectedExecutionException e) {
                }
                catch (Exception e) {
                    if (ClusterConnectionImpl.this.scheduledExecutor.isShutdown()) {
                        return;
                    }
                    if (ClusterConnectionImpl.this.stopping) {
                        return;
                    }
                    HornetQServerLogger.LOGGER.errorAnnouncingBackup();
                    ClusterConnectionImpl.this.scheduledExecutor.schedule(new Runnable(){

                        @Override
                        public void run() {
                            ClusterConnectionImpl.this.announceBackup();
                        }
                    }, ClusterConnectionImpl.this.retryInterval, TimeUnit.MILLISECONDS);
                }
                finally {
                    ClusterConnectionImpl.this.announcingBackup = false;
                }
            }
        });
    }

    private TopologyMember getLocalMember() {
        return this.topology.getMember(this.manager.getNodeId());
    }

    @Override
    public void addClusterTopologyListener(ClusterTopologyListener listener) {
        this.topology.addClusterTopologyListener(listener);
        this.topology.sendTopology(listener);
    }

    @Override
    public void removeClusterTopologyListener(ClusterTopologyListener listener) {
        this.topology.removeClusterTopologyListener(listener);
    }

    @Override
    public Topology getTopology() {
        return this.topology;
    }

    @Override
    public void nodeAnnounced(long uniqueEventID, String nodeID, String nodeName, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup) {
        if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
            HornetQServerLogger.LOGGER.debug(this + "::NodeAnnounced, backup=" + backup + nodeID + connectorPair);
        }
        TransportConfiguration live = (TransportConfiguration)connectorPair.getA();
        TransportConfiguration backupTC = (TransportConfiguration)connectorPair.getB();
        TopologyMemberImpl newMember = new TopologyMemberImpl(nodeID, nodeName, live, backupTC);
        newMember.setUniqueEventID(uniqueEventID);
        if (backup) {
            this.topology.updateBackup(new TopologyMemberImpl(nodeID, nodeName, live, backupTC));
        } else {
            this.topology.updateMember(uniqueEventID, nodeID, newMember);
        }
    }

    public void onConnection(ClientSessionFactoryInternal sf) {
        TopologyMember localMember = this.getLocalMember();
        if (localMember != null) {
            sf.sendNodeAnnounce(localMember.getUniqueEventID(), this.manager.getNodeId(), this.manager.getNodeGroupName(), false, localMember.getLive(), localMember.getBackup());
        } else {
            HornetQServerLogger.LOGGER.noLocalMemborOnClusterConnection(this);
        }
    }

    public boolean isStarted() {
        return this.started;
    }

    @Override
    public SimpleString getName() {
        return this.name;
    }

    @Override
    public String getNodeID() {
        return this.nodeManager.getNodeId().toString();
    }

    @Override
    public HornetQServer getServer() {
        return this.server;
    }

    @Override
    public boolean isNodeActive(String nodeId) {
        MessageFlowRecord rec = this.records.get(nodeId);
        if (rec == null) {
            return false;
        }
        return rec.getBridge().isConnected();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<String, String> getNodes() {
        Object object = this.recordsGuard;
        synchronized (object) {
            HashMap<String, String> nodes = new HashMap<String, String>();
            for (Map.Entry<String, MessageFlowRecord> entry : this.records.entrySet()) {
                RemotingConnection fwdConnection = entry.getValue().getBridge().getForwardingConnection();
                if (fwdConnection == null) continue;
                nodes.put(entry.getKey(), fwdConnection.getRemoteAddress());
            }
            return nodes;
        }
    }

    @Override
    public synchronized void activate() throws Exception {
        if (!this.started) {
            return;
        }
        if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
            HornetQServerLogger.LOGGER.debug("Activating cluster connection nodeID=" + this.nodeManager.getNodeId() + " for server=" + this.server);
        }
        this.backup = false;
        this.topology.updateAsLive(this.manager.getNodeId(), new TopologyMemberImpl(this.manager.getNodeId(), this.manager.getNodeGroupName(), this.connector, null));
        if (this.backupServerLocator != null) {
            try {
                this.backupServerLocator.close();
            }
            catch (Exception e) {
                HornetQServerLogger.LOGGER.errorClosingBackupFactoryOnClusterConnection(e);
            }
            this.backupServerLocator = null;
        }
        this.serverLocator = this.clusterConnector.createServerLocator();
        if (this.serverLocator != null) {
            TopologyMemberImpl currentMember;
            if (!this.useDuplicateDetection) {
                HornetQServerLogger.LOGGER.debug("DuplicateDetection is disabled, sending clustered messages blocked");
            }
            if ((currentMember = this.topology.getMember(this.manager.getNodeId())) == null) {
                throw new IllegalStateException("InternalError! The ClusterConnection doesn't know about its own node = " + this);
            }
            this.serverLocator.setNodeID(this.nodeManager.getNodeId().toString());
            this.serverLocator.setIdentity("(main-ClusterConnection::" + this.server.toString() + ")");
            this.serverLocator.setReconnectAttempts(0);
            this.serverLocator.setClusterConnection(true);
            this.serverLocator.setClusterTransportConfiguration(this.connector);
            this.serverLocator.setInitialConnectAttempts(-1);
            this.serverLocator.setClientFailureCheckPeriod(this.clientFailureCheckPeriod);
            this.serverLocator.setConnectionTTL(this.connectionTTL);
            this.serverLocator.setConfirmationWindowSize(this.confirmationWindowSize);
            this.serverLocator.setBlockOnDurableSend(!this.useDuplicateDetection);
            this.serverLocator.setBlockOnNonDurableSend(!this.useDuplicateDetection);
            this.serverLocator.setCallTimeout(this.callTimeout);
            this.serverLocator.setProducerWindowSize(-1);
            if (this.retryInterval > 0L) {
                this.serverLocator.setRetryInterval(this.retryInterval);
            }
            this.addClusterTopologyListener(this);
            this.serverLocator.setAfterConnectionInternalListener((AfterConnectInternalListener)this);
            this.serverLocator.start(this.server.getExecutorFactory().getExecutor());
        }
        if (this.managementService != null) {
            TypedProperties props = new TypedProperties();
            props.putSimpleStringProperty(new SimpleString("name"), this.name);
            Notification notification = new Notification(this.nodeManager.getNodeId().toString(), NotificationType.CLUSTER_CONNECTION_STARTED, props);
            HornetQServerLogger.LOGGER.debug("sending notification: " + notification);
            this.managementService.sendNotification(notification);
        }
    }

    @Override
    public TransportConfiguration getConnector() {
        return this.connector;
    }

    public void nodeDown(long eventUID, String nodeID) {
        if (this.stopping) {
            return;
        }
        if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
            HornetQServerLogger.LOGGER.debug(this + " receiving nodeDown for nodeID=" + nodeID, new Exception("trace"));
        }
        if (nodeID.equals(this.nodeManager.getNodeId().toString())) {
            return;
        }
        MessageFlowRecord record = this.records.remove(nodeID);
        if (record != null) {
            try {
                if (isTrace) {
                    HornetQServerLogger.LOGGER.trace("Closing clustering record " + record);
                }
                record.close();
            }
            catch (Exception e) {
                HornetQServerLogger.LOGGER.errorClosingFlowRecord(e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void nodeUP(TopologyMember topologyMember, boolean last) {
        if (this.stopping) {
            return;
        }
        String nodeID = topologyMember.getNodeId();
        if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
            String ClusterTestBase = "receiving nodeUP for nodeID=";
            HornetQServerLogger.LOGGER.debug(this + ClusterTestBase + nodeID + " connectionPair=" + topologyMember);
        }
        if (nodeID.equals(this.nodeManager.getNodeId().toString())) {
            if (HornetQServerLogger.LOGGER.isTraceEnabled()) {
                HornetQServerLogger.LOGGER.trace(this + "::informing about backup to itself, nodeUUID=" + this.nodeManager.getNodeId() + ", connectorPair=" + topologyMember + ", this = " + this);
            }
            return;
        }
        if (this.allowDirectConnectionsOnly && !this.allowableConnections.contains(topologyMember.getLive())) {
            return;
        }
        if (this.serverLocator == null) {
            return;
        }
        if (topologyMember.getLive() == null) {
            if (isTrace) {
                HornetQServerLogger.LOGGER.trace(this + " ignoring call with nodeID=" + nodeID + ", topologyMember=" + topologyMember + ", last=" + last);
            }
            return;
        }
        Object object = this.recordsGuard;
        synchronized (object) {
            try {
                MessageFlowRecord record = this.records.get(nodeID);
                if (record == null) {
                    SimpleString queueName;
                    Binding queueBinding;
                    if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
                        HornetQServerLogger.LOGGER.debug(this + "::Creating record for nodeID=" + nodeID + ", topologyMember=" + topologyMember);
                    }
                    Queue queue = (queueBinding = this.postOffice.getBinding(queueName = new SimpleString("sf." + this.name + "." + nodeID))) != null ? (Queue)queueBinding.getBindable() : this.server.createQueue(queueName, queueName, null, true, false);
                    this.createNewRecord(topologyMember.getUniqueEventID(), nodeID, topologyMember.getLive(), queueName, queue, true);
                } else if (isTrace) {
                    HornetQServerLogger.LOGGER.trace(this + " ignored nodeUp record for " + topologyMember + " on nodeID=" + nodeID + " as the record already existed");
                }
            }
            catch (Exception e) {
                HornetQServerLogger.LOGGER.errorUpdatingTopology(e);
            }
        }
    }

    @Override
    public synchronized void informTopology() {
        String nodeID = this.server.getNodeID().toString();
        TopologyMemberImpl localMember = this.backup ? new TopologyMemberImpl(nodeID, null, null, this.connector) : new TopologyMemberImpl(nodeID, null, this.connector, null);
        this.topology.updateAsLive(nodeID, localMember);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createNewRecord(long eventUID, String targetNodeID, TransportConfiguration connector, SimpleString queueName, Queue queue, boolean start) throws Exception {
        String nodeId;
        ServerLocatorImpl targetLocator = new ServerLocatorImpl(this.topology, true, new TransportConfiguration[]{connector});
        ClusterConnectionImpl clusterConnectionImpl = this;
        synchronized (clusterConnectionImpl) {
            if (!this.started) {
                return;
            }
            if (this.serverLocator == null) {
                return;
            }
            nodeId = this.serverLocator.getNodeID();
        }
        targetLocator.setReconnectAttempts(0);
        targetLocator.setInitialConnectAttempts(0);
        targetLocator.setClientFailureCheckPeriod(this.clientFailureCheckPeriod);
        targetLocator.setConnectionTTL(this.connectionTTL);
        targetLocator.setInitialConnectAttempts(0);
        targetLocator.setConfirmationWindowSize(this.confirmationWindowSize);
        targetLocator.setBlockOnDurableSend(!this.useDuplicateDetection);
        targetLocator.setBlockOnNonDurableSend(!this.useDuplicateDetection);
        targetLocator.setRetryInterval(this.retryInterval);
        targetLocator.setMaxRetryInterval(this.maxRetryInterval);
        targetLocator.setRetryIntervalMultiplier(this.retryIntervalMultiplier);
        targetLocator.setMinLargeMessageSize(this.minLargeMessageSize);
        targetLocator.setProducerWindowSize(-1);
        targetLocator.setAfterConnectionInternalListener((AfterConnectInternalListener)this);
        targetLocator.setNodeID(nodeId);
        targetLocator.setClusterTransportConfiguration(this.serverLocator.getClusterTransportConfiguration());
        if (this.retryInterval > 0L) {
            targetLocator.setRetryInterval(this.retryInterval);
        }
        targetLocator.disableFinalizeCheck();
        MessageFlowRecordImpl record = new MessageFlowRecordImpl((ServerLocatorInternal)targetLocator, eventUID, targetNodeID, connector, queueName, queue);
        ClusterConnectionBridge bridge = new ClusterConnectionBridge(this, this.manager, (ServerLocatorInternal)targetLocator, this.serverLocator, this.reconnectAttempts, this.retryInterval, this.retryIntervalMultiplier, this.maxRetryInterval, this.nodeManager.getUUID(), record.getEventUID(), record.getTargetNodeID(), record.getQueueName(), record.getQueue(), this.executorFactory.getExecutor(), null, null, this.scheduledExecutor, null, this.useDuplicateDetection, this.clusterUser, this.clusterPassword, !this.backup, this.server.getStorageManager(), this.managementService.getManagementAddress(), this.managementService.getManagementNotificationAddress(), record, record.getConnector());
        targetLocator.setIdentity("(Cluster-connection-bridge::" + bridge.toString() + "::" + this.toString() + ")");
        if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
            HornetQServerLogger.LOGGER.debug("creating record between " + this.connector + " and " + connector + bridge);
        }
        record.setBridge(bridge);
        this.records.put(targetNodeID, record);
        if (start) {
            bridge.start();
        }
    }

    public Map<String, MessageFlowRecord> getRecords() {
        return this.records;
    }

    public String toString() {
        return "ClusterConnectionImpl@" + System.identityHashCode(this) + "[nodeUUID=" + this.nodeManager.getNodeId() + ", connector=" + this.connector + ", address=" + this.address + ", server=" + this.server + "]";
    }

    @Override
    public String describe() {
        StringWriter str = new StringWriter();
        PrintWriter out = new PrintWriter(str);
        out.println(this);
        out.println("***************************************");
        out.println(this.name + " connected to");
        for (MessageFlowRecord messageFlow : this.records.values()) {
            out.println("\t Bridge = " + messageFlow.getBridge());
            out.println("\t Flow Record = " + messageFlow);
        }
        out.println("***************************************");
        return str.toString();
    }

    @Override
    public boolean verify(String clusterUser0, String clusterPassword0) {
        return this.clusterUser.equals(clusterUser0) && this.clusterPassword.equals(clusterPassword0);
    }

    private final class DiscoveryClusterConnector
    implements ClusterConnector {
        private final DiscoveryGroupConfiguration dg;

        public DiscoveryClusterConnector(DiscoveryGroupConfiguration dg) {
            this.dg = dg;
        }

        @Override
        public ServerLocatorInternal createServerLocator() {
            return new ServerLocatorImpl(ClusterConnectionImpl.this.topology, true, this.dg);
        }
    }

    private final class StaticClusterConnector
    implements ClusterConnector {
        private final TransportConfiguration[] tcConfigs;

        public StaticClusterConnector(TransportConfiguration[] tcConfigs) {
            this.tcConfigs = tcConfigs;
        }

        @Override
        public ServerLocatorInternal createServerLocator() {
            if (this.tcConfigs != null && this.tcConfigs.length > 0) {
                if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
                    HornetQServerLogger.LOGGER.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(this.tcConfigs));
                }
                ServerLocatorImpl locator = new ServerLocatorImpl(ClusterConnectionImpl.this.topology, true, this.tcConfigs);
                locator.setClusterConnection(true);
                return locator;
            }
            return null;
        }

        public String toString() {
            return "StaticClusterConnector [tcConfigs=" + Arrays.toString(this.tcConfigs) + "]";
        }
    }

    private static interface ClusterConnector {
        public ServerLocatorInternal createServerLocator();
    }

    private class MessageFlowRecordImpl
    implements MessageFlowRecord {
        private BridgeImpl bridge;
        private final long eventUID;
        private final String targetNodeID;
        private final TransportConfiguration connector;
        private final ServerLocatorInternal targetLocator;
        private final SimpleString queueName;
        private boolean disconnected = false;
        private final Queue queue;
        private final Map<SimpleString, RemoteQueueBinding> bindings = new HashMap<SimpleString, RemoteQueueBinding>();
        private volatile boolean isClosed = false;
        private volatile boolean firstReset = false;

        public MessageFlowRecordImpl(ServerLocatorInternal targetLocator, long eventUID, String targetNodeID, TransportConfiguration connector, SimpleString queueName, Queue queue) {
            this.targetLocator = targetLocator;
            this.queue = queue;
            this.targetNodeID = targetNodeID;
            this.connector = connector;
            this.queueName = queueName;
            this.eventUID = eventUID;
        }

        public String toString() {
            return "MessageFlowRecordImpl [nodeID=" + this.targetNodeID + ", connector=" + this.connector + ", queueName=" + this.queueName + ", queue=" + this.queue + ", isClosed=" + this.isClosed + ", firstReset=" + this.firstReset + "]";
        }

        @Override
        public void serverDisconnected() {
            this.disconnected = true;
        }

        @Override
        public String getAddress() {
            return ClusterConnectionImpl.this.address.toString();
        }

        public long getEventUID() {
            return this.eventUID;
        }

        public String getTargetNodeID() {
            return this.targetNodeID;
        }

        public TransportConfiguration getConnector() {
            return this.connector;
        }

        public SimpleString getQueueName() {
            return this.queueName;
        }

        public Queue getQueue() {
            return this.queue;
        }

        @Override
        public int getMaxHops() {
            return ClusterConnectionImpl.this.maxHops;
        }

        @Override
        public void close() throws Exception {
            if (isTrace) {
                HornetQServerLogger.LOGGER.trace("Stopping bridge " + this.bridge);
            }
            this.isClosed = true;
            this.clearBindings();
            if (this.disconnected) {
                this.bridge.disconnect();
            }
            this.bridge.stop();
            this.bridge.getExecutor().execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        if (MessageFlowRecordImpl.this.disconnected) {
                            MessageFlowRecordImpl.this.targetLocator.cleanup();
                        } else {
                            MessageFlowRecordImpl.this.targetLocator.close();
                        }
                    }
                    catch (Exception ignored) {
                        HornetQServerLogger.LOGGER.debug(ignored.getMessage(), ignored);
                    }
                }
            });
        }

        @Override
        public boolean isClosed() {
            return this.isClosed;
        }

        @Override
        public void reset() throws Exception {
            this.clearBindings();
        }

        public void setBridge(BridgeImpl bridge) {
            this.bridge = bridge;
        }

        @Override
        public Bridge getBridge() {
            return this.bridge;
        }

        public synchronized void onMessage(ClientMessage message) {
            if (HornetQServerLogger.LOGGER.isDebugEnabled()) {
                HornetQServerLogger.LOGGER.debug("ClusterCommunication::Flow record on " + ClusterConnectionImpl.this.clusterConnector + " Receiving message " + message);
            }
            try {
                if (message.containsProperty(PostOfficeImpl.HDR_RESET_QUEUE_DATA)) {
                    this.clearBindings();
                    this.firstReset = true;
                    return;
                }
                if (!this.firstReset) {
                    return;
                }
                SimpleString type = message.getSimpleStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE);
                NotificationType ntype = NotificationType.valueOf((String)type.toString());
                switch (ntype) {
                    case BINDING_ADDED: {
                        this.doBindingAdded(message);
                        break;
                    }
                    case BINDING_REMOVED: {
                        this.doBindingRemoved(message);
                        break;
                    }
                    case CONSUMER_CREATED: {
                        this.doConsumerCreated(message);
                        break;
                    }
                    case CONSUMER_CLOSED: {
                        this.doConsumerClosed(message);
                        break;
                    }
                    case PROPOSAL: {
                        this.doProposalReceived(message);
                        break;
                    }
                    case PROPOSAL_RESPONSE: {
                        this.doProposalResponseReceived(message);
                        break;
                    }
                    default: {
                        throw HornetQMessageBundle.BUNDLE.invalidType(ntype);
                    }
                }
            }
            catch (Exception e) {
                HornetQServerLogger.LOGGER.errorHandlingMessage(e);
            }
        }

        private synchronized void doProposalReceived(ClientMessage message) throws Exception {
            if (!message.containsProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID)) {
                throw new IllegalStateException("proposal type is null");
            }
            SimpleString type = message.getSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID);
            SimpleString val = message.getSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
            Integer hops = message.getIntProperty(ManagementHelper.HDR_DISTANCE);
            Response response = ClusterConnectionImpl.this.server.getGroupingHandler().receive(new Proposal(type, val), hops + 1);
            if (response != null) {
                ClusterConnectionImpl.this.server.getGroupingHandler().send(response, 0);
            }
        }

        private synchronized void doProposalResponseReceived(ClientMessage message) throws Exception {
            if (!message.containsProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID)) {
                throw new IllegalStateException("proposal type is null");
            }
            SimpleString type = message.getSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_GROUP_ID);
            SimpleString val = message.getSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
            SimpleString alt = message.getSimpleStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE);
            Integer hops = message.getIntProperty(ManagementHelper.HDR_DISTANCE);
            Response response = new Response(type, val, alt);
            ClusterConnectionImpl.this.server.getGroupingHandler().proposed(response);
            ClusterConnectionImpl.this.server.getGroupingHandler().send(response, hops + 1);
        }

        private synchronized void clearBindings() throws Exception {
            HornetQServerLogger.LOGGER.debug(ClusterConnectionImpl.this + " clearing bindings");
            for (RemoteQueueBinding binding : new HashSet<RemoteQueueBinding>(this.bindings.values())) {
                this.removeBinding(binding.getClusterName());
            }
        }

        private synchronized void doBindingAdded(ClientMessage message) throws Exception {
            if (HornetQServerLogger.LOGGER.isTraceEnabled()) {
                HornetQServerLogger.LOGGER.trace(ClusterConnectionImpl.this + " Adding binding " + message);
            }
            if (!message.containsProperty(ManagementHelper.HDR_DISTANCE)) {
                throw new IllegalStateException("distance is null");
            }
            if (!message.containsProperty(ManagementHelper.HDR_ADDRESS)) {
                throw new IllegalStateException("queueAddress is null");
            }
            if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
                throw new IllegalStateException("clusterName is null");
            }
            if (!message.containsProperty(ManagementHelper.HDR_ROUTING_NAME)) {
                throw new IllegalStateException("routingName is null");
            }
            if (!message.containsProperty(ManagementHelper.HDR_BINDING_ID)) {
                throw new IllegalStateException("queueID is null");
            }
            Integer distance = message.getIntProperty(ManagementHelper.HDR_DISTANCE);
            SimpleString queueAddress = message.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS);
            SimpleString clusterName = message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
            SimpleString routingName = message.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME);
            SimpleString filterString = message.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
            Long queueID = message.getLongProperty(ManagementHelper.HDR_BINDING_ID);
            RemoteQueueBindingImpl binding = new RemoteQueueBindingImpl(ClusterConnectionImpl.this.server.getStorageManager().generateUniqueID(), queueAddress, clusterName, routingName, queueID, filterString, this.queue, this.bridge.getName(), distance + 1);
            if (ClusterConnectionImpl.this.postOffice.getBinding(clusterName) != null) {
                HornetQServerLogger.LOGGER.remoteQueueAlreadyBoundOnClusterConnection(this, clusterName);
                return;
            }
            if (isTrace) {
                HornetQServerLogger.LOGGER.trace("Adding binding " + clusterName + " into " + ClusterConnectionImpl.this);
            }
            this.bindings.put(clusterName, binding);
            try {
                ClusterConnectionImpl.this.postOffice.addBinding(binding);
            }
            catch (Exception ignore) {
                // empty catch block
            }
            Bindings theBindings = ClusterConnectionImpl.this.postOffice.getBindingsForAddress(queueAddress);
            theBindings.setRouteWhenNoConsumers(ClusterConnectionImpl.this.routeWhenNoConsumers);
        }

        private void doBindingRemoved(ClientMessage message) throws Exception {
            if (HornetQServerLogger.LOGGER.isTraceEnabled()) {
                HornetQServerLogger.LOGGER.trace(ClusterConnectionImpl.this + " Removing binding " + message);
            }
            if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
                throw new IllegalStateException("clusterName is null");
            }
            SimpleString clusterName = message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
            this.removeBinding(clusterName);
        }

        private synchronized void removeBinding(SimpleString clusterName) throws Exception {
            RemoteQueueBinding binding = this.bindings.remove(clusterName);
            if (binding == null) {
                throw new IllegalStateException("Cannot find binding for queue " + clusterName);
            }
            ClusterConnectionImpl.this.postOffice.removeBinding(binding.getUniqueName());
        }

        private synchronized void doConsumerCreated(ClientMessage message) throws Exception {
            if (HornetQServerLogger.LOGGER.isTraceEnabled()) {
                HornetQServerLogger.LOGGER.trace(ClusterConnectionImpl.this + " Consumer created " + message);
            }
            if (!message.containsProperty(ManagementHelper.HDR_DISTANCE)) {
                throw new IllegalStateException("distance is null");
            }
            if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
                throw new IllegalStateException("clusterName is null");
            }
            Integer distance = message.getIntProperty(ManagementHelper.HDR_DISTANCE);
            SimpleString clusterName = message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
            message.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
            SimpleString filterString = message.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
            RemoteQueueBinding binding = this.bindings.get(clusterName);
            if (binding == null) {
                throw new IllegalStateException("Cannot find binding for " + clusterName + " on " + ClusterConnectionImpl.this);
            }
            binding.addConsumer(filterString);
            TypedProperties props = new TypedProperties();
            props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
            props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName);
            props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
            props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
            Queue theQueue = (Queue)binding.getBindable();
            props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
            if (filterString != null) {
                props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
            }
            Notification notification = new Notification(null, NotificationType.CONSUMER_CREATED, props);
            ClusterConnectionImpl.this.managementService.sendNotification(notification);
        }

        private synchronized void doConsumerClosed(ClientMessage message) throws Exception {
            if (HornetQServerLogger.LOGGER.isTraceEnabled()) {
                HornetQServerLogger.LOGGER.trace(ClusterConnectionImpl.this + " Consumer closed " + message);
            }
            if (!message.containsProperty(ManagementHelper.HDR_DISTANCE)) {
                throw new IllegalStateException("distance is null");
            }
            if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME)) {
                throw new IllegalStateException("clusterName is null");
            }
            Integer distance = message.getIntProperty(ManagementHelper.HDR_DISTANCE);
            SimpleString clusterName = message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
            message.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
            SimpleString filterString = message.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
            RemoteQueueBinding binding = this.bindings.get(clusterName);
            if (binding == null) {
                throw new IllegalStateException("Cannot find binding for " + clusterName);
            }
            binding.removeConsumer(filterString);
            TypedProperties props = new TypedProperties();
            props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
            props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName);
            props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
            props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
            Queue theQueue = (Queue)binding.getBindable();
            props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
            if (filterString != null) {
                props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
            }
            Notification notification = new Notification(null, NotificationType.CONSUMER_CLOSED, props);
            ClusterConnectionImpl.this.managementService.sendNotification(notification);
        }
    }
}

