package org.apache.dolphinscheduler.server.master.runner;

import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import lombok.Generated;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.remote.command.task.TaskExecuteRunningMessageAck;
import org.apache.dolphinscheduler.remote.command.task.TaskExecuteStartMessage;
import org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.cache.StreamTaskInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.StateEventHandleError;
import org.apache.dolphinscheduler.server.master.event.StateEventHandleException;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.dispatcher.WorkerTaskDispatcher;
import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecutionContextFactory;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.class */
public class StreamTaskExecuteRunnable implements Runnable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamTaskExecuteRunnable.class);
    protected DefaultTaskExecuteRunnableFactory defaultTaskExecuteRunnableFactory;
    protected TaskDefinition taskDefinition;
    protected TaskInstance taskInstance;
    protected ProcessDefinition processDefinition;
    protected TaskExecuteStartMessage taskExecuteStartMessage;
    private final ConcurrentLinkedQueue<TaskEvent> taskEvents = new ConcurrentLinkedQueue<>();
    private TaskRunnableStatus taskRunnableStatus = TaskRunnableStatus.CREATED;
    protected ProcessService processService = (ProcessService) SpringApplicationContext.getBean(ProcessService.class);
    protected MasterConfig masterConfig = (MasterConfig) SpringApplicationContext.getBean(MasterConfig.class);
    protected WorkerTaskDispatcher workerTaskDispatcher = (WorkerTaskDispatcher) SpringApplicationContext.getBean(WorkerTaskDispatcher.class);
    protected TaskPluginManager taskPluginManager = (TaskPluginManager) SpringApplicationContext.getBean(TaskPluginManager.class);
    protected ProcessTaskRelationMapper processTaskRelationMapper = (ProcessTaskRelationMapper) SpringApplicationContext.getBean(ProcessTaskRelationMapper.class);
    protected TaskInstanceDao taskInstanceDao = (TaskInstanceDao) SpringApplicationContext.getBean(TaskInstanceDao.class);
    private StreamTaskInstanceExecCacheManager streamTaskInstanceExecCacheManager = (StreamTaskInstanceExecCacheManager) SpringApplicationContext.getBean(StreamTaskInstanceExecCacheManager.class);
    protected TaskExecutionContextFactory taskExecutionContextFactory = (TaskExecutionContextFactory) SpringApplicationContext.getBean(TaskExecutionContextFactory.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.dolphinscheduler.server.master.runner.StreamTaskExecuteRunnable$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$dolphinscheduler$plugin$task$api$enums$TaskExecutionStatus = new int[TaskExecutionStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$dolphinscheduler$plugin$task$api$enums$TaskExecutionStatus[TaskExecutionStatus.KILL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$plugin$task$api$enums$TaskExecutionStatus[TaskExecutionStatus.SUCCESS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$dolphinscheduler$plugin$task$api$enums$TaskExecutionStatus[TaskExecutionStatus.FAILURE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable$TaskRunnableStatus.class */
    public enum TaskRunnableStatus {
        CREATED,
        STARTED
    }

    public StreamTaskExecuteRunnable(TaskDefinition taskDefinition, TaskExecuteStartMessage taskExecuteStartMessage) {
        this.taskDefinition = taskDefinition;
        this.taskExecuteStartMessage = taskExecuteStartMessage;
    }

    public TaskInstance getTaskInstance() {
        return this.taskInstance;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.processService.updateTaskDefinitionResources(this.taskDefinition);
        this.taskInstance = newTaskInstance(this.taskDefinition);
        this.taskInstanceDao.upsertTaskInstance(this.taskInstance);
        this.streamTaskInstanceExecCacheManager.cache(this.taskInstance.getId().intValue(), this);
        List queryByTaskCode = this.processTaskRelationMapper.queryByTaskCode(this.taskDefinition.getCode());
        long processDefinitionCode = ((ProcessTaskRelation) queryByTaskCode.get(0)).getProcessDefinitionCode();
        this.processDefinition = this.processService.findProcessDefinition(Long.valueOf(processDefinitionCode), ((ProcessTaskRelation) queryByTaskCode.get(0)).getProcessDefinitionVersion());
        try {
            this.workerTaskDispatcher.dispatchTask(this.defaultTaskExecuteRunnableFactory.createTaskExecuteRunnable(this.taskInstance));
            this.taskRunnableStatus = TaskRunnableStatus.STARTED;
            log.info("Master success dispatch task to worker, taskInstanceName: {}, worker: {}", this.taskInstance.getId(), this.taskInstance.getHost());
        } catch (Exception e) {
            log.error("Master dispatch task to worker error, taskInstanceName: {}", this.taskInstance.getName(), e);
            this.taskInstance.setState(TaskExecutionStatus.FAILURE);
            this.taskInstanceDao.upsertTaskInstance(this.taskInstance);
        }
    }

    public boolean isStart() {
        return TaskRunnableStatus.STARTED == this.taskRunnableStatus;
    }

    public boolean addTaskEvent(TaskEvent taskEvent) {
        if (this.taskInstance.getId().intValue() != taskEvent.getTaskInstanceId()) {
            log.info("state event would be abounded, taskInstanceId:{}, eventType:{}, state:{}", new Object[]{Integer.valueOf(taskEvent.getTaskInstanceId()), taskEvent.getEvent(), taskEvent.getState()});
            return false;
        }
        this.taskEvents.add(taskEvent);
        return true;
    }

    public int eventSize() {
        return this.taskEvents.size();
    }

    public void handleEvents() {
        if (!isStart()) {
            log.info("The stream task instance is not started, will not handle its state event, current state event size: {}", Integer.valueOf(this.taskEvents.size()));
            return;
        }
        TaskEvent taskEvent = null;
        while (!this.taskEvents.isEmpty()) {
            try {
                try {
                    try {
                        try {
                            taskEvent = this.taskEvents.peek();
                            LogUtils.setTaskInstanceIdMDC(Integer.valueOf(taskEvent.getTaskInstanceId()));
                            log.info("Begin to handle state event, {}", taskEvent);
                            if (handleTaskEvent(taskEvent)) {
                                this.taskEvents.remove(taskEvent);
                            }
                            LogUtils.removeWorkflowAndTaskInstanceIdMDC();
                        } catch (Exception e) {
                            log.error("State event handle error, get a unknown exception, will retry this event: {}", taskEvent, e);
                            ThreadUtils.sleep(1000L);
                            LogUtils.removeWorkflowAndTaskInstanceIdMDC();
                        }
                    } catch (StateEventHandleException e2) {
                        log.error("State event handle error, will retry this event: {}", taskEvent, e2);
                        ThreadUtils.sleep(1000L);
                        LogUtils.removeWorkflowAndTaskInstanceIdMDC();
                    }
                } catch (StateEventHandleError e3) {
                    log.error("State event handle error, will remove this event: {}", taskEvent, e3);
                    this.taskEvents.remove(taskEvent);
                    ThreadUtils.sleep(1000L);
                    LogUtils.removeWorkflowAndTaskInstanceIdMDC();
                }
            } catch (Throwable th) {
                LogUtils.removeWorkflowAndTaskInstanceIdMDC();
                throw th;
            }
        }
    }

    public TaskInstance newTaskInstance(TaskDefinition taskDefinition) {
        TaskInstance taskInstance = new TaskInstance();
        taskInstance.setTaskCode(taskDefinition.getCode());
        taskInstance.setTaskDefinitionVersion(taskDefinition.getVersion());
        taskInstance.setName(taskDefinition.getName());
        taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
        taskInstance.setProcessInstanceId(0);
        taskInstance.setProjectCode(Long.valueOf(taskDefinition.getProjectCode()));
        taskInstance.setTaskType(taskDefinition.getTaskType().toUpperCase());
        taskInstance.setAlertFlag(Flag.NO);
        taskInstance.setStartTime((Date) null);
        taskInstance.setFlag(Flag.YES);
        taskInstance.setRetryTimes(0);
        taskInstance.setMaxRetryTimes(taskDefinition.getFailRetryTimes());
        taskInstance.setRetryInterval(taskDefinition.getFailRetryInterval());
        taskInstance.setTaskParams(taskDefinition.getTaskParams());
        taskInstance.setTaskGroupId(taskDefinition.getTaskGroupId());
        taskInstance.setTaskGroupPriority(taskDefinition.getTaskGroupPriority());
        taskInstance.setCpuQuota(taskDefinition.getCpuQuota());
        taskInstance.setMemoryMax(taskDefinition.getMemoryMax());
        taskInstance.setTaskInstancePriority(Priority.MEDIUM);
        if (taskDefinition.getTaskPriority() != null) {
            taskInstance.setTaskInstancePriority(taskDefinition.getTaskPriority());
        }
        taskInstance.setDelayTime(taskDefinition.getDelayTime());
        taskInstance.setDryRun(this.taskExecuteStartMessage.getDryRun());
        taskInstance.setWorkerGroup(StringUtils.isBlank(taskDefinition.getWorkerGroup()) ? "default" : taskDefinition.getWorkerGroup());
        taskInstance.setEnvironmentCode(Long.valueOf(taskDefinition.getEnvironmentCode() == 0 ? -1L : taskDefinition.getEnvironmentCode()));
        if (!taskInstance.getEnvironmentCode().equals(-1L)) {
            Environment findEnvironmentByCode = this.processService.findEnvironmentByCode(taskInstance.getEnvironmentCode());
            if (Objects.nonNull(findEnvironmentByCode) && StringUtils.isNotEmpty(findEnvironmentByCode.getConfig())) {
                taskInstance.setEnvironmentConfig(findEnvironmentByCode.getConfig());
            }
        }
        if (taskInstance.getSubmitTime() == null) {
            taskInstance.setSubmitTime(new Date());
        }
        if (taskInstance.getFirstSubmitTime() == null) {
            taskInstance.setFirstSubmitTime(taskInstance.getSubmitTime());
        }
        taskInstance.setTaskExecuteType(taskDefinition.getTaskExecuteType());
        taskInstance.setExecutorId(this.taskExecuteStartMessage.getExecutorId());
        taskInstance.setExecutorName(this.taskExecuteStartMessage.getExecutorName());
        return taskInstance;
    }

    protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance) {
        String tenantForProcess = this.processService.getTenantForProcess(this.taskExecuteStartMessage.getTenantCode(), this.taskDefinition == null ? 0 : this.taskDefinition.getUserId());
        if (StringUtils.isBlank(tenantForProcess)) {
            log.error("tenant not exists,task instance id : {}", taskInstance.getId());
            return null;
        }
        taskInstance.setResources(getResourceFullNames(taskInstance));
        TaskExecutionContext create = TaskExecutionContextBuilder.get().buildWorkflowInstanceHost(this.masterConfig.getMasterAddress()).buildTaskInstanceRelatedInfo(taskInstance).buildTaskDefinitionRelatedInfo(this.taskDefinition).buildResourceParametersInfo(this.taskPluginManager.getTaskChannel(taskInstance.getTaskType()).getResources(taskInstance.getTaskParams())).buildBusinessParamsMap(new HashMap()).buildParamInfo(paramParsingPreparation(taskInstance, this.taskPluginManager.getParameters(ParametersNode.builder().taskType(taskInstance.getTaskType()).taskParams(taskInstance.getTaskParams()).build()))).create();
        create.setTenantCode(tenantForProcess);
        create.setProjectCode(this.processDefinition.getProjectCode());
        create.setProcessDefineCode(Long.valueOf(this.processDefinition.getCode()));
        create.setProcessDefineVersion(this.processDefinition.getVersion());
        create.setProcessInstanceId(0);
        this.taskExecutionContextFactory.setDataQualityTaskExecutionContext(create, taskInstance, tenantForProcess);
        this.taskExecutionContextFactory.setK8sTaskRelatedInfo(create, taskInstance);
        return create;
    }

    protected Map<String, String> getResourceFullNames(TaskInstance taskInstance) {
        HashMap hashMap = new HashMap();
        AbstractParameters parameters = this.taskPluginManager.getParameters(ParametersNode.builder().taskType(taskInstance.getTaskType()).taskParams(taskInstance.getTaskParams()).build());
        if (parameters != null) {
            List resourceFilesList = parameters.getResourceFilesList();
            if (CollectionUtils.isNotEmpty(resourceFilesList)) {
                Set set = (Set) resourceFilesList.stream().filter(resourceInfo -> {
                    return resourceInfo.getId() == null;
                }).collect(Collectors.toSet());
                if (CollectionUtils.isNotEmpty(set)) {
                    set.forEach(resourceInfo2 -> {
                    });
                }
                Set set2 = (Set) resourceFilesList.stream().map((v0) -> {
                    return v0.getId();
                }).collect(Collectors.toSet());
                if (CollectionUtils.isNotEmpty(set2)) {
                    this.processService.listResourceByIds((Integer[]) set2.toArray(new Integer[set2.size()])).forEach(resource -> {
                    });
                }
            }
        }
        return hashMap;
    }

    protected boolean handleTaskEvent(TaskEvent taskEvent) throws StateEventHandleException, StateEventHandleError {
        measureTaskState(taskEvent);
        if (this.taskInstance.getState() == null) {
            throw new StateEventHandleError("Task state event handle error due to task state is null");
        }
        this.taskInstance.setStartTime(taskEvent.getStartTime());
        this.taskInstance.setHost(taskEvent.getWorkerAddress());
        this.taskInstance.setLogPath(taskEvent.getLogPath());
        this.taskInstance.setExecutePath(taskEvent.getExecutePath());
        this.taskInstance.setPid(taskEvent.getProcessId());
        this.taskInstance.setAppLink(taskEvent.getAppIds());
        this.taskInstance.setState(taskEvent.getState());
        this.taskInstance.setEndTime(taskEvent.getEndTime());
        this.taskInstance.setVarPool(taskEvent.getVarPool());
        this.processService.changeOutParam(this.taskInstance);
        this.taskInstanceDao.updateById(this.taskInstance);
        sendAckToWorker(taskEvent);
        if (!this.taskInstance.getState().isFinished()) {
            return true;
        }
        this.streamTaskInstanceExecCacheManager.removeByTaskInstanceId(this.taskInstance.getId().intValue());
        log.info("The stream task instance is finish, taskInstanceId:{}, state:{}", this.taskInstance.getId(), taskEvent.getState());
        return true;
    }

    private void measureTaskState(TaskEvent taskEvent) {
        if (taskEvent == null || taskEvent.getState() == null) {
            log.warn("The task event is broken..., taskEvent: {}", taskEvent);
            return;
        }
        if (taskEvent.getState().isFinished()) {
            TaskMetrics.incTaskInstanceByState("finish");
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$dolphinscheduler$plugin$task$api$enums$TaskExecutionStatus[taskEvent.getState().ordinal()]) {
            case 1:
                TaskMetrics.incTaskInstanceByState("stop");
                return;
            case 2:
                TaskMetrics.incTaskInstanceByState("success");
                return;
            case 3:
                TaskMetrics.incTaskInstanceByState("fail");
                return;
            default:
                return;
        }
    }

    public Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskInstance, @NonNull AbstractParameters abstractParameters) {
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        if (abstractParameters == null) {
            throw new NullPointerException("parameters is marked non-null but is null");
        }
        Map<String, Property> userDefParamsMap = ParameterUtils.getUserDefParamsMap(this.taskExecuteStartMessage.getStartParams());
        Map<? extends String, ? extends Property> inputLocalParametersMap = abstractParameters.getInputLocalParametersMap();
        abstractParameters.setVarPool(taskInstance.getVarPool());
        Map<? extends String, ? extends Property> varPoolMap = abstractParameters.getVarPoolMap();
        if (userDefParamsMap.isEmpty() && inputLocalParametersMap.isEmpty() && varPoolMap.isEmpty()) {
            return null;
        }
        if (varPoolMap.size() != 0) {
            userDefParamsMap.putAll(varPoolMap);
        }
        if (inputLocalParametersMap.size() != 0) {
            userDefParamsMap.putAll(inputLocalParametersMap);
        }
        return userDefParamsMap;
    }

    private void sendAckToWorker(TaskEvent taskEvent) {
        taskEvent.getChannel().writeAndFlush(new TaskExecuteRunningMessageAck(true, taskEvent.getTaskInstanceId(), this.masterConfig.getMasterAddress(), taskEvent.getWorkerAddress(), System.currentTimeMillis()).convert2Command());
    }
}
