/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.heartbeat;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.gson.Gson;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.NodeSrvStatus;
import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
import org.apache.inlong.common.heartbeat.ComponentHeartbeat;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
import org.apache.inlong.manager.common.enums.ClusterStatus;
import org.apache.inlong.manager.common.enums.NodeStatus;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.service.cluster.InlongClusterOperator;
import org.apache.inlong.manager.service.cluster.InlongClusterOperatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HeartbeatManager
implements AbstractHeartbeatManager {
    private static final Logger log = LoggerFactory.getLogger(HeartbeatManager.class);
    private static final String AUTO_REGISTERED = "auto registered";
    private static final Gson GSON = new Gson();
    private Cache<ComponentHeartbeat, HeartbeatMsg> heartbeatCache;
    private LoadingCache<ComponentHeartbeat, ClusterInfo> clusterInfoCache;
    @Autowired
    private InlongClusterOperatorFactory clusterOperatorFactory;
    @Autowired
    private InlongClusterEntityMapper clusterMapper;
    @Autowired
    private InlongClusterNodeEntityMapper clusterNodeMapper;

    @PostConstruct
    public void init() {
        long expireTime = (long)this.heartbeatInterval() * 2L;
        Scheduler evictScheduler = Scheduler.forScheduledExecutorService((ScheduledExecutorService)Executors.newSingleThreadScheduledExecutor());
        this.heartbeatCache = Caffeine.newBuilder().scheduler(evictScheduler).expireAfterAccess(expireTime, TimeUnit.SECONDS).removalListener((k, msg, c) -> {
            if ((c.wasEvicted() || c == RemovalCause.EXPLICIT) && msg != null) {
                this.evictClusterNode((HeartbeatMsg)msg);
            }
        }).build();
        this.clusterInfoCache = Caffeine.newBuilder().expireAfterAccess(expireTime * 2L, TimeUnit.SECONDS).build(this::fetchCluster);
    }

    public void reportHeartbeat(HeartbeatMsg heartbeat) {
        ComponentHeartbeat componentHeartbeat = heartbeat.componentHeartbeat();
        ClusterInfo clusterInfo = (ClusterInfo)this.clusterInfoCache.get((Object)componentHeartbeat);
        if (clusterInfo == null) {
            log.error("not found any cluster by name={} and type={}", (Object)componentHeartbeat.getClusterName(), (Object)componentHeartbeat.getComponentType());
            return;
        }
        HeartbeatMsg lastHeartbeat = (HeartbeatMsg)this.heartbeatCache.getIfPresent((Object)componentHeartbeat);
        String[] ports = heartbeat.getPort().split(",");
        String[] ips = heartbeat.getIp().split(",");
        String protocolType = heartbeat.getProtocolType();
        String[] protocolTypes = null;
        if (StringUtils.isNotBlank((CharSequence)protocolType) && ports.length > 1 && (protocolTypes = protocolType.split(",")).length < ports.length) {
            protocolTypes = null;
        }
        int handlerNum = 0;
        for (int i = 0; i < ports.length; ++i) {
            InlongClusterNodeEntity clusterNode;
            HeartbeatMsg heartbeatMsg = (HeartbeatMsg)JsonUtils.parseObject((byte[])JsonUtils.toJsonByte((Object)heartbeat), HeartbeatMsg.class);
            assert (heartbeatMsg != null);
            heartbeatMsg.setPort(ports[i].trim());
            heartbeatMsg.setIp(ips[i].trim());
            if (protocolTypes != null) {
                heartbeatMsg.setProtocolType(protocolTypes[i]);
            } else {
                heartbeatMsg.setProtocolType(protocolType);
            }
            if (NodeSrvStatus.SERVICE_UNINSTALL.equals((Object)heartbeat.getNodeSrvStatus())) {
                clusterNode = this.getClusterNode(clusterInfo, heartbeatMsg);
                this.deleteClusterNode(clusterNode);
                continue;
            }
            if (!HeartbeatManager.heartbeatConfigModified(lastHeartbeat, heartbeat)) continue;
            clusterNode = this.getClusterNode(clusterInfo, heartbeatMsg);
            if (clusterNode == null) {
                handlerNum += this.insertClusterNode(clusterInfo, heartbeatMsg, clusterInfo.getCreator());
                continue;
            }
            handlerNum += this.updateClusterNode(clusterNode, heartbeatMsg);
        }
        if (lastHeartbeat == null || handlerNum == ports.length) {
            this.heartbeatCache.put((Object)componentHeartbeat, (Object)heartbeat);
        }
    }

    private void evictClusterNode(HeartbeatMsg heartbeat) {
        log.debug("evict cluster node");
        ComponentHeartbeat componentHeartbeat = heartbeat.componentHeartbeat();
        ClusterInfo clusterInfo = (ClusterInfo)this.clusterInfoCache.getIfPresent((Object)componentHeartbeat);
        if (clusterInfo == null) {
            log.error("not found any cluster by name={} and type={}", (Object)componentHeartbeat.getClusterName(), (Object)componentHeartbeat.getComponentType());
            return;
        }
        String[] ports = heartbeat.getPort().split(",");
        String[] ips = heartbeat.getIp().split(",");
        String protocolType = heartbeat.getProtocolType();
        String[] protocolTypes = null;
        if (StringUtils.isNotBlank((CharSequence)protocolType) && ports.length > 1 && (protocolTypes = protocolType.split(",")).length < ports.length) {
            protocolTypes = null;
        }
        for (int i = 0; i < ports.length; ++i) {
            HeartbeatMsg heartbeatMsg = (HeartbeatMsg)JsonUtils.parseObject((byte[])JsonUtils.toJsonByte((Object)heartbeat), HeartbeatMsg.class);
            assert (heartbeatMsg != null);
            heartbeatMsg.setPort(ports[i].trim());
            heartbeatMsg.setIp(ips[i].trim());
            if (protocolTypes != null) {
                heartbeatMsg.setProtocolType(protocolTypes[i]);
            } else {
                heartbeatMsg.setProtocolType(protocolType);
            }
            InlongClusterNodeEntity clusterNode = this.getClusterNode(clusterInfo, heartbeatMsg);
            if (clusterNode == null) {
                log.error("not found any cluster node by type={}, ip={}, port={}", new Object[]{heartbeat.getComponentType(), heartbeat.getIp(), heartbeat.getPort()});
                return;
            }
            clusterNode.setStatus(Integer.valueOf(NodeStatus.HEARTBEAT_TIMEOUT.getStatus()));
            this.clusterNodeMapper.updateById(clusterNode);
        }
    }

    private InlongClusterNodeEntity getClusterNode(ClusterInfo clusterInfo, HeartbeatMsg heartbeat) {
        ClusterNodeRequest nodeRequest = new ClusterNodeRequest();
        nodeRequest.setParentId(clusterInfo.getId());
        nodeRequest.setType(heartbeat.getComponentType());
        nodeRequest.setIp(heartbeat.getIp());
        nodeRequest.setPort(Integer.valueOf(heartbeat.getPort()));
        nodeRequest.setProtocolType(heartbeat.getProtocolType());
        return this.clusterNodeMapper.selectByUniqueKey(nodeRequest);
    }

    private int insertClusterNode(ClusterInfo clusterInfo, HeartbeatMsg heartbeat, String creator) {
        InlongClusterNodeEntity clusterNode = new InlongClusterNodeEntity();
        clusterNode.setParentId(clusterInfo.getId());
        clusterNode.setType(heartbeat.getComponentType());
        clusterNode.setIp(heartbeat.getIp());
        clusterNode.setPort(Integer.valueOf(heartbeat.getPort()));
        clusterNode.setProtocolType(heartbeat.getProtocolType());
        clusterNode.setNodeLoad(heartbeat.getLoad());
        clusterNode.setStatus(Integer.valueOf(ClusterStatus.NORMAL.getStatus()));
        clusterNode.setCreator(creator);
        clusterNode.setModifier(creator);
        clusterNode.setDescription(AUTO_REGISTERED);
        this.insertOrUpdateLabel(clusterNode, heartbeat);
        return this.clusterNodeMapper.insertOnDuplicateKeyUpdate(clusterNode);
    }

    private int updateClusterNode(InlongClusterNodeEntity clusterNode, HeartbeatMsg heartbeat) {
        clusterNode.setStatus(Integer.valueOf(ClusterStatus.NORMAL.getStatus()));
        clusterNode.setNodeLoad(heartbeat.getLoad());
        this.insertOrUpdateLabel(clusterNode, heartbeat);
        return this.clusterNodeMapper.updateById(clusterNode);
    }

    private void insertOrUpdateLabel(InlongClusterNodeEntity clusterNode, HeartbeatMsg heartbeat) {
        HashSet groupSet = heartbeat.getNodeGroup() == null ? new HashSet() : Arrays.stream(heartbeat.getNodeGroup().split(",")).collect(Collectors.toSet());
        HashMap<String, String> extParams = clusterNode.getExtParams() == null ? new HashMap<String, String>() : (Map)GSON.fromJson(clusterNode.getExtParams(), Map.class);
        extParams.put("agentGroup", String.join((CharSequence)",", groupSet));
        clusterNode.setExtParams(GSON.toJson(extParams));
    }

    private int deleteClusterNode(InlongClusterNodeEntity clusterNode) {
        return this.clusterNodeMapper.deleteById(clusterNode.getId());
    }

    private ClusterInfo fetchCluster(ComponentHeartbeat componentHeartbeat) {
        String clusterName = componentHeartbeat.getClusterName();
        String type = componentHeartbeat.getComponentType();
        String clusterTag = componentHeartbeat.getClusterTag();
        String extTag = componentHeartbeat.getExtTag();
        Preconditions.checkNotNull((Object)clusterTag, (String)"cluster tag cannot be null");
        Preconditions.checkNotNull((Object)type, (String)"cluster type cannot be null");
        Preconditions.checkNotNull((Object)clusterName, (String)"cluster name cannot be null");
        InlongClusterEntity entity = this.clusterMapper.selectByNameAndType(clusterName, type);
        if (null != entity) {
            InlongClusterOperator operator = this.clusterOperatorFactory.getInstance(entity.getType());
            return operator.getFromEntity(entity);
        }
        InlongClusterEntity cluster = new InlongClusterEntity();
        cluster.setName(clusterName);
        cluster.setType(type);
        cluster.setClusterTags(clusterTag);
        cluster.setExtTag(extTag);
        String inCharges = componentHeartbeat.getInCharges();
        if (StringUtils.isBlank((CharSequence)inCharges)) {
            inCharges = "admin";
        }
        String creator = inCharges.split(",")[0];
        cluster.setInCharges(inCharges);
        cluster.setCreator(creator);
        cluster.setModifier(creator);
        cluster.setStatus(Integer.valueOf(ClusterStatus.NORMAL.getStatus()));
        cluster.setDescription(AUTO_REGISTERED);
        this.clusterMapper.insertOnDuplicateKeyUpdate(cluster);
        InlongClusterOperator operator = this.clusterOperatorFactory.getInstance(cluster.getType());
        ClusterInfo clusterInfo = operator.getFromEntity(cluster);
        log.debug("success to fetch cluster for heartbeat: {}", (Object)componentHeartbeat);
        return clusterInfo;
    }

    private static boolean heartbeatConfigModified(HeartbeatMsg oldHB, HeartbeatMsg newHB) {
        if (oldHB == null) {
            return true;
        }
        return oldHB.getNodeGroup() != newHB.getNodeGroup() || oldHB.getLoad() != newHB.getLoad();
    }

    public Cache<ComponentHeartbeat, HeartbeatMsg> getHeartbeatCache() {
        return this.heartbeatCache;
    }

    public LoadingCache<ComponentHeartbeat, ClusterInfo> getClusterInfoCache() {
        return this.clusterInfoCache;
    }
}

