/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.consumer;

import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.EnumUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskPriority;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TaskPriorityQueueConsumer
extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(TaskPriorityQueueConsumer.class);
    @Autowired
    private TaskPriorityQueue taskPriorityQueue;
    @Autowired
    private ProcessService processService;
    @Autowired
    private ExecutorDispatcher dispatcher;
    @Autowired
    private MasterConfig masterConfig;

    @PostConstruct
    public void init() {
        super.setName("TaskUpdateQueueConsumerThread");
        super.start();
    }

    @Override
    public void run() {
        ArrayList<String> failedDispatchTasks = new ArrayList<String>();
        while (Stopper.isRunning()) {
            try {
                int fetchTaskNum = this.masterConfig.getMasterDispatchTaskNumber();
                failedDispatchTasks.clear();
                for (int i = 0; i < fetchTaskNum; ++i) {
                    if (this.taskPriorityQueue.size() <= 0) {
                        Thread.sleep(1000L);
                        continue;
                    }
                    String taskPriorityInfo = this.taskPriorityQueue.take();
                    TaskPriority taskPriority = TaskPriority.of(taskPriorityInfo);
                    boolean dispatchResult = this.dispatch(taskPriority.getTaskId());
                    if (dispatchResult) continue;
                    failedDispatchTasks.add(taskPriorityInfo);
                }
                for (String dispatchFailedTask : failedDispatchTasks) {
                    this.taskPriorityQueue.put(dispatchFailedTask);
                }
            }
            catch (Exception e) {
                logger.error("dispatcher task error", (Throwable)e);
            }
        }
    }

    private boolean dispatch(int taskInstanceId) {
        boolean result = false;
        try {
            TaskExecutionContext context = this.getTaskExecutionContext(taskInstanceId);
            ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
            if (this.taskInstanceIsFinalState(taskInstanceId).booleanValue()) {
                return true;
            }
            result = this.dispatcher.dispatch(executionContext);
        }
        catch (ExecuteException e) {
            logger.error("dispatch error", (Throwable)e);
        }
        return result;
    }

    public Boolean taskInstanceIsFinalState(int taskInstanceId) {
        TaskInstance taskInstance = this.processService.findTaskInstanceById(Integer.valueOf(taskInstanceId));
        return taskInstance.getState().typeIsFinished();
    }

    protected TaskExecutionContext getTaskExecutionContext(int taskInstanceId) {
        TaskInstance taskInstance = this.processService.getTaskInstanceDetailByTaskId(taskInstanceId);
        TaskType taskType = TaskType.valueOf((String)taskInstance.getTaskType());
        TaskNode taskNode = (TaskNode)JSONObject.parseObject((String)taskInstance.getTaskJson(), TaskNode.class);
        Integer userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
        Tenant tenant = this.processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId.intValue());
        if (this.verifyTenantIsNull(tenant, taskInstance)) {
            this.processService.changeTaskState(ExecutionStatus.FAILURE, taskInstance.getStartTime(), taskInstance.getHost(), null, null, taskInstance.getId());
            return null;
        }
        String userQueue = this.processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
        taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty((CharSequence)userQueue) ? tenant.getQueue() : userQueue);
        taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
        taskInstance.setExecutePath(this.getExecLocalPath(taskInstance));
        taskInstance.setResources(this.getResourceFullNames(taskNode));
        SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
        DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
        ProcedureTaskExecutionContext procedureTaskExecutionContext = new ProcedureTaskExecutionContext();
        SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext();
        if (taskType == TaskType.SQL) {
            this.setSQLTaskRelation(sqlTaskExecutionContext, taskNode);
        }
        if (taskType == TaskType.DATAX) {
            this.setDataxTaskRelation(dataxTaskExecutionContext, taskNode);
        }
        if (taskType == TaskType.PROCEDURE) {
            this.setProcedureTaskRelation(procedureTaskExecutionContext, taskNode);
        }
        if (taskType == TaskType.SQOOP) {
            this.setSqoopTaskRelation(sqoopTaskExecutionContext, taskNode);
        }
        return TaskExecutionContextBuilder.get().buildTaskInstanceRelatedInfo(taskInstance).buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance()).buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine()).buildSQLTaskRelatedInfo(sqlTaskExecutionContext).buildDataxTaskRelatedInfo(dataxTaskExecutionContext).buildProcedureTaskRelatedInfo(procedureTaskExecutionContext).buildSqoopTaskRelatedInfo(sqoopTaskExecutionContext).create();
    }

    private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskNode taskNode) {
        ProcedureParameters procedureParameters = (ProcedureParameters)JSONObject.parseObject((String)taskNode.getParams(), ProcedureParameters.class);
        int datasourceId = procedureParameters.getDatasource();
        DataSource datasource = this.processService.findDataSourceById(datasourceId);
        procedureTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
    }

    private void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskNode taskNode) {
        DataxParameters dataxParameters = (DataxParameters)JSONObject.parseObject((String)taskNode.getParams(), DataxParameters.class);
        DataSource dataSource = this.processService.findDataSourceById(dataxParameters.getDataSource());
        DataSource dataTarget = this.processService.findDataSourceById(dataxParameters.getDataTarget());
        if (dataSource != null) {
            dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
            dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
            dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
        }
        if (dataTarget != null) {
            dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
            dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
            dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
        }
    }

    private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) {
        SqoopParameters sqoopParameters = (SqoopParameters)JSONObject.parseObject((String)taskNode.getParams(), SqoopParameters.class);
        SourceMysqlParameter sourceMysqlParameter = (SourceMysqlParameter)JSONUtils.parseObject((String)sqoopParameters.getSourceParams(), SourceMysqlParameter.class);
        TargetMysqlParameter targetMysqlParameter = (TargetMysqlParameter)JSONUtils.parseObject((String)sqoopParameters.getTargetParams(), TargetMysqlParameter.class);
        DataSource dataSource = this.processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
        DataSource dataTarget = this.processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
        if (dataSource != null) {
            sqoopTaskExecutionContext.setDataSourceId(dataSource.getId());
            sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
            sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
        }
        if (dataTarget != null) {
            sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId());
            sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
            sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
        }
    }

    private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskNode taskNode) {
        boolean udfTypeFlag;
        SqlParameters sqlParameters = (SqlParameters)JSONObject.parseObject((String)taskNode.getParams(), SqlParameters.class);
        int datasourceId = sqlParameters.getDatasource();
        DataSource datasource = this.processService.findDataSourceById(datasourceId);
        sqlTaskExecutionContext.setConnectionParams(datasource.getConnectionParams());
        boolean bl = udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, (String)sqlParameters.getType()) && StringUtils.isNotEmpty((CharSequence)sqlParameters.getUdfs());
        if (udfTypeFlag) {
            String[] udfFunIds = sqlParameters.getUdfs().split(",");
            int[] udfFunIdsArray = new int[udfFunIds.length];
            for (int i = 0; i < udfFunIds.length; ++i) {
                udfFunIdsArray[i] = Integer.parseInt(udfFunIds[i]);
            }
            List udfFuncList = this.processService.queryUdfFunListByids(udfFunIdsArray);
            HashMap<UdfFunc, String> udfFuncMap = new HashMap<UdfFunc, String>();
            for (UdfFunc udfFunc : udfFuncList) {
                String tenantCode = this.processService.queryTenantCodeByResName(udfFunc.getResourceName(), ResourceType.UDF);
                udfFuncMap.put(udfFunc, tenantCode);
            }
            sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncMap);
        }
    }

    private String getExecLocalPath(TaskInstance taskInstance) {
        return FileUtils.getProcessExecDir((int)taskInstance.getProcessDefine().getProjectId(), (int)taskInstance.getProcessDefine().getId(), (int)taskInstance.getProcessInstance().getId(), (int)taskInstance.getId());
    }

    private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
        if (tenant == null) {
            logger.error("tenant not exists,process instance id : {},task instance id : {}", (Object)taskInstance.getProcessInstance().getId(), (Object)taskInstance.getId());
            return true;
        }
        return false;
    }

    private Map<String, String> getResourceFullNames(TaskNode taskNode) {
        List projectResourceFiles;
        HashMap<String, String> resourceMap = new HashMap<String, String>();
        AbstractParameters baseParam = TaskParametersUtils.getParameters((String)taskNode.getType(), (String)taskNode.getParams());
        if (baseParam != null && CollectionUtils.isNotEmpty((Collection)(projectResourceFiles = baseParam.getResourceFilesList()))) {
            Stream<Integer> resourceIdStream;
            Set<Integer> resourceIdsSet;
            Set<ResourceInfo> oldVersionResources = projectResourceFiles.stream().filter(t -> t.getId() == 0).collect(Collectors.toSet());
            if (CollectionUtils.isNotEmpty(oldVersionResources)) {
                oldVersionResources.forEach(t -> resourceMap.put(t.getRes(), this.processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)));
            }
            if (CollectionUtils.isNotEmpty(resourceIdsSet = (resourceIdStream = projectResourceFiles.stream().map(resourceInfo -> resourceInfo.getId())).filter(resId -> resId != 0).collect(Collectors.toSet()))) {
                Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]);
                List resources = this.processService.listResourceByIds(resourceIds);
                resources.forEach(t -> resourceMap.put(t.getFullName(), this.processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)));
            }
        }
        return resourceMap;
    }
}

