package org.apache.nifi.cluster.coordination.heartbeat;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.HeartbeatPayload;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.ProtocolListener;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.class */
public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor implements HeartbeatMonitor, ProtocolHandler {
    protected static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeatMonitor.class);
    private final String heartbeatAddress;
    private final ConcurrentMap<NodeIdentifier, NodeHeartbeat> heartbeatMessages;
    private volatile long purgeTimestamp;

    /* renamed from: org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$nifi$cluster$protocol$message$ProtocolMessage$MessageType = new int[ProtocolMessage.MessageType.values().length];

        static {
            try {
                $SwitchMap$org$apache$nifi$cluster$protocol$message$ProtocolMessage$MessageType[ProtocolMessage.MessageType.HEARTBEAT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$nifi$cluster$protocol$message$ProtocolMessage$MessageType[ProtocolMessage.MessageType.CLUSTER_WORKLOAD_REQUEST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public ClusterProtocolHeartbeatMonitor(ClusterCoordinator clusterCoordinator, ProtocolListener protocolListener, NiFiProperties niFiProperties) {
        super(clusterCoordinator, niFiProperties);
        this.heartbeatMessages = new ConcurrentHashMap();
        this.purgeTimestamp = System.currentTimeMillis();
        protocolListener.addHandler(this);
        String property = niFiProperties.getProperty("nifi.cluster.node.address");
        property = (property == null || property.trim().isEmpty()) ? "localhost" : property;
        String property2 = niFiProperties.getProperty("nifi.cluster.node.protocol.port");
        if (property2 == null || property2.trim().isEmpty()) {
            throw new RuntimeException("Unable to determine which port Cluster Coordinator Protocol is listening on because the 'nifi.cluster.node.protocol.port' property is not set");
        }
        try {
            Integer.parseInt(property2);
            this.heartbeatAddress = property + ":" + property2;
        } catch (NumberFormatException e) {
            throw new RuntimeException("Unable to determine which port Cluster Coordinator Protocol is listening on because the 'nifi.cluster.node.protocol.port' property is set to '" + property2 + "', which is not a valid port number.");
        }
    }

    public String getHeartbeatAddress() {
        return this.heartbeatAddress;
    }

    @Override // org.apache.nifi.cluster.coordination.heartbeat.AbstractHeartbeatMonitor
    public void onStart() {
        this.heartbeatMessages.clear();
        for (NodeIdentifier nodeIdentifier : this.clusterCoordinator.getNodeIdentifiers(new NodeConnectionState[0])) {
            this.heartbeatMessages.put(nodeIdentifier, new StandardNodeHeartbeat(nodeIdentifier, System.currentTimeMillis(), this.clusterCoordinator.getConnectionStatus(nodeIdentifier), 0, 0L, 0, System.currentTimeMillis(), 0L));
        }
    }

    @Override // org.apache.nifi.cluster.coordination.heartbeat.AbstractHeartbeatMonitor
    public void onStop() {
    }

    @Override // org.apache.nifi.cluster.coordination.heartbeat.AbstractHeartbeatMonitor
    protected Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats() {
        return Collections.unmodifiableMap(this.heartbeatMessages);
    }

    public synchronized void removeHeartbeat(NodeIdentifier nodeIdentifier) {
        logger.debug("Deleting heartbeat for node {}", nodeIdentifier);
        this.heartbeatMessages.remove(nodeIdentifier);
    }

    public synchronized void purgeHeartbeats() {
        logger.debug("Purging old heartbeats");
        this.heartbeatMessages.clear();
        this.purgeTimestamp = System.currentTimeMillis();
    }

    @Override // org.apache.nifi.cluster.coordination.heartbeat.AbstractHeartbeatMonitor
    public synchronized long getPurgeTimestamp() {
        return this.purgeTimestamp;
    }

    public ProtocolMessage handle(ProtocolMessage protocolMessage, Set<String> set) throws ProtocolException {
        switch (AnonymousClass1.$SwitchMap$org$apache$nifi$cluster$protocol$message$ProtocolMessage$MessageType[protocolMessage.getType().ordinal()]) {
            case 1:
                return handleHeartbeat((HeartbeatMessage) protocolMessage);
            case 2:
                return handleClusterWorkload((ClusterWorkloadRequestMessage) protocolMessage);
            default:
                throw new ProtocolException("Cannot handle message of type " + protocolMessage.getType());
        }
    }

    private ProtocolMessage handleHeartbeat(HeartbeatMessage heartbeatMessage) {
        Heartbeat heartbeat = heartbeatMessage.getHeartbeat();
        NodeIdentifier nodeIdentifier = heartbeat.getNodeIdentifier();
        NodeConnectionStatus connectionStatus = heartbeat.getConnectionStatus();
        HeartbeatPayload unmarshal = HeartbeatPayload.unmarshal(heartbeat.getPayload());
        int activeThreadCount = unmarshal.getActiveThreadCount();
        this.heartbeatMessages.put(heartbeat.getNodeIdentifier(), new StandardNodeHeartbeat(nodeIdentifier, System.currentTimeMillis(), connectionStatus, (int) unmarshal.getTotalFlowFileCount(), unmarshal.getTotalFlowFileBytes(), activeThreadCount, unmarshal.getSystemStartTime(), unmarshal.getRevisionUpdateCount()));
        logger.debug("Received new heartbeat from {}", nodeIdentifier);
        List<NodeConnectionStatus> clusterStatus = unmarshal.getClusterStatus();
        if (clusterStatus == null) {
            clusterStatus = Collections.emptyList();
        }
        List<NodeConnectionStatus> updatedStatuses = getUpdatedStatuses(clusterStatus);
        HeartbeatResponseMessage heartbeatResponseMessage = new HeartbeatResponseMessage();
        heartbeatResponseMessage.setUpdatedNodeStatuses(updatedStatuses);
        if (!getClusterCoordinator().isFlowElectionComplete()) {
            heartbeatResponseMessage.setFlowElectionMessage(getClusterCoordinator().getFlowElectionStatus());
        }
        return heartbeatResponseMessage;
    }

    private ProtocolMessage handleClusterWorkload(ClusterWorkloadRequestMessage clusterWorkloadRequestMessage) {
        ClusterWorkloadResponseMessage clusterWorkloadResponseMessage = new ClusterWorkloadResponseMessage();
        HashMap hashMap = new HashMap();
        getLatestHeartbeats().values().stream().filter(nodeHeartbeat -> {
            return NodeConnectionState.CONNECTED.equals(nodeHeartbeat.getConnectionStatus().getState());
        }).forEach(nodeHeartbeat2 -> {
            NodeWorkload nodeWorkload = new NodeWorkload();
            nodeWorkload.setReportedTimestamp(nodeHeartbeat2.getTimestamp());
            nodeWorkload.setSystemStartTime(nodeHeartbeat2.getSystemStartTime());
            nodeWorkload.setActiveThreadCount(nodeHeartbeat2.getActiveThreadCount());
            nodeWorkload.setFlowFileCount(nodeHeartbeat2.getFlowFileCount());
            nodeWorkload.setFlowFileBytes(nodeHeartbeat2.getFlowFileBytes());
            hashMap.put(nodeHeartbeat2.getNodeIdentifier(), nodeWorkload);
        });
        clusterWorkloadResponseMessage.setNodeWorkloads(hashMap);
        return clusterWorkloadResponseMessage;
    }

    private List<NodeConnectionStatus> getUpdatedStatuses(List<NodeConnectionStatus> list) {
        Map map = (Map) list.stream().collect(Collectors.toMap(nodeConnectionStatus -> {
            return nodeConnectionStatus.getNodeIdentifier();
        }, Function.identity()));
        List<NodeConnectionStatus> connectionStatuses = this.clusterCoordinator.getConnectionStatuses();
        ArrayList arrayList = new ArrayList();
        for (NodeConnectionStatus nodeConnectionStatus2 : connectionStatuses) {
            if (!nodeConnectionStatus2.equals((NodeConnectionStatus) map.get(nodeConnectionStatus2.getNodeIdentifier()))) {
                arrayList.add(nodeConnectionStatus2);
            }
        }
        Set set = (Set) connectionStatuses.stream().map(nodeConnectionStatus3 -> {
            return nodeConnectionStatus3.getNodeIdentifier();
        }).collect(Collectors.toSet());
        for (NodeConnectionStatus nodeConnectionStatus4 : list) {
            if (!set.contains(nodeConnectionStatus4.getNodeIdentifier())) {
                arrayList.add(new NodeConnectionStatus(nodeConnectionStatus4.getNodeIdentifier(), NodeConnectionState.REMOVED, (DisconnectionCode) null));
            }
        }
        logger.debug("\n\nCalculated diff between current cluster status and node cluster status as follows:\nNode: {}\nSelf: {}\nDifference: {}\n\n", new Object[]{list, connectionStatuses, arrayList});
        return arrayList;
    }

    public boolean canHandle(ProtocolMessage protocolMessage) {
        return protocolMessage.getType() == ProtocolMessage.MessageType.HEARTBEAT || protocolMessage.getType() == ProtocolMessage.MessageType.CLUSTER_WORKLOAD_REQUEST;
    }
}
