/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.coordination.heartbeat;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.heartbeat.AbstractHeartbeatMonitor;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
import org.apache.nifi.cluster.coordination.heartbeat.StandardNodeHeartbeat;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.NodeWorkload;
import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.HeartbeatPayload;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.ProtocolListener;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterProtocolHeartbeatMonitor
extends AbstractHeartbeatMonitor
implements HeartbeatMonitor,
ProtocolHandler {
    protected static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeatMonitor.class);
    private final String heartbeatAddress;
    private final ConcurrentMap<NodeIdentifier, NodeHeartbeat> heartbeatMessages = new ConcurrentHashMap<NodeIdentifier, NodeHeartbeat>();
    protected static final Unmarshaller nodeIdentifierUnmarshaller;

    public ClusterProtocolHeartbeatMonitor(ClusterCoordinator clusterCoordinator, ProtocolListener protocolListener, NiFiProperties nifiProperties) {
        super(clusterCoordinator, nifiProperties);
        String port;
        protocolListener.addHandler((ProtocolHandler)this);
        String hostname = nifiProperties.getProperty("nifi.cluster.node.address");
        if (hostname == null || hostname.trim().isEmpty()) {
            hostname = "localhost";
        }
        if ((port = nifiProperties.getProperty("nifi.cluster.node.protocol.port")) == null || port.trim().isEmpty()) {
            throw new RuntimeException("Unable to determine which port Cluster Coordinator Protocol is listening on because the 'nifi.cluster.node.protocol.port' property is not set");
        }
        try {
            Integer.parseInt(port);
        }
        catch (NumberFormatException nfe) {
            throw new RuntimeException("Unable to determine which port Cluster Coordinator Protocol is listening on because the 'nifi.cluster.node.protocol.port' property is set to '" + port + "', which is not a valid port number.");
        }
        this.heartbeatAddress = hostname + ":" + port;
    }

    public String getHeartbeatAddress() {
        return this.heartbeatAddress;
    }

    @Override
    public void onStart() {
        this.heartbeatMessages.clear();
        for (NodeIdentifier nodeId : this.clusterCoordinator.getNodeIdentifiers(new NodeConnectionState[0])) {
            StandardNodeHeartbeat heartbeat = new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), this.clusterCoordinator.getConnectionStatus(nodeId), 0, 0L, 0, System.currentTimeMillis());
            this.heartbeatMessages.put(nodeId, heartbeat);
        }
    }

    @Override
    public void onStop() {
    }

    @Override
    protected Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats() {
        return Collections.unmodifiableMap(this.heartbeatMessages);
    }

    public synchronized void removeHeartbeat(NodeIdentifier nodeId) {
        logger.debug("Deleting heartbeat for node {}", (Object)nodeId);
        this.heartbeatMessages.remove(nodeId);
    }

    public synchronized void purgeHeartbeats() {
        logger.debug("Purging old heartbeats");
        this.heartbeatMessages.clear();
    }

    public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
        switch (msg.getType()) {
            case HEARTBEAT: {
                return this.handleHeartbeat((HeartbeatMessage)msg);
            }
            case CLUSTER_WORKLOAD_REQUEST: {
                return this.handleClusterWorkload((ClusterWorkloadRequestMessage)msg);
            }
        }
        throw new ProtocolException("Cannot handle message of type " + msg.getType());
    }

    private ProtocolMessage handleHeartbeat(HeartbeatMessage msg) {
        HeartbeatMessage heartbeatMsg = msg;
        Heartbeat heartbeat = heartbeatMsg.getHeartbeat();
        NodeIdentifier nodeId = heartbeat.getNodeIdentifier();
        NodeConnectionStatus connectionStatus = heartbeat.getConnectionStatus();
        byte[] payloadBytes = heartbeat.getPayload();
        HeartbeatPayload payload = HeartbeatPayload.unmarshal((byte[])payloadBytes);
        int activeThreadCount = payload.getActiveThreadCount();
        int flowFileCount = (int)payload.getTotalFlowFileCount();
        long flowFileBytes = payload.getTotalFlowFileBytes();
        long systemStartTime = payload.getSystemStartTime();
        StandardNodeHeartbeat nodeHeartbeat = new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), connectionStatus, flowFileCount, flowFileBytes, activeThreadCount, systemStartTime);
        this.heartbeatMessages.put(heartbeat.getNodeIdentifier(), nodeHeartbeat);
        logger.debug("Received new heartbeat from {}", (Object)nodeId);
        List<NodeConnectionStatus> nodeStatusList = payload.getClusterStatus();
        if (nodeStatusList == null) {
            nodeStatusList = Collections.emptyList();
        }
        List<NodeConnectionStatus> updatedStatuses = this.getUpdatedStatuses(nodeStatusList);
        HeartbeatResponseMessage responseMessage = new HeartbeatResponseMessage();
        responseMessage.setUpdatedNodeStatuses(updatedStatuses);
        if (!this.getClusterCoordinator().isFlowElectionComplete()) {
            responseMessage.setFlowElectionMessage(this.getClusterCoordinator().getFlowElectionStatus());
        }
        return responseMessage;
    }

    private ProtocolMessage handleClusterWorkload(ClusterWorkloadRequestMessage msg) {
        ClusterWorkloadResponseMessage response = new ClusterWorkloadResponseMessage();
        HashMap workloads = new HashMap();
        this.getLatestHeartbeats().values().stream().filter(hb -> NodeConnectionState.CONNECTED.equals((Object)hb.getConnectionStatus().getState())).forEach(hb -> {
            NodeWorkload wl = new NodeWorkload();
            wl.setReportedTimestamp(hb.getTimestamp());
            wl.setSystemStartTime(hb.getSystemStartTime());
            wl.setActiveThreadCount(hb.getActiveThreadCount());
            wl.setFlowFileCount(hb.getFlowFileCount());
            wl.setFlowFileBytes(hb.getFlowFileBytes());
            workloads.put(hb.getNodeIdentifier(), wl);
        });
        response.setNodeWorkloads(workloads);
        return response;
    }

    private List<NodeConnectionStatus> getUpdatedStatuses(List<NodeConnectionStatus> nodeStatusList) {
        Map nodeStatusMap = nodeStatusList.stream().collect(Collectors.toMap(status -> status.getNodeIdentifier(), Function.identity()));
        List currentStatuses = this.clusterCoordinator.getConnectionStatuses();
        ArrayList<NodeConnectionStatus> updatedStatuses = new ArrayList<NodeConnectionStatus>();
        for (NodeConnectionStatus currentStatus : currentStatuses) {
            NodeConnectionStatus nodeStatus;
            if (currentStatus.equals((Object)(nodeStatus = (NodeConnectionStatus)nodeStatusMap.get(currentStatus.getNodeIdentifier())))) continue;
            updatedStatuses.add(currentStatus);
        }
        Set nodeIds = currentStatuses.stream().map(status -> status.getNodeIdentifier()).collect(Collectors.toSet());
        for (NodeConnectionStatus nodeStatus : nodeStatusList) {
            if (nodeIds.contains(nodeStatus.getNodeIdentifier())) continue;
            updatedStatuses.add(new NodeConnectionStatus(nodeStatus.getNodeIdentifier(), NodeConnectionState.REMOVED, null));
        }
        logger.debug("\n\nCalculated diff between current cluster status and node cluster status as follows:\nNode: {}\nSelf: {}\nDifference: {}\n\n", new Object[]{nodeStatusList, currentStatuses, updatedStatuses});
        return updatedStatuses;
    }

    public boolean canHandle(ProtocolMessage msg) {
        return msg.getType() == ProtocolMessage.MessageType.HEARTBEAT || msg.getType() == ProtocolMessage.MessageType.CLUSTER_WORKLOAD_REQUEST;
    }

    static {
        try {
            JAXBContext jaxbContext = JAXBContext.newInstance((Class[])new Class[]{NodeIdentifier.class});
            nodeIdentifierUnmarshaller = jaxbContext.createUnmarshaller();
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create an Unmarshaller for unmarshalling Node Identifier", e);
        }
    }
}

