/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.coordination.node;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
import org.apache.nifi.cluster.coordination.flow.FlowElection;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.coordination.node.state.NodeIdentifierDescriptor;
import org.apache.nifi.cluster.event.Event;
import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.manager.exception.IllegalNodeOffloadException;
import org.apache.nifi.cluster.protocol.ComponentRevision;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.OffloadMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.revision.RevisionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NodeClusterCoordinator
implements ClusterCoordinator,
ProtocolHandler,
RequestCompletionCallback {
    private static final Logger logger = LoggerFactory.getLogger(NodeClusterCoordinator.class);
    private static final String EVENT_CATEGORY = "Clustering";
    private static final Pattern COUNTER_URI_PATTERN = Pattern.compile("/nifi-api/counters/[a-f0-9\\-]{36}");
    private final String instanceId = UUID.randomUUID().toString();
    private volatile NodeIdentifier nodeId;
    private final ClusterCoordinationProtocolSenderListener senderListener;
    private final EventReporter eventReporter;
    private final ClusterNodeFirewall firewall;
    private final RevisionManager revisionManager;
    private final NiFiProperties nifiProperties;
    private final LeaderElectionManager leaderElectionManager;
    private final AtomicLong latestUpdateId = new AtomicLong(-1L);
    private final FlowElection flowElection;
    private final NodeProtocolSender nodeProtocolSender;
    private final StateManager stateManager;
    private volatile FlowService flowService;
    private volatile boolean connected;
    private volatile boolean closed = false;
    private volatile boolean requireElection = true;
    private final ConcurrentMap<String, NodeConnectionStatus> nodeStatuses = new ConcurrentHashMap<String, NodeConnectionStatus>();
    private final ConcurrentMap<String, CircularFifoQueue<NodeEvent>> nodeEvents = new ConcurrentHashMap<String, CircularFifoQueue<NodeEvent>>();
    private final List<ClusterTopologyEventListener> eventListeners = new CopyOnWriteArrayList<ClusterTopologyEventListener>();

    public NodeClusterCoordinator(ClusterCoordinationProtocolSenderListener senderListener, EventReporter eventReporter, LeaderElectionManager leaderElectionManager, FlowElection flowElection, ClusterNodeFirewall firewall, RevisionManager revisionManager, NiFiProperties nifiProperties, ExtensionManager extensionManager, NodeProtocolSender nodeProtocolSender) throws IOException {
        this(senderListener, eventReporter, leaderElectionManager, flowElection, firewall, revisionManager, nifiProperties, nodeProtocolSender, StandardStateManagerProvider.create((NiFiProperties)nifiProperties, (VariableRegistry)VariableRegistry.EMPTY_REGISTRY, (ExtensionManager)extensionManager, (ParameterLookup)ParameterLookup.EMPTY));
    }

    public NodeClusterCoordinator(ClusterCoordinationProtocolSenderListener senderListener, EventReporter eventReporter, LeaderElectionManager leaderElectionManager, FlowElection flowElection, ClusterNodeFirewall firewall, RevisionManager revisionManager, NiFiProperties nifiProperties, NodeProtocolSender nodeProtocolSender, StateManagerProvider stateManagerProvider) throws IOException {
        this.senderListener = senderListener;
        this.flowService = null;
        this.eventReporter = eventReporter;
        this.firewall = firewall;
        this.revisionManager = revisionManager;
        this.nifiProperties = nifiProperties;
        this.leaderElectionManager = leaderElectionManager;
        this.flowElection = flowElection;
        this.nodeProtocolSender = nodeProtocolSender;
        this.stateManager = stateManagerProvider.getStateManager("Cluster Coordinator");
        this.recoverState();
        senderListener.addHandler((ProtocolHandler)this);
    }

    private void recoverState() throws IOException {
        StateMap stateMap = this.stateManager.getState(Scope.LOCAL);
        if (stateMap == null) {
            logger.debug("No state to restore");
            return;
        }
        ObjectMapper mapper = new ObjectMapper();
        JsonFactory jsonFactory = new JsonFactory();
        jsonFactory.setCodec((ObjectCodec)mapper);
        HashMap<NodeIdentifier, NodeConnectionStatus> connectionStatusMap = new HashMap<NodeIdentifier, NodeConnectionStatus>();
        NodeIdentifier localNodeId = null;
        Map state = stateMap.toMap();
        for (Map.Entry entry : state.entrySet()) {
            String nodeUuid = (String)entry.getKey();
            String nodeIdentifierJson = (String)entry.getValue();
            logger.debug("Recovering state for {} = {}", (Object)nodeUuid, (Object)nodeIdentifierJson);
            JsonParser jsonParser = jsonFactory.createParser(nodeIdentifierJson);
            Throwable throwable = null;
            try {
                NodeIdentifierDescriptor nodeIdDesc = (NodeIdentifierDescriptor)jsonParser.readValueAs(NodeIdentifierDescriptor.class);
                NodeIdentifier nodeId = nodeIdDesc.toNodeIdentifier();
                connectionStatusMap.put(nodeId, new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
                if (!nodeIdDesc.isLocalNodeIdentifier()) continue;
                if (localNodeId == null) {
                    localNodeId = nodeId;
                    continue;
                }
                logger.warn("When recovering state, determined that two Node Identifiers claim to be the local Node Identifier: {} and {}. Will ignore both of these and wait until connecting to cluster to determine which Node Identiifer is the local Node Identifier", (Object)localNodeId.getFullDescription(), (Object)nodeId.getFullDescription());
                localNodeId = null;
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (jsonParser == null) continue;
                if (throwable != null) {
                    try {
                        jsonParser.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                jsonParser.close();
            }
        }
        if (!connectionStatusMap.isEmpty()) {
            this.resetNodeStatuses(connectionStatusMap);
        }
        if (localNodeId != null) {
            logger.debug("Recovered state indicating that Local Node Identifier is {}", localNodeId);
            this.setLocalNodeIdentifier(localNodeId);
        }
    }

    private void storeState() {
        ObjectMapper mapper = new ObjectMapper();
        JsonFactory jsonFactory = new JsonFactory();
        jsonFactory.setCodec((ObjectCodec)mapper);
        try {
            HashMap<String, String> stateMap = new HashMap<String, String>();
            NodeIdentifier localNodeId = this.getLocalNodeIdentifier();
            for (NodeIdentifier nodeId : this.getNodeIdentifiers(new NodeConnectionState[0])) {
                boolean isLocalId = nodeId.equals((Object)localNodeId);
                NodeIdentifierDescriptor descriptor = NodeIdentifierDescriptor.fromNodeIdentifier(nodeId, isLocalId);
                StringWriter writer = new StringWriter();
                Throwable throwable = null;
                try {
                    JsonGenerator jsonGenerator = jsonFactory.createGenerator((Writer)writer);
                    jsonGenerator.writeObject((Object)descriptor);
                    String serializedDescriptor = writer.toString();
                    stateMap.put(nodeId.getId(), serializedDescriptor);
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (writer == null) continue;
                    if (throwable != null) {
                        try {
                            writer.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    writer.close();
                }
            }
            this.stateManager.setState(stateMap, Scope.LOCAL);
            logger.debug("Stored the following state as the Cluster Topology: {}", stateMap);
        }
        catch (Exception e) {
            logger.warn("Failed to store cluster topology to local State Manager. Upon restart of NiFi, the cluster topology may not be accurate until joining the cluster.", (Throwable)e);
        }
    }

    public void registerEventListener(ClusterTopologyEventListener eventListener) {
        this.eventListeners.add(eventListener);
    }

    public void unregisterEventListener(ClusterTopologyEventListener eventListener) {
        this.eventListeners.remove(eventListener);
    }

    public void shutdown() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        NodeIdentifier localId = this.getLocalNodeIdentifier();
        if (localId != null) {
            NodeConnectionStatus shutdownStatus = new NodeConnectionStatus(localId, DisconnectionCode.NODE_SHUTDOWN);
            this.updateNodeStatus(shutdownStatus, false);
            logger.info("Successfully notified other nodes that I am shutting down");
        }
    }

    public void setLocalNodeIdentifier(NodeIdentifier nodeId) {
        if (nodeId == null || nodeId.equals((Object)this.nodeId)) {
            return;
        }
        this.nodeId = nodeId;
        this.nodeStatuses.computeIfAbsent(nodeId.getId(), id -> new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
        this.eventListeners.forEach(listener -> listener.onLocalNodeIdentifierSet(nodeId));
        this.storeState();
    }

    public NodeIdentifier getLocalNodeIdentifier() {
        return this.nodeId;
    }

    private NodeIdentifier waitForElectedClusterCoordinator() {
        return this.waitForNodeIdentifier(() -> this.getElectedActiveCoordinatorNode(false));
    }

    private NodeIdentifier waitForNodeIdentifier(Supplier<NodeIdentifier> fetchNodeId) {
        NodeIdentifier localNodeId = null;
        while (localNodeId == null) {
            localNodeId = fetchNodeId.get();
            if (localNodeId != null) continue;
            if (this.closed) {
                return null;
            }
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                return null;
            }
        }
        return localNodeId;
    }

    private String getElectedActiveCoordinatorAddress() {
        return this.leaderElectionManager.getLeader("Cluster Coordinator");
    }

    public void resetNodeStatuses(Map<NodeIdentifier, NodeConnectionStatus> statusMap) {
        logger.info("Resetting cluster node statuses from {} to {}", this.nodeStatuses, statusMap);
        for (Map.Entry<NodeIdentifier, NodeConnectionStatus> entry : statusMap.entrySet()) {
            NodeIdentifier nodeId = entry.getKey();
            NodeConnectionStatus proposedStatus = entry.getValue();
            if (proposedStatus.getState() == NodeConnectionState.REMOVED) {
                this.removeNode(nodeId);
                continue;
            }
            this.updateNodeStatus(nodeId, proposedStatus, false);
        }
        this.storeState();
    }

    private NodeConnectionStatus removeNode(NodeIdentifier nodeId) {
        NodeConnectionStatus status = (NodeConnectionStatus)this.nodeStatuses.remove(nodeId.getId());
        this.nodeEvents.remove(nodeId.getId());
        if (status != null) {
            this.onNodeRemoved(nodeId);
        }
        return status;
    }

    private boolean removeNodeConditionally(NodeIdentifier nodeId, NodeConnectionStatus expectedStatus) {
        boolean removed = this.nodeStatuses.remove(nodeId.getId(), expectedStatus);
        if (removed) {
            this.nodeEvents.remove(nodeId.getId());
            this.onNodeRemoved(nodeId);
        }
        return removed;
    }

    private NodeConnectionStatus updateNodeStatus(NodeIdentifier nodeId, NodeConnectionStatus updatedStatus) {
        return this.updateNodeStatus(nodeId, updatedStatus, true);
    }

    private NodeConnectionStatus updateNodeStatus(NodeIdentifier nodeId, NodeConnectionStatus updatedStatus, boolean storeState) {
        NodeConnectionStatus evictedStatus = this.nodeStatuses.put(nodeId.getId(), updatedStatus);
        if (evictedStatus == null) {
            this.onNodeAdded(nodeId, storeState);
        } else {
            this.onNodeStateChange(nodeId, updatedStatus.getState());
        }
        return evictedStatus;
    }

    private boolean updateNodeStatusConditionally(NodeIdentifier nodeId, NodeConnectionStatus expectedStatus, NodeConnectionStatus updatedStatus) {
        boolean updated;
        if (expectedStatus == null) {
            NodeConnectionStatus existingValue = this.nodeStatuses.putIfAbsent(nodeId.getId(), updatedStatus);
            boolean bl = updated = existingValue == null;
            if (updated) {
                this.onNodeAdded(nodeId, true);
            }
        } else {
            updated = this.nodeStatuses.replace(nodeId.getId(), expectedStatus, updatedStatus);
        }
        if (updated) {
            this.onNodeStateChange(nodeId, updatedStatus.getState());
        }
        return updated;
    }

    public boolean resetNodeStatus(NodeConnectionStatus connectionStatus, long qualifyingUpdateId) {
        NodeIdentifier nodeId = connectionStatus.getNodeIdentifier();
        NodeConnectionStatus currentStatus = this.getConnectionStatus(nodeId);
        if (currentStatus == null) {
            return this.replaceNodeStatus(nodeId, null, connectionStatus);
        }
        if (currentStatus.getUpdateIdentifier() == qualifyingUpdateId) {
            return this.replaceNodeStatus(nodeId, currentStatus, connectionStatus);
        }
        return false;
    }

    private boolean replaceNodeStatus(NodeIdentifier nodeId, NodeConnectionStatus currentStatus, NodeConnectionStatus newStatus) {
        if (newStatus == null) {
            logger.error("Cannot change node status for {} from {} to {} because new status is null", new Object[]{nodeId, currentStatus, newStatus});
            logger.error("", (Throwable)new NullPointerException());
        }
        if (currentStatus == null) {
            if (newStatus.getState() == NodeConnectionState.REMOVED) {
                return this.removeNodeConditionally(nodeId, currentStatus);
            }
            return this.updateNodeStatusConditionally(nodeId, null, newStatus);
        }
        if (newStatus.getState() == NodeConnectionState.REMOVED) {
            if (this.removeNodeConditionally(nodeId, currentStatus)) {
                this.storeState();
                return true;
            }
            return false;
        }
        return this.updateNodeStatusConditionally(nodeId, currentStatus, newStatus);
    }

    public void requestNodeConnect(NodeIdentifier nodeId, String userDn) {
        if (this.requireElection && !this.flowElection.isElectionComplete() && this.flowElection.isVoteCounted(nodeId)) {
            logger.debug("Received heartbeat for {} and node is not connected. Will not request node connect to cluster, though, because the Flow Election is still in progress", (Object)nodeId);
            return;
        }
        if (userDn == null) {
            this.reportEvent(nodeId, Severity.INFO, "Requesting that node connect to cluster");
        } else {
            this.reportEvent(nodeId, Severity.INFO, "Requesting that node connect to cluster on behalf of " + userDn);
        }
        this.updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, null, null, null, Long.valueOf(System.currentTimeMillis())));
        ReconnectionRequestMessage request = new ReconnectionRequestMessage();
        request.setNodeId(nodeId);
        request.setInstanceId(this.instanceId);
        boolean includeDataFlow = !this.requireElection;
        this.requestReconnectionAsynchronously(request, 10, 5, includeDataFlow);
    }

    public void finishNodeConnection(NodeIdentifier nodeId) {
        NodeConnectionState state = this.getConnectionState(nodeId);
        if (state == null) {
            logger.debug("Attempted to finish node connection for {} but node is not known. Requesting that node connect", (Object)nodeId);
            this.requestNodeConnect(nodeId, null);
            return;
        }
        if (state == NodeConnectionState.CONNECTED) {
            return;
        }
        if (state == NodeConnectionState.DISCONNECTED || state == NodeConnectionState.DISCONNECTING) {
            logger.debug("Attempted to finish node connection for {} but node state was {}. Requesting that node connect", (Object)nodeId, (Object)state);
            this.requestNodeConnect(nodeId, null);
            return;
        }
        logger.info("{} is now connected", (Object)nodeId);
        this.updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
    }

    public void finishNodeOffload(NodeIdentifier nodeId) {
        NodeConnectionState state = this.getConnectionState(nodeId);
        if (state == null) {
            logger.warn("Attempted to finish node offload for {} but node is not known.", (Object)nodeId);
            return;
        }
        if (state != NodeConnectionState.OFFLOADING) {
            logger.warn("Attempted to finish node offload for {} but node is not in the offloading state, it is currently {}.", (Object)nodeId, (Object)state);
            return;
        }
        logger.info("{} is now offloaded", (Object)nodeId);
        this.updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED));
    }

    public void requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation) {
        Set<NodeIdentifier> offloadNodeIds = this.getNodeIdentifiers(NodeConnectionState.OFFLOADING, NodeConnectionState.OFFLOADED);
        if (offloadNodeIds.contains(nodeId)) {
            logger.debug("Attempted to offload node but the node is already offloading or offloaded");
            return;
        }
        Set<NodeIdentifier> disconnectedNodeIds = this.getNodeIdentifiers(NodeConnectionState.DISCONNECTED);
        if (!disconnectedNodeIds.contains(nodeId)) {
            throw new IllegalNodeOffloadException("Cannot offload node " + nodeId + " because it is not currently disconnected");
        }
        logger.info("Requesting that {} is offloaded due to {}", (Object)nodeId, explanation == null ? offloadCode : explanation);
        this.updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADING, offloadCode, explanation));
        OffloadMessage request = new OffloadMessage();
        request.setNodeId(nodeId);
        request.setExplanation(explanation);
        this.addNodeEvent(nodeId, "Offload requested due to " + explanation);
        this.offloadAsynchronously(request, 10, 5);
    }

    public void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
        Set<NodeIdentifier> connectedNodeIds = this.getNodeIdentifiers(NodeConnectionState.CONNECTED);
        if (connectedNodeIds.size() == 1 && connectedNodeIds.contains(nodeId)) {
            throw new IllegalNodeDisconnectionException("Cannot disconnect node " + nodeId + " because it is the only node currently connected");
        }
        logger.info("Requesting that {} disconnect due to {}", (Object)nodeId, explanation == null ? disconnectionCode : explanation);
        this.updateNodeStatus(new NodeConnectionStatus(nodeId, disconnectionCode, explanation));
        if (disconnectionCode == DisconnectionCode.NODE_SHUTDOWN) {
            return;
        }
        DisconnectMessage request = new DisconnectMessage();
        request.setNodeId(nodeId);
        request.setExplanation(explanation);
        this.addNodeEvent(nodeId, "Disconnection requested due to " + explanation);
        this.disconnectAsynchronously(request, 10, 5);
    }

    public void disconnectionRequestedByNode(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
        Severity severity;
        logger.info("{} requested disconnection from cluster due to {}", (Object)nodeId, explanation == null ? disconnectionCode : explanation);
        this.updateNodeStatus(new NodeConnectionStatus(nodeId, disconnectionCode, explanation));
        switch (disconnectionCode) {
            case STARTUP_FAILURE: 
            case MISMATCHED_FLOWS: 
            case UNKNOWN: {
                severity = Severity.ERROR;
                break;
            }
            case LACK_OF_HEARTBEAT: {
                severity = Severity.WARNING;
                break;
            }
            default: {
                severity = Severity.INFO;
            }
        }
        this.reportEvent(nodeId, severity, "Node disconnected from cluster due to " + explanation);
    }

    public void removeNode(NodeIdentifier nodeId, String userDn) {
        this.reportEvent(nodeId, Severity.INFO, "User " + userDn + " requested that node be removed from cluster");
        this.notifyOthersOfNodeStatusChange(new NodeConnectionStatus(nodeId, NodeConnectionState.REMOVED));
        this.removeNode(nodeId);
        this.storeState();
    }

    private void onNodeRemoved(NodeIdentifier nodeId) {
        this.eventListeners.forEach(listener -> listener.onNodeRemoved(nodeId));
    }

    private void onNodeAdded(NodeIdentifier nodeId, boolean storeState) {
        if (storeState) {
            this.storeState();
        }
        this.eventListeners.forEach(listener -> listener.onNodeAdded(nodeId));
    }

    private void onNodeStateChange(NodeIdentifier nodeId, NodeConnectionState nodeConnectionState) {
        this.eventListeners.forEach(listener -> listener.onNodeStateChange(nodeId, nodeConnectionState));
    }

    public NodeConnectionStatus getConnectionStatus(NodeIdentifier nodeId) {
        return (NodeConnectionStatus)this.nodeStatuses.get(nodeId.getId());
    }

    private NodeConnectionState getConnectionState(NodeIdentifier nodeId) {
        NodeConnectionStatus status = this.getConnectionStatus(nodeId);
        return status == null ? null : status.getState();
    }

    public List<NodeConnectionStatus> getConnectionStatuses() {
        return new ArrayList<NodeConnectionStatus>(this.nodeStatuses.values());
    }

    public Map<NodeConnectionState, List<NodeIdentifier>> getConnectionStates() {
        HashMap<NodeConnectionState, List<NodeIdentifier>> connectionStates = new HashMap<NodeConnectionState, List<NodeIdentifier>>();
        for (Map.Entry entry : this.nodeStatuses.entrySet()) {
            NodeConnectionState state = ((NodeConnectionStatus)entry.getValue()).getState();
            List nodeIds = connectionStates.computeIfAbsent(state, s -> new ArrayList());
            nodeIds.add(((NodeConnectionStatus)entry.getValue()).getNodeIdentifier());
        }
        return connectionStates;
    }

    public boolean isBlockedByFirewall(Set<String> nodeIdentities) {
        if (this.firewall == null) {
            return false;
        }
        for (String nodeId : nodeIdentities) {
            if (!this.firewall.isPermissible(nodeId)) continue;
            return false;
        }
        return true;
    }

    public void reportEvent(NodeIdentifier nodeId, Severity severity, String event) {
        this.eventReporter.reportEvent(severity, EVENT_CATEGORY, nodeId == null ? event : "Event Reported for " + nodeId + " -- " + event);
        if (nodeId != null) {
            this.addNodeEvent(nodeId, severity, event);
        }
        String message = nodeId == null ? event : "Event Reported for " + nodeId + " -- " + event;
        switch (severity) {
            case ERROR: {
                logger.error(message);
                break;
            }
            case WARNING: {
                logger.warn(message);
                break;
            }
            case INFO: {
                logger.info(message);
            }
        }
    }

    public NodeIdentifier getNodeIdentifier(String uuid) {
        NodeConnectionStatus status = (NodeConnectionStatus)this.nodeStatuses.get(uuid);
        return status == null ? null : status.getNodeIdentifier();
    }

    public Set<NodeIdentifier> getNodeIdentifiers(NodeConnectionState ... states) {
        HashSet<NodeConnectionState> statesOfInterest = new HashSet<NodeConnectionState>();
        if (states.length == 0) {
            for (NodeConnectionState state : NodeConnectionState.values()) {
                statesOfInterest.add(state);
            }
        } else {
            for (NodeConnectionState state : states) {
                statesOfInterest.add(state);
            }
        }
        return this.nodeStatuses.entrySet().stream().filter(entry -> statesOfInterest.contains(((NodeConnectionStatus)entry.getValue()).getState())).map(entry -> ((NodeConnectionStatus)entry.getValue()).getNodeIdentifier()).collect(Collectors.toSet());
    }

    public NodeIdentifier getPrimaryNode() {
        String primaryNodeAddress = this.leaderElectionManager.getLeader("Primary Node");
        if (primaryNodeAddress == null) {
            return null;
        }
        return this.nodeStatuses.values().stream().map(NodeConnectionStatus::getNodeIdentifier).filter(nodeId -> primaryNodeAddress.equals(nodeId.getSocketAddress() + ":" + nodeId.getSocketPort())).findFirst().orElse(null);
    }

    public NodeIdentifier getElectedActiveCoordinatorNode() {
        return this.getElectedActiveCoordinatorNode(true);
    }

    private NodeIdentifier getElectedActiveCoordinatorNode(boolean warnOnError) {
        NodeIdentifier electedNodeId;
        block13: {
            int electedNodePort;
            String electedNodeAddress;
            try {
                electedNodeAddress = this.getElectedActiveCoordinatorAddress();
            }
            catch (NoClusterCoordinatorException ncce) {
                logger.debug("There is currently no elected active Cluster Coordinator");
                return null;
            }
            if (electedNodeAddress == null || electedNodeAddress.trim().isEmpty()) {
                logger.debug("There is currently no elected active Cluster Coordinator");
                return null;
            }
            int colonLoc = (electedNodeAddress = electedNodeAddress.trim()).indexOf(58);
            if (colonLoc < 1) {
                if (warnOnError) {
                    logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", (Object)electedNodeAddress);
                }
                return null;
            }
            String electedNodeHostname = electedNodeAddress.substring(0, colonLoc);
            String portString = electedNodeAddress.substring(colonLoc + 1);
            try {
                electedNodePort = Integer.parseInt(portString);
            }
            catch (NumberFormatException nfe) {
                if (warnOnError) {
                    logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", (Object)electedNodeAddress);
                }
                return null;
            }
            Set<NodeIdentifier> connectedNodeIds = this.getNodeIdentifiers(new NodeConnectionState[0]);
            electedNodeId = connectedNodeIds.stream().filter(nodeId -> nodeId.getSocketAddress().equals(electedNodeHostname) && nodeId.getSocketPort() == electedNodePort).findFirst().orElse(null);
            if (electedNodeId == null && warnOnError) {
                logger.debug("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {},but there is no node with this address. Will attempt to communicate with node to determine its information", (Object)electedNodeAddress);
                try {
                    NodeConnectionStatus connectionStatus = this.senderListener.requestNodeConnectionStatus(electedNodeHostname, electedNodePort);
                    logger.debug("Received NodeConnectionStatus {}", (Object)connectionStatus);
                    if (connectionStatus == null) {
                        return null;
                    }
                    NodeConnectionStatus existingStatus = this.nodeStatuses.putIfAbsent(connectionStatus.getNodeIdentifier().getId(), connectionStatus);
                    if (existingStatus == null) {
                        this.onNodeAdded(connectionStatus.getNodeIdentifier(), true);
                        return connectionStatus.getNodeIdentifier();
                    }
                    return existingStatus.getNodeIdentifier();
                }
                catch (Exception e) {
                    logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address. Attempted to determine the node's information but failed to retrieve its information due to {}", (Object)electedNodeAddress, (Object)e.toString());
                    if (!logger.isDebugEnabled()) break block13;
                    logger.warn("", (Throwable)e);
                }
            }
        }
        return electedNodeId;
    }

    public boolean isActiveClusterCoordinator() {
        return this.leaderElectionManager != null && this.leaderElectionManager.isLeader("Cluster Coordinator");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<NodeEvent> getNodeEvents(NodeIdentifier nodeId) {
        CircularFifoQueue eventQueue = (CircularFifoQueue)this.nodeEvents.get(nodeId.getId());
        if (eventQueue == null) {
            return Collections.emptyList();
        }
        CircularFifoQueue circularFifoQueue = eventQueue;
        synchronized (circularFifoQueue) {
            return new ArrayList<NodeEvent>((Collection<NodeEvent>)eventQueue);
        }
    }

    public void setFlowService(FlowService flowService) {
        if (this.flowService != null) {
            throw new IllegalStateException("Flow Service has already been set");
        }
        this.flowService = flowService;
    }

    private void addNodeEvent(NodeIdentifier nodeId, String event) {
        this.addNodeEvent(nodeId, Severity.INFO, event);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addNodeEvent(NodeIdentifier nodeId, Severity severity, String message) {
        CircularFifoQueue eventQueue;
        Event event = new Event(nodeId.toString(), message, severity);
        CircularFifoQueue circularFifoQueue = eventQueue = this.nodeEvents.computeIfAbsent(nodeId.getId(), id -> new CircularFifoQueue());
        synchronized (circularFifoQueue) {
            eventQueue.add((Object)event);
        }
    }

    void updateNodeStatus(NodeConnectionStatus status) {
        this.updateNodeStatus(status, true);
    }

    void updateNodeStatus(NodeConnectionStatus status, boolean waitForCoordinator) {
        NodeIdentifier nodeId = status.getNodeIdentifier();
        NodeConnectionStatus currentStatus = this.updateNodeStatus(nodeId, status);
        NodeConnectionState currentState = currentStatus == null ? null : currentStatus.getState();
        logger.info("Status of {} changed from {} to {}", new Object[]{nodeId, currentStatus, status});
        logger.debug("State of cluster nodes is now {}", this.nodeStatuses);
        this.latestUpdateId.updateAndGet(curVal -> Math.max(curVal, status.getUpdateIdentifier()));
        if (currentState == null || currentState != status.getState()) {
            boolean notifyAllNodes = this.isActiveClusterCoordinator();
            if (notifyAllNodes) {
                logger.debug("Notifying all nodes that status changed from {} to {}", (Object)currentStatus, (Object)status);
            } else {
                logger.debug("Notifying cluster coordinator that node status changed from {} to {}", (Object)currentStatus, (Object)status);
            }
            this.notifyOthersOfNodeStatusChange(status, notifyAllNodes, waitForCoordinator);
        } else {
            logger.debug("Not notifying other nodes that status changed because previous state of {} is same as new state of {}", (Object)currentState, (Object)status.getState());
        }
    }

    void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus) {
        this.notifyOthersOfNodeStatusChange(updatedStatus, this.isActiveClusterCoordinator(), true);
    }

    void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
        Set<NodeIdentifier> nodesToNotify;
        if (notifyAllNodes) {
            nodesToNotify = this.getNodeIdentifiers(new NodeConnectionState[0]);
            nodesToNotify.remove(this.getLocalNodeIdentifier());
        } else if (waitForCoordinator) {
            nodesToNotify = Collections.singleton(this.waitForElectedClusterCoordinator());
        } else {
            NodeIdentifier nodeId = this.getElectedActiveCoordinatorNode();
            if (nodeId == null) {
                return;
            }
            nodesToNotify = Collections.singleton(nodeId);
        }
        NodeStatusChangeMessage message = new NodeStatusChangeMessage();
        message.setNodeId(updatedStatus.getNodeIdentifier());
        message.setNodeConnectionStatus(updatedStatus);
        this.senderListener.notifyNodeStatusChange(nodesToNotify, message);
    }

    private void offloadAsynchronously(final OffloadMessage request, final int attempts, final int retrySeconds) {
        Thread offloadThread = new Thread(new Runnable(){

            @Override
            public void run() {
                NodeIdentifier nodeId = request.getNodeId();
                for (int i = 0; i < attempts; ++i) {
                    try {
                        NodeClusterCoordinator.this.senderListener.offload(request);
                        NodeClusterCoordinator.this.reportEvent(nodeId, Severity.INFO, "Node was offloaded due to " + request.getExplanation());
                        return;
                    }
                    catch (Exception e) {
                        logger.error("Failed to notify {} that it has been offloaded due to {}", new Object[]{request.getNodeId(), request.getExplanation(), e});
                        try {
                            Thread.sleep((long)retrySeconds * 1000L);
                            continue;
                        }
                        catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                }
            }
        }, "Offload " + request.getNodeId());
        offloadThread.start();
    }

    private void disconnectAsynchronously(final DisconnectMessage request, final int attempts, final int retrySeconds) {
        Thread disconnectThread = new Thread(new Runnable(){

            @Override
            public void run() {
                NodeIdentifier nodeId = request.getNodeId();
                for (int i = 0; i < attempts; ++i) {
                    try {
                        NodeClusterCoordinator.this.senderListener.disconnect(request);
                        NodeClusterCoordinator.this.reportEvent(nodeId, Severity.INFO, "Node disconnected due to " + request.getExplanation());
                        return;
                    }
                    catch (Exception e) {
                        logger.error("Failed to notify {} that it has been disconnected from the cluster due to {}", (Object)request.getNodeId(), (Object)request.getExplanation());
                        try {
                            Thread.sleep((long)retrySeconds * 1000L);
                            continue;
                        }
                        catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                }
            }
        }, "Disconnect " + request.getNodeId());
        disconnectThread.start();
    }

    private void requestReconnectionAsynchronously(final ReconnectionRequestMessage request, final int reconnectionAttempts, final int retrySeconds, final boolean includeDataFlow) {
        Thread reconnectionThread = new Thread(new Runnable(){

            @Override
            public void run() {
                while (NodeClusterCoordinator.this.flowService == null) {
                    try {
                        Thread.sleep(100L);
                    }
                    catch (InterruptedException ie) {
                        logger.info("Could not send Reconnection request to {} because thread was interrupted before FlowService was made available", (Object)request.getNodeId());
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
                for (int i = 0; i < reconnectionAttempts; ++i) {
                    try {
                        if (NodeConnectionState.CONNECTING != NodeClusterCoordinator.this.getConnectionState(request.getNodeId())) {
                            return;
                        }
                        if (includeDataFlow) {
                            request.setDataFlow(new StandardDataFlow(NodeClusterCoordinator.this.flowService.createDataFlowFromController()));
                        }
                        request.setNodeConnectionStatuses(NodeClusterCoordinator.this.getConnectionStatuses());
                        request.setComponentRevisions(NodeClusterCoordinator.this.revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision((Revision)rev)).collect(Collectors.toList()));
                        NodeClusterCoordinator.this.senderListener.requestReconnection(request);
                        logger.info("Successfully requested that {} join the cluster", (Object)request.getNodeId());
                        return;
                    }
                    catch (Exception e) {
                        logger.warn("Problem encountered issuing reconnection request to node " + request.getNodeId(), (Throwable)e);
                        NodeClusterCoordinator.this.eventReporter.reportEvent(Severity.WARNING, NodeClusterCoordinator.EVENT_CATEGORY, "Problem encountered issuing reconnection request to node " + request.getNodeId() + " due to: " + e);
                        try {
                            Thread.sleep(1000L * (long)retrySeconds);
                            continue;
                        }
                        catch (InterruptedException ie) {
                            break;
                        }
                    }
                }
                if (NodeConnectionState.CONNECTING == NodeClusterCoordinator.this.getConnectionState(request.getNodeId())) {
                    NodeClusterCoordinator.this.requestNodeDisconnect(request.getNodeId(), DisconnectionCode.UNABLE_TO_COMMUNICATE, "Attempted to request that node reconnect to cluster but could not communicate with node");
                }
            }
        }, "Reconnect " + request.getNodeId());
        reconnectionThread.start();
    }

    public ProtocolMessage handle(ProtocolMessage protocolMessage, Set<String> nodeIdentities) throws ProtocolException {
        switch (protocolMessage.getType()) {
            case CONNECTION_REQUEST: {
                return this.handleConnectionRequest((ConnectionRequestMessage)protocolMessage, nodeIdentities);
            }
            case NODE_STATUS_CHANGE: {
                this.handleNodeStatusChange((NodeStatusChangeMessage)protocolMessage);
                return null;
            }
            case NODE_CONNECTION_STATUS_REQUEST: {
                return this.handleNodeConnectionStatusRequest();
            }
        }
        throw new ProtocolException("Cannot handle Protocol Message " + protocolMessage + " because it is not of the correct type");
    }

    private NodeConnectionStatusResponseMessage handleNodeConnectionStatusRequest() {
        NodeConnectionStatusResponseMessage msg = new NodeConnectionStatusResponseMessage();
        NodeIdentifier self = this.getLocalNodeIdentifier();
        if (self != null) {
            NodeConnectionStatus connectionStatus = (NodeConnectionStatus)this.nodeStatuses.get(self.getId());
            msg.setNodeConnectionStatus(connectionStatus);
        }
        return msg;
    }

    private String summarizeStatusChange(NodeConnectionStatus oldStatus, NodeConnectionStatus status) {
        StringBuilder sb = new StringBuilder();
        if (oldStatus == null || status.getState() != oldStatus.getState()) {
            sb.append("Node Status changed from ").append(oldStatus == null ? "[Unknown Node]" : oldStatus.getState().toString()).append(" to ").append(status.getState().toString());
            if (status.getReason() != null) {
                sb.append(" due to ").append(status.getReason());
            } else if (status.getDisconnectCode() != null) {
                sb.append(" due to ").append(status.getDisconnectCode().toString());
            }
        }
        return sb.toString();
    }

    private void handleNodeStatusChange(NodeStatusChangeMessage statusChangeMessage) {
        NodeConnectionStatus updatedStatus = statusChangeMessage.getNodeConnectionStatus();
        NodeIdentifier nodeId = statusChangeMessage.getNodeId();
        logger.debug("Handling request {}", (Object)statusChangeMessage);
        NodeConnectionStatus oldStatus = (NodeConnectionStatus)this.nodeStatuses.get(statusChangeMessage.getNodeId().getId());
        if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) {
            if (this.removeNodeConditionally(nodeId, oldStatus)) {
                this.storeState();
            }
        } else {
            this.updateNodeStatus(nodeId, updatedStatus);
        }
        logger.info("Status of {} changed from {} to {}", new Object[]{statusChangeMessage.getNodeId(), oldStatus, updatedStatus});
        logger.debug("State of cluster nodes is now {}", this.nodeStatuses);
        NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus();
        String summary = this.summarizeStatusChange(oldStatus, status);
        if (!StringUtils.isEmpty((CharSequence)summary)) {
            this.addNodeEvent(nodeId, summary);
        }
        NodeConnectionStatus.updateIdGenerator((long)updatedStatus.getUpdateIdentifier());
        if (this.isActiveClusterCoordinator()) {
            this.notifyOthersOfNodeStatusChange(statusChangeMessage.getNodeConnectionStatus());
        }
    }

    public String getFlowElectionStatus() {
        if (!this.requireElection) {
            return null;
        }
        return this.flowElection.getStatusDescription();
    }

    public boolean isFlowElectionComplete() {
        return !this.requireElection || this.flowElection.isElectionComplete();
    }

    private void registerNodeId(NodeIdentifier nodeIdentifier) {
        NodeConnectionStatus proposedConnectionStatus = new NodeConnectionStatus(nodeIdentifier, DisconnectionCode.NOT_YET_CONNECTED);
        NodeConnectionStatus existingStatus = this.nodeStatuses.putIfAbsent(nodeIdentifier.getId(), proposedConnectionStatus);
        this.removeConflictingNodeIds(nodeIdentifier);
        if (existingStatus == null) {
            logger.info("No existing node with ID {}; will add Node as {}", (Object)nodeIdentifier.getId(), (Object)nodeIdentifier.getFullDescription());
            logger.debug("After adding node {}, node statuses are {}", (Object)nodeIdentifier, this.nodeStatuses);
            this.onNodeAdded(nodeIdentifier, true);
        } else {
            logger.debug("A node already exists with ID {}; existing Node Identifier is: {}", (Object)nodeIdentifier.getId(), (Object)existingStatus.getNodeIdentifier().getFullDescription());
        }
    }

    private void removeConflictingNodeIds(NodeIdentifier nodeIdentifier) {
        Set<NodeIdentifier> conflictingNodeIds = this.findConflictingNodeIds(nodeIdentifier);
        if (!conflictingNodeIds.isEmpty()) {
            Set fullNodeIdDescriptions = conflictingNodeIds.stream().map(NodeIdentifier::getFullDescription).collect(Collectors.toSet());
            logger.warn("New Node {} was registered for this cluster, but this Node Identifier conflicts with {} others: {}; each of these conflicting Node Identifiers will be removed from the cluster", new Object[]{nodeIdentifier.getFullDescription(), fullNodeIdDescriptions.size(), fullNodeIdDescriptions});
            conflictingNodeIds.forEach(uuid -> this.removeNode((NodeIdentifier)uuid));
        }
    }

    private Set<NodeIdentifier> findConflictingNodeIds(NodeIdentifier nodeId) {
        return this.nodeStatuses.values().stream().map(NodeConnectionStatus::getNodeIdentifier).filter(potential -> !potential.equals((Object)nodeId)).filter(arg_0 -> ((NodeIdentifier)nodeId).logicallyEquals(arg_0)).collect(Collectors.toSet());
    }

    private ConnectionResponseMessage handleConnectionRequest(ConnectionRequestMessage requestMessage, Set<String> nodeIdentities) {
        NodeIdentifier nodeIdentifier = requestMessage.getConnectionRequest().getProposedNodeIdentifier();
        NodeIdentifier withNodeIdentities = this.addNodeIdentities(nodeIdentifier, nodeIdentities);
        DataFlow dataFlow = requestMessage.getConnectionRequest().getDataFlow();
        ConnectionRequest requestWithNodeIdentities = new ConnectionRequest(withNodeIdentities, dataFlow);
        this.registerNodeId(nodeIdentifier);
        if (this.isBlockedByFirewall(nodeIdentities)) {
            logger.info("Firewall blocked connection request from node " + nodeIdentifier + " with Node Identities " + nodeIdentities);
            ConnectionResponse response = ConnectionResponse.createBlockedByFirewallResponse();
            ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
            responseMessage.setConnectionResponse(response);
            return responseMessage;
        }
        if (this.requireElection) {
            DataFlow electedDataFlow = this.flowElection.castVote(dataFlow, withNodeIdentities);
            if (electedDataFlow == null) {
                logger.info("Received Connection Request from {}; responding with Flow Election In Progress message", (Object)withNodeIdentities);
                return this.createFlowElectionInProgressResponse();
            }
            logger.info("Received Connection Request from {}; responding with DataFlow that was elected", (Object)withNodeIdentities);
            return this.createConnectionResponse(requestWithNodeIdentities, nodeIdentifier, electedDataFlow);
        }
        logger.info("Received Connection Request from {}; responding with my DataFlow", (Object)withNodeIdentities);
        return this.createConnectionResponse(requestWithNodeIdentities, nodeIdentifier);
    }

    private ConnectionResponseMessage createFlowElectionInProgressResponse() {
        ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
        String statusDescription = this.flowElection.getStatusDescription();
        responseMessage.setConnectionResponse(new ConnectionResponse(5, "Cluster is still voting on which Flow is the correct flow for the cluster. " + statusDescription));
        return responseMessage;
    }

    private ConnectionResponseMessage createConnectionResponse(ConnectionRequest request, NodeIdentifier resolvedNodeIdentifier) {
        DataFlow dataFlow = null;
        if (this.flowService != null) {
            try {
                dataFlow = this.flowService.createDataFlowFromController();
            }
            catch (IOException ioe) {
                logger.error("Unable to obtain current dataflow from FlowService in order to provide the flow to " + resolvedNodeIdentifier + ". Will tell node to try again later", (Throwable)ioe);
            }
        }
        return this.createConnectionResponse(request, resolvedNodeIdentifier, dataFlow);
    }

    private ConnectionResponseMessage createConnectionResponse(ConnectionRequest request, NodeIdentifier resolvedNodeIdentifier, DataFlow clusterDataFlow) {
        if (clusterDataFlow == null) {
            ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
            responseMessage.setConnectionResponse(new ConnectionResponse(5, "The cluster dataflow is not yet available"));
            return responseMessage;
        }
        NodeConnectionStatus status = this.getConnectionStatus(resolvedNodeIdentifier);
        if (status == null) {
            this.addNodeEvent(resolvedNodeIdentifier, "Connection requested from new node. Setting status to connecting.");
        } else {
            this.addNodeEvent(resolvedNodeIdentifier, "Connection requested from existing node. Setting status to connecting.");
        }
        status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, null, null, null, Long.valueOf(System.currentTimeMillis()));
        this.updateNodeStatus(status);
        ConnectionResponse response = new ConnectionResponse(resolvedNodeIdentifier, clusterDataFlow, this.instanceId, this.getConnectionStatuses(), this.revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision((Revision)rev)).collect(Collectors.toList()));
        ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
        responseMessage.setConnectionResponse(response);
        return responseMessage;
    }

    private NodeIdentifier addNodeIdentities(NodeIdentifier nodeId, Set<String> nodeIdentities) {
        return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort(), nodeId.getSocketAddress(), nodeId.getSocketPort(), nodeId.getLoadBalanceAddress(), nodeId.getLoadBalancePort(), nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(), nodeId.getSiteToSiteHttpApiPort(), nodeId.isSiteToSiteSecure(), nodeIdentities);
    }

    public boolean canHandle(ProtocolMessage msg) {
        return ProtocolMessage.MessageType.CONNECTION_REQUEST == msg.getType() || ProtocolMessage.MessageType.NODE_STATUS_CHANGE == msg.getType() || ProtocolMessage.MessageType.NODE_CONNECTION_STATUS_REQUEST == msg.getType();
    }

    private boolean isMutableRequest(String method) {
        return "DELETE".equalsIgnoreCase(method) || "POST".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method);
    }

    @Override
    public void afterRequest(String uriPath, String method, Set<NodeResponse> nodeResponses) {
        if (!this.isActiveClusterCoordinator()) {
            return;
        }
        boolean mutableRequest = this.isMutableRequest(method);
        if (mutableRequest) {
            boolean someNodesFailedMissingCounter;
            StandardHttpResponseMapper responseMerger = new StandardHttpResponseMapper(this.nifiProperties);
            Set<NodeResponse> problematicNodeResponses = responseMerger.getProblematicNodeResponses(nodeResponses);
            boolean allNodesFailed = problematicNodeResponses.size() == nodeResponses.size();
            boolean bl = someNodesFailedMissingCounter = !problematicNodeResponses.isEmpty() && problematicNodeResponses.size() < nodeResponses.size() && this.isMissingCounter(problematicNodeResponses, uriPath);
            if (allNodesFailed) {
                logger.warn("All nodes failed to process URI {} {}. As a result, no node will be disconnected from cluster", (Object)method, (Object)uriPath);
                return;
            }
            if (someNodesFailedMissingCounter) {
                return;
            }
            if (!problematicNodeResponses.isEmpty() && problematicNodeResponses.size() < nodeResponses.size()) {
                Set failedNodeIds = problematicNodeResponses.stream().map(response -> response.getNodeId()).collect(Collectors.toSet());
                logger.warn(String.format("The following nodes failed to process URI %s '%s'.  Requesting each node reconnect to cluster.", uriPath, failedNodeIds));
                for (NodeIdentifier nodeId : failedNodeIds) {
                    NodeConnectionStatus reconnectionStatus = new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING);
                    this.updateNodeStatus(reconnectionStatus);
                    this.requestNodeConnect(nodeId, null);
                }
            }
        }
    }

    private boolean isMissingCounter(Set<NodeResponse> problematicNodeResponses, String uriPath) {
        if (COUNTER_URI_PATTERN.matcher(uriPath).matches()) {
            boolean notFound = true;
            for (NodeResponse problematicResponse : problematicNodeResponses) {
                if (problematicResponse.getStatus() == 404) continue;
                notFound = false;
                break;
            }
            return notFound;
        }
        return false;
    }

    public void setConnected(boolean connected) {
        this.connected = connected;
        if (connected) {
            logger.info("This node is now connected to the cluster. Will no longer require election of DataFlow.");
            this.requireElection = false;
        }
    }

    public boolean isConnected() {
        return this.connected;
    }

    public Map<NodeIdentifier, NodeWorkload> getClusterWorkload() throws IOException {
        ClusterWorkloadRequestMessage request = new ClusterWorkloadRequestMessage();
        ClusterWorkloadResponseMessage response = this.nodeProtocolSender.clusterWorkload(request);
        return response.getNodeWorkloads();
    }
}

