/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.coordination.node;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.event.Event;
import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.protocol.ComponentRevision;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.revision.RevisionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NodeClusterCoordinator
implements ClusterCoordinator,
ProtocolHandler,
RequestCompletionCallback {
    private static final Logger logger = LoggerFactory.getLogger(NodeClusterCoordinator.class);
    private static final String EVENT_CATEGORY = "Clustering";
    private static final Pattern COUNTER_URI_PATTERN = Pattern.compile("/nifi-api/counters/[a-f0-9\\-]{36}");
    private final String instanceId = UUID.randomUUID().toString();
    private volatile NodeIdentifier nodeId;
    private final ClusterCoordinationProtocolSenderListener senderListener;
    private final EventReporter eventReporter;
    private final ClusterNodeFirewall firewall;
    private final RevisionManager revisionManager;
    private final NiFiProperties nifiProperties;
    private final LeaderElectionManager leaderElectionManager;
    private final AtomicLong latestUpdateId = new AtomicLong(-1L);
    private volatile FlowService flowService;
    private volatile boolean connected;
    private volatile boolean closed = false;
    private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> nodeStatuses = new ConcurrentHashMap<NodeIdentifier, NodeConnectionStatus>();
    private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> nodeEvents = new ConcurrentHashMap<NodeIdentifier, CircularFifoQueue<NodeEvent>>();

    public NodeClusterCoordinator(ClusterCoordinationProtocolSenderListener senderListener, EventReporter eventReporter, LeaderElectionManager leaderElectionManager, ClusterNodeFirewall firewall, RevisionManager revisionManager, NiFiProperties nifiProperties) {
        this.senderListener = senderListener;
        this.flowService = null;
        this.eventReporter = eventReporter;
        this.firewall = firewall;
        this.revisionManager = revisionManager;
        this.nifiProperties = nifiProperties;
        this.leaderElectionManager = leaderElectionManager;
        senderListener.addHandler((ProtocolHandler)this);
    }

    public void shutdown() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        NodeConnectionStatus shutdownStatus = new NodeConnectionStatus(this.getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN);
        this.updateNodeStatus(shutdownStatus, false);
        logger.info("Successfully notified other nodes that I am shutting down");
    }

    public void setLocalNodeIdentifier(NodeIdentifier nodeId) {
        this.nodeId = nodeId;
        this.nodeStatuses.computeIfAbsent(nodeId, id -> new NodeConnectionStatus(id, DisconnectionCode.NOT_YET_CONNECTED));
    }

    public NodeIdentifier getLocalNodeIdentifier() {
        return this.nodeId;
    }

    private NodeIdentifier waitForElectedClusterCoordinator() {
        return this.waitForNodeIdentifier(() -> this.getElectedActiveCoordinatorNode(false));
    }

    private NodeIdentifier waitForNodeIdentifier(Supplier<NodeIdentifier> fetchNodeId) {
        NodeIdentifier localNodeId = null;
        while (localNodeId == null) {
            localNodeId = fetchNodeId.get();
            if (localNodeId != null) continue;
            if (this.closed) {
                return null;
            }
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                return null;
            }
        }
        return localNodeId;
    }

    private String getElectedActiveCoordinatorAddress() throws IOException {
        return this.leaderElectionManager.getLeader("Cluster Coordinator");
    }

    public void resetNodeStatuses(Map<NodeIdentifier, NodeConnectionStatus> statusMap) {
        logger.info("Resetting cluster node statuses from {} to {}", this.nodeStatuses, statusMap);
        for (Map.Entry<NodeIdentifier, NodeConnectionStatus> entry : statusMap.entrySet()) {
            NodeIdentifier nodeId = entry.getKey();
            NodeConnectionStatus proposedStatus = entry.getValue();
            if (proposedStatus.getState() == NodeConnectionState.REMOVED) {
                this.nodeStatuses.remove(nodeId);
                continue;
            }
            this.nodeStatuses.put(nodeId, proposedStatus);
        }
    }

    public boolean resetNodeStatus(NodeConnectionStatus connectionStatus, long qualifyingUpdateId) {
        NodeIdentifier nodeId = connectionStatus.getNodeIdentifier();
        NodeConnectionStatus currentStatus = this.getConnectionStatus(nodeId);
        if (currentStatus == null) {
            return this.replaceNodeStatus(nodeId, null, connectionStatus);
        }
        if (currentStatus.getUpdateIdentifier() == qualifyingUpdateId) {
            return this.replaceNodeStatus(nodeId, currentStatus, connectionStatus);
        }
        return false;
    }

    private boolean replaceNodeStatus(NodeIdentifier nodeId, NodeConnectionStatus currentStatus, NodeConnectionStatus newStatus) {
        if (newStatus == null) {
            logger.error("Cannot change node status for {} from {} to {} because new status is null", new Object[]{nodeId, currentStatus, newStatus});
            logger.error("", (Throwable)new NullPointerException());
        }
        if (currentStatus == null) {
            if (newStatus.getState() == NodeConnectionState.REMOVED) {
                return this.nodeStatuses.remove(nodeId, currentStatus);
            }
            NodeConnectionStatus existingValue = this.nodeStatuses.putIfAbsent(nodeId, newStatus);
            return existingValue == null;
        }
        if (newStatus.getState() == NodeConnectionState.REMOVED) {
            return this.nodeStatuses.remove(nodeId, currentStatus);
        }
        return this.nodeStatuses.replace(nodeId, currentStatus, newStatus);
    }

    public void requestNodeConnect(NodeIdentifier nodeId, String userDn) {
        if (userDn == null) {
            this.reportEvent(nodeId, Severity.INFO, "Requesting that node connect to cluster");
        } else {
            this.reportEvent(nodeId, Severity.INFO, "Requesting that node connect to cluster on behalf of " + userDn);
        }
        this.updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTING, null, null, Long.valueOf(System.currentTimeMillis())));
        ReconnectionRequestMessage request = new ReconnectionRequestMessage();
        request.setNodeId(nodeId);
        request.setInstanceId(this.instanceId);
        this.requestReconnectionAsynchronously(request, 10, 5);
    }

    public void finishNodeConnection(NodeIdentifier nodeId) {
        NodeConnectionState state = this.getConnectionState(nodeId);
        if (state == null) {
            logger.debug("Attempted to finish node connection for {} but node is not known. Requesting that node connect", (Object)nodeId);
            this.requestNodeConnect(nodeId, null);
            return;
        }
        if (state == NodeConnectionState.CONNECTED) {
            return;
        }
        if (state == NodeConnectionState.DISCONNECTED || state == NodeConnectionState.DISCONNECTING) {
            logger.debug("Attempted to finish node connection for {} but node state was {}. Requesting that node connect", (Object)nodeId, (Object)state);
            this.requestNodeConnect(nodeId, null);
            return;
        }
        logger.info("{} is now connected", (Object)nodeId);
        this.updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
    }

    public void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
        Set<NodeIdentifier> connectedNodeIds = this.getNodeIdentifiers(NodeConnectionState.CONNECTED);
        if (connectedNodeIds.size() == 1 && connectedNodeIds.contains(nodeId)) {
            throw new IllegalNodeDisconnectionException("Cannot disconnect node " + nodeId + " because it is the only node currently connected");
        }
        logger.info("Requesting that {} disconnect due to {}", (Object)nodeId, explanation == null ? disconnectionCode : explanation);
        this.updateNodeStatus(new NodeConnectionStatus(nodeId, disconnectionCode, explanation));
        if (disconnectionCode == DisconnectionCode.NODE_SHUTDOWN) {
            return;
        }
        DisconnectMessage request = new DisconnectMessage();
        request.setNodeId(nodeId);
        request.setExplanation(explanation);
        this.addNodeEvent(nodeId, "Disconnection requested due to " + explanation);
        this.disconnectAsynchronously(request, 10, 5);
    }

    public void disconnectionRequestedByNode(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
        Severity severity;
        logger.info("{} requested disconnection from cluster due to {}", (Object)nodeId, explanation == null ? disconnectionCode : explanation);
        this.updateNodeStatus(new NodeConnectionStatus(nodeId, disconnectionCode, explanation));
        switch (disconnectionCode) {
            case STARTUP_FAILURE: 
            case MISMATCHED_FLOWS: 
            case UNKNOWN: {
                severity = Severity.ERROR;
                break;
            }
            case LACK_OF_HEARTBEAT: {
                severity = Severity.WARNING;
                break;
            }
            default: {
                severity = Severity.INFO;
            }
        }
        this.reportEvent(nodeId, severity, "Node disconnected from cluster due to " + explanation);
    }

    public void removeNode(NodeIdentifier nodeId, String userDn) {
        this.reportEvent(nodeId, Severity.INFO, "User " + userDn + " requested that node be removed from cluster");
        this.nodeStatuses.remove(nodeId);
        this.nodeEvents.remove(nodeId);
        this.notifyOthersOfNodeStatusChange(new NodeConnectionStatus(nodeId, NodeConnectionState.REMOVED));
    }

    public NodeConnectionStatus getConnectionStatus(NodeIdentifier nodeId) {
        return (NodeConnectionStatus)this.nodeStatuses.get(nodeId);
    }

    private NodeConnectionState getConnectionState(NodeIdentifier nodeId) {
        NodeConnectionStatus status = this.getConnectionStatus(nodeId);
        return status == null ? null : status.getState();
    }

    public List<NodeConnectionStatus> getConnectionStatuses() {
        return new ArrayList<NodeConnectionStatus>(this.nodeStatuses.values());
    }

    public Map<NodeConnectionState, List<NodeIdentifier>> getConnectionStates() {
        HashMap<NodeConnectionState, List<NodeIdentifier>> connectionStates = new HashMap<NodeConnectionState, List<NodeIdentifier>>();
        for (Map.Entry entry : this.nodeStatuses.entrySet()) {
            NodeConnectionState state = ((NodeConnectionStatus)entry.getValue()).getState();
            List nodeIds = connectionStates.computeIfAbsent(state, s -> new ArrayList());
            nodeIds.add(entry.getKey());
        }
        return connectionStates;
    }

    public boolean isBlockedByFirewall(String hostname) {
        return this.firewall != null && !this.firewall.isPermissible(hostname);
    }

    public void reportEvent(NodeIdentifier nodeId, Severity severity, String event) {
        this.eventReporter.reportEvent(severity, EVENT_CATEGORY, nodeId == null ? event : "Event Reported for " + nodeId + " -- " + event);
        if (nodeId != null) {
            this.addNodeEvent(nodeId, severity, event);
        }
        String message = nodeId == null ? event : "Event Reported for " + nodeId + " -- " + event;
        switch (severity) {
            case ERROR: {
                logger.error(message);
                break;
            }
            case WARNING: {
                logger.warn(message);
                break;
            }
            case INFO: {
                logger.info(message);
            }
        }
    }

    public NodeIdentifier getNodeIdentifier(String uuid) {
        for (NodeIdentifier nodeId : this.nodeStatuses.keySet()) {
            if (!nodeId.getId().equals(uuid)) continue;
            return nodeId;
        }
        return null;
    }

    public Set<NodeIdentifier> getNodeIdentifiers(NodeConnectionState ... states) {
        HashSet<NodeConnectionState> statesOfInterest = new HashSet<NodeConnectionState>();
        if (states.length == 0) {
            for (NodeConnectionState state : NodeConnectionState.values()) {
                statesOfInterest.add(state);
            }
        } else {
            for (NodeConnectionState state : states) {
                statesOfInterest.add(state);
            }
        }
        return this.nodeStatuses.entrySet().stream().filter(entry -> statesOfInterest.contains(((NodeConnectionStatus)entry.getValue()).getState())).map(entry -> (NodeIdentifier)entry.getKey()).collect(Collectors.toSet());
    }

    public NodeIdentifier getPrimaryNode() {
        String primaryNodeAddress = this.leaderElectionManager.getLeader("Primary Node");
        if (primaryNodeAddress == null) {
            return null;
        }
        return this.nodeStatuses.keySet().stream().filter(nodeId -> primaryNodeAddress.equals(nodeId.getSocketAddress() + ":" + nodeId.getSocketPort())).findFirst().orElse(null);
    }

    public NodeIdentifier getElectedActiveCoordinatorNode() {
        return this.getElectedActiveCoordinatorNode(true);
    }

    private NodeIdentifier getElectedActiveCoordinatorNode(boolean warnOnError) {
        NodeIdentifier electedNodeId;
        block16: {
            int electedNodePort;
            String electedNodeAddress;
            try {
                electedNodeAddress = this.getElectedActiveCoordinatorAddress();
            }
            catch (NoClusterCoordinatorException ncce) {
                logger.debug("There is currently no elected active Cluster Coordinator");
                return null;
            }
            catch (IOException ioe) {
                if (warnOnError) {
                    logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently: " + ioe);
                    if (logger.isDebugEnabled()) {
                        logger.warn("", (Throwable)ioe);
                    }
                }
                return null;
            }
            if (electedNodeAddress == null) {
                logger.debug("There is currently no elected active Cluster Coordinator");
                return null;
            }
            int colonLoc = electedNodeAddress.indexOf(58);
            if (colonLoc < 1) {
                if (warnOnError) {
                    logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", (Object)electedNodeAddress);
                }
                return null;
            }
            String electedNodeHostname = electedNodeAddress.substring(0, colonLoc);
            String portString = electedNodeAddress.substring(colonLoc + 1);
            try {
                electedNodePort = Integer.parseInt(portString);
            }
            catch (NumberFormatException nfe) {
                if (warnOnError) {
                    logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", (Object)electedNodeAddress);
                }
                return null;
            }
            Set<NodeIdentifier> connectedNodeIds = this.getNodeIdentifiers(new NodeConnectionState[0]);
            electedNodeId = connectedNodeIds.stream().filter(nodeId -> nodeId.getSocketAddress().equals(electedNodeHostname) && nodeId.getSocketPort() == electedNodePort).findFirst().orElse(null);
            if (electedNodeId == null && warnOnError) {
                logger.debug("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {},but there is no node with this address. Will attempt to communicate with node to determine its information", (Object)electedNodeAddress);
                try {
                    NodeConnectionStatus connectionStatus = this.senderListener.requestNodeConnectionStatus(electedNodeHostname, electedNodePort);
                    logger.debug("Received NodeConnectionStatus {}", (Object)connectionStatus);
                    if (connectionStatus == null) {
                        return null;
                    }
                    NodeConnectionStatus existingStatus = this.nodeStatuses.putIfAbsent(connectionStatus.getNodeIdentifier(), connectionStatus);
                    if (existingStatus == null) {
                        return connectionStatus.getNodeIdentifier();
                    }
                    return existingStatus.getNodeIdentifier();
                }
                catch (Exception e) {
                    logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address. Attempted to determine the node's information but failed to retrieve its information due to {}", (Object)electedNodeAddress, (Object)e.toString());
                    if (!logger.isDebugEnabled()) break block16;
                    logger.warn("", (Throwable)e);
                }
            }
        }
        return electedNodeId;
    }

    public boolean isActiveClusterCoordinator() {
        NodeIdentifier self = this.getLocalNodeIdentifier();
        return self != null && self.equals((Object)this.getElectedActiveCoordinatorNode());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<NodeEvent> getNodeEvents(NodeIdentifier nodeId) {
        CircularFifoQueue eventQueue = (CircularFifoQueue)this.nodeEvents.get(nodeId);
        if (eventQueue == null) {
            return Collections.emptyList();
        }
        CircularFifoQueue circularFifoQueue = eventQueue;
        synchronized (circularFifoQueue) {
            return new ArrayList<NodeEvent>((Collection<NodeEvent>)eventQueue);
        }
    }

    public void setFlowService(FlowService flowService) {
        if (this.flowService != null) {
            throw new IllegalStateException("Flow Service has already been set");
        }
        this.flowService = flowService;
    }

    private void addNodeEvent(NodeIdentifier nodeId, String event) {
        this.addNodeEvent(nodeId, Severity.INFO, event);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addNodeEvent(NodeIdentifier nodeId, Severity severity, String message) {
        CircularFifoQueue eventQueue;
        Event event = new Event(nodeId.toString(), message, severity);
        CircularFifoQueue circularFifoQueue = eventQueue = this.nodeEvents.computeIfAbsent(nodeId, id -> new CircularFifoQueue());
        synchronized (circularFifoQueue) {
            eventQueue.add((Object)event);
        }
    }

    void updateNodeStatus(NodeConnectionStatus status) {
        this.updateNodeStatus(status, true);
    }

    void updateNodeStatus(NodeConnectionStatus status, boolean waitForCoordinator) {
        NodeIdentifier nodeId = status.getNodeIdentifier();
        NodeConnectionStatus currentStatus = this.nodeStatuses.put(nodeId, status);
        NodeConnectionState currentState = currentStatus == null ? null : currentStatus.getState();
        logger.info("Status of {} changed from {} to {}", new Object[]{nodeId, currentStatus, status});
        logger.debug("State of cluster nodes is now {}", this.nodeStatuses);
        this.latestUpdateId.updateAndGet(curVal -> Math.max(curVal, status.getUpdateIdentifier()));
        if (currentState == null || currentState != status.getState()) {
            boolean notifyAllNodes = this.isActiveClusterCoordinator();
            if (notifyAllNodes) {
                logger.debug("Notifying all nodes that status changed from {} to {}", (Object)currentStatus, (Object)status);
            } else {
                logger.debug("Notifying cluster coordinator that node status changed from {} to {}", (Object)currentStatus, (Object)status);
            }
            this.notifyOthersOfNodeStatusChange(status, notifyAllNodes, waitForCoordinator);
        } else {
            logger.debug("Not notifying other nodes that status changed because previous state of {} is same as new state of {}", (Object)currentState, (Object)status.getState());
        }
    }

    void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus) {
        this.notifyOthersOfNodeStatusChange(updatedStatus, this.isActiveClusterCoordinator(), true);
    }

    void notifyOthersOfNodeStatusChange(NodeConnectionStatus updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
        Set<NodeIdentifier> nodesToNotify;
        if (notifyAllNodes) {
            nodesToNotify = this.getNodeIdentifiers(NodeConnectionState.CONNECTED, NodeConnectionState.CONNECTING);
            nodesToNotify.remove(this.getLocalNodeIdentifier());
        } else if (waitForCoordinator) {
            nodesToNotify = Collections.singleton(this.waitForElectedClusterCoordinator());
        } else {
            NodeIdentifier nodeId = this.getElectedActiveCoordinatorNode();
            if (nodeId == null) {
                return;
            }
            nodesToNotify = Collections.singleton(nodeId);
        }
        NodeStatusChangeMessage message = new NodeStatusChangeMessage();
        message.setNodeId(updatedStatus.getNodeIdentifier());
        message.setNodeConnectionStatus(updatedStatus);
        this.senderListener.notifyNodeStatusChange(nodesToNotify, message);
    }

    private void disconnectAsynchronously(final DisconnectMessage request, final int attempts, final int retrySeconds) {
        Thread disconnectThread = new Thread(new Runnable(){

            @Override
            public void run() {
                NodeIdentifier nodeId = request.getNodeId();
                for (int i = 0; i < attempts; ++i) {
                    try {
                        NodeClusterCoordinator.this.senderListener.disconnect(request);
                        NodeClusterCoordinator.this.reportEvent(nodeId, Severity.INFO, "Node disconnected due to " + request.getExplanation());
                        return;
                    }
                    catch (Exception e) {
                        logger.error("Failed to notify {} that it has been disconnected from the cluster due to {}", (Object)request.getNodeId(), (Object)request.getExplanation());
                        try {
                            Thread.sleep((long)retrySeconds * 1000L);
                            continue;
                        }
                        catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                }
            }
        }, "Disconnect " + request.getNodeId());
        disconnectThread.start();
    }

    private void requestReconnectionAsynchronously(final ReconnectionRequestMessage request, final int reconnectionAttempts, final int retrySeconds) {
        Thread reconnectionThread = new Thread(new Runnable(){

            @Override
            public void run() {
                while (NodeClusterCoordinator.this.flowService == null) {
                    try {
                        Thread.sleep(100L);
                    }
                    catch (InterruptedException ie) {
                        logger.info("Could not send Reconnection request to {} because thread was interrupted before FlowService was made available", (Object)request.getNodeId());
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
                for (int i = 0; i < reconnectionAttempts; ++i) {
                    try {
                        if (NodeConnectionState.CONNECTING != NodeClusterCoordinator.this.getConnectionState(request.getNodeId())) {
                            return;
                        }
                        request.setDataFlow(new StandardDataFlow(NodeClusterCoordinator.this.flowService.createDataFlow()));
                        request.setNodeConnectionStatuses(NodeClusterCoordinator.this.getConnectionStatuses());
                        request.setComponentRevisions(NodeClusterCoordinator.this.revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision((Revision)rev)).collect(Collectors.toList()));
                        NodeClusterCoordinator.this.senderListener.requestReconnection(request);
                        logger.info("Successfully requested that {} join the cluster", (Object)request.getNodeId());
                        return;
                    }
                    catch (Exception e) {
                        logger.warn("Problem encountered issuing reconnection request to node " + request.getNodeId(), (Throwable)e);
                        NodeClusterCoordinator.this.eventReporter.reportEvent(Severity.WARNING, NodeClusterCoordinator.EVENT_CATEGORY, "Problem encountered issuing reconnection request to node " + request.getNodeId() + " due to: " + e);
                        try {
                            Thread.sleep(1000L * (long)retrySeconds);
                            continue;
                        }
                        catch (InterruptedException ie) {
                            break;
                        }
                    }
                }
                if (NodeConnectionState.CONNECTING == NodeClusterCoordinator.this.getConnectionState(request.getNodeId())) {
                    NodeClusterCoordinator.this.requestNodeDisconnect(request.getNodeId(), DisconnectionCode.UNABLE_TO_COMMUNICATE, "Attempted to request that node reconnect to cluster but could not communicate with node");
                }
            }
        }, "Reconnect " + request.getNodeId());
        reconnectionThread.start();
    }

    public ProtocolMessage handle(ProtocolMessage protocolMessage) throws ProtocolException {
        switch (protocolMessage.getType()) {
            case CONNECTION_REQUEST: {
                return this.handleConnectionRequest((ConnectionRequestMessage)protocolMessage);
            }
            case NODE_STATUS_CHANGE: {
                this.handleNodeStatusChange((NodeStatusChangeMessage)protocolMessage);
                return null;
            }
            case NODE_CONNECTION_STATUS_REQUEST: {
                return this.handleNodeConnectionStatusRequest();
            }
        }
        throw new ProtocolException("Cannot handle Protocol Message " + protocolMessage + " because it is not of the correct type");
    }

    private NodeConnectionStatusResponseMessage handleNodeConnectionStatusRequest() {
        NodeConnectionStatus connectionStatus = (NodeConnectionStatus)this.nodeStatuses.get(this.getLocalNodeIdentifier());
        NodeConnectionStatusResponseMessage msg = new NodeConnectionStatusResponseMessage();
        msg.setNodeConnectionStatus(connectionStatus);
        return msg;
    }

    private String summarizeStatusChange(NodeConnectionStatus oldStatus, NodeConnectionStatus status) {
        StringBuilder sb = new StringBuilder();
        if (oldStatus == null || status.getState() != oldStatus.getState()) {
            sb.append("Node Status changed from ").append(oldStatus == null ? "[Unknown Node]" : oldStatus.getState().toString()).append(" to ").append(status.getState().toString());
            if (status.getDisconnectReason() != null) {
                sb.append(" due to ").append(status.getDisconnectReason());
            } else if (status.getDisconnectCode() != null) {
                sb.append(" due to ").append(status.getDisconnectCode().toString());
            }
        }
        return sb.toString();
    }

    private void handleNodeStatusChange(NodeStatusChangeMessage statusChangeMessage) {
        NodeConnectionStatus updatedStatus = statusChangeMessage.getNodeConnectionStatus();
        NodeIdentifier nodeId = statusChangeMessage.getNodeId();
        logger.debug("Handling request {}", (Object)statusChangeMessage);
        NodeConnectionStatus oldStatus = (NodeConnectionStatus)this.nodeStatuses.get(statusChangeMessage.getNodeId());
        if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) {
            this.nodeStatuses.remove(nodeId, oldStatus);
        } else {
            this.nodeStatuses.put(nodeId, updatedStatus);
        }
        logger.info("Status of {} changed from {} to {}", new Object[]{statusChangeMessage.getNodeId(), oldStatus, updatedStatus});
        logger.debug("State of cluster nodes is now {}", this.nodeStatuses);
        NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus();
        String summary = this.summarizeStatusChange(oldStatus, status);
        if (!StringUtils.isEmpty((CharSequence)summary)) {
            this.addNodeEvent(nodeId, summary);
        }
        NodeConnectionStatus.updateIdGenerator((long)updatedStatus.getUpdateIdentifier());
        if (this.isActiveClusterCoordinator()) {
            this.notifyOthersOfNodeStatusChange(statusChangeMessage.getNodeConnectionStatus());
        }
    }

    private NodeIdentifier resolveNodeId(NodeIdentifier proposedIdentifier) {
        NodeConnectionStatus proposedConnectionStatus = new NodeConnectionStatus(proposedIdentifier, DisconnectionCode.NOT_YET_CONNECTED);
        NodeConnectionStatus existingStatus = this.nodeStatuses.putIfAbsent(proposedIdentifier, proposedConnectionStatus);
        NodeIdentifier resolvedNodeId = proposedIdentifier;
        if (existingStatus == null) {
            resolvedNodeId = proposedIdentifier;
            logger.debug("No existing node with ID {}; resolved node ID is as-proposed", (Object)proposedIdentifier.getId());
        } else if (existingStatus.getNodeIdentifier().logicallyEquals(proposedIdentifier)) {
            resolvedNodeId = proposedIdentifier;
            logger.debug("No existing node with ID {}; resolved node ID is as-proposed", (Object)proposedIdentifier.getId());
        } else {
            resolvedNodeId = new NodeIdentifier(UUID.randomUUID().toString(), proposedIdentifier.getApiAddress(), proposedIdentifier.getApiPort(), proposedIdentifier.getSocketAddress(), proposedIdentifier.getSocketPort(), proposedIdentifier.getSiteToSiteAddress(), proposedIdentifier.getSiteToSitePort(), proposedIdentifier.getSiteToSiteHttpApiPort(), proposedIdentifier.isSiteToSiteSecure());
            logger.debug("A node already exists with ID {}. Proposed Node Identifier was {}; existing Node Identifier is {}; Resolved Node Identifier is {}", new Object[]{proposedIdentifier.getId(), proposedIdentifier, this.getNodeIdentifier(proposedIdentifier.getId()), resolvedNodeId});
        }
        return resolvedNodeId;
    }

    private ConnectionResponseMessage handleConnectionRequest(ConnectionRequestMessage requestMessage) {
        NodeIdentifier proposedIdentifier = requestMessage.getConnectionRequest().getProposedNodeIdentifier();
        ConnectionRequest requestWithDn = new ConnectionRequest(this.addRequestorDn(proposedIdentifier, requestMessage.getRequestorDN()));
        NodeIdentifier resolvedNodeId = this.resolveNodeId(proposedIdentifier);
        ConnectionResponse response = this.createConnectionResponse(requestWithDn, resolvedNodeId);
        ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
        responseMessage.setConnectionResponse(response);
        return responseMessage;
    }

    private ConnectionResponse createConnectionResponse(ConnectionRequest request, NodeIdentifier resolvedNodeIdentifier) {
        if (this.isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
            logger.info("Firewall blocked connection request from node " + resolvedNodeIdentifier);
            return ConnectionResponse.createBlockedByFirewallResponse();
        }
        NodeConnectionStatus status = this.getConnectionStatus(resolvedNodeIdentifier);
        if (status == null) {
            this.addNodeEvent(resolvedNodeIdentifier, "Connection requested from new node.  Setting status to connecting.");
        } else {
            this.addNodeEvent(resolvedNodeIdentifier, "Connection requested from existing node.  Setting status to connecting");
        }
        status = new NodeConnectionStatus(resolvedNodeIdentifier, NodeConnectionState.CONNECTING, null, null, Long.valueOf(System.currentTimeMillis()));
        this.updateNodeStatus(status);
        DataFlow dataFlow = null;
        if (this.flowService != null) {
            try {
                dataFlow = this.flowService.createDataFlow();
            }
            catch (IOException ioe) {
                logger.error("Unable to obtain current dataflow from FlowService in order to provide the flow to " + resolvedNodeIdentifier + ". Will tell node to try again later", (Throwable)ioe);
            }
        }
        if (dataFlow == null) {
            int tryAgainSeconds = 5;
            this.addNodeEvent(resolvedNodeIdentifier, Severity.WARNING, "Connection requested from node, but manager was unable to obtain current flow. Instructing node to try again in 5 seconds.");
            return new ConnectionResponse(5);
        }
        return new ConnectionResponse(resolvedNodeIdentifier, dataFlow, this.instanceId, this.getConnectionStatuses(), this.revisionManager.getAllRevisions().stream().map(rev -> ComponentRevision.fromRevision((Revision)rev)).collect(Collectors.toList()));
    }

    private NodeIdentifier addRequestorDn(NodeIdentifier nodeId, String dn) {
        return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort(), nodeId.getSocketAddress(), nodeId.getSocketPort(), nodeId.getSiteToSiteAddress(), nodeId.getSiteToSitePort(), nodeId.getSiteToSiteHttpApiPort(), nodeId.isSiteToSiteSecure(), dn);
    }

    public boolean canHandle(ProtocolMessage msg) {
        return ProtocolMessage.MessageType.CONNECTION_REQUEST == msg.getType() || ProtocolMessage.MessageType.NODE_STATUS_CHANGE == msg.getType() || ProtocolMessage.MessageType.NODE_CONNECTION_STATUS_REQUEST == msg.getType();
    }

    private boolean isMutableRequest(String method) {
        return "DELETE".equalsIgnoreCase(method) || "POST".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method);
    }

    @Override
    public void afterRequest(String uriPath, String method, Set<NodeResponse> nodeResponses) {
        if (!this.isActiveClusterCoordinator()) {
            return;
        }
        boolean mutableRequest = this.isMutableRequest(method);
        if (mutableRequest) {
            boolean someNodesFailedMissingCounter;
            StandardHttpResponseMerger responseMerger = new StandardHttpResponseMerger(this.nifiProperties);
            Set<NodeResponse> problematicNodeResponses = responseMerger.getProblematicNodeResponses(nodeResponses);
            boolean allNodesFailed = problematicNodeResponses.size() == nodeResponses.size();
            boolean bl = someNodesFailedMissingCounter = !problematicNodeResponses.isEmpty() && problematicNodeResponses.size() < nodeResponses.size() && this.isMissingCounter(problematicNodeResponses, uriPath);
            if (allNodesFailed) {
                logger.warn("All nodes failed to process URI {} {}. As a result, no node will be disconnected from cluster", (Object)method, (Object)uriPath);
                return;
            }
            if (someNodesFailedMissingCounter) {
                return;
            }
            if (!problematicNodeResponses.isEmpty() && problematicNodeResponses.size() < nodeResponses.size()) {
                Set failedNodeIds = problematicNodeResponses.stream().map(response -> response.getNodeId()).collect(Collectors.toSet());
                logger.warn(String.format("The following nodes failed to process URI %s '%s'.  Requesting each node disconnect from cluster.", uriPath, failedNodeIds));
                for (NodeIdentifier nodeId : failedNodeIds) {
                    this.requestNodeDisconnect(nodeId, DisconnectionCode.FAILED_TO_SERVICE_REQUEST, "Failed to process request " + method + " " + uriPath);
                }
            }
        }
    }

    private boolean isMissingCounter(Set<NodeResponse> problematicNodeResponses, String uriPath) {
        if (COUNTER_URI_PATTERN.matcher(uriPath).matches()) {
            boolean notFound = true;
            for (NodeResponse problematicResponse : problematicNodeResponses) {
                if (problematicResponse.getStatus() == 404) continue;
                notFound = false;
                break;
            }
            return notFound;
        }
        return false;
    }

    public void setConnected(boolean connected) {
        this.connected = connected;
    }

    public boolean isConnected() {
        return this.connected;
    }
}

