/*
 * Decompiled with CFR 0.152.
 */
package org.apache.dolphinscheduler.server.master.event;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.event.WorkflowEvent;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandleError;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventHandler;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
import org.apache.dolphinscheduler.server.master.event.WorkflowEventType;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.master.runner.WorkflowSubmitStatue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class WorkflowStartEventHandler
implements WorkflowEventHandler {
    private final Logger logger = LoggerFactory.getLogger(WorkflowStartEventHandler.class);
    @Autowired
    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
    @Autowired
    private StateWheelExecuteThread stateWheelExecuteThread;
    @Autowired
    private WorkflowExecuteThreadPool workflowExecuteThreadPool;
    @Autowired
    private WorkflowEventQueue workflowEventQueue;

    @Override
    public void handleWorkflowEvent(WorkflowEvent workflowEvent) throws WorkflowEventHandleError {
        this.logger.info("Handle workflow start event, begin to start a workflow, event: {}", (Object)workflowEvent);
        WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(workflowEvent.getWorkflowInstanceId());
        if (workflowExecuteRunnable == null) {
            throw new WorkflowEventHandleError("The workflow start event is invalid, cannot find the workflow instance from cache");
        }
        ProcessInstanceMetrics.incProcessInstanceByState("submit");
        ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance();
        CompletableFuture.supplyAsync(workflowExecuteRunnable::call, (Executor)((Object)this.workflowExecuteThreadPool)).thenAccept(workflowSubmitStatue -> {
            if (WorkflowSubmitStatue.SUCCESS == workflowSubmitStatue) {
                this.logger.info("Success submit the workflow instance");
                if (processInstance.getTimeout() > 0) {
                    this.stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
                }
            } else {
                this.logger.error("Failed to submit the workflow instance, will resend the workflow start event: {}", (Object)workflowEvent);
                this.workflowEventQueue.addEvent(workflowEvent);
            }
        });
    }

    @Override
    public WorkflowEventType getHandleWorkflowEventType() {
        return WorkflowEventType.START_WORKFLOW;
    }
}

