package org.apache.inlong.manager.service.heartbeat;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.base.Joiner;
import com.google.gson.Gson;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.NodeSrvStatus;
import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
import org.apache.inlong.common.heartbeat.ComponentHeartbeat;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
import org.apache.inlong.manager.common.enums.ClusterStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.NodeStatus;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
import org.apache.inlong.manager.dao.mapper.ComponentHeartbeatEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeDTO;
import org.apache.inlong.manager.service.cluster.InlongClusterOperatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

@Lazy
@Component
/* loaded from: input_file:org/apache/inlong/manager/service/heartbeat/HeartbeatManager.class */
public class HeartbeatManager implements AbstractHeartbeatManager {
    private static final Logger log;
    private static final String AUTO_REGISTERED = "auto registered";
    private static final Gson GSON;
    private Cache<ComponentHeartbeat, HeartbeatMsg> heartbeatCache;
    private LoadingCache<ComponentHeartbeat, ClusterInfo> clusterInfoCache;

    @Autowired
    private InlongClusterOperatorFactory clusterOperatorFactory;

    @Autowired
    private InlongClusterEntityMapper clusterMapper;

    @Autowired
    private InlongClusterNodeEntityMapper clusterNodeMapper;

    @Autowired
    private StreamSourceEntityMapper sourceMapper;

    @Autowired
    private ComponentHeartbeatEntityMapper componentHeartbeatMapper;

    @Value("${cluster.heartbeat.interval:30}")
    private Long heartbeatIntervalFactor;

    @Value("${reset.nodeStatus.enabled:false}")
    private Boolean resetNodeStatusEnabled;
    static final /* synthetic */ boolean $assertionsDisabled;

    private static boolean heartbeatConfigModified(HeartbeatMsg heartbeatMsg, HeartbeatMsg heartbeatMsg2) {
        return (heartbeatMsg != null && Objects.equals(heartbeatMsg.getNodeGroup(), heartbeatMsg2.getNodeGroup()) && Objects.equals(heartbeatMsg.getLoad(), heartbeatMsg2.getLoad())) ? false : true;
    }

    @PostConstruct
    public void init() {
        if (this.resetNodeStatusEnabled.booleanValue()) {
            this.clusterNodeMapper.updateStatus((Integer) null, Integer.valueOf(NodeStatus.HEARTBEAT_TIMEOUT.getStatus()), Integer.valueOf(NodeStatus.NORMAL.getStatus()));
        }
        long heartbeatInterval = heartbeatInterval() * this.heartbeatIntervalFactor.longValue();
        this.heartbeatCache = Caffeine.newBuilder().scheduler(Scheduler.forScheduledExecutorService(Executors.newSingleThreadScheduledExecutor())).expireAfterAccess(heartbeatInterval, TimeUnit.SECONDS).removalListener((componentHeartbeat, heartbeatMsg, removalCause) -> {
            if ((removalCause.wasEvicted() || removalCause == RemovalCause.EXPLICIT) && heartbeatMsg != null) {
                evictClusterNode(heartbeatMsg);
            }
        }).build();
        this.clusterInfoCache = Caffeine.newBuilder().expireAfterAccess(heartbeatInterval * 2, TimeUnit.SECONDS).build(this::fetchCluster);
    }

    public void reportHeartbeat(HeartbeatMsg heartbeatMsg) {
        ComponentHeartbeat componentHeartbeat = heartbeatMsg.componentHeartbeat();
        ClusterInfo clusterInfo = (ClusterInfo) this.clusterInfoCache.get(componentHeartbeat);
        if (clusterInfo == null) {
            log.error("not found any cluster by name={} and type={}", componentHeartbeat.getClusterName(), componentHeartbeat.getComponentType());
            return;
        }
        HeartbeatMsg heartbeatMsg2 = (HeartbeatMsg) this.heartbeatCache.getIfPresent(componentHeartbeat);
        String[] split = heartbeatMsg.getPort().split(",");
        String[] split2 = heartbeatMsg.getIp().split(",");
        String protocolType = heartbeatMsg.getProtocolType();
        String[] strArr = null;
        if (StringUtils.isNotBlank(protocolType) && split.length > 1) {
            strArr = protocolType.split(",");
            if (strArr.length < split.length) {
                strArr = null;
            }
        }
        int i = 0;
        for (int i2 = 0; i2 < split.length; i2++) {
            HeartbeatMsg heartbeatMsg3 = (HeartbeatMsg) JsonUtils.parseObject(JsonUtils.toJsonByte(heartbeatMsg), HeartbeatMsg.class);
            if (!$assertionsDisabled && heartbeatMsg3 == null) {
                throw new AssertionError();
            }
            heartbeatMsg3.setPort(split[i2].trim());
            heartbeatMsg3.setIp(split2[i2].trim());
            if (strArr != null) {
                heartbeatMsg3.setProtocolType(strArr[i2]);
            } else {
                heartbeatMsg3.setProtocolType(protocolType);
            }
            if (NodeSrvStatus.SERVICE_UNINSTALL.equals(heartbeatMsg.getNodeSrvStatus())) {
                deleteClusterNode(getClusterNode(clusterInfo, heartbeatMsg3));
            } else if (heartbeatConfigModified(heartbeatMsg2, heartbeatMsg)) {
                InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeatMsg3);
                if (clusterNode == null) {
                    i += insertClusterNode(clusterInfo, heartbeatMsg3, clusterInfo.getCreator());
                } else {
                    i += updateClusterNode(clusterNode, heartbeatMsg3);
                    if (Objects.equals(clusterNode.getType(), "AGENT")) {
                        List selectHeartbeatTimeoutIds = this.sourceMapper.selectHeartbeatTimeoutIds((List) null, heartbeatMsg.getIp(), heartbeatMsg.getClusterName());
                        if (CollectionUtils.isNotEmpty(selectHeartbeatTimeoutIds)) {
                            this.sourceMapper.rollbackTimeoutStatusByIds(selectHeartbeatTimeoutIds, (String) null);
                        }
                    }
                }
            }
        }
        if (heartbeatMsg2 == null || i == split.length) {
            this.heartbeatCache.put(componentHeartbeat, heartbeatMsg);
        }
    }

    private void evictClusterNode(HeartbeatMsg heartbeatMsg) {
        log.debug("evict cluster node");
        ComponentHeartbeat componentHeartbeat = heartbeatMsg.componentHeartbeat();
        ClusterInfo clusterInfo = (ClusterInfo) this.clusterInfoCache.getIfPresent(componentHeartbeat);
        if (clusterInfo == null) {
            log.error("not found any cluster by name={} and type={}", componentHeartbeat.getClusterName(), componentHeartbeat.getComponentType());
            return;
        }
        String[] split = heartbeatMsg.getPort().split(",");
        String[] split2 = heartbeatMsg.getIp().split(",");
        String protocolType = heartbeatMsg.getProtocolType();
        String[] strArr = null;
        if (StringUtils.isNotBlank(protocolType) && split.length > 1) {
            strArr = protocolType.split(",");
            if (strArr.length < split.length) {
                strArr = null;
            }
        }
        if (this.componentHeartbeatMapper.selectTimeOutHeartBeat(componentHeartbeat.getComponentType(), componentHeartbeat.getIp(), Long.valueOf(heartbeatInterval() * 2)) != null) {
            this.heartbeatCache.put(componentHeartbeat, heartbeatMsg);
            return;
        }
        for (int i = 0; i < split.length; i++) {
            HeartbeatMsg heartbeatMsg2 = (HeartbeatMsg) JsonUtils.parseObject(JsonUtils.toJsonByte(heartbeatMsg), HeartbeatMsg.class);
            if (!$assertionsDisabled && heartbeatMsg2 == null) {
                throw new AssertionError();
            }
            heartbeatMsg2.setPort(split[i].trim());
            heartbeatMsg2.setIp(split2[i].trim());
            if (strArr != null) {
                heartbeatMsg2.setProtocolType(strArr[i]);
            } else {
                heartbeatMsg2.setProtocolType(protocolType);
            }
            InlongClusterNodeEntity clusterNode = getClusterNode(clusterInfo, heartbeatMsg2);
            if (clusterNode == null) {
                log.error("not found any cluster node by type={}, ip={}, port={}", new Object[]{heartbeatMsg.getComponentType(), heartbeatMsg.getIp(), heartbeatMsg.getPort()});
                return;
            } else {
                clusterNode.setStatus(Integer.valueOf(NodeStatus.HEARTBEAT_TIMEOUT.getStatus()));
                this.clusterNodeMapper.updateById(clusterNode);
            }
        }
    }

    private InlongClusterNodeEntity getClusterNode(ClusterInfo clusterInfo, HeartbeatMsg heartbeatMsg) {
        ClusterNodeRequest clusterNodeRequest = new ClusterNodeRequest();
        clusterNodeRequest.setParentId(clusterInfo.getId());
        clusterNodeRequest.setType(heartbeatMsg.getComponentType());
        clusterNodeRequest.setIp(heartbeatMsg.getIp());
        clusterNodeRequest.setPort(Integer.valueOf(heartbeatMsg.getPort()));
        clusterNodeRequest.setProtocolType(heartbeatMsg.getProtocolType());
        return this.clusterNodeMapper.selectByUniqueKey(clusterNodeRequest);
    }

    private int insertClusterNode(ClusterInfo clusterInfo, HeartbeatMsg heartbeatMsg, String str) {
        InlongClusterNodeEntity inlongClusterNodeEntity = new InlongClusterNodeEntity();
        inlongClusterNodeEntity.setParentId(clusterInfo.getId());
        inlongClusterNodeEntity.setType(heartbeatMsg.getComponentType());
        inlongClusterNodeEntity.setIp(heartbeatMsg.getIp());
        inlongClusterNodeEntity.setPort(Integer.valueOf(heartbeatMsg.getPort()));
        inlongClusterNodeEntity.setProtocolType(heartbeatMsg.getProtocolType());
        inlongClusterNodeEntity.setNodeLoad(heartbeatMsg.getLoad());
        inlongClusterNodeEntity.setStatus(Integer.valueOf(ClusterStatus.NORMAL.getStatus()));
        inlongClusterNodeEntity.setCreator(str);
        inlongClusterNodeEntity.setModifier(str);
        inlongClusterNodeEntity.setDescription(AUTO_REGISTERED);
        insertOrUpdateNodeGroup(inlongClusterNodeEntity, heartbeatMsg);
        return this.clusterNodeMapper.insertOnDuplicateKeyUpdate(inlongClusterNodeEntity);
    }

    private int updateClusterNode(InlongClusterNodeEntity inlongClusterNodeEntity, HeartbeatMsg heartbeatMsg) {
        inlongClusterNodeEntity.setStatus(Integer.valueOf(ClusterStatus.NORMAL.getStatus()));
        inlongClusterNodeEntity.setNodeLoad(heartbeatMsg.getLoad());
        insertOrUpdateNodeGroup(inlongClusterNodeEntity, heartbeatMsg);
        return this.clusterNodeMapper.updateById(inlongClusterNodeEntity);
    }

    private void insertOrUpdateNodeGroup(InlongClusterNodeEntity inlongClusterNodeEntity, HeartbeatMsg heartbeatMsg) {
        Iterable hashSet = StringUtils.isBlank(heartbeatMsg.getNodeGroup()) ? new HashSet() : (Set) Arrays.stream(heartbeatMsg.getNodeGroup().split(",")).collect(Collectors.toSet());
        AgentClusterNodeDTO agentClusterNodeDTO = new AgentClusterNodeDTO();
        if (StringUtils.isNotBlank(inlongClusterNodeEntity.getExtParams())) {
            agentClusterNodeDTO = AgentClusterNodeDTO.getFromJson(inlongClusterNodeEntity.getExtParams());
            agentClusterNodeDTO.setAgentGroup(Joiner.on(",").join(hashSet));
        }
        inlongClusterNodeEntity.setExtParams(GSON.toJson(agentClusterNodeDTO));
    }

    private int deleteClusterNode(InlongClusterNodeEntity inlongClusterNodeEntity) {
        return this.clusterNodeMapper.deleteById(inlongClusterNodeEntity.getId());
    }

    private ClusterInfo fetchCluster(ComponentHeartbeat componentHeartbeat) {
        String clusterName = componentHeartbeat.getClusterName();
        String componentType = componentHeartbeat.getComponentType();
        String clusterTag = componentHeartbeat.getClusterTag();
        String extTag = componentHeartbeat.getExtTag();
        Preconditions.expectNotBlank(clusterTag, ErrorCodeEnum.INVALID_PARAMETER, "cluster tag cannot be null");
        Preconditions.expectNotBlank(componentType, ErrorCodeEnum.INVALID_PARAMETER, "cluster type cannot be null");
        Preconditions.expectNotBlank(clusterName, ErrorCodeEnum.INVALID_PARAMETER, "cluster name cannot be null");
        InlongClusterEntity selectByNameAndType = this.clusterMapper.selectByNameAndType(clusterName, componentType);
        if (null != selectByNameAndType) {
            return this.clusterOperatorFactory.getInstance(selectByNameAndType.getType()).getFromEntity(selectByNameAndType);
        }
        InlongClusterEntity inlongClusterEntity = new InlongClusterEntity();
        inlongClusterEntity.setName(clusterName);
        inlongClusterEntity.setDisplayName(clusterName);
        inlongClusterEntity.setType(componentType);
        inlongClusterEntity.setClusterTags(clusterTag);
        inlongClusterEntity.setExtTag(extTag);
        String inCharges = componentHeartbeat.getInCharges();
        if (StringUtils.isBlank(inCharges)) {
            inCharges = "admin";
        }
        String str = inCharges.split(",")[0];
        inlongClusterEntity.setInCharges(inCharges);
        inlongClusterEntity.setCreator(str);
        inlongClusterEntity.setModifier(str);
        inlongClusterEntity.setStatus(Integer.valueOf(ClusterStatus.NORMAL.getStatus()));
        inlongClusterEntity.setDescription(AUTO_REGISTERED);
        this.clusterMapper.insertOnDuplicateKeyUpdate(inlongClusterEntity);
        ClusterInfo fromEntity = this.clusterOperatorFactory.getInstance(inlongClusterEntity.getType()).getFromEntity(inlongClusterEntity);
        log.debug("success to fetch cluster for heartbeat: {}", componentHeartbeat);
        return fromEntity;
    }

    public Cache<ComponentHeartbeat, HeartbeatMsg> getHeartbeatCache() {
        return this.heartbeatCache;
    }

    public LoadingCache<ComponentHeartbeat, ClusterInfo> getClusterInfoCache() {
        return this.clusterInfoCache;
    }

    static {
        $assertionsDisabled = !HeartbeatManager.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(HeartbeatManager.class);
        GSON = new Gson();
    }
}
