/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.runner;

import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.remote.command.task.TaskExecuteRunningMessageAck;
import org.apache.dolphinscheduler.remote.command.task.TaskExecuteStartMessage;
import org.apache.dolphinscheduler.server.master.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.cache.StreamTaskInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.event.StateEventHandleError;
import org.apache.dolphinscheduler.server.master.event.StateEventHandleException;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.dispatcher.WorkerTaskDispatcher;
import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecutionContextFactory;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamTaskExecuteRunnable
implements Runnable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamTaskExecuteRunnable.class);
    protected MasterConfig masterConfig;
    protected ProcessService processService;
    protected TaskInstanceDao taskInstanceDao;
    protected DefaultTaskExecuteRunnableFactory defaultTaskExecuteRunnableFactory;
    protected WorkerTaskDispatcher workerTaskDispatcher;
    protected ProcessTaskRelationMapper processTaskRelationMapper;
    protected TaskPluginManager taskPluginManager;
    private StreamTaskInstanceExecCacheManager streamTaskInstanceExecCacheManager;
    protected TaskDefinition taskDefinition;
    protected TaskInstance taskInstance;
    protected ProcessDefinition processDefinition;
    protected TaskExecuteStartMessage taskExecuteStartMessage;
    protected TaskExecutionContextFactory taskExecutionContextFactory;
    private final ConcurrentLinkedQueue<TaskEvent> taskEvents = new ConcurrentLinkedQueue();
    private TaskRunnableStatus taskRunnableStatus = TaskRunnableStatus.CREATED;

    public StreamTaskExecuteRunnable(TaskDefinition taskDefinition, TaskExecuteStartMessage taskExecuteStartMessage) {
        this.processService = (ProcessService)SpringApplicationContext.getBean(ProcessService.class);
        this.masterConfig = (MasterConfig)SpringApplicationContext.getBean(MasterConfig.class);
        this.workerTaskDispatcher = (WorkerTaskDispatcher)SpringApplicationContext.getBean(WorkerTaskDispatcher.class);
        this.taskPluginManager = (TaskPluginManager)SpringApplicationContext.getBean(TaskPluginManager.class);
        this.processTaskRelationMapper = (ProcessTaskRelationMapper)SpringApplicationContext.getBean(ProcessTaskRelationMapper.class);
        this.taskInstanceDao = (TaskInstanceDao)SpringApplicationContext.getBean(TaskInstanceDao.class);
        this.streamTaskInstanceExecCacheManager = (StreamTaskInstanceExecCacheManager)SpringApplicationContext.getBean(StreamTaskInstanceExecCacheManager.class);
        this.taskDefinition = taskDefinition;
        this.taskExecuteStartMessage = taskExecuteStartMessage;
        this.taskExecutionContextFactory = (TaskExecutionContextFactory)SpringApplicationContext.getBean(TaskExecutionContextFactory.class);
    }

    public TaskInstance getTaskInstance() {
        return this.taskInstance;
    }

    @Override
    public void run() {
        this.processService.updateTaskDefinitionResources(this.taskDefinition);
        this.taskInstance = this.newTaskInstance(this.taskDefinition);
        this.taskInstanceDao.upsertTaskInstance(this.taskInstance);
        this.streamTaskInstanceExecCacheManager.cache(this.taskInstance.getId(), this);
        List processTaskRelationList = this.processTaskRelationMapper.queryByTaskCode(this.taskDefinition.getCode());
        long processDefinitionCode = ((ProcessTaskRelation)processTaskRelationList.get(0)).getProcessDefinitionCode();
        int processDefinitionVersion = ((ProcessTaskRelation)processTaskRelationList.get(0)).getProcessDefinitionVersion();
        this.processDefinition = this.processService.findProcessDefinition(Long.valueOf(processDefinitionCode), processDefinitionVersion);
        try {
            DefaultTaskExecuteRunnable taskExecuteRunnable = this.defaultTaskExecuteRunnableFactory.createTaskExecuteRunnable(this.taskInstance);
            this.workerTaskDispatcher.dispatchTask(taskExecuteRunnable);
        }
        catch (Exception e) {
            log.error("Master dispatch task to worker error, taskInstanceName: {}", (Object)this.taskInstance.getName(), (Object)e);
            this.taskInstance.setState(TaskExecutionStatus.FAILURE);
            this.taskInstanceDao.upsertTaskInstance(this.taskInstance);
            return;
        }
        this.taskRunnableStatus = TaskRunnableStatus.STARTED;
        log.info("Master success dispatch task to worker, taskInstanceName: {}, worker: {}", (Object)this.taskInstance.getId(), (Object)this.taskInstance.getHost());
    }

    public boolean isStart() {
        return TaskRunnableStatus.STARTED == this.taskRunnableStatus;
    }

    public boolean addTaskEvent(TaskEvent taskEvent) {
        if (this.taskInstance.getId().intValue() != taskEvent.getTaskInstanceId()) {
            log.info("state event would be abounded, taskInstanceId:{}, eventType:{}, state:{}", new Object[]{taskEvent.getTaskInstanceId(), taskEvent.getEvent(), taskEvent.getState()});
            return false;
        }
        this.taskEvents.add(taskEvent);
        return true;
    }

    public int eventSize() {
        return this.taskEvents.size();
    }

    public void handleEvents() {
        if (!this.isStart()) {
            log.info("The stream task instance is not started, will not handle its state event, current state event size: {}", (Object)this.taskEvents.size());
            return;
        }
        TaskEvent taskEvent = null;
        while (!this.taskEvents.isEmpty()) {
            try {
                taskEvent = this.taskEvents.peek();
                LogUtils.setTaskInstanceIdMDC((Integer)taskEvent.getTaskInstanceId());
                log.info("Begin to handle state event, {}", (Object)taskEvent);
                if (!this.handleTaskEvent(taskEvent)) continue;
                this.taskEvents.remove(taskEvent);
            }
            catch (StateEventHandleError stateEventHandleError) {
                log.error("State event handle error, will remove this event: {}", (Object)taskEvent, (Object)stateEventHandleError);
                this.taskEvents.remove(taskEvent);
                ThreadUtils.sleep((long)1000L);
            }
            catch (StateEventHandleException stateEventHandleException) {
                log.error("State event handle error, will retry this event: {}", (Object)taskEvent, (Object)stateEventHandleException);
                ThreadUtils.sleep((long)1000L);
            }
            catch (Exception e) {
                log.error("State event handle error, get a unknown exception, will retry this event: {}", (Object)taskEvent, (Object)e);
                ThreadUtils.sleep((long)1000L);
            }
            finally {
                LogUtils.removeWorkflowAndTaskInstanceIdMDC();
            }
        }
    }

    public TaskInstance newTaskInstance(TaskDefinition taskDefinition) {
        Environment environment;
        TaskInstance taskInstance = new TaskInstance();
        taskInstance.setTaskCode(taskDefinition.getCode());
        taskInstance.setTaskDefinitionVersion(taskDefinition.getVersion());
        taskInstance.setName(taskDefinition.getName());
        taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS);
        taskInstance.setProcessInstanceId(0);
        taskInstance.setProjectCode(Long.valueOf(taskDefinition.getProjectCode()));
        taskInstance.setTaskType(taskDefinition.getTaskType().toUpperCase());
        taskInstance.setAlertFlag(Flag.NO);
        taskInstance.setStartTime(null);
        taskInstance.setFlag(Flag.YES);
        taskInstance.setRetryTimes(0);
        taskInstance.setMaxRetryTimes(taskDefinition.getFailRetryTimes());
        taskInstance.setRetryInterval(taskDefinition.getFailRetryInterval());
        taskInstance.setTaskParams(taskDefinition.getTaskParams());
        taskInstance.setTaskGroupId(taskDefinition.getTaskGroupId());
        taskInstance.setTaskGroupPriority(taskDefinition.getTaskGroupPriority());
        taskInstance.setCpuQuota(taskDefinition.getCpuQuota());
        taskInstance.setMemoryMax(taskDefinition.getMemoryMax());
        taskInstance.setTaskInstancePriority(Priority.MEDIUM);
        if (taskDefinition.getTaskPriority() != null) {
            taskInstance.setTaskInstancePriority(taskDefinition.getTaskPriority());
        }
        taskInstance.setDelayTime(taskDefinition.getDelayTime());
        taskInstance.setDryRun(this.taskExecuteStartMessage.getDryRun());
        taskInstance.setWorkerGroup(StringUtils.isBlank((CharSequence)taskDefinition.getWorkerGroup()) ? "default" : taskDefinition.getWorkerGroup());
        taskInstance.setEnvironmentCode(Long.valueOf(taskDefinition.getEnvironmentCode() == 0L ? -1L : taskDefinition.getEnvironmentCode()));
        if (!taskInstance.getEnvironmentCode().equals(-1L) && Objects.nonNull(environment = this.processService.findEnvironmentByCode(taskInstance.getEnvironmentCode())) && StringUtils.isNotEmpty((CharSequence)environment.getConfig())) {
            taskInstance.setEnvironmentConfig(environment.getConfig());
        }
        if (taskInstance.getSubmitTime() == null) {
            taskInstance.setSubmitTime(new Date());
        }
        if (taskInstance.getFirstSubmitTime() == null) {
            taskInstance.setFirstSubmitTime(taskInstance.getSubmitTime());
        }
        taskInstance.setTaskExecuteType(taskDefinition.getTaskExecuteType());
        taskInstance.setExecutorId(this.taskExecuteStartMessage.getExecutorId());
        taskInstance.setExecutorName(this.taskExecuteStartMessage.getExecutorName());
        return taskInstance;
    }

    protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance) {
        int userId = this.taskDefinition == null ? 0 : this.taskDefinition.getUserId();
        String tenantCode = this.processService.getTenantForProcess(this.taskExecuteStartMessage.getTenantCode(), userId);
        if (StringUtils.isBlank((CharSequence)tenantCode)) {
            log.error("tenant not exists,task instance id : {}", (Object)taskInstance.getId());
            return null;
        }
        taskInstance.setResources(this.getResourceFullNames(taskInstance));
        TaskChannel taskChannel = this.taskPluginManager.getTaskChannel(taskInstance.getTaskType());
        ResourceParametersHelper resources = taskChannel.getResources(taskInstance.getTaskParams());
        AbstractParameters baseParam = this.taskPluginManager.getParameters(ParametersNode.builder().taskType(taskInstance.getTaskType()).taskParams(taskInstance.getTaskParams()).build());
        Map<String, Property> propertyMap = this.paramParsingPreparation(taskInstance, baseParam);
        TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get().buildWorkflowInstanceHost(this.masterConfig.getMasterAddress()).buildTaskInstanceRelatedInfo(taskInstance).buildTaskDefinitionRelatedInfo(this.taskDefinition).buildResourceParametersInfo(resources).buildBusinessParamsMap(new HashMap<String, Property>()).buildParamInfo(propertyMap).create();
        taskExecutionContext.setTenantCode(tenantCode);
        taskExecutionContext.setProjectCode(this.processDefinition.getProjectCode());
        taskExecutionContext.setProcessDefineCode(Long.valueOf(this.processDefinition.getCode()));
        taskExecutionContext.setProcessDefineVersion(this.processDefinition.getVersion());
        taskExecutionContext.setProcessInstanceId(0);
        this.taskExecutionContextFactory.setDataQualityTaskExecutionContext(taskExecutionContext, taskInstance, tenantCode);
        this.taskExecutionContextFactory.setK8sTaskRelatedInfo(taskExecutionContext, taskInstance);
        return taskExecutionContext;
    }

    protected Map<String, String> getResourceFullNames(TaskInstance taskInstance) {
        List projectResourceFiles;
        HashMap<String, String> resourcesMap = new HashMap<String, String>();
        AbstractParameters baseParam = this.taskPluginManager.getParameters(ParametersNode.builder().taskType(taskInstance.getTaskType()).taskParams(taskInstance.getTaskParams()).build());
        if (baseParam != null && CollectionUtils.isNotEmpty((Collection)(projectResourceFiles = baseParam.getResourceFilesList()))) {
            Stream<Integer> resourceIdStream;
            Set<Integer> resourceIdsSet;
            Set<ResourceInfo> oldVersionResources = projectResourceFiles.stream().filter(t -> t.getId() == null).collect(Collectors.toSet());
            if (CollectionUtils.isNotEmpty(oldVersionResources)) {
                oldVersionResources.forEach(t -> resourcesMap.put(t.getRes(), this.processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE)));
            }
            if (CollectionUtils.isNotEmpty(resourceIdsSet = (resourceIdStream = projectResourceFiles.stream().map(ResourceInfo::getId)).collect(Collectors.toSet()))) {
                Integer[] resourceIds = resourceIdsSet.toArray(new Integer[resourceIdsSet.size()]);
                List resources = this.processService.listResourceByIds(resourceIds);
                resources.forEach(t -> resourcesMap.put(t.getFullName(), this.processService.queryTenantCodeByResName(t.getFullName(), ResourceType.FILE)));
            }
        }
        return resourcesMap;
    }

    protected boolean handleTaskEvent(TaskEvent taskEvent) throws StateEventHandleException, StateEventHandleError {
        this.measureTaskState(taskEvent);
        if (this.taskInstance.getState() == null) {
            throw new StateEventHandleError("Task state event handle error due to task state is null");
        }
        this.taskInstance.setStartTime(taskEvent.getStartTime());
        this.taskInstance.setHost(taskEvent.getWorkerAddress());
        this.taskInstance.setLogPath(taskEvent.getLogPath());
        this.taskInstance.setExecutePath(taskEvent.getExecutePath());
        this.taskInstance.setPid(taskEvent.getProcessId());
        this.taskInstance.setAppLink(taskEvent.getAppIds());
        this.taskInstance.setState(taskEvent.getState());
        this.taskInstance.setEndTime(taskEvent.getEndTime());
        this.taskInstance.setVarPool(taskEvent.getVarPool());
        this.processService.changeOutParam(this.taskInstance);
        this.taskInstanceDao.updateById((Object)this.taskInstance);
        this.sendAckToWorker(taskEvent);
        if (this.taskInstance.getState().isFinished()) {
            this.streamTaskInstanceExecCacheManager.removeByTaskInstanceId(this.taskInstance.getId());
            log.info("The stream task instance is finish, taskInstanceId:{}, state:{}", (Object)this.taskInstance.getId(), (Object)taskEvent.getState());
        }
        return true;
    }

    private void measureTaskState(TaskEvent taskEvent) {
        if (taskEvent == null || taskEvent.getState() == null) {
            log.warn("The task event is broken..., taskEvent: {}", (Object)taskEvent);
            return;
        }
        if (taskEvent.getState().isFinished()) {
            TaskMetrics.incTaskInstanceByState("finish");
        }
        switch (taskEvent.getState()) {
            case KILL: {
                TaskMetrics.incTaskInstanceByState("stop");
                break;
            }
            case SUCCESS: {
                TaskMetrics.incTaskInstanceByState("success");
                break;
            }
            case FAILURE: {
                TaskMetrics.incTaskInstanceByState("fail");
                break;
            }
        }
    }

    public Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskInstance, @NonNull AbstractParameters parameters) {
        if (taskInstance == null) {
            throw new NullPointerException("taskInstance is marked non-null but is null");
        }
        if (parameters == null) {
            throw new NullPointerException("parameters is marked non-null but is null");
        }
        Map globalParamsMap = this.taskExecuteStartMessage.getStartParams();
        Map globalParams = ParameterUtils.getUserDefParamsMap((Map)globalParamsMap);
        Map localParams = parameters.getInputLocalParametersMap();
        parameters.setVarPool(taskInstance.getVarPool());
        Map varParams = parameters.getVarPoolMap();
        if (globalParams.isEmpty() && localParams.isEmpty() && varParams.isEmpty()) {
            return null;
        }
        if (varParams.size() != 0) {
            globalParams.putAll(varParams);
        }
        if (localParams.size() != 0) {
            globalParams.putAll(localParams);
        }
        return globalParams;
    }

    private void sendAckToWorker(TaskEvent taskEvent) {
        TaskExecuteRunningMessageAck taskExecuteRunningMessageAck = new TaskExecuteRunningMessageAck(true, taskEvent.getTaskInstanceId(), this.masterConfig.getMasterAddress(), taskEvent.getWorkerAddress(), System.currentTimeMillis());
        taskEvent.getChannel().writeAndFlush((Object)taskExecuteRunningMessageAck.convert2Command());
    }

    private static enum TaskRunnableStatus {
        CREATED,
        STARTED;

    }
}

