/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.core.impl;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.db.CommandEntity;
import org.apache.inlong.common.enums.PullJobTypeEnum;
import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.CmdConfig;
import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.common.pojo.agent.TaskResult;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

@Service
public class AgentServiceImpl
implements AgentService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AgentServiceImpl.class);
    private static final int UNISSUED_STATUS = 2;
    private static final int ISSUED_STATUS = 3;
    private static final int MODULUS_100 = 100;
    private static final int TASK_FETCH_SIZE = 2;
    @Autowired
    private StreamSourceEntityMapper sourceMapper;
    @Autowired
    private SourceSnapshotOperator snapshotOperator;
    @Autowired
    private DataSourceCmdConfigEntityMapper sourceCmdConfigMapper;
    @Autowired
    private InlongStreamEntityMapper streamMapper;

    @Override
    public Boolean reportSnapshot(TaskSnapshotRequest request) {
        return this.snapshotOperator.snapshot(request);
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, isolation=Isolation.READ_COMMITTED, propagation=Propagation.REQUIRES_NEW)
    public void report(TaskRequest request) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("begin to get agent task: {}", (Object)request);
        }
        if (request == null || StringUtils.isBlank((CharSequence)request.getAgentIp())) {
            throw new BusinessException("agent request or agent ip was empty, just return");
        }
        if (CollectionUtils.isEmpty((Collection)request.getCommandInfo())) {
            LOGGER.warn("task result was empty, just return");
            return;
        }
        for (CommandEntity command : request.getCommandInfo()) {
            this.updateTaskStatus(command);
        }
    }

    private void updateTaskStatus(CommandEntity command) {
        Integer taskId = command.getTaskId();
        StreamSourceEntity current = this.sourceMapper.selectForAgentTask(taskId);
        if (current == null) {
            LOGGER.warn("stream source not found by id={}, just return", (Object)taskId);
            return;
        }
        if (!Objects.equals(command.getVersion(), current.getVersion())) {
            LOGGER.warn("task result version [{}] not equals to current [{}] for id [{}], skip update", new Object[]{command.getVersion(), current.getVersion(), taskId});
            return;
        }
        int result = command.getCommandResult();
        int previousStatus = current.getStatus();
        int nextStatus = SourceStatus.SOURCE_NORMAL.getCode();
        if (1 == result) {
            LOGGER.warn("task failed for id =[{}]", (Object)taskId);
            nextStatus = SourceStatus.SOURCE_FAILED.getCode();
        } else if (previousStatus / 100 == 3) {
            if (SourceStatus.TEMP_TO_NORMAL.contains(previousStatus)) {
                nextStatus = SourceStatus.SOURCE_NORMAL.getCode();
            } else if (SourceStatus.BEEN_ISSUED_DELETE.getCode() == previousStatus) {
                nextStatus = SourceStatus.SOURCE_DISABLE.getCode();
            } else if (SourceStatus.BEEN_ISSUED_FROZEN.getCode() == previousStatus) {
                nextStatus = SourceStatus.SOURCE_FROZEN.getCode();
            }
        }
        if (nextStatus != previousStatus) {
            this.sourceMapper.updateStatus(taskId, Integer.valueOf(nextStatus), Boolean.valueOf(false));
            LOGGER.info("task result=[{}], update source status to [{}] for id [{}]", new Object[]{result, nextStatus, taskId});
        }
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, isolation=Isolation.READ_COMMITTED, propagation=Propagation.REQUIRES_NEW)
    public TaskResult getTaskResult(TaskRequest request) {
        if (request == null || StringUtils.isBlank((CharSequence)request.getAgentIp())) {
            throw new BusinessException("agent request or agent ip was empty, just return");
        }
        ArrayList tasks = Lists.newArrayList();
        List<DataConfig> nonFileTasks = this.fetchNonFileTasks(request);
        tasks.addAll(nonFileTasks);
        List<DataConfig> fileTasks = this.fetchFileTasks(request);
        tasks.addAll(fileTasks);
        List<DataConfig> needIssuedTasks = this.fetchIssuedTasks(request);
        tasks.addAll(needIssuedTasks);
        List<CmdConfig> cmdConfigs = this.getAgentCmdConfigs(request);
        return TaskResult.builder().dataConfigs((List)tasks).cmdConfigs(cmdConfigs).build();
    }

    private List<DataConfig> fetchNonFileTasks(TaskRequest taskRequest) {
        List<Integer> needAddStatusList;
        if (PullJobTypeEnum.NEVER == PullJobTypeEnum.getPullJobType((int)taskRequest.getPullJobType())) {
            LOGGER.warn("agent pull job type is [NEVER], just pull to be active tasks");
            needAddStatusList = Collections.singletonList(SourceStatus.TO_BE_ISSUED_ACTIVE.getCode());
        } else {
            needAddStatusList = Arrays.asList(SourceStatus.TO_BE_ISSUED_ADD.getCode(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode());
        }
        ArrayList sourceTypes = Lists.newArrayList((Object[])new String[]{"MYSQL_SQL", "KAFKA", "MYSQL_BINLOG"});
        List sourceEntities = this.sourceMapper.selectByStatusAndType(needAddStatusList, (List)sourceTypes, 2);
        ArrayList nonFileTasks = Lists.newArrayList();
        for (StreamSourceEntity sourceEntity : sourceEntities) {
            int op = this.getOp(sourceEntity.getStatus());
            int nextStatus = this.getNextStatus(sourceEntity.getStatus());
            sourceEntity.setStatus(Integer.valueOf(nextStatus));
            sourceEntity.setAgentIp(taskRequest.getAgentIp());
            sourceEntity.setUuid(taskRequest.getUuid());
            if (this.sourceMapper.updateByPrimaryKeySelective(sourceEntity) != 1) continue;
            sourceEntity.setVersion(Integer.valueOf(sourceEntity.getVersion() + 1));
            nonFileTasks.add(this.getDataConfig(sourceEntity, op));
        }
        return nonFileTasks;
    }

    private List<DataConfig> fetchFileTasks(TaskRequest taskRequest) {
        List<Integer> needAddStatusList;
        if (PullJobTypeEnum.NEVER == PullJobTypeEnum.getPullJobType((int)taskRequest.getPullJobType())) {
            LOGGER.warn("agent pull job type is [NEVER], just pull to be active tasks");
            needAddStatusList = Collections.singletonList(SourceStatus.TO_BE_ISSUED_ACTIVE.getCode());
        } else {
            needAddStatusList = Arrays.asList(SourceStatus.TO_BE_ISSUED_ADD.getCode(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode());
        }
        String agentIp = taskRequest.getAgentIp();
        String agentClusterName = taskRequest.getClusterName();
        String uuid = taskRequest.getUuid();
        Preconditions.checkTrue((StringUtils.isNotBlank((CharSequence)agentIp) || StringUtils.isNotBlank((CharSequence)agentClusterName) ? 1 : 0) != 0, (String)"both agent ip and cluster name are blank when fetching file task");
        List sourceEntities = this.sourceMapper.selectByAgentIpOrCluster(needAddStatusList, (List)Lists.newArrayList((Object[])new String[]{"FILE"}), agentIp, agentClusterName, 20);
        ArrayList fileTasks = Lists.newArrayList();
        for (StreamSourceEntity sourceEntity : sourceEntities) {
            FileSourceDTO fileSourceDTO = FileSourceDTO.getFromJson((String)sourceEntity.getExtParams());
            String destIp = sourceEntity.getAgentIp();
            String destClusterName = sourceEntity.getInlongClusterName();
            if (StringUtils.isNotBlank((CharSequence)destIp) && StringUtils.isBlank((CharSequence)destClusterName)) {
                if (!destIp.equals(agentIp)) continue;
                int op = this.getOp(sourceEntity.getStatus());
                int nextStatus = this.getNextStatus(sourceEntity.getStatus());
                sourceEntity.setUuid(uuid);
                sourceEntity.setStatus(Integer.valueOf(nextStatus));
                if (this.sourceMapper.updateByPrimaryKeySelective(sourceEntity) != 1) continue;
                sourceEntity.setVersion(Integer.valueOf(sourceEntity.getVersion() + 1));
                fileTasks.add(this.getDataConfig(sourceEntity, op));
                continue;
            }
            if (StringUtils.isNotBlank((CharSequence)destClusterName) && destClusterName.equals(agentClusterName) && Objects.isNull(sourceEntity.getTemplateId())) {
                List subSources = this.sourceMapper.selectByTemplateId(sourceEntity.getId());
                if (subSources.stream().anyMatch(subSource -> subSource.getAgentIp().equals(agentIp))) continue;
                StreamSourceEntity fileEntity = (StreamSourceEntity)CommonBeanUtils.copyProperties((Object)sourceEntity, StreamSourceEntity::new);
                FileSourceDTO childFileSourceDTO = (FileSourceDTO)CommonBeanUtils.copyProperties((Object)fileSourceDTO, FileSourceDTO::new);
                fileEntity.setExtParams(JsonUtils.toJsonString((Object)childFileSourceDTO));
                fileEntity.setAgentIp(agentIp);
                fileEntity.setUuid(uuid);
                fileEntity.setSourceName(fileEntity.getSourceName() + "-" + RandomStringUtils.randomAlphanumeric((int)10));
                fileEntity.setTemplateId(sourceEntity.getId());
                int op = this.getOp(fileEntity.getStatus());
                int nextStatus = this.getNextStatus(fileEntity.getStatus());
                fileEntity.setStatus(Integer.valueOf(nextStatus));
                if (this.sourceMapper.insert(fileEntity) > 0) {
                    fileTasks.add(this.getDataConfig(fileEntity, op));
                }
            }
            if (fileTasks.size() < 2) continue;
            break;
        }
        return fileTasks;
    }

    private List<DataConfig> fetchIssuedTasks(TaskRequest taskRequest) {
        String agentIp = taskRequest.getAgentIp();
        String uuid = taskRequest.getUuid();
        List<Integer> statusList = Arrays.asList(SourceStatus.TO_BE_ISSUED_DELETE.getCode(), SourceStatus.TO_BE_ISSUED_RETRY.getCode(), SourceStatus.TO_BE_ISSUED_BACKTRACK.getCode(), SourceStatus.TO_BE_ISSUED_FROZEN.getCode(), SourceStatus.TO_BE_ISSUED_CHECK.getCode(), SourceStatus.TO_BE_ISSUED_REDO_METRIC.getCode(), SourceStatus.TO_BE_ISSUED_MAKEUP.getCode());
        List sourceEntities = this.sourceMapper.selectByStatusAndIp(statusList, agentIp, uuid);
        ArrayList issuedTasks = Lists.newArrayList();
        for (StreamSourceEntity issuedTask : sourceEntities) {
            int op = this.getOp(issuedTask.getStatus());
            int nextStatus = this.getNextStatus(issuedTask.getStatus());
            issuedTask.setStatus(Integer.valueOf(nextStatus));
            if (this.sourceMapper.updateByPrimaryKeySelective(issuedTask) != 1) continue;
            issuedTask.setVersion(Integer.valueOf(issuedTask.getVersion() + 1));
            issuedTasks.add(this.getDataConfig(issuedTask, op));
        }
        return issuedTasks;
    }

    private int getOp(int status) {
        return status % 100;
    }

    private int getNextStatus(int status) {
        int op = status % 100;
        return 300 + op;
    }

    private DataConfig getDataConfig(StreamSourceEntity entity, int op) {
        DataConfig dataConfig = new DataConfig();
        dataConfig.setIp(entity.getAgentIp());
        dataConfig.setUuid(entity.getUuid());
        dataConfig.setOp(String.valueOf(op));
        dataConfig.setTaskId(entity.getId());
        dataConfig.setTaskType(Integer.valueOf(this.getTaskType(entity)));
        dataConfig.setTaskName(entity.getSourceName());
        dataConfig.setSnapshot(entity.getSnapshot());
        dataConfig.setVersion(entity.getVersion());
        String groupId = entity.getInlongGroupId();
        String streamId = entity.getInlongStreamId();
        dataConfig.setInlongGroupId(groupId);
        dataConfig.setInlongStreamId(streamId);
        InlongStreamEntity streamEntity = this.streamMapper.selectByIdentifier(groupId, streamId);
        String extParams = entity.getExtParams();
        if (streamEntity != null) {
            dataConfig.setSyncSend(streamEntity.getSyncSend());
            if ("FILE".equalsIgnoreCase(streamEntity.getDataType())) {
                String dataSeparator = streamEntity.getDataSeparator();
                extParams = null != dataSeparator ? this.getExtParams(extParams, dataSeparator) : extParams;
            }
        } else {
            dataConfig.setSyncSend(Integer.valueOf(0));
            LOGGER.warn("set syncSend=[0] as the stream not exists for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        }
        dataConfig.setExtParams(extParams);
        return dataConfig;
    }

    private String getExtParams(String extParams, String dataSeparator) {
        if (Objects.isNull(extParams)) {
            return null;
        }
        FileSourceDTO fileSourceDTO = (FileSourceDTO)JsonUtils.parseObject((String)extParams, FileSourceDTO.class);
        if (Objects.nonNull(fileSourceDTO)) {
            fileSourceDTO.setDataSeparator(dataSeparator);
            return JsonUtils.toJsonString((Object)fileSourceDTO);
        }
        return extParams;
    }

    private int getTaskType(StreamSourceEntity sourceEntity) {
        TaskTypeEnum taskType = (TaskTypeEnum)SourceType.SOURCE_TASK_MAP.get(sourceEntity.getSourceType());
        if (taskType == null) {
            throw new BusinessException("Unsupported task type for source type " + sourceEntity.getSourceType());
        }
        return taskType.getType();
    }

    private List<CmdConfig> getAgentCmdConfigs(TaskRequest taskRequest) {
        return this.sourceCmdConfigMapper.queryCmdByAgentIp(taskRequest.getAgentIp()).stream().map(cmd -> {
            CmdConfig cmdConfig = new CmdConfig();
            cmdConfig.setDataTime(cmd.getSpecifiedDataTime());
            cmdConfig.setOp(cmd.getCmdType());
            cmdConfig.setId(cmd.getId());
            cmdConfig.setTaskId(cmd.getTaskId());
            return cmdConfig;
        }).collect(Collectors.toList());
    }
}

