/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.cluster;

import java.io.IOException;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.HeartbeatPayload;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.message.CommsTimingDetails;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
import org.apache.nifi.controller.cluster.Heartbeater;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterProtocolHeartbeater
implements Heartbeater {
    private static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeater.class);
    private final NodeProtocolSender protocolSender;
    private final LeaderElectionManager electionManager;
    private final ClusterCoordinator clusterCoordinator;

    public ClusterProtocolHeartbeater(NodeProtocolSender protocolSender, ClusterCoordinator clusterCoordinator, LeaderElectionManager electionManager) {
        this.protocolSender = protocolSender;
        this.clusterCoordinator = clusterCoordinator;
        this.electionManager = electionManager;
    }

    @Override
    public String getHeartbeatAddress() {
        Optional clusterCoordinator = this.electionManager.getLeader("Cluster Coordinator");
        return (String)clusterCoordinator.orElseThrow(() -> new ProtocolException("Unable to send heartbeat: Cluster Coordinator not found"));
    }

    @Override
    public synchronized void send(HeartbeatMessage heartbeatMessage) throws IOException {
        long sendStart = System.nanoTime();
        long findCoordinatorStart = System.nanoTime();
        String heartbeatAddress = this.getHeartbeatAddress();
        long findCoordinatorNanos = System.nanoTime() - findCoordinatorStart;
        HeartbeatResponseMessage responseMessage = this.protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
        byte[] payloadBytes = heartbeatMessage.getHeartbeat().getPayload();
        HeartbeatPayload payload = HeartbeatPayload.unmarshal((byte[])payloadBytes);
        List nodeStatusList = payload.getClusterStatus();
        Map<NodeIdentifier, Long> updateIdMap = nodeStatusList.stream().collect(Collectors.toMap(status -> status.getNodeIdentifier(), status -> status.getUpdateIdentifier()));
        List updatedStatuses = responseMessage.getUpdatedNodeStatuses();
        if (updatedStatuses != null) {
            Iterator iterator = updatedStatuses.iterator();
            while (iterator.hasNext()) {
                NodeConnectionStatus updatedStatus;
                NodeIdentifier nodeId = (updatedStatus = (NodeConnectionStatus)iterator.next()).getNodeIdentifier();
                Long updateId = updateIdMap.get(nodeId);
                boolean updated = this.clusterCoordinator.resetNodeStatus(updatedStatus, updateId == null ? -1L : updateId);
                if (updated) {
                    logger.info("After receiving heartbeat response, updated status of {} to {}", (Object)updatedStatus.getNodeIdentifier(), (Object)updatedStatus);
                    continue;
                }
                logger.debug("After receiving heartbeat response, did not update status of {} to {} because the update is out-of-date", (Object)updatedStatus.getNodeIdentifier(), (Object)updatedStatus);
            }
        }
        long sendNanos = System.nanoTime() - sendStart;
        long sendMillis = TimeUnit.NANOSECONDS.toMillis(sendNanos);
        String flowElectionMessage = responseMessage.getFlowElectionMessage();
        Object formattedElectionMessage = flowElectionMessage == null ? "" : "; " + flowElectionMessage;
        CommsTimingDetails timingDetails = responseMessage.getCommsTimingDetails();
        logger.info("Heartbeat created at {} and sent to {} at {}; determining Cluster Coordinator took {} millis; DNS lookup for coordinator took {} millis; connecting to coordinator took {} millis; sending heartbeat took {} millis; receiving first byte from response took {} millis; receiving full response took {} millis; total time was {} millis{}", new Object[]{Instant.ofEpochMilli(heartbeatMessage.getHeartbeat().getCreatedTimestamp()), heartbeatAddress, Instant.now(), TimeUnit.NANOSECONDS.toMillis(findCoordinatorNanos), timingDetails.getDnsLookupMillis(), timingDetails.getConnectMillis(), timingDetails.getSendRequestMillis(), timingDetails.getReceiveFirstByteMillis(), timingDetails.getReceiveFullResponseMillis(), sendMillis, formattedElectionMessage});
    }

    @Override
    public void close() throws IOException {
    }
}

