package org.apache.inlong.manager.service.core.impl;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.db.CommandEntity;
import org.apache.inlong.common.enums.PullJobTypeEnum;
import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.CmdConfig;
import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.common.pojo.agent.TaskResult;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/core/impl/AgentServiceImpl.class */
public class AgentServiceImpl implements AgentService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AgentServiceImpl.class);
    private static final int UNISSUED_STATUS = 2;
    private static final int ISSUED_STATUS = 3;
    private static final int MODULUS_100 = 100;
    private static final int TASK_FETCH_SIZE = 2;

    @Autowired
    private StreamSourceEntityMapper sourceMapper;

    @Autowired
    private SourceSnapshotOperator snapshotOperator;

    @Autowired
    private DataSourceCmdConfigEntityMapper sourceCmdConfigMapper;

    @Autowired
    private InlongStreamEntityMapper streamMapper;

    @Override // org.apache.inlong.manager.service.core.AgentService
    public Boolean reportSnapshot(TaskSnapshotRequest taskSnapshotRequest) {
        return this.snapshotOperator.snapshot(taskSnapshotRequest);
    }

    @Override // org.apache.inlong.manager.service.core.AgentService
    @Transactional(rollbackFor = {Throwable.class}, isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
    public void report(TaskRequest taskRequest) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("begin to get agent task: {}", taskRequest);
        }
        if (taskRequest == null || StringUtils.isBlank(taskRequest.getAgentIp())) {
            throw new BusinessException("agent request or agent ip was empty, just return");
        }
        if (CollectionUtils.isEmpty(taskRequest.getCommandInfo())) {
            LOGGER.warn("task result was empty, just return");
            return;
        }
        Iterator it = taskRequest.getCommandInfo().iterator();
        while (it.hasNext()) {
            updateTaskStatus((CommandEntity) it.next());
        }
    }

    private void updateTaskStatus(CommandEntity commandEntity) {
        Integer taskId = commandEntity.getTaskId();
        StreamSourceEntity selectForAgentTask = this.sourceMapper.selectForAgentTask(taskId);
        if (selectForAgentTask == null) {
            LOGGER.warn("stream source not found by id={}, just return", taskId);
            return;
        }
        if (!Objects.equals(commandEntity.getVersion(), selectForAgentTask.getVersion())) {
            LOGGER.warn("task result version [{}] not equals to current [{}] for id [{}], skip update", new Object[]{commandEntity.getVersion(), selectForAgentTask.getVersion(), taskId});
            return;
        }
        int commandResult = commandEntity.getCommandResult();
        int intValue = selectForAgentTask.getStatus().intValue();
        int intValue2 = SourceStatus.SOURCE_NORMAL.getCode().intValue();
        if (1 == commandResult) {
            LOGGER.warn("task failed for id =[{}]", taskId);
            intValue2 = SourceStatus.SOURCE_FAILED.getCode().intValue();
        } else if (intValue / MODULUS_100 == ISSUED_STATUS) {
            if (SourceStatus.TEMP_TO_NORMAL.contains(Integer.valueOf(intValue))) {
                intValue2 = SourceStatus.SOURCE_NORMAL.getCode().intValue();
            } else if (SourceStatus.BEEN_ISSUED_DELETE.getCode().intValue() == intValue) {
                intValue2 = SourceStatus.SOURCE_DISABLE.getCode().intValue();
            } else if (SourceStatus.BEEN_ISSUED_FROZEN.getCode().intValue() == intValue) {
                intValue2 = SourceStatus.SOURCE_FROZEN.getCode().intValue();
            }
        }
        if (intValue2 != intValue) {
            this.sourceMapper.updateStatus(taskId, Integer.valueOf(intValue2), false);
            LOGGER.info("task result=[{}], update source status to [{}] for id [{}]", new Object[]{Integer.valueOf(commandResult), Integer.valueOf(intValue2), taskId});
        }
    }

    @Override // org.apache.inlong.manager.service.core.AgentService
    @Transactional(rollbackFor = {Throwable.class}, isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
    public TaskResult getTaskResult(TaskRequest taskRequest) {
        if (taskRequest == null || StringUtils.isBlank(taskRequest.getAgentIp())) {
            throw new BusinessException("agent request or agent ip was empty, just return");
        }
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.addAll(fetchNonFileTasks(taskRequest));
        newArrayList.addAll(fetchFileTasks(taskRequest));
        newArrayList.addAll(fetchIssuedTasks(taskRequest));
        return TaskResult.builder().dataConfigs(newArrayList).cmdConfigs(getAgentCmdConfigs(taskRequest)).build();
    }

    private List<DataConfig> fetchNonFileTasks(TaskRequest taskRequest) {
        List asList;
        if (PullJobTypeEnum.NEVER == PullJobTypeEnum.getPullJobType(taskRequest.getPullJobType())) {
            LOGGER.warn("agent pull job type is [NEVER], just pull to be active tasks");
            asList = Collections.singletonList(SourceStatus.TO_BE_ISSUED_ACTIVE.getCode());
        } else {
            asList = Arrays.asList(SourceStatus.TO_BE_ISSUED_ADD.getCode(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode());
        }
        List<StreamSourceEntity> selectByStatusAndType = this.sourceMapper.selectByStatusAndType(asList, Lists.newArrayList(new String[]{"MYSQL_SQL", "KAFKA", "MYSQL_BINLOG"}), 2);
        ArrayList newArrayList = Lists.newArrayList();
        for (StreamSourceEntity streamSourceEntity : selectByStatusAndType) {
            int op = getOp(streamSourceEntity.getStatus().intValue());
            streamSourceEntity.setStatus(Integer.valueOf(getNextStatus(streamSourceEntity.getStatus().intValue())));
            streamSourceEntity.setAgentIp(taskRequest.getAgentIp());
            streamSourceEntity.setUuid(taskRequest.getUuid());
            if (this.sourceMapper.updateByPrimaryKeySelective(streamSourceEntity) == 1) {
                streamSourceEntity.setVersion(Integer.valueOf(streamSourceEntity.getVersion().intValue() + 1));
                newArrayList.add(getDataConfig(streamSourceEntity, op));
            }
        }
        return newArrayList;
    }

    private List<DataConfig> fetchFileTasks(TaskRequest taskRequest) {
        List asList;
        if (PullJobTypeEnum.NEVER == PullJobTypeEnum.getPullJobType(taskRequest.getPullJobType())) {
            LOGGER.warn("agent pull job type is [NEVER], just pull to be active tasks");
            asList = Collections.singletonList(SourceStatus.TO_BE_ISSUED_ACTIVE.getCode());
        } else {
            asList = Arrays.asList(SourceStatus.TO_BE_ISSUED_ADD.getCode(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode());
        }
        String agentIp = taskRequest.getAgentIp();
        String clusterName = taskRequest.getClusterName();
        String uuid = taskRequest.getUuid();
        Preconditions.checkTrue(StringUtils.isNotBlank(agentIp) || StringUtils.isNotBlank(clusterName), "both agent ip and cluster name are blank when fetching file task");
        List<StreamSourceEntity> selectByAgentIpOrCluster = this.sourceMapper.selectByAgentIpOrCluster(asList, Lists.newArrayList(new String[]{"FILE"}), agentIp, clusterName, 20);
        ArrayList newArrayList = Lists.newArrayList();
        for (StreamSourceEntity streamSourceEntity : selectByAgentIpOrCluster) {
            FileSourceDTO fromJson = FileSourceDTO.getFromJson(streamSourceEntity.getExtParams());
            String agentIp2 = streamSourceEntity.getAgentIp();
            String inlongClusterName = streamSourceEntity.getInlongClusterName();
            if (!StringUtils.isNotBlank(agentIp2) || !StringUtils.isBlank(inlongClusterName)) {
                if (StringUtils.isNotBlank(inlongClusterName) && inlongClusterName.equals(clusterName) && Objects.isNull(streamSourceEntity.getTemplateId())) {
                    if (this.sourceMapper.selectByTemplateId(streamSourceEntity.getId()).stream().anyMatch(streamSourceEntity2 -> {
                        return streamSourceEntity2.getAgentIp().equals(agentIp);
                    })) {
                        continue;
                    } else {
                        StreamSourceEntity streamSourceEntity3 = (StreamSourceEntity) CommonBeanUtils.copyProperties(streamSourceEntity, StreamSourceEntity::new);
                        streamSourceEntity3.setExtParams(JsonUtils.toJsonString((FileSourceDTO) CommonBeanUtils.copyProperties(fromJson, FileSourceDTO::new)));
                        streamSourceEntity3.setAgentIp(agentIp);
                        streamSourceEntity3.setUuid(uuid);
                        streamSourceEntity3.setSourceName(streamSourceEntity3.getSourceName() + "-" + RandomStringUtils.randomAlphanumeric(10));
                        streamSourceEntity3.setTemplateId(streamSourceEntity.getId());
                        int op = getOp(streamSourceEntity3.getStatus().intValue());
                        streamSourceEntity3.setStatus(Integer.valueOf(getNextStatus(streamSourceEntity3.getStatus().intValue())));
                        if (this.sourceMapper.insert(streamSourceEntity3) > 0) {
                            newArrayList.add(getDataConfig(streamSourceEntity3, op));
                        }
                    }
                }
                if (newArrayList.size() >= 2) {
                    break;
                }
            } else if (agentIp2.equals(agentIp)) {
                int op2 = getOp(streamSourceEntity.getStatus().intValue());
                int nextStatus = getNextStatus(streamSourceEntity.getStatus().intValue());
                streamSourceEntity.setUuid(uuid);
                streamSourceEntity.setStatus(Integer.valueOf(nextStatus));
                if (this.sourceMapper.updateByPrimaryKeySelective(streamSourceEntity) == 1) {
                    streamSourceEntity.setVersion(Integer.valueOf(streamSourceEntity.getVersion().intValue() + 1));
                    newArrayList.add(getDataConfig(streamSourceEntity, op2));
                }
            }
        }
        return newArrayList;
    }

    private List<DataConfig> fetchIssuedTasks(TaskRequest taskRequest) {
        List<StreamSourceEntity> selectByStatusAndIp = this.sourceMapper.selectByStatusAndIp(Arrays.asList(SourceStatus.TO_BE_ISSUED_DELETE.getCode(), SourceStatus.TO_BE_ISSUED_RETRY.getCode(), SourceStatus.TO_BE_ISSUED_BACKTRACK.getCode(), SourceStatus.TO_BE_ISSUED_FROZEN.getCode(), SourceStatus.TO_BE_ISSUED_CHECK.getCode(), SourceStatus.TO_BE_ISSUED_REDO_METRIC.getCode(), SourceStatus.TO_BE_ISSUED_MAKEUP.getCode()), taskRequest.getAgentIp(), taskRequest.getUuid());
        ArrayList newArrayList = Lists.newArrayList();
        for (StreamSourceEntity streamSourceEntity : selectByStatusAndIp) {
            int op = getOp(streamSourceEntity.getStatus().intValue());
            streamSourceEntity.setStatus(Integer.valueOf(getNextStatus(streamSourceEntity.getStatus().intValue())));
            if (this.sourceMapper.updateByPrimaryKeySelective(streamSourceEntity) == 1) {
                streamSourceEntity.setVersion(Integer.valueOf(streamSourceEntity.getVersion().intValue() + 1));
                newArrayList.add(getDataConfig(streamSourceEntity, op));
            }
        }
        return newArrayList;
    }

    private int getOp(int i) {
        return i % MODULUS_100;
    }

    private int getNextStatus(int i) {
        return 300 + (i % MODULUS_100);
    }

    private DataConfig getDataConfig(StreamSourceEntity streamSourceEntity, int i) {
        DataConfig dataConfig = new DataConfig();
        dataConfig.setIp(streamSourceEntity.getAgentIp());
        dataConfig.setUuid(streamSourceEntity.getUuid());
        dataConfig.setOp(String.valueOf(i));
        dataConfig.setTaskId(streamSourceEntity.getId());
        dataConfig.setTaskType(Integer.valueOf(getTaskType(streamSourceEntity)));
        dataConfig.setTaskName(streamSourceEntity.getSourceName());
        dataConfig.setSnapshot(streamSourceEntity.getSnapshot());
        dataConfig.setVersion(streamSourceEntity.getVersion());
        String inlongGroupId = streamSourceEntity.getInlongGroupId();
        String inlongStreamId = streamSourceEntity.getInlongStreamId();
        dataConfig.setInlongGroupId(inlongGroupId);
        dataConfig.setInlongStreamId(inlongStreamId);
        InlongStreamEntity selectByIdentifier = this.streamMapper.selectByIdentifier(inlongGroupId, inlongStreamId);
        String extParams = streamSourceEntity.getExtParams();
        if (selectByIdentifier != null) {
            dataConfig.setSyncSend(selectByIdentifier.getSyncSend());
            if ("FILE".equalsIgnoreCase(selectByIdentifier.getDataType())) {
                String dataSeparator = selectByIdentifier.getDataSeparator();
                extParams = null != dataSeparator ? getExtParams(extParams, dataSeparator) : extParams;
            }
        } else {
            dataConfig.setSyncSend(0);
            LOGGER.warn("set syncSend=[0] as the stream not exists for groupId={}, streamId={}", inlongGroupId, inlongStreamId);
        }
        dataConfig.setExtParams(extParams);
        return dataConfig;
    }

    private String getExtParams(String str, String str2) {
        if (Objects.isNull(str)) {
            return null;
        }
        FileSourceDTO fileSourceDTO = (FileSourceDTO) JsonUtils.parseObject(str, FileSourceDTO.class);
        if (!Objects.nonNull(fileSourceDTO)) {
            return str;
        }
        fileSourceDTO.setDataSeparator(str2);
        return JsonUtils.toJsonString(fileSourceDTO);
    }

    private int getTaskType(StreamSourceEntity streamSourceEntity) {
        TaskTypeEnum taskTypeEnum = (TaskTypeEnum) SourceType.SOURCE_TASK_MAP.get(streamSourceEntity.getSourceType());
        if (taskTypeEnum == null) {
            throw new BusinessException("Unsupported task type for source type " + streamSourceEntity.getSourceType());
        }
        return taskTypeEnum.getType();
    }

    private List<CmdConfig> getAgentCmdConfigs(TaskRequest taskRequest) {
        return (List) this.sourceCmdConfigMapper.queryCmdByAgentIp(taskRequest.getAgentIp()).stream().map(dataSourceCmdConfigEntity -> {
            CmdConfig cmdConfig = new CmdConfig();
            cmdConfig.setDataTime(dataSourceCmdConfigEntity.getSpecifiedDataTime());
            cmdConfig.setOp(dataSourceCmdConfigEntity.getCmdType());
            cmdConfig.setId(dataSourceCmdConfigEntity.getId());
            cmdConfig.setTaskId(dataSourceCmdConfigEntity.getTaskId());
            return cmdConfig;
        }).collect(Collectors.toList());
    }
}
