/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.coordination.heartbeat;

import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractHeartbeatMonitor
implements HeartbeatMonitor {
    private final int heartbeatIntervalMillis;
    private static final Logger logger = LoggerFactory.getLogger(AbstractHeartbeatMonitor.class);
    protected final ClusterCoordinator clusterCoordinator;
    protected final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true);
    private volatile ScheduledFuture<?> future;
    private volatile boolean stopped = true;

    public AbstractHeartbeatMonitor(ClusterCoordinator clusterCoordinator, NiFiProperties nifiProperties) {
        this.clusterCoordinator = clusterCoordinator;
        String heartbeatInterval = nifiProperties.getProperty("nifi.cluster.protocol.heartbeat.interval", "5 sec");
        this.heartbeatIntervalMillis = (int)FormatUtils.getTimeDuration((String)heartbeatInterval, (TimeUnit)TimeUnit.MILLISECONDS);
    }

    public final synchronized void start() {
        if (!this.stopped) {
            logger.info("Attempted to start Heartbeat Monitor but it is already started. Stopping heartbeat monitor and re-starting it.");
            this.stop();
        }
        this.stopped = false;
        logger.info("Heartbeat Monitor started");
        try {
            this.onStart();
        }
        catch (Exception e) {
            logger.error("Failed to start Heartbeat Monitor", (Throwable)e);
        }
        this.future = this.flowEngine.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                try {
                    AbstractHeartbeatMonitor.this.monitorHeartbeats();
                }
                catch (Exception e) {
                    AbstractHeartbeatMonitor.this.clusterCoordinator.reportEvent(null, Severity.ERROR, "Failed to process heartbeats from nodes due to " + e.toString());
                    logger.error("Failed to process heartbeats", (Throwable)e);
                }
            }
        }, (long)this.heartbeatIntervalMillis, (long)this.heartbeatIntervalMillis, TimeUnit.MILLISECONDS);
    }

    public final synchronized void stop() {
        if (this.stopped) {
            return;
        }
        this.stopped = true;
        logger.info("Heartbeat Monitor stopped");
        try {
            if (this.future != null) {
                this.future.cancel(true);
            }
        }
        finally {
            this.onStop();
        }
    }

    protected boolean isStopped() {
        return this.stopped;
    }

    public NodeHeartbeat getLatestHeartbeat(NodeIdentifier nodeId) {
        return this.getLatestHeartbeats().get(nodeId);
    }

    protected ClusterCoordinator getClusterCoordinator() {
        return this.clusterCoordinator;
    }

    protected long getHeartbeatInterval(TimeUnit timeUnit) {
        return timeUnit.convert(this.heartbeatIntervalMillis, TimeUnit.MILLISECONDS);
    }

    protected synchronized void monitorHeartbeats() {
        if (!this.clusterCoordinator.isActiveClusterCoordinator()) {
            logger.debug("It appears that this node is no longer the actively elected cluster coordinator. Will not request that node disconnect.");
            return;
        }
        Map<NodeIdentifier, NodeHeartbeat> latestHeartbeats = this.getLatestHeartbeats();
        if (latestHeartbeats == null || latestHeartbeats.isEmpty()) {
            logger.debug("Received no new heartbeats. Will not disconnect any nodes due to lack of heartbeat");
            return;
        }
        StopWatch procStopWatch = new StopWatch(true);
        for (NodeHeartbeat heartbeat : latestHeartbeats.values()) {
            try {
                this.processHeartbeat(heartbeat);
            }
            catch (Exception e) {
                this.clusterCoordinator.reportEvent(null, Severity.ERROR, "Received heartbeat from " + heartbeat.getNodeIdentifier() + " but failed to process heartbeat due to " + e);
                logger.error("Failed to process heartbeat from {} due to {}", (Object)heartbeat.getNodeIdentifier(), (Object)e.toString());
                logger.error("", (Throwable)e);
            }
        }
        procStopWatch.stop();
        logger.info("Finished processing {} heartbeats in {}", (Object)latestHeartbeats.size(), (Object)procStopWatch.getDuration());
        long maxMillis = this.heartbeatIntervalMillis * 8;
        long threshold = System.currentTimeMillis() - maxMillis;
        for (NodeHeartbeat heartbeat : latestHeartbeats.values()) {
            if (heartbeat.getTimestamp() >= threshold) continue;
            long secondsSinceLastHeartbeat = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - heartbeat.getTimestamp());
            this.clusterCoordinator.disconnectionRequestedByNode(heartbeat.getNodeIdentifier(), DisconnectionCode.LACK_OF_HEARTBEAT, "Have not received a heartbeat from node in " + secondsSinceLastHeartbeat + " seconds");
            try {
                this.removeHeartbeat(heartbeat.getNodeIdentifier());
            }
            catch (Exception e) {
                logger.warn("Failed to remove heartbeat for {} due to {}", (Object)heartbeat.getNodeIdentifier(), (Object)e.toString());
                logger.warn("", (Throwable)e);
            }
        }
    }

    private void processHeartbeat(NodeHeartbeat heartbeat) {
        NodeIdentifier nodeId = heartbeat.getNodeIdentifier();
        if (this.clusterCoordinator.isBlockedByFirewall(nodeId.getSocketAddress())) {
            this.clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Firewall blocked received heartbeat. Issuing disconnection request.");
            this.clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.BLOCKED_BY_FIREWALL, "Blocked by Firewall");
            this.removeHeartbeat(nodeId);
            return;
        }
        NodeConnectionStatus connectionStatus = this.clusterCoordinator.getConnectionStatus(nodeId);
        if (connectionStatus == null) {
            this.clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from unknown node. Removing heartbeat and requesting that node connect to cluster.");
            this.removeHeartbeat(nodeId);
            this.clusterCoordinator.requestNodeConnect(nodeId, null);
            return;
        }
        NodeConnectionState connectionState = connectionStatus.getState();
        if (heartbeat.getConnectionStatus().getState() != NodeConnectionState.CONNECTED && connectionState == NodeConnectionState.CONNECTED) {
            this.clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Received heartbeat from node that thinks it is not yet part of the cluster,though the Cluster Coordinator thought it was (node claimed state was " + heartbeat.getConnectionStatus().getState() + "). Marking as Disconnected and requesting that Node reconnect to cluster");
            this.clusterCoordinator.requestNodeConnect(nodeId, null);
            return;
        }
        if (NodeConnectionState.DISCONNECTED == connectionState) {
            DisconnectionCode disconnectionCode = connectionStatus.getDisconnectCode();
            switch (disconnectionCode) {
                case LACK_OF_HEARTBEAT: 
                case UNABLE_TO_COMMUNICATE: 
                case NOT_YET_CONNECTED: 
                case STARTUP_FAILURE: {
                    this.clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node previously disconnected due to " + disconnectionCode + ". Issuing reconnection request.");
                    this.clusterCoordinator.requestNodeConnect(nodeId, null);
                    break;
                }
                default: {
                    logger.info("Ignoring received heartbeat from disconnected node " + nodeId + ".  Issuing disconnection request.");
                    this.clusterCoordinator.requestNodeDisconnect(nodeId, disconnectionCode, connectionStatus.getDisconnectReason());
                    this.removeHeartbeat(nodeId);
                }
            }
            return;
        }
        if (NodeConnectionState.DISCONNECTING == connectionStatus.getState()) {
            this.removeHeartbeat(nodeId);
            return;
        }
        if (NodeConnectionState.CONNECTING == connectionState) {
            Long connectionRequestTime = connectionStatus.getConnectionRequestTime();
            if (connectionRequestTime != null && heartbeat.getTimestamp() < connectionRequestTime) {
                this.clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat but ignoring because it was reported before the node was last asked to reconnect.");
                this.removeHeartbeat(nodeId);
                return;
            }
            this.clusterCoordinator.finishNodeConnection(nodeId);
            this.clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received first heartbeat from connecting node. Node connected.");
        }
    }

    protected abstract Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats();

    protected void onStart() {
    }

    protected void onStop() {
    }
}

