/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.core.impl;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.db.CommandEntity;
import org.apache.inlong.common.enums.ComponentTypeEnum;
import org.apache.inlong.common.enums.PullJobTypeEnum;
import org.apache.inlong.common.pojo.agent.CmdConfig;
import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.common.pojo.agent.TaskResult;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.common.enums.SourceState;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogRequest;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.core.StreamConfigLogService;
import org.apache.inlong.manager.service.source.SourceSnapshotOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

@Service
public class AgentServiceImpl
implements AgentService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AgentServiceImpl.class);
    private static final int UNISSUED_STATUS = 2;
    private static final int ISSUED_STATUS = 3;
    private static final int MODULUS_100 = 100;
    private static final int TASK_FETCH_SIZE = 2;
    @Autowired
    private StreamSourceEntityMapper sourceMapper;
    @Autowired
    private SourceSnapshotOperation snapshotOperation;
    @Autowired
    private DataSourceCmdConfigEntityMapper sourceCmdConfigMapper;
    @Autowired
    private InlongStreamEntityMapper streamMapper;
    @Autowired
    private StreamConfigLogService streamConfigLogService;

    @Override
    public Boolean reportSnapshot(TaskSnapshotRequest request) {
        return this.snapshotOperation.snapshot(request);
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, isolation=Isolation.READ_COMMITTED, propagation=Propagation.REQUIRES_NEW)
    public void report(TaskRequest request) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("begin to get agent task: {}", (Object)request);
        }
        if (request == null || StringUtils.isBlank((CharSequence)request.getAgentIp())) {
            throw new BusinessException("agent request or agent ip was empty, just return");
        }
        if (CollectionUtils.isEmpty((Collection)request.getCommandInfo())) {
            LOGGER.warn("task result was empty, just return");
            return;
        }
        for (CommandEntity command : request.getCommandInfo()) {
            this.updateTaskStatus(command);
        }
    }

    private void updateTaskStatus(CommandEntity command) {
        Integer taskId = command.getTaskId();
        StreamSourceEntity current = this.sourceMapper.selectByIdForUpdate(taskId);
        if (current == null) {
            LOGGER.warn("stream source not found by id={}, just return", (Object)taskId);
            return;
        }
        if (!Objects.equals(command.getVersion(), current.getVersion())) {
            LOGGER.warn("task result version [{}] not equals to current [{}] for id [{}], skip update", new Object[]{command.getVersion(), current.getVersion(), taskId});
            return;
        }
        int result = command.getCommandResult();
        int previousStatus = current.getStatus();
        int nextStatus = SourceState.SOURCE_NORMAL.getCode();
        if (1 == result) {
            this.logFailedStreamSource(current);
            nextStatus = SourceState.SOURCE_FAILED.getCode();
        } else if (previousStatus / 100 == 3) {
            if (SourceState.TEMP_TO_NORMAL.contains(previousStatus)) {
                nextStatus = SourceState.SOURCE_NORMAL.getCode();
            } else if (SourceState.BEEN_ISSUED_DELETE.getCode() == previousStatus) {
                nextStatus = SourceState.SOURCE_DISABLE.getCode();
            } else if (SourceState.BEEN_ISSUED_FROZEN.getCode() == previousStatus) {
                nextStatus = SourceState.SOURCE_FROZEN.getCode();
            }
        }
        if (nextStatus != previousStatus) {
            this.sourceMapper.updateStatus(taskId, Integer.valueOf(nextStatus), Boolean.valueOf(false));
            LOGGER.info("task result=[{}], update source status to [{}] for id [{}]", new Object[]{result, nextStatus, taskId});
        }
    }

    @Override
    @Transactional(rollbackFor={Throwable.class}, isolation=Isolation.READ_COMMITTED, propagation=Propagation.REQUIRES_NEW)
    public TaskResult getTaskResult(TaskRequest request) {
        List<Integer> needAddStatusList;
        if (request == null || StringUtils.isBlank((CharSequence)request.getAgentIp())) {
            throw new BusinessException("agent request or agent ip was empty, just return");
        }
        String agentIp = request.getAgentIp();
        String uuid = request.getUuid();
        if (PullJobTypeEnum.NEVER != PullJobTypeEnum.getPullJobType((int)request.getPullJobType())) {
            needAddStatusList = Arrays.asList(SourceState.TO_BE_ISSUED_ADD.getCode(), SourceState.TO_BE_ISSUED_ACTIVE.getCode());
        } else {
            LOGGER.warn("agent pull job type is [NEVER], just pull to be active tasks");
            needAddStatusList = Collections.singletonList(SourceState.TO_BE_ISSUED_ACTIVE.getCode());
        }
        ArrayList sourceTypes = Lists.newArrayList((Object[])new String[]{SourceType.BINLOG.getType(), SourceType.KAFKA.getType(), SourceType.SQL.getType()});
        List entityList = this.sourceMapper.selectByStatusAndType(needAddStatusList, (List)sourceTypes, 2);
        List<Integer> statusList = Arrays.asList(SourceState.TO_BE_ISSUED_DELETE.getCode(), SourceState.TO_BE_ISSUED_RETRY.getCode(), SourceState.TO_BE_ISSUED_BACKTRACK.getCode(), SourceState.TO_BE_ISSUED_FROZEN.getCode(), SourceState.TO_BE_ISSUED_CHECK.getCode(), SourceState.TO_BE_ISSUED_REDO_METRIC.getCode(), SourceState.TO_BE_ISSUED_MAKEUP.getCode());
        List needIssuedList = this.sourceMapper.selectByStatusAndIp(statusList, agentIp, uuid);
        entityList.addAll(needIssuedList);
        List fileEntityList = this.sourceMapper.selectByStatusAndType(needAddStatusList, (List)Lists.newArrayList((Object[])new String[]{SourceType.FILE.getType()}), 4);
        for (Object fileEntity : fileEntityList) {
            FileSourceDTO fileSourceDTO = FileSourceDTO.getFromJson((String)fileEntity.getExtParams());
            if (!agentIp.equals(fileSourceDTO.getIp())) continue;
            entityList.add(fileEntity);
        }
        ArrayList dataConfigs = Lists.newArrayList();
        for (StreamSourceEntity entity : entityList) {
            int id = entity.getId();
            entity = this.sourceMapper.selectByIdForUpdate(Integer.valueOf(id));
            int status = entity.getStatus();
            int op = status % 100;
            if (status / 100 != 2) {
                LOGGER.info("skip task status not in 20x, id={}", (Object)id);
                continue;
            }
            int nextStatus = 300 + op;
            this.sourceMapper.updateStatus(Integer.valueOf(id), Integer.valueOf(nextStatus), Boolean.valueOf(false));
            LOGGER.info("update stream source status to [{}] for id [{}] ", (Object)nextStatus, (Object)id);
            DataConfig dataConfig = this.getDataConfig(entity, op);
            dataConfigs.add(dataConfig);
        }
        List<CmdConfig> cmdConfigs = this.getAgentCmdConfigs(request);
        for (StreamSourceEntity entity : entityList) {
            if (!StringUtils.isEmpty((CharSequence)entity.getAgentIp())) continue;
            this.sourceMapper.updateIpAndUuid(entity.getId(), agentIp, uuid, Boolean.valueOf(false));
            LOGGER.info("update stream source ip to [{}], uuid to [{}] for id [{}]", new Object[]{agentIp, uuid, entity.getId()});
        }
        return TaskResult.builder().dataConfigs((List)dataConfigs).cmdConfigs(cmdConfigs).build();
    }

    private void logFailedStreamSource(StreamSourceEntity entity) {
        InlongStreamConfigLogRequest request = new InlongStreamConfigLogRequest();
        request.setInlongGroupId(entity.getInlongGroupId());
        request.setInlongStreamId(entity.getInlongStreamId());
        request.setComponentName(ComponentTypeEnum.Agent.getName());
        request.setIp(entity.getAgentIp());
        request.setConfigName("DataSource:" + entity.getSourceName());
        request.setLogType(Integer.valueOf(1));
        request.setLogInfo(String.format("StreamSource=%s init failed, please check!", entity));
        request.setReportTime(new Date().getTime());
        this.streamConfigLogService.reportConfigLog(request);
    }

    private DataConfig getDataConfig(StreamSourceEntity entity, int op) {
        DataConfig dataConfig = new DataConfig();
        dataConfig.setIp(entity.getAgentIp());
        dataConfig.setUuid(entity.getUuid());
        dataConfig.setOp(String.valueOf(op));
        dataConfig.setTaskId(entity.getId());
        dataConfig.setTaskType(Integer.valueOf(this.getTaskType(entity)));
        dataConfig.setTaskName(entity.getSourceName());
        dataConfig.setSnapshot(entity.getSnapshot());
        dataConfig.setExtParams(entity.getExtParams());
        dataConfig.setVersion(entity.getVersion());
        String groupId = entity.getInlongGroupId();
        String streamId = entity.getInlongStreamId();
        dataConfig.setInlongGroupId(groupId);
        dataConfig.setInlongStreamId(streamId);
        InlongStreamEntity streamEntity = this.streamMapper.selectByIdentifier(groupId, streamId);
        if (streamEntity != null) {
            dataConfig.setSyncSend(streamEntity.getSyncSend());
        } else {
            dataConfig.setSyncSend(Integer.valueOf(0));
            LOGGER.warn("set syncSend=[0] as the stream not exists for groupId={}, streamId={}", (Object)groupId, (Object)streamId);
        }
        return dataConfig;
    }

    private int getTaskType(StreamSourceEntity sourceEntity) {
        SourceType sourceType = SourceType.forType((String)sourceEntity.getSourceType());
        return sourceType.getTaskType().getType();
    }

    private List<CmdConfig> getAgentCmdConfigs(TaskRequest taskRequest) {
        return this.sourceCmdConfigMapper.queryCmdByAgentIp(taskRequest.getAgentIp()).stream().map(cmd -> {
            CmdConfig cmdConfig = new CmdConfig();
            cmdConfig.setDataTime(cmd.getSpecifiedDataTime());
            cmdConfig.setOp(cmd.getCmdType());
            cmdConfig.setId(cmd.getId());
            cmdConfig.setTaskId(cmd.getTaskId());
            return cmdConfig;
        }).collect(Collectors.toList());
    }
}

