package org.apache.nifi.cluster.coordination.node;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
import org.apache.nifi.cluster.event.Event;
import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.protocol.ComponentRevision;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.revision.RevisionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.class */
public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandler, RequestCompletionCallback {
    private static final String EVENT_CATEGORY = "Clustering";
    private volatile NodeIdentifier nodeId;
    private final ClusterCoordinationProtocolSenderListener senderListener;
    private final EventReporter eventReporter;
    private final ClusterNodeFirewall firewall;
    private final RevisionManager revisionManager;
    private final NiFiProperties nifiProperties;
    private final LeaderElectionManager leaderElectionManager;
    private volatile boolean connected;
    private static final Logger logger = LoggerFactory.getLogger(NodeClusterCoordinator.class);
    private static final Pattern COUNTER_URI_PATTERN = Pattern.compile("/nifi-api/counters/[a-f0-9\\-]{36}");
    private final String instanceId = UUID.randomUUID().toString();
    private final AtomicLong latestUpdateId = new AtomicLong(-1);
    private volatile boolean closed = false;
    private final ConcurrentMap<NodeIdentifier, NodeConnectionStatus> nodeStatuses = new ConcurrentHashMap();
    private final ConcurrentMap<NodeIdentifier, CircularFifoQueue<NodeEvent>> nodeEvents = new ConcurrentHashMap();
    private volatile FlowService flowService = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$nifi$cluster$coordination$node$DisconnectionCode;
        static final /* synthetic */ int[] $SwitchMap$org$apache$nifi$reporting$Severity;
        static final /* synthetic */ int[] $SwitchMap$org$apache$nifi$cluster$protocol$message$ProtocolMessage$MessageType = new int[ProtocolMessage.MessageType.values().length];

        static {
            try {
                $SwitchMap$org$apache$nifi$cluster$protocol$message$ProtocolMessage$MessageType[ProtocolMessage.MessageType.CONNECTION_REQUEST.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$nifi$cluster$protocol$message$ProtocolMessage$MessageType[ProtocolMessage.MessageType.NODE_STATUS_CHANGE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$nifi$cluster$protocol$message$ProtocolMessage$MessageType[ProtocolMessage.MessageType.NODE_CONNECTION_STATUS_REQUEST.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$nifi$reporting$Severity = new int[Severity.values().length];
            try {
                $SwitchMap$org$apache$nifi$reporting$Severity[Severity.ERROR.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$nifi$reporting$Severity[Severity.WARNING.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$nifi$reporting$Severity[Severity.INFO.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
            $SwitchMap$org$apache$nifi$cluster$coordination$node$DisconnectionCode = new int[DisconnectionCode.values().length];
            try {
                $SwitchMap$org$apache$nifi$cluster$coordination$node$DisconnectionCode[DisconnectionCode.STARTUP_FAILURE.ordinal()] = 1;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$nifi$cluster$coordination$node$DisconnectionCode[DisconnectionCode.MISMATCHED_FLOWS.ordinal()] = 2;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$nifi$cluster$coordination$node$DisconnectionCode[DisconnectionCode.UNKNOWN.ordinal()] = 3;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$nifi$cluster$coordination$node$DisconnectionCode[DisconnectionCode.LACK_OF_HEARTBEAT.ordinal()] = 4;
            } catch (NoSuchFieldError e10) {
            }
        }
    }

    public NodeClusterCoordinator(ClusterCoordinationProtocolSenderListener clusterCoordinationProtocolSenderListener, EventReporter eventReporter, LeaderElectionManager leaderElectionManager, ClusterNodeFirewall clusterNodeFirewall, RevisionManager revisionManager, NiFiProperties niFiProperties) {
        this.senderListener = clusterCoordinationProtocolSenderListener;
        this.eventReporter = eventReporter;
        this.firewall = clusterNodeFirewall;
        this.revisionManager = revisionManager;
        this.nifiProperties = niFiProperties;
        this.leaderElectionManager = leaderElectionManager;
        clusterCoordinationProtocolSenderListener.addHandler(this);
    }

    public void shutdown() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        updateNodeStatus(new NodeConnectionStatus(getLocalNodeIdentifier(), DisconnectionCode.NODE_SHUTDOWN), false);
        logger.info("Successfully notified other nodes that I am shutting down");
    }

    public void setLocalNodeIdentifier(NodeIdentifier nodeIdentifier) {
        this.nodeId = nodeIdentifier;
        this.nodeStatuses.computeIfAbsent(nodeIdentifier, nodeIdentifier2 -> {
            return new NodeConnectionStatus(nodeIdentifier2, DisconnectionCode.NOT_YET_CONNECTED);
        });
    }

    public NodeIdentifier getLocalNodeIdentifier() {
        return this.nodeId;
    }

    private NodeIdentifier waitForElectedClusterCoordinator() {
        return waitForNodeIdentifier(() -> {
            return getElectedActiveCoordinatorNode(false);
        });
    }

    private NodeIdentifier waitForNodeIdentifier(Supplier<NodeIdentifier> supplier) {
        NodeIdentifier nodeIdentifier = null;
        while (nodeIdentifier == null) {
            nodeIdentifier = supplier.get();
            if (nodeIdentifier == null) {
                if (this.closed) {
                    return null;
                }
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return null;
                }
            }
        }
        return nodeIdentifier;
    }

    private String getElectedActiveCoordinatorAddress() throws IOException {
        return this.leaderElectionManager.getLeader("Cluster Coordinator");
    }

    public void resetNodeStatuses(Map<NodeIdentifier, NodeConnectionStatus> map) {
        logger.info("Resetting cluster node statuses from {} to {}", this.nodeStatuses, map);
        for (Map.Entry<NodeIdentifier, NodeConnectionStatus> entry : map.entrySet()) {
            NodeIdentifier key = entry.getKey();
            NodeConnectionStatus value = entry.getValue();
            if (value.getState() == NodeConnectionState.REMOVED) {
                this.nodeStatuses.remove(key);
            } else {
                this.nodeStatuses.put(key, value);
            }
        }
    }

    public boolean resetNodeStatus(NodeConnectionStatus nodeConnectionStatus, long j) {
        NodeIdentifier nodeIdentifier = nodeConnectionStatus.getNodeIdentifier();
        NodeConnectionStatus connectionStatus = getConnectionStatus(nodeIdentifier);
        if (connectionStatus == null) {
            return replaceNodeStatus(nodeIdentifier, null, nodeConnectionStatus);
        }
        if (connectionStatus.getUpdateIdentifier() == j) {
            return replaceNodeStatus(nodeIdentifier, connectionStatus, nodeConnectionStatus);
        }
        return false;
    }

    private boolean replaceNodeStatus(NodeIdentifier nodeIdentifier, NodeConnectionStatus nodeConnectionStatus, NodeConnectionStatus nodeConnectionStatus2) {
        if (nodeConnectionStatus2 == null) {
            logger.error("Cannot change node status for {} from {} to {} because new status is null", new Object[]{nodeIdentifier, nodeConnectionStatus, nodeConnectionStatus2});
            logger.error("", new NullPointerException());
        }
        return nodeConnectionStatus == null ? nodeConnectionStatus2.getState() == NodeConnectionState.REMOVED ? this.nodeStatuses.remove(nodeIdentifier, nodeConnectionStatus) : this.nodeStatuses.putIfAbsent(nodeIdentifier, nodeConnectionStatus2) == null : nodeConnectionStatus2.getState() == NodeConnectionState.REMOVED ? this.nodeStatuses.remove(nodeIdentifier, nodeConnectionStatus) : this.nodeStatuses.replace(nodeIdentifier, nodeConnectionStatus, nodeConnectionStatus2);
    }

    public void requestNodeConnect(NodeIdentifier nodeIdentifier, String str) {
        if (str == null) {
            reportEvent(nodeIdentifier, Severity.INFO, "Requesting that node connect to cluster");
        } else {
            reportEvent(nodeIdentifier, Severity.INFO, "Requesting that node connect to cluster on behalf of " + str);
        }
        updateNodeStatus(new NodeConnectionStatus(nodeIdentifier, NodeConnectionState.CONNECTING, (DisconnectionCode) null, (String) null, Long.valueOf(System.currentTimeMillis())));
        ReconnectionRequestMessage reconnectionRequestMessage = new ReconnectionRequestMessage();
        reconnectionRequestMessage.setNodeId(nodeIdentifier);
        reconnectionRequestMessage.setInstanceId(this.instanceId);
        requestReconnectionAsynchronously(reconnectionRequestMessage, 10, 5);
    }

    public void finishNodeConnection(NodeIdentifier nodeIdentifier) {
        NodeConnectionState connectionState = getConnectionState(nodeIdentifier);
        if (connectionState == null) {
            logger.debug("Attempted to finish node connection for {} but node is not known. Requesting that node connect", nodeIdentifier);
            requestNodeConnect(nodeIdentifier, null);
        } else {
            if (connectionState == NodeConnectionState.CONNECTED) {
                return;
            }
            if (connectionState == NodeConnectionState.DISCONNECTED || connectionState == NodeConnectionState.DISCONNECTING) {
                logger.debug("Attempted to finish node connection for {} but node state was {}. Requesting that node connect", nodeIdentifier, connectionState);
                requestNodeConnect(nodeIdentifier, null);
            } else {
                logger.info("{} is now connected", nodeIdentifier);
                updateNodeStatus(new NodeConnectionStatus(nodeIdentifier, NodeConnectionState.CONNECTED));
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void requestNodeDisconnect(NodeIdentifier nodeIdentifier, DisconnectionCode disconnectionCode, String str) {
        Set<NodeIdentifier> nodeIdentifiers = getNodeIdentifiers(NodeConnectionState.CONNECTED);
        if (nodeIdentifiers.size() == 1 && nodeIdentifiers.contains(nodeIdentifier)) {
            throw new IllegalNodeDisconnectionException("Cannot disconnect node " + nodeIdentifier + " because it is the only node currently connected");
        }
        logger.info("Requesting that {} disconnect due to {}", nodeIdentifier, str == 0 ? disconnectionCode : str);
        updateNodeStatus(new NodeConnectionStatus(nodeIdentifier, disconnectionCode, str));
        if (disconnectionCode == DisconnectionCode.NODE_SHUTDOWN) {
            return;
        }
        DisconnectMessage disconnectMessage = new DisconnectMessage();
        disconnectMessage.setNodeId(nodeIdentifier);
        disconnectMessage.setExplanation(str);
        addNodeEvent(nodeIdentifier, "Disconnection requested due to " + str);
        disconnectAsynchronously(disconnectMessage, 10, 5);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void disconnectionRequestedByNode(NodeIdentifier nodeIdentifier, DisconnectionCode disconnectionCode, String str) {
        Severity severity;
        logger.info("{} requested disconnection from cluster due to {}", nodeIdentifier, str == 0 ? disconnectionCode : str);
        updateNodeStatus(new NodeConnectionStatus(nodeIdentifier, disconnectionCode, str));
        switch (AnonymousClass3.$SwitchMap$org$apache$nifi$cluster$coordination$node$DisconnectionCode[disconnectionCode.ordinal()]) {
            case 1:
            case 2:
            case 3:
                severity = Severity.ERROR;
                break;
            case 4:
                severity = Severity.WARNING;
                break;
            default:
                severity = Severity.INFO;
                break;
        }
        reportEvent(nodeIdentifier, severity, "Node disconnected from cluster due to " + str);
    }

    public void removeNode(NodeIdentifier nodeIdentifier, String str) {
        reportEvent(nodeIdentifier, Severity.INFO, "User " + str + " requested that node be removed from cluster");
        this.nodeStatuses.remove(nodeIdentifier);
        this.nodeEvents.remove(nodeIdentifier);
        notifyOthersOfNodeStatusChange(new NodeConnectionStatus(nodeIdentifier, NodeConnectionState.REMOVED));
    }

    public NodeConnectionStatus getConnectionStatus(NodeIdentifier nodeIdentifier) {
        return this.nodeStatuses.get(nodeIdentifier);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public NodeConnectionState getConnectionState(NodeIdentifier nodeIdentifier) {
        NodeConnectionStatus connectionStatus = getConnectionStatus(nodeIdentifier);
        if (connectionStatus == null) {
            return null;
        }
        return connectionStatus.getState();
    }

    public List<NodeConnectionStatus> getConnectionStatuses() {
        return new ArrayList(this.nodeStatuses.values());
    }

    public Map<NodeConnectionState, List<NodeIdentifier>> getConnectionStates() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<NodeIdentifier, NodeConnectionStatus> entry : this.nodeStatuses.entrySet()) {
            ((List) hashMap.computeIfAbsent(entry.getValue().getState(), nodeConnectionState -> {
                return new ArrayList();
            })).add(entry.getKey());
        }
        return hashMap;
    }

    public boolean isBlockedByFirewall(String str) {
        return (this.firewall == null || this.firewall.isPermissible(str)) ? false : true;
    }

    public void reportEvent(NodeIdentifier nodeIdentifier, Severity severity, String str) {
        this.eventReporter.reportEvent(severity, EVENT_CATEGORY, nodeIdentifier == null ? str : "Event Reported for " + nodeIdentifier + " -- " + str);
        if (nodeIdentifier != null) {
            addNodeEvent(nodeIdentifier, severity, str);
        }
        String str2 = nodeIdentifier == null ? str : "Event Reported for " + nodeIdentifier + " -- " + str;
        switch (AnonymousClass3.$SwitchMap$org$apache$nifi$reporting$Severity[severity.ordinal()]) {
            case 1:
                logger.error(str2);
                return;
            case 2:
                logger.warn(str2);
                return;
            case 3:
                logger.info(str2);
                return;
            default:
                return;
        }
    }

    public NodeIdentifier getNodeIdentifier(String str) {
        for (NodeIdentifier nodeIdentifier : this.nodeStatuses.keySet()) {
            if (nodeIdentifier.getId().equals(str)) {
                return nodeIdentifier;
            }
        }
        return null;
    }

    public Set<NodeIdentifier> getNodeIdentifiers(NodeConnectionState... nodeConnectionStateArr) {
        HashSet hashSet = new HashSet();
        if (nodeConnectionStateArr.length == 0) {
            for (NodeConnectionState nodeConnectionState : NodeConnectionState.values()) {
                hashSet.add(nodeConnectionState);
            }
        } else {
            for (NodeConnectionState nodeConnectionState2 : nodeConnectionStateArr) {
                hashSet.add(nodeConnectionState2);
            }
        }
        return (Set) this.nodeStatuses.entrySet().stream().filter(entry -> {
            return hashSet.contains(((NodeConnectionStatus) entry.getValue()).getState());
        }).map(entry2 -> {
            return (NodeIdentifier) entry2.getKey();
        }).collect(Collectors.toSet());
    }

    public NodeIdentifier getPrimaryNode() {
        String leader = this.leaderElectionManager.getLeader("Primary Node");
        if (leader == null) {
            return null;
        }
        return this.nodeStatuses.keySet().stream().filter(nodeIdentifier -> {
            return leader.equals(nodeIdentifier.getSocketAddress() + ":" + nodeIdentifier.getSocketPort());
        }).findFirst().orElse(null);
    }

    public NodeIdentifier getElectedActiveCoordinatorNode() {
        return getElectedActiveCoordinatorNode(true);
    }

    private NodeIdentifier getElectedActiveCoordinatorNode(boolean z) {
        try {
            String electedActiveCoordinatorAddress = getElectedActiveCoordinatorAddress();
            if (electedActiveCoordinatorAddress == null) {
                logger.debug("There is currently no elected active Cluster Coordinator");
                return null;
            }
            int indexOf = electedActiveCoordinatorAddress.indexOf(58);
            if (indexOf < 1) {
                if (!z) {
                    return null;
                }
                logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedActiveCoordinatorAddress);
                return null;
            }
            String substring = electedActiveCoordinatorAddress.substring(0, indexOf);
            try {
                int parseInt = Integer.parseInt(electedActiveCoordinatorAddress.substring(indexOf + 1));
                NodeIdentifier orElse = getNodeIdentifiers(new NodeConnectionState[0]).stream().filter(nodeIdentifier -> {
                    return nodeIdentifier.getSocketAddress().equals(substring) && nodeIdentifier.getSocketPort() == parseInt;
                }).findFirst().orElse(null);
                if (orElse == null && z) {
                    logger.debug("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {},but there is no node with this address. Will attempt to communicate with node to determine its information", electedActiveCoordinatorAddress);
                    try {
                        NodeConnectionStatus requestNodeConnectionStatus = this.senderListener.requestNodeConnectionStatus(substring, parseInt);
                        logger.debug("Received NodeConnectionStatus {}", requestNodeConnectionStatus);
                        if (requestNodeConnectionStatus == null) {
                            return null;
                        }
                        NodeConnectionStatus putIfAbsent = this.nodeStatuses.putIfAbsent(requestNodeConnectionStatus.getNodeIdentifier(), requestNodeConnectionStatus);
                        return putIfAbsent == null ? requestNodeConnectionStatus.getNodeIdentifier() : putIfAbsent.getNodeIdentifier();
                    } catch (Exception e) {
                        logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but there is no node with this address. Attempted to determine the node's information but failed to retrieve its information due to {}", electedActiveCoordinatorAddress, e.toString());
                        if (logger.isDebugEnabled()) {
                            logger.warn("", e);
                        }
                    }
                }
                return orElse;
            } catch (NumberFormatException e2) {
                if (!z) {
                    return null;
                }
                logger.warn("Failed to determine which node is elected active Cluster Coordinator: ZooKeeper reports the address as {}, but this is not a valid address", electedActiveCoordinatorAddress);
                return null;
            }
        } catch (IOException e3) {
            if (!z) {
                return null;
            }
            logger.warn("Failed to determine which node is elected active Cluster Coordinator. There may be no coordinator currently: " + e3);
            if (!logger.isDebugEnabled()) {
                return null;
            }
            logger.warn("", e3);
            return null;
        } catch (NoClusterCoordinatorException e4) {
            logger.debug("There is currently no elected active Cluster Coordinator");
            return null;
        }
    }

    public boolean isActiveClusterCoordinator() {
        NodeIdentifier localNodeIdentifier = getLocalNodeIdentifier();
        return localNodeIdentifier != null && localNodeIdentifier.equals(getElectedActiveCoordinatorNode());
    }

    public List<NodeEvent> getNodeEvents(NodeIdentifier nodeIdentifier) {
        ArrayList arrayList;
        CircularFifoQueue<NodeEvent> circularFifoQueue = this.nodeEvents.get(nodeIdentifier);
        if (circularFifoQueue == null) {
            return Collections.emptyList();
        }
        synchronized (circularFifoQueue) {
            arrayList = new ArrayList((Collection) circularFifoQueue);
        }
        return arrayList;
    }

    public void setFlowService(FlowService flowService) {
        if (this.flowService != null) {
            throw new IllegalStateException("Flow Service has already been set");
        }
        this.flowService = flowService;
    }

    private void addNodeEvent(NodeIdentifier nodeIdentifier, String str) {
        addNodeEvent(nodeIdentifier, Severity.INFO, str);
    }

    private void addNodeEvent(NodeIdentifier nodeIdentifier, Severity severity, String str) {
        Event event = new Event(nodeIdentifier.toString(), str, severity);
        CircularFifoQueue<NodeEvent> computeIfAbsent = this.nodeEvents.computeIfAbsent(nodeIdentifier, nodeIdentifier2 -> {
            return new CircularFifoQueue();
        });
        synchronized (computeIfAbsent) {
            computeIfAbsent.add(event);
        }
    }

    void updateNodeStatus(NodeConnectionStatus nodeConnectionStatus) {
        updateNodeStatus(nodeConnectionStatus, true);
    }

    void updateNodeStatus(NodeConnectionStatus nodeConnectionStatus, boolean z) {
        NodeIdentifier nodeIdentifier = nodeConnectionStatus.getNodeIdentifier();
        NodeConnectionStatus put = this.nodeStatuses.put(nodeIdentifier, nodeConnectionStatus);
        NodeConnectionState state = put == null ? null : put.getState();
        logger.info("Status of {} changed from {} to {}", new Object[]{nodeIdentifier, put, nodeConnectionStatus});
        logger.debug("State of cluster nodes is now {}", this.nodeStatuses);
        this.latestUpdateId.updateAndGet(j -> {
            return Math.max(j, nodeConnectionStatus.getUpdateIdentifier());
        });
        if (state != null && state == nodeConnectionStatus.getState()) {
            logger.debug("Not notifying other nodes that status changed because previous state of {} is same as new state of {}", state, nodeConnectionStatus.getState());
            return;
        }
        boolean isActiveClusterCoordinator = isActiveClusterCoordinator();
        if (isActiveClusterCoordinator) {
            logger.debug("Notifying all nodes that status changed from {} to {}", put, nodeConnectionStatus);
        } else {
            logger.debug("Notifying cluster coordinator that node status changed from {} to {}", put, nodeConnectionStatus);
        }
        notifyOthersOfNodeStatusChange(nodeConnectionStatus, isActiveClusterCoordinator, z);
    }

    void notifyOthersOfNodeStatusChange(NodeConnectionStatus nodeConnectionStatus) {
        notifyOthersOfNodeStatusChange(nodeConnectionStatus, isActiveClusterCoordinator(), true);
    }

    void notifyOthersOfNodeStatusChange(NodeConnectionStatus nodeConnectionStatus, boolean z, boolean z2) {
        Set<NodeIdentifier> singleton;
        if (z) {
            singleton = getNodeIdentifiers(NodeConnectionState.CONNECTED, NodeConnectionState.CONNECTING);
            singleton.remove(getLocalNodeIdentifier());
        } else if (z2) {
            singleton = Collections.singleton(waitForElectedClusterCoordinator());
        } else {
            NodeIdentifier electedActiveCoordinatorNode = getElectedActiveCoordinatorNode();
            if (electedActiveCoordinatorNode == null) {
                return;
            } else {
                singleton = Collections.singleton(electedActiveCoordinatorNode);
            }
        }
        NodeStatusChangeMessage nodeStatusChangeMessage = new NodeStatusChangeMessage();
        nodeStatusChangeMessage.setNodeId(nodeConnectionStatus.getNodeIdentifier());
        nodeStatusChangeMessage.setNodeConnectionStatus(nodeConnectionStatus);
        this.senderListener.notifyNodeStatusChange(singleton, nodeStatusChangeMessage);
    }

    private void disconnectAsynchronously(final DisconnectMessage disconnectMessage, final int i, final int i2) {
        new Thread(new Runnable() { // from class: org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.1
            @Override // java.lang.Runnable
            public void run() {
                NodeIdentifier nodeId = disconnectMessage.getNodeId();
                for (int i3 = 0; i3 < i; i3++) {
                    try {
                        NodeClusterCoordinator.this.senderListener.disconnect(disconnectMessage);
                        NodeClusterCoordinator.this.reportEvent(nodeId, Severity.INFO, "Node disconnected due to " + disconnectMessage.getExplanation());
                        return;
                    } catch (Exception e) {
                        NodeClusterCoordinator.logger.error("Failed to notify {} that it has been disconnected from the cluster due to {}", disconnectMessage.getNodeId(), disconnectMessage.getExplanation());
                        try {
                            Thread.sleep(i2 * 1000);
                        } catch (InterruptedException e2) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                }
            }
        }, "Disconnect " + disconnectMessage.getNodeId()).start();
    }

    private void requestReconnectionAsynchronously(final ReconnectionRequestMessage reconnectionRequestMessage, final int i, final int i2) {
        new Thread(new Runnable() { // from class: org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator.2
            @Override // java.lang.Runnable
            public void run() {
                while (NodeClusterCoordinator.this.flowService == null) {
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                        NodeClusterCoordinator.logger.info("Could not send Reconnection request to {} because thread was interrupted before FlowService was made available", reconnectionRequestMessage.getNodeId());
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
                for (int i3 = 0; i3 < i; i3++) {
                    try {
                        if (NodeConnectionState.CONNECTING != NodeClusterCoordinator.this.getConnectionState(reconnectionRequestMessage.getNodeId())) {
                            return;
                        }
                        reconnectionRequestMessage.setDataFlow(new StandardDataFlow(NodeClusterCoordinator.this.flowService.createDataFlow()));
                        reconnectionRequestMessage.setNodeConnectionStatuses(NodeClusterCoordinator.this.getConnectionStatuses());
                        reconnectionRequestMessage.setComponentRevisions((List) NodeClusterCoordinator.this.revisionManager.getAllRevisions().stream().map(revision -> {
                            return ComponentRevision.fromRevision(revision);
                        }).collect(Collectors.toList()));
                        NodeClusterCoordinator.this.senderListener.requestReconnection(reconnectionRequestMessage);
                        NodeClusterCoordinator.logger.info("Successfully requested that {} join the cluster", reconnectionRequestMessage.getNodeId());
                        return;
                    } catch (Exception e2) {
                        NodeClusterCoordinator.logger.warn("Problem encountered issuing reconnection request to node " + reconnectionRequestMessage.getNodeId(), e2);
                        NodeClusterCoordinator.this.eventReporter.reportEvent(Severity.WARNING, NodeClusterCoordinator.EVENT_CATEGORY, "Problem encountered issuing reconnection request to node " + reconnectionRequestMessage.getNodeId() + " due to: " + e2);
                        try {
                            Thread.sleep(1000 * i2);
                        } catch (InterruptedException e3) {
                        }
                    }
                }
                if (NodeConnectionState.CONNECTING == NodeClusterCoordinator.this.getConnectionState(reconnectionRequestMessage.getNodeId())) {
                    NodeClusterCoordinator.this.requestNodeDisconnect(reconnectionRequestMessage.getNodeId(), DisconnectionCode.UNABLE_TO_COMMUNICATE, "Attempted to request that node reconnect to cluster but could not communicate with node");
                }
            }
        }, "Reconnect " + reconnectionRequestMessage.getNodeId()).start();
    }

    public ProtocolMessage handle(ProtocolMessage protocolMessage) throws ProtocolException {
        switch (AnonymousClass3.$SwitchMap$org$apache$nifi$cluster$protocol$message$ProtocolMessage$MessageType[protocolMessage.getType().ordinal()]) {
            case 1:
                return handleConnectionRequest((ConnectionRequestMessage) protocolMessage);
            case 2:
                handleNodeStatusChange((NodeStatusChangeMessage) protocolMessage);
                return null;
            case 3:
                return handleNodeConnectionStatusRequest();
            default:
                throw new ProtocolException("Cannot handle Protocol Message " + protocolMessage + " because it is not of the correct type");
        }
    }

    private NodeConnectionStatusResponseMessage handleNodeConnectionStatusRequest() {
        NodeConnectionStatus nodeConnectionStatus = this.nodeStatuses.get(getLocalNodeIdentifier());
        NodeConnectionStatusResponseMessage nodeConnectionStatusResponseMessage = new NodeConnectionStatusResponseMessage();
        nodeConnectionStatusResponseMessage.setNodeConnectionStatus(nodeConnectionStatus);
        return nodeConnectionStatusResponseMessage;
    }

    private String summarizeStatusChange(NodeConnectionStatus nodeConnectionStatus, NodeConnectionStatus nodeConnectionStatus2) {
        StringBuilder sb = new StringBuilder();
        if (nodeConnectionStatus == null || nodeConnectionStatus2.getState() != nodeConnectionStatus.getState()) {
            sb.append("Node Status changed from ").append(nodeConnectionStatus == null ? "[Unknown Node]" : nodeConnectionStatus.getState().toString()).append(" to ").append(nodeConnectionStatus2.getState().toString());
            if (nodeConnectionStatus2.getDisconnectReason() != null) {
                sb.append(" due to ").append(nodeConnectionStatus2.getDisconnectReason());
            } else if (nodeConnectionStatus2.getDisconnectCode() != null) {
                sb.append(" due to ").append(nodeConnectionStatus2.getDisconnectCode().toString());
            }
        }
        return sb.toString();
    }

    private void handleNodeStatusChange(NodeStatusChangeMessage nodeStatusChangeMessage) {
        NodeConnectionStatus nodeConnectionStatus = nodeStatusChangeMessage.getNodeConnectionStatus();
        NodeIdentifier nodeId = nodeStatusChangeMessage.getNodeId();
        logger.debug("Handling request {}", nodeStatusChangeMessage);
        NodeConnectionStatus nodeConnectionStatus2 = this.nodeStatuses.get(nodeStatusChangeMessage.getNodeId());
        if (nodeStatusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) {
            this.nodeStatuses.remove(nodeId, nodeConnectionStatus2);
        } else {
            this.nodeStatuses.put(nodeId, nodeConnectionStatus);
        }
        logger.info("Status of {} changed from {} to {}", new Object[]{nodeStatusChangeMessage.getNodeId(), nodeConnectionStatus2, nodeConnectionStatus});
        logger.debug("State of cluster nodes is now {}", this.nodeStatuses);
        String summarizeStatusChange = summarizeStatusChange(nodeConnectionStatus2, nodeStatusChangeMessage.getNodeConnectionStatus());
        if (!StringUtils.isEmpty(summarizeStatusChange)) {
            addNodeEvent(nodeId, summarizeStatusChange);
        }
        NodeConnectionStatus.updateIdGenerator(nodeConnectionStatus.getUpdateIdentifier());
        if (isActiveClusterCoordinator()) {
            notifyOthersOfNodeStatusChange(nodeStatusChangeMessage.getNodeConnectionStatus());
        }
    }

    private NodeIdentifier resolveNodeId(NodeIdentifier nodeIdentifier) {
        NodeIdentifier nodeIdentifier2;
        NodeConnectionStatus putIfAbsent = this.nodeStatuses.putIfAbsent(nodeIdentifier, new NodeConnectionStatus(nodeIdentifier, DisconnectionCode.NOT_YET_CONNECTED));
        if (putIfAbsent == null) {
            nodeIdentifier2 = nodeIdentifier;
            logger.debug("No existing node with ID {}; resolved node ID is as-proposed", nodeIdentifier.getId());
        } else if (putIfAbsent.getNodeIdentifier().logicallyEquals(nodeIdentifier)) {
            nodeIdentifier2 = nodeIdentifier;
            logger.debug("No existing node with ID {}; resolved node ID is as-proposed", nodeIdentifier.getId());
        } else {
            nodeIdentifier2 = new NodeIdentifier(UUID.randomUUID().toString(), nodeIdentifier.getApiAddress(), nodeIdentifier.getApiPort(), nodeIdentifier.getSocketAddress(), nodeIdentifier.getSocketPort(), nodeIdentifier.getSiteToSiteAddress(), nodeIdentifier.getSiteToSitePort(), nodeIdentifier.getSiteToSiteHttpApiPort(), nodeIdentifier.isSiteToSiteSecure());
            logger.debug("A node already exists with ID {}. Proposed Node Identifier was {}; existing Node Identifier is {}; Resolved Node Identifier is {}", new Object[]{nodeIdentifier.getId(), nodeIdentifier, getNodeIdentifier(nodeIdentifier.getId()), nodeIdentifier2});
        }
        return nodeIdentifier2;
    }

    private ConnectionResponseMessage handleConnectionRequest(ConnectionRequestMessage connectionRequestMessage) {
        NodeIdentifier proposedNodeIdentifier = connectionRequestMessage.getConnectionRequest().getProposedNodeIdentifier();
        ConnectionResponse createConnectionResponse = createConnectionResponse(new ConnectionRequest(addRequestorDn(proposedNodeIdentifier, connectionRequestMessage.getRequestorDN())), resolveNodeId(proposedNodeIdentifier));
        ConnectionResponseMessage connectionResponseMessage = new ConnectionResponseMessage();
        connectionResponseMessage.setConnectionResponse(createConnectionResponse);
        return connectionResponseMessage;
    }

    private ConnectionResponse createConnectionResponse(ConnectionRequest connectionRequest, NodeIdentifier nodeIdentifier) {
        if (isBlockedByFirewall(nodeIdentifier.getSocketAddress())) {
            logger.info("Firewall blocked connection request from node " + nodeIdentifier);
            return ConnectionResponse.createBlockedByFirewallResponse();
        }
        if (getConnectionStatus(nodeIdentifier) == null) {
            addNodeEvent(nodeIdentifier, "Connection requested from new node.  Setting status to connecting.");
        } else {
            addNodeEvent(nodeIdentifier, "Connection requested from existing node.  Setting status to connecting");
        }
        updateNodeStatus(new NodeConnectionStatus(nodeIdentifier, NodeConnectionState.CONNECTING, (DisconnectionCode) null, (String) null, Long.valueOf(System.currentTimeMillis())));
        DataFlow dataFlow = null;
        if (this.flowService != null) {
            try {
                dataFlow = this.flowService.createDataFlow();
            } catch (IOException e) {
                logger.error("Unable to obtain current dataflow from FlowService in order to provide the flow to " + nodeIdentifier + ". Will tell node to try again later", e);
            }
        }
        if (dataFlow != null) {
            return new ConnectionResponse(nodeIdentifier, dataFlow, this.instanceId, getConnectionStatuses(), (List) this.revisionManager.getAllRevisions().stream().map(revision -> {
                return ComponentRevision.fromRevision(revision);
            }).collect(Collectors.toList()));
        }
        addNodeEvent(nodeIdentifier, Severity.WARNING, "Connection requested from node, but manager was unable to obtain current flow. Instructing node to try again in 5 seconds.");
        return new ConnectionResponse(5);
    }

    private NodeIdentifier addRequestorDn(NodeIdentifier nodeIdentifier, String str) {
        return new NodeIdentifier(nodeIdentifier.getId(), nodeIdentifier.getApiAddress(), nodeIdentifier.getApiPort(), nodeIdentifier.getSocketAddress(), nodeIdentifier.getSocketPort(), nodeIdentifier.getSiteToSiteAddress(), nodeIdentifier.getSiteToSitePort(), nodeIdentifier.getSiteToSiteHttpApiPort(), nodeIdentifier.isSiteToSiteSecure(), str);
    }

    public boolean canHandle(ProtocolMessage protocolMessage) {
        return ProtocolMessage.MessageType.CONNECTION_REQUEST == protocolMessage.getType() || ProtocolMessage.MessageType.NODE_STATUS_CHANGE == protocolMessage.getType() || ProtocolMessage.MessageType.NODE_CONNECTION_STATUS_REQUEST == protocolMessage.getType();
    }

    private boolean isMutableRequest(String str) {
        return "DELETE".equalsIgnoreCase(str) || "POST".equalsIgnoreCase(str) || "PUT".equalsIgnoreCase(str);
    }

    @Override // org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback
    public void afterRequest(String str, String str2, Set<NodeResponse> set) {
        if (isActiveClusterCoordinator() && isMutableRequest(str2)) {
            Set<NodeResponse> problematicNodeResponses = new StandardHttpResponseMerger(this.nifiProperties).getProblematicNodeResponses(set);
            boolean z = problematicNodeResponses.size() == set.size();
            boolean z2 = !problematicNodeResponses.isEmpty() && problematicNodeResponses.size() < set.size() && isMissingCounter(problematicNodeResponses, str);
            if (z) {
                logger.warn("All nodes failed to process URI {} {}. As a result, no node will be disconnected from cluster", str2, str);
                return;
            }
            if (z2 || problematicNodeResponses.isEmpty() || problematicNodeResponses.size() >= set.size()) {
                return;
            }
            Set set2 = (Set) problematicNodeResponses.stream().map(nodeResponse -> {
                return nodeResponse.getNodeId();
            }).collect(Collectors.toSet());
            logger.warn(String.format("The following nodes failed to process URI %s '%s'.  Requesting each node disconnect from cluster.", str, set2));
            Iterator it = set2.iterator();
            while (it.hasNext()) {
                requestNodeDisconnect((NodeIdentifier) it.next(), DisconnectionCode.FAILED_TO_SERVICE_REQUEST, "Failed to process request " + str2 + " " + str);
            }
        }
    }

    private boolean isMissingCounter(Set<NodeResponse> set, String str) {
        if (!COUNTER_URI_PATTERN.matcher(str).matches()) {
            return false;
        }
        boolean z = true;
        Iterator<NodeResponse> it = set.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (it.next().getStatus() != 404) {
                z = false;
                break;
            }
        }
        return z;
    }

    public void setConnected(boolean z) {
        this.connected = z;
    }

    public boolean isConnected() {
        return this.connected;
    }
}
