package com.netflix.conductor.core.execution;

import com.netflix.conductor.annotations.Trace;
import com.netflix.conductor.annotations.VisibleForTesting;
import com.netflix.conductor.common.metadata.tasks.PollData;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest;
import com.netflix.conductor.common.metadata.workflow.SkipTaskRequest;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.utils.TaskUtils;
import com.netflix.conductor.core.WorkflowContext;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.dal.ExecutionDAOFacade;
import com.netflix.conductor.core.event.WorkflowCreationEvent;
import com.netflix.conductor.core.event.WorkflowEvaluationEvent;
import com.netflix.conductor.core.exception.ConflictException;
import com.netflix.conductor.core.exception.NonTransientException;
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.exception.TransientException;
import com.netflix.conductor.core.execution.DeciderService;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.execution.tasks.Terminate;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
import com.netflix.conductor.core.metadata.MetadataMapperService;
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.core.utils.ParametersUtils;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.core.utils.Utils;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;
import com.netflix.conductor.service.ExecutionLockService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Trace
@Component
/* loaded from: input_file:com/netflix/conductor/core/execution/WorkflowExecutor.class */
public class WorkflowExecutor {
    private static final int EXPEDITED_PRIORITY = 10;
    private final MetadataDAO metadataDAO;
    private final QueueDAO queueDAO;
    private final DeciderService deciderService;
    private final ConductorProperties properties;
    private final MetadataMapperService metadataMapperService;
    private final ExecutionDAOFacade executionDAOFacade;
    private final ParametersUtils parametersUtils;
    private final IDGenerator idGenerator;
    private final WorkflowStatusListener workflowStatusListener;
    private final SystemTaskRegistry systemTaskRegistry;
    private final ApplicationEventPublisher eventPublisher;
    private long activeWorkerLastPollMs;
    private final ExecutionLockService executionLockService;
    private final Predicate<PollData> validateLastPolledTime = pollData -> {
        return pollData.getLastPollTime() > System.currentTimeMillis() - this.activeWorkerLastPollMs;
    };
    private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowExecutor.class);
    private static final String CLASS_NAME = WorkflowExecutor.class.getSimpleName();
    private static final Predicate<TaskModel> UNSUCCESSFUL_TERMINAL_TASK = taskModel -> {
        return !taskModel.getStatus().isSuccessful() && taskModel.getStatus().isTerminal();
    };
    private static final Predicate<TaskModel> UNSUCCESSFUL_JOIN_TASK = UNSUCCESSFUL_TERMINAL_TASK.and(taskModel -> {
        return "JOIN".equals(taskModel.getTaskType());
    });
    private static final Predicate<TaskModel> NON_TERMINAL_TASK = taskModel -> {
        return !taskModel.getStatus().isTerminal();
    };

    public WorkflowExecutor(DeciderService deciderService, MetadataDAO metadataDAO, QueueDAO queueDAO, MetadataMapperService metadataMapperService, WorkflowStatusListener workflowStatusListener, ExecutionDAOFacade executionDAOFacade, ConductorProperties conductorProperties, ExecutionLockService executionLockService, SystemTaskRegistry systemTaskRegistry, ParametersUtils parametersUtils, IDGenerator iDGenerator, ApplicationEventPublisher applicationEventPublisher) {
        this.deciderService = deciderService;
        this.metadataDAO = metadataDAO;
        this.queueDAO = queueDAO;
        this.properties = conductorProperties;
        this.metadataMapperService = metadataMapperService;
        this.executionDAOFacade = executionDAOFacade;
        this.activeWorkerLastPollMs = conductorProperties.getActiveWorkerLastPollTimeout().toMillis();
        this.workflowStatusListener = workflowStatusListener;
        this.executionLockService = executionLockService;
        this.parametersUtils = parametersUtils;
        this.idGenerator = iDGenerator;
        this.systemTaskRegistry = systemTaskRegistry;
        this.eventPublisher = applicationEventPublisher;
    }

    public void resetCallbacksForWorkflow(String str) {
        WorkflowModel workflowModel = this.executionDAOFacade.getWorkflowModel(str, true);
        if (workflowModel.getStatus().isTerminal()) {
            throw new ConflictException("Workflow is in terminal state. Status = %s", workflowModel.getStatus());
        }
        workflowModel.getTasks().stream().filter(taskModel -> {
            return !this.systemTaskRegistry.isSystemTask(taskModel.getTaskType()) && TaskModel.Status.SCHEDULED == taskModel.getStatus() && taskModel.getCallbackAfterSeconds() > 0;
        }).forEach(taskModel2 -> {
            if (this.queueDAO.resetOffsetTime(QueueUtils.getQueueName(taskModel2), taskModel2.getTaskId())) {
                taskModel2.setCallbackAfterSeconds(0L);
                this.executionDAOFacade.updateTask(taskModel2);
            }
        });
    }

    public String rerun(RerunWorkflowRequest rerunWorkflowRequest) {
        Utils.checkNotNull(rerunWorkflowRequest.getReRunFromWorkflowId(), "reRunFromWorkflowId is missing");
        if (rerunWF(rerunWorkflowRequest.getReRunFromWorkflowId(), rerunWorkflowRequest.getReRunFromTaskId(), rerunWorkflowRequest.getTaskInput(), rerunWorkflowRequest.getWorkflowInput(), rerunWorkflowRequest.getCorrelationId())) {
            return rerunWorkflowRequest.getReRunFromWorkflowId();
        }
        throw new IllegalArgumentException("Task " + rerunWorkflowRequest.getReRunFromTaskId() + " not found");
    }

    public void restart(String str, boolean z) {
        WorkflowDef workflowDef;
        WorkflowModel workflowModel = this.executionDAOFacade.getWorkflowModel(str, true);
        if (!workflowModel.getStatus().isTerminal()) {
            String format = String.format("Workflow: %s is not in terminal state, unable to restart.", workflowModel);
            LOGGER.error(format);
            throw new ConflictException(format);
        }
        if (z) {
            WorkflowDef orElseThrow = this.metadataDAO.getLatestWorkflowDef(workflowModel.getWorkflowName()).orElseThrow(() -> {
                return new NotFoundException("Unable to find latest definition for %s", str);
            });
            workflowModel.setWorkflowDefinition(orElseThrow);
            workflowDef = this.metadataMapperService.populateTaskDefinitions(orElseThrow);
        } else {
            workflowDef = (WorkflowDef) Optional.ofNullable(workflowModel.getWorkflowDefinition()).orElseGet(() -> {
                return this.metadataDAO.getWorkflowDef(workflowModel.getWorkflowName(), workflowModel.getWorkflowVersion()).orElseThrow(() -> {
                    return new NotFoundException("Unable to find definition for %s", str);
                });
            });
        }
        if (!workflowDef.isRestartable() && workflowModel.getStatus().equals(WorkflowModel.Status.COMPLETED)) {
            throw new NotFoundException("Workflow: %s is non-restartable", workflowModel);
        }
        this.executionDAOFacade.resetWorkflow(str);
        workflowModel.getTasks().clear();
        workflowModel.setReasonForIncompletion(null);
        workflowModel.setFailedTaskId(null);
        workflowModel.setCreateTime(Long.valueOf(System.currentTimeMillis()));
        workflowModel.setEndTime(0L);
        workflowModel.setLastRetriedTime(0L);
        workflowModel.setStatus(WorkflowModel.Status.RUNNING);
        workflowModel.setOutput(null);
        workflowModel.setExternalOutputPayloadStoragePath(null);
        try {
            this.executionDAOFacade.createWorkflow(workflowModel);
            this.metadataMapperService.populateWorkflowWithDefinitions(workflowModel);
            decide(str);
            updateAndPushParents(workflowModel, "restarted");
        } catch (Exception e) {
            Monitors.recordWorkflowStartError(workflowDef.getName(), WorkflowContext.get().getClientApp());
            LOGGER.error("Unable to restart workflow: {}", workflowDef.getName(), e);
            terminateWorkflow(str, "Error when restarting the workflow");
            throw e;
        }
    }

    public void retry(String str, boolean z) {
        WorkflowModel workflowModel = this.executionDAOFacade.getWorkflowModel(str, true);
        if (!workflowModel.getStatus().isTerminal()) {
            throw new NotFoundException("Workflow is still running.  status=%s", workflowModel.getStatus());
        }
        if (workflowModel.getTasks().isEmpty()) {
            throw new ConflictException("Workflow has not started yet");
        }
        if (!z) {
            retry(workflowModel);
            updateAndPushParents(workflowModel, "retried");
            return;
        }
        Optional<TaskModel> findFirst = workflowModel.getTasks().stream().filter(UNSUCCESSFUL_TERMINAL_TASK).findFirst();
        if (findFirst.isPresent()) {
            WorkflowModel findLastFailedSubWorkflowIfAny = findLastFailedSubWorkflowIfAny(findFirst.get(), workflowModel);
            retry(findLastFailedSubWorkflowIfAny);
            updateAndPushParents(findLastFailedSubWorkflowIfAny, "retried");
        }
    }

    private void updateAndPushParents(WorkflowModel workflowModel, String str) {
        String str2 = "";
        while (workflowModel.hasParent()) {
            TaskModel taskModel = this.executionDAOFacade.getTaskModel(workflowModel.getParentWorkflowTaskId());
            if (taskModel.getWorkflowTask().isOptional()) {
                LOGGER.info("Sub workflow task {} is optional, skip updating parents", taskModel);
                return;
            }
            taskModel.setSubworkflowChanged(true);
            taskModel.setStatus(TaskModel.Status.IN_PROGRESS);
            this.executionDAOFacade.updateTask(taskModel);
            String shortString = workflowModel.toShortString();
            str2 = !str2.equals("") ? String.format("%s -> %s", shortString, str2) : shortString;
            TaskExecLog taskExecLog = new TaskExecLog(String.format("Sub workflow %s %s.", str2, str));
            taskExecLog.setTaskId(taskModel.getTaskId());
            this.executionDAOFacade.addTaskExecLog(Collections.singletonList(taskExecLog));
            LOGGER.info("Task {} updated. {}", taskExecLog.getTaskId(), taskExecLog.getLog());
            String parentWorkflowId = workflowModel.getParentWorkflowId();
            WorkflowModel workflowModel2 = this.executionDAOFacade.getWorkflowModel(parentWorkflowId, true);
            workflowModel2.setStatus(WorkflowModel.Status.RUNNING);
            workflowModel2.setLastRetriedTime(System.currentTimeMillis());
            this.executionDAOFacade.updateWorkflow(workflowModel2);
            expediteLazyWorkflowEvaluation(parentWorkflowId);
            workflowModel = workflowModel2;
        }
    }

    private void retry(WorkflowModel workflowModel) {
        HashMap hashMap = new HashMap();
        for (TaskModel taskModel : workflowModel.getTasks()) {
            switch (taskModel.getStatus()) {
                case FAILED:
                case FAILED_WITH_TERMINAL_ERROR:
                case TIMED_OUT:
                    hashMap.put(taskModel.getReferenceTaskName(), taskModel);
                    break;
                case CANCELED:
                    if (!taskModel.getTaskType().equalsIgnoreCase(TaskType.JOIN.toString()) && !taskModel.getTaskType().equalsIgnoreCase(TaskType.DO_WHILE.toString())) {
                        hashMap.put(taskModel.getReferenceTaskName(), taskModel);
                        break;
                    } else {
                        taskModel.setStatus(TaskModel.Status.IN_PROGRESS);
                        addTaskToQueue(taskModel);
                        break;
                    }
                default:
                    hashMap.remove(taskModel.getReferenceTaskName());
                    break;
            }
        }
        if (hashMap.values().size() == 0 && workflowModel.getStatus() != WorkflowModel.Status.TIMED_OUT) {
            throw new ConflictException("There are no retryable tasks! Use restart if you want to attempt entire workflow execution again.");
        }
        workflowModel.setStatus(WorkflowModel.Status.RUNNING);
        workflowModel.setLastRetriedTime(System.currentTimeMillis());
        String reasonForIncompletion = workflowModel.getReasonForIncompletion();
        workflowModel.setReasonForIncompletion(null);
        this.queueDAO.push(Utils.DECIDER_QUEUE, workflowModel.getWorkflowId(), workflowModel.getPriority(), this.properties.getWorkflowOffsetTimeout().getSeconds());
        this.executionDAOFacade.updateWorkflow(workflowModel);
        LOGGER.info("Workflow {} that failed due to '{}' was retried", workflowModel.toShortString(), reasonForIncompletion);
        List<TaskModel> list = (List) hashMap.values().stream().sorted(Comparator.comparingInt((v0) -> {
            return v0.getSeq();
        })).map(taskModel2 -> {
            return taskToBeRescheduled(workflowModel, taskModel2);
        }).collect(Collectors.toList());
        dedupAndAddTasks(workflowModel, list);
        this.executionDAOFacade.updateTasks(workflowModel.getTasks());
        scheduleTask(workflowModel, list);
    }

    private WorkflowModel findLastFailedSubWorkflowIfAny(TaskModel taskModel, WorkflowModel workflowModel) {
        if ("SUB_WORKFLOW".equals(taskModel.getTaskType()) && UNSUCCESSFUL_TERMINAL_TASK.test(taskModel)) {
            WorkflowModel workflowModel2 = this.executionDAOFacade.getWorkflowModel(taskModel.getSubWorkflowId(), true);
            Optional<TaskModel> findFirst = workflowModel2.getTasks().stream().filter(UNSUCCESSFUL_TERMINAL_TASK).findFirst();
            if (findFirst.isPresent()) {
                return findLastFailedSubWorkflowIfAny(findFirst.get(), workflowModel2);
            }
        }
        return workflowModel;
    }

    private TaskModel taskToBeRescheduled(WorkflowModel workflowModel, TaskModel taskModel) {
        TaskModel copy = taskModel.copy();
        copy.setTaskId(this.idGenerator.generate());
        copy.setRetriedTaskId(taskModel.getTaskId());
        copy.setStatus(TaskModel.Status.SCHEDULED);
        copy.setRetryCount(taskModel.getRetryCount() + 1);
        copy.setRetried(false);
        copy.setPollCount(0);
        copy.setCallbackAfterSeconds(0L);
        copy.setSubWorkflowId(null);
        copy.setScheduledTime(0L);
        copy.setStartTime(0L);
        copy.setEndTime(0L);
        copy.setWorkerId(null);
        copy.setReasonForIncompletion(null);
        copy.setSeq(0);
        copy.getInputData().putAll(this.parametersUtils.getTaskInput(copy.getWorkflowTask().getInputParameters(), workflowModel, copy.getWorkflowTask().getTaskDefinition(), copy.getTaskId()));
        taskModel.setRetried(true);
        taskModel.setExecuted(true);
        return copy;
    }

    private void endExecution(WorkflowModel workflowModel, TaskModel taskModel) {
        WorkflowModel completeWorkflow;
        if (taskModel != null) {
            String str = (String) taskModel.getWorkflowTask().getInputParameters().get(Terminate.getTerminationStatusParameter());
            String str2 = (String) taskModel.getWorkflowTask().getInputParameters().get(Terminate.getTerminationReasonParameter());
            if (StringUtils.isBlank(str2)) {
                str2 = String.format("Workflow is %s by TERMINATE task: %s", str, taskModel.getTaskId());
            }
            if (WorkflowModel.Status.FAILED.name().equals(str)) {
                workflowModel.setStatus(WorkflowModel.Status.FAILED);
                completeWorkflow = terminate(workflowModel, new TerminateWorkflowException(str2, workflowModel.getStatus(), taskModel));
            } else {
                workflowModel.setReasonForIncompletion(str2);
                completeWorkflow = completeWorkflow(workflowModel);
            }
        } else {
            completeWorkflow = completeWorkflow(workflowModel);
        }
        cancelNonTerminalTasks(completeWorkflow);
    }

    @VisibleForTesting
    WorkflowModel completeWorkflow(WorkflowModel workflowModel) {
        LOGGER.debug("Completing workflow execution for {}", workflowModel.getWorkflowId());
        if (workflowModel.getStatus().equals(WorkflowModel.Status.COMPLETED)) {
            this.queueDAO.remove(Utils.DECIDER_QUEUE, workflowModel.getWorkflowId());
            this.executionDAOFacade.removeFromPendingWorkflow(workflowModel.getWorkflowName(), workflowModel.getWorkflowId());
            LOGGER.debug("Workflow: {} has already been completed.", workflowModel.getWorkflowId());
            return workflowModel;
        }
        if (workflowModel.getStatus().isTerminal()) {
            throw new ConflictException("Workflow is already in terminal state. Current status: " + workflowModel.getStatus());
        }
        this.deciderService.updateWorkflowOutput(workflowModel, null);
        workflowModel.setStatus(WorkflowModel.Status.COMPLETED);
        List list = (List) workflowModel.getTasks().stream().filter(taskModel -> {
            return TaskModel.Status.FAILED.equals(taskModel.getStatus()) || TaskModel.Status.FAILED_WITH_TERMINAL_ERROR.equals(taskModel.getStatus());
        }).collect(Collectors.toList());
        workflowModel.getFailedReferenceTaskNames().addAll((Collection) list.stream().map((v0) -> {
            return v0.getReferenceTaskName();
        }).collect(Collectors.toSet()));
        workflowModel.getFailedTaskNames().addAll((Collection) list.stream().map((v0) -> {
            return v0.getTaskDefName();
        }).collect(Collectors.toSet()));
        this.executionDAOFacade.updateWorkflow(workflowModel);
        LOGGER.debug("Completed workflow execution for {}", workflowModel.getWorkflowId());
        this.workflowStatusListener.onWorkflowCompletedIfEnabled(workflowModel);
        Monitors.recordWorkflowCompletion(workflowModel.getWorkflowName(), workflowModel.getEndTime() - workflowModel.getCreateTime().longValue(), workflowModel.getOwnerApp());
        if (workflowModel.hasParent()) {
            updateParentWorkflowTask(workflowModel);
            LOGGER.info("{} updated parent {} task {}", new Object[]{workflowModel.toShortString(), workflowModel.getParentWorkflowId(), workflowModel.getParentWorkflowTaskId()});
            expediteLazyWorkflowEvaluation(workflowModel.getParentWorkflowId());
        }
        this.executionLockService.releaseLock(workflowModel.getWorkflowId());
        this.executionLockService.deleteLock(workflowModel.getWorkflowId());
        return workflowModel;
    }

    public void terminateWorkflow(String str, String str2) {
        WorkflowModel workflowModel = this.executionDAOFacade.getWorkflowModel(str, true);
        if (WorkflowModel.Status.COMPLETED.equals(workflowModel.getStatus())) {
            throw new ConflictException("Cannot terminate a COMPLETED workflow.");
        }
        workflowModel.setStatus(WorkflowModel.Status.TERMINATED);
        terminateWorkflow(workflowModel, str2, null);
    }

    public WorkflowModel terminateWorkflow(WorkflowModel workflowModel, String str, String str2) {
        try {
            this.executionLockService.acquireLock(workflowModel.getWorkflowId(), 60000L);
            if (!workflowModel.getStatus().isTerminal()) {
                workflowModel.setStatus(WorkflowModel.Status.TERMINATED);
            }
            try {
                this.deciderService.updateWorkflowOutput(workflowModel, null);
            } catch (Exception e) {
                LOGGER.error("Failed to update output data for workflow: {}", workflowModel.getWorkflowId(), e);
                Monitors.error(CLASS_NAME, "terminateWorkflow");
            }
            List list = (List) workflowModel.getTasks().stream().filter(taskModel -> {
                return TaskModel.Status.FAILED.equals(taskModel.getStatus()) || TaskModel.Status.FAILED_WITH_TERMINAL_ERROR.equals(taskModel.getStatus());
            }).collect(Collectors.toList());
            workflowModel.getFailedReferenceTaskNames().addAll((Collection) list.stream().map((v0) -> {
                return v0.getReferenceTaskName();
            }).collect(Collectors.toSet()));
            workflowModel.getFailedTaskNames().addAll((Collection) list.stream().map((v0) -> {
                return v0.getTaskDefName();
            }).collect(Collectors.toSet()));
            String workflowId = workflowModel.getWorkflowId();
            workflowModel.setReasonForIncompletion(str);
            this.executionDAOFacade.updateWorkflow(workflowModel);
            this.workflowStatusListener.onWorkflowTerminatedIfEnabled(workflowModel);
            Monitors.recordWorkflowTermination(workflowModel.getWorkflowName(), workflowModel.getStatus(), workflowModel.getOwnerApp());
            LOGGER.info("Workflow {} is terminated because of {}", workflowId, str);
            try {
                workflowModel.getTasks().forEach(taskModel2 -> {
                    this.queueDAO.remove(QueueUtils.getQueueName(taskModel2), taskModel2.getTaskId());
                });
            } catch (Exception e2) {
                LOGGER.warn("Error removing task(s) from queue during workflow termination : {}", workflowId, e2);
            }
            if (workflowModel.hasParent()) {
                updateParentWorkflowTask(workflowModel);
                LOGGER.info("{} updated parent {} task {}", new Object[]{workflowModel.toShortString(), workflowModel.getParentWorkflowId(), workflowModel.getParentWorkflowTaskId()});
                expediteLazyWorkflowEvaluation(workflowModel.getParentWorkflowId());
            }
            if (!StringUtils.isBlank(str2)) {
                HashMap hashMap = new HashMap(workflowModel.getInput());
                hashMap.put("workflowId", workflowId);
                hashMap.put("reason", str);
                hashMap.put("failureStatus", workflowModel.getStatus().toString());
                if (workflowModel.getFailedTaskId() != null) {
                    hashMap.put("failureTaskId", workflowModel.getFailedTaskId());
                }
                try {
                    String generate = this.idGenerator.generate();
                    StartWorkflowInput startWorkflowInput = new StartWorkflowInput();
                    startWorkflowInput.setName(str2);
                    startWorkflowInput.setWorkflowInput(hashMap);
                    startWorkflowInput.setCorrelationId(workflowModel.getCorrelationId());
                    startWorkflowInput.setTaskToDomain(workflowModel.getTaskToDomain());
                    startWorkflowInput.setWorkflowId(generate);
                    startWorkflowInput.setTriggeringWorkflowId(workflowId);
                    this.eventPublisher.publishEvent(new WorkflowCreationEvent(startWorkflowInput));
                    workflowModel.addOutput("conductor.failure_workflow", generate);
                } catch (Exception e3) {
                    LOGGER.error("Failed to start error workflow", e3);
                    workflowModel.getOutput().put("conductor.failure_workflow", "Error workflow " + str2 + " failed to start.  reason: " + e3.getMessage());
                    Monitors.recordWorkflowStartError(str2, WorkflowContext.get().getClientApp());
                }
                this.executionDAOFacade.updateWorkflow(workflowModel);
            }
            this.executionDAOFacade.removeFromPendingWorkflow(workflowModel.getWorkflowName(), workflowModel.getWorkflowId());
            List<String> cancelNonTerminalTasks = cancelNonTerminalTasks(workflowModel);
            if (cancelNonTerminalTasks.isEmpty()) {
                return workflowModel;
            }
            throw new NonTransientException(String.format("Error canceling system tasks: %s", String.join(",", cancelNonTerminalTasks)));
        } finally {
            this.executionLockService.releaseLock(workflowModel.getWorkflowId());
            this.executionLockService.deleteLock(workflowModel.getWorkflowId());
        }
    }

    public void updateTask(TaskResult taskResult) {
        if (taskResult == null) {
            throw new IllegalArgumentException("Task object is null");
        }
        if (taskResult.isExtendLease()) {
            extendLease(taskResult);
            return;
        }
        String workflowInstanceId = taskResult.getWorkflowInstanceId();
        WorkflowModel workflowModel = this.executionDAOFacade.getWorkflowModel(workflowInstanceId, false);
        TaskModel taskModel = (TaskModel) Optional.ofNullable(this.executionDAOFacade.getTaskModel(taskResult.getTaskId())).orElseThrow(() -> {
            return new NotFoundException("No such task found by id: %s", taskResult.getTaskId());
        });
        LOGGER.debug("Task: {} belonging to Workflow {} being updated", taskModel, workflowModel);
        String queueName = QueueUtils.getQueueName(taskModel);
        if (taskModel.getStatus().isTerminal()) {
            this.queueDAO.remove(queueName, taskResult.getTaskId());
            LOGGER.info("Task: {} has already finished execution with status: {} within workflow: {}. Removed task from queue: {}", new Object[]{taskModel.getTaskId(), taskModel.getStatus(), taskModel.getWorkflowInstanceId(), queueName});
            Monitors.recordUpdateConflict(taskModel.getTaskType(), workflowModel.getWorkflowName(), taskModel.getStatus());
            return;
        }
        if (workflowModel.getStatus().isTerminal()) {
            this.queueDAO.remove(queueName, taskResult.getTaskId());
            LOGGER.info("Workflow: {} has already finished execution. Task update for: {} ignored and removed from Queue: {}.", new Object[]{workflowModel, taskResult.getTaskId(), queueName});
            Monitors.recordUpdateConflict(taskModel.getTaskType(), workflowModel.getWorkflowName(), workflowModel.getStatus());
            return;
        }
        if (this.systemTaskRegistry.isSystemTask(taskModel.getTaskType()) || taskResult.getStatus() != TaskResult.Status.IN_PROGRESS) {
            taskModel.setStatus(TaskModel.Status.valueOf(taskResult.getStatus().name()));
        } else {
            taskModel.setStatus(TaskModel.Status.SCHEDULED);
        }
        taskModel.setOutputMessage(taskResult.getOutputMessage());
        taskModel.setReasonForIncompletion(taskResult.getReasonForIncompletion());
        taskModel.setWorkerId(taskResult.getWorkerId());
        taskModel.setCallbackAfterSeconds(taskResult.getCallbackAfterSeconds());
        taskModel.setOutputData(taskResult.getOutputData());
        taskModel.setSubWorkflowId(taskResult.getSubWorkflowId());
        if (StringUtils.isNotBlank(taskResult.getExternalOutputPayloadStoragePath())) {
            taskModel.setExternalOutputPayloadStoragePath(taskResult.getExternalOutputPayloadStoragePath());
        }
        if (taskModel.getStatus().isTerminal()) {
            taskModel.setEndTime(System.currentTimeMillis());
        }
        switch (taskModel.getStatus()) {
            case FAILED:
            case FAILED_WITH_TERMINAL_ERROR:
            case TIMED_OUT:
            case CANCELED:
            case COMPLETED:
                try {
                    this.queueDAO.remove(queueName, taskResult.getTaskId());
                    LOGGER.debug("Task: {} removed from taskQueue: {} since the task status is {}", new Object[]{taskModel, queueName, taskModel.getStatus().name()});
                    break;
                } catch (Exception e) {
                    LOGGER.warn(String.format("Error removing the message in queue for task: %s for workflow: %s", taskModel.getTaskId(), workflowInstanceId), e);
                    Monitors.recordTaskQueueOpError(taskModel.getTaskType(), workflowModel.getWorkflowName());
                    break;
                }
            case IN_PROGRESS:
            case SCHEDULED:
                try {
                    long callbackAfterSeconds = taskResult.getCallbackAfterSeconds();
                    this.queueDAO.postpone(queueName, taskModel.getTaskId(), taskModel.getWorkflowPriority(), callbackAfterSeconds);
                    LOGGER.debug("Task: {} postponed in taskQueue: {} since the task status is {} with callbackAfterSeconds: {}", new Object[]{taskModel, queueName, taskModel.getStatus().name(), Long.valueOf(callbackAfterSeconds)});
                    break;
                } catch (Exception e2) {
                    String format = String.format("Error postponing the message in queue for task: %s for workflow: %s", taskModel.getTaskId(), workflowInstanceId);
                    LOGGER.error(format, e2);
                    Monitors.recordTaskQueueOpError(taskModel.getTaskType(), workflowModel.getWorkflowName());
                    throw new TransientException(format, e2);
                }
        }
        try {
            this.executionDAOFacade.updateTask(taskModel);
            taskResult.getLogs().forEach(taskExecLog -> {
                taskExecLog.setTaskId(taskModel.getTaskId());
            });
            this.executionDAOFacade.addTaskExecLog(taskResult.getLogs());
            if (taskModel.getStatus().isTerminal()) {
                long taskDuration = getTaskDuration(0L, taskModel);
                long endTime = taskModel.getEndTime() - taskModel.getStartTime();
                Monitors.recordTaskExecutionTime(taskModel.getTaskDefName(), taskDuration, true, taskModel.getStatus());
                Monitors.recordTaskExecutionTime(taskModel.getTaskDefName(), endTime, false, taskModel.getStatus());
            }
            if (isLazyEvaluateWorkflow(workflowModel.getWorkflowDefinition(), taskModel)) {
                return;
            }
            decide(workflowInstanceId);
        } catch (Exception e3) {
            String format2 = String.format("Error updating task: %s for workflow: %s", taskModel.getTaskId(), workflowInstanceId);
            LOGGER.error(format2, e3);
            Monitors.recordTaskUpdateError(taskModel.getTaskType(), workflowModel.getWorkflowName());
            throw new TransientException(format2, e3);
        }
    }

    private void extendLease(TaskResult taskResult) {
        TaskModel taskModel = (TaskModel) Optional.ofNullable(this.executionDAOFacade.getTaskModel(taskResult.getTaskId())).orElseThrow(() -> {
            return new NotFoundException("No such task found by id: %s", taskResult.getTaskId());
        });
        LOGGER.debug("Extend lease for Task: {} belonging to Workflow: {}", taskModel, taskModel.getWorkflowInstanceId());
        if (taskModel.getStatus().isTerminal()) {
            return;
        }
        try {
            this.executionDAOFacade.extendLease(taskModel);
        } catch (Exception e) {
            String format = String.format("Error extend lease for Task: %s belonging to Workflow: %s", taskModel.getTaskId(), taskModel.getWorkflowInstanceId());
            LOGGER.error(format, e);
            Monitors.recordTaskExtendLeaseError(taskModel.getTaskType(), taskModel.getWorkflowType());
            throw new TransientException(format, e);
        }
    }

    @VisibleForTesting
    boolean isLazyEvaluateWorkflow(WorkflowDef workflowDef, TaskModel taskModel) {
        if (taskModel.isLoopOverTask()) {
            return false;
        }
        String referenceTaskName = taskModel.getReferenceTaskName();
        List collectTasks = workflowDef.collectTasks();
        return ((List) collectTasks.stream().filter(workflowTask -> {
            return workflowTask.getType().equals(TaskType.FORK_JOIN.name());
        }).collect(Collectors.toList())).stream().anyMatch(workflowTask2 -> {
            return workflowTask2.has(referenceTaskName);
        }) ? ((List) collectTasks.stream().filter(workflowTask3 -> {
            return workflowTask3.getType().equals(TaskType.JOIN.name());
        }).collect(Collectors.toList())).stream().anyMatch(workflowTask4 -> {
            return workflowTask4.getJoinOn().contains(referenceTaskName);
        }) : collectTasks.stream().noneMatch(workflowTask5 -> {
            return workflowTask5.getTaskReferenceName().equals(referenceTaskName);
        });
    }

    public TaskModel getTask(String str) {
        return (TaskModel) Optional.ofNullable(this.executionDAOFacade.getTaskModel(str)).map(taskModel -> {
            return taskModel.getWorkflowTask() != null ? this.metadataMapperService.populateTaskWithDefinition(taskModel) : taskModel;
        }).orElse(null);
    }

    public List<Workflow> getRunningWorkflows(String str, int i) {
        return this.executionDAOFacade.getPendingWorkflowsByName(str, i);
    }

    public List<String> getWorkflows(String str, Integer num, Long l, Long l2) {
        return (List) this.executionDAOFacade.getWorkflowsByName(str, l, l2).stream().filter(workflow -> {
            return workflow.getWorkflowVersion() == num.intValue();
        }).map((v0) -> {
            return v0.getWorkflowId();
        }).collect(Collectors.toList());
    }

    public List<String> getRunningWorkflowIds(String str, int i) {
        return this.executionDAOFacade.getRunningWorkflowIds(str, i);
    }

    @EventListener({WorkflowEvaluationEvent.class})
    public void handleWorkflowEvaluationEvent(WorkflowEvaluationEvent workflowEvaluationEvent) {
        decide(workflowEvaluationEvent.getWorkflowModel());
    }

    public WorkflowModel decide(String str) {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        if (!this.executionLockService.acquireLock(str)) {
            return null;
        }
        try {
            WorkflowModel workflowModel = this.executionDAOFacade.getWorkflowModel(str, true);
            if (workflowModel == null) {
                return null;
            }
            WorkflowModel decide = decide(workflowModel);
            this.executionLockService.releaseLock(str);
            stopWatch.stop();
            Monitors.recordWorkflowDecisionTime(stopWatch.getTime());
            return decide;
        } finally {
            this.executionLockService.releaseLock(str);
            stopWatch.stop();
            Monitors.recordWorkflowDecisionTime(stopWatch.getTime());
        }
    }

    public WorkflowModel decide(WorkflowModel workflowModel) {
        if (workflowModel.getStatus().isTerminal()) {
            if (!workflowModel.getStatus().isSuccessful()) {
                cancelNonTerminalTasks(workflowModel);
            }
            return workflowModel;
        }
        adjustStateIfSubWorkflowChanged(workflowModel);
        try {
            DeciderService.DeciderOutcome decide = this.deciderService.decide(workflowModel);
            if (decide.isComplete) {
                endExecution(workflowModel, decide.terminateTask);
                return workflowModel;
            }
            List<TaskModel> list = decide.tasksToBeScheduled;
            setTaskDomains(list, workflowModel);
            List<TaskModel> list2 = decide.tasksToBeUpdated;
            List<TaskModel> dedupAndAddTasks = dedupAndAddTasks(workflowModel, list);
            boolean scheduleTask = scheduleTask(workflowModel, dedupAndAddTasks);
            for (TaskModel taskModel : decide.tasksToBeScheduled) {
                this.executionDAOFacade.populateTaskData(taskModel);
                if (this.systemTaskRegistry.isSystemTask(taskModel.getTaskType()) && NON_TERMINAL_TASK.test(taskModel)) {
                    WorkflowSystemTask workflowSystemTask = this.systemTaskRegistry.get(taskModel.getTaskType());
                    if (!workflowSystemTask.isAsync() && workflowSystemTask.execute(workflowModel, taskModel, this)) {
                        list2.add(taskModel);
                        scheduleTask = true;
                    }
                }
            }
            if (!decide.tasksToBeUpdated.isEmpty() || !dedupAndAddTasks.isEmpty()) {
                this.executionDAOFacade.updateTasks(list2);
            }
            if (scheduleTask) {
                return decide(workflowModel);
            }
            if (!decide.tasksToBeUpdated.isEmpty() || !dedupAndAddTasks.isEmpty()) {
                this.executionDAOFacade.updateWorkflow(workflowModel);
            }
            return workflowModel;
        } catch (TerminateWorkflowException e) {
            LOGGER.info("Execution terminated of workflow: {}", workflowModel, e);
            terminate(workflowModel, e);
            return workflowModel;
        } catch (RuntimeException e2) {
            LOGGER.error("Error deciding workflow: {}", workflowModel.getWorkflowId(), e2);
            throw e2;
        }
    }

    private void adjustStateIfSubWorkflowChanged(WorkflowModel workflowModel) {
        Optional<TaskModel> findChangedSubWorkflowTask = findChangedSubWorkflowTask(workflowModel);
        if (findChangedSubWorkflowTask.isPresent()) {
            TaskModel taskModel = findChangedSubWorkflowTask.get();
            taskModel.setSubworkflowChanged(false);
            this.executionDAOFacade.updateTask(taskModel);
            LOGGER.info("{} reset subworkflowChanged flag for {}", workflowModel.toShortString(), taskModel.getTaskId());
            if (workflowModel.getWorkflowDefinition().containsType("JOIN") || workflowModel.getWorkflowDefinition().containsType("FORK_JOIN_DYNAMIC")) {
                Stream<TaskModel> peek = workflowModel.getTasks().stream().filter(UNSUCCESSFUL_JOIN_TASK).peek(taskModel2 -> {
                    taskModel2.setStatus(TaskModel.Status.IN_PROGRESS);
                    addTaskToQueue(taskModel2);
                });
                ExecutionDAOFacade executionDAOFacade = this.executionDAOFacade;
                Objects.requireNonNull(executionDAOFacade);
                peek.forEach(executionDAOFacade::updateTask);
            }
        }
    }

    private Optional<TaskModel> findChangedSubWorkflowTask(WorkflowModel workflowModel) {
        return (((WorkflowDef) Optional.ofNullable(workflowModel.getWorkflowDefinition()).orElseGet(() -> {
            return this.metadataDAO.getWorkflowDef(workflowModel.getWorkflowName(), workflowModel.getWorkflowVersion()).orElseThrow(() -> {
                return new TransientException("Workflow Definition is not found");
            });
        })).containsType("SUB_WORKFLOW") || workflowModel.getWorkflowDefinition().containsType("FORK_JOIN_DYNAMIC")) ? workflowModel.getTasks().stream().filter(taskModel -> {
            return taskModel.getTaskType().equals("SUB_WORKFLOW") && taskModel.isSubworkflowChanged() && !taskModel.isRetried();
        }).findFirst() : Optional.empty();
    }

    @VisibleForTesting
    List<String> cancelNonTerminalTasks(WorkflowModel workflowModel) {
        ArrayList arrayList = new ArrayList();
        for (TaskModel taskModel : workflowModel.getTasks()) {
            if (!taskModel.getStatus().isTerminal()) {
                taskModel.setStatus(TaskModel.Status.CANCELED);
                if (this.systemTaskRegistry.isSystemTask(taskModel.getTaskType())) {
                    WorkflowSystemTask workflowSystemTask = this.systemTaskRegistry.get(taskModel.getTaskType());
                    try {
                        workflowSystemTask.cancel(workflowModel, taskModel, this);
                    } catch (Exception e) {
                        arrayList.add(taskModel.getReferenceTaskName());
                        LOGGER.error("Error canceling system task:{}/{} in workflow: {}", new Object[]{workflowSystemTask.getTaskType(), taskModel.getTaskId(), workflowModel.getWorkflowId(), e});
                    }
                }
                this.executionDAOFacade.updateTask(taskModel);
            }
        }
        if (arrayList.isEmpty()) {
            try {
                this.workflowStatusListener.onWorkflowFinalizedIfEnabled(workflowModel);
                this.queueDAO.remove(Utils.DECIDER_QUEUE, workflowModel.getWorkflowId());
            } catch (Exception e2) {
                LOGGER.error("Error removing workflow: {} from decider queue", workflowModel.getWorkflowId(), e2);
            }
        }
        return arrayList;
    }

    @VisibleForTesting
    List<TaskModel> dedupAndAddTasks(WorkflowModel workflowModel, List<TaskModel> list) {
        Set set = (Set) workflowModel.getTasks().stream().map(taskModel -> {
            return taskModel.getReferenceTaskName() + "_" + taskModel.getRetryCount();
        }).collect(Collectors.toSet());
        List<TaskModel> list2 = (List) list.stream().filter(taskModel2 -> {
            return !set.contains(taskModel2.getReferenceTaskName() + "_" + taskModel2.getRetryCount());
        }).collect(Collectors.toList());
        workflowModel.getTasks().addAll(list2);
        return list2;
    }

    public void pauseWorkflow(String str) {
        try {
            this.executionLockService.acquireLock(str, 60000L);
            WorkflowModel.Status status = WorkflowModel.Status.PAUSED;
            WorkflowModel workflowModel = this.executionDAOFacade.getWorkflowModel(str, false);
            if (workflowModel.getStatus().isTerminal()) {
                throw new ConflictException("Workflow %s has ended, status cannot be updated.", workflowModel.toShortString());
            }
            if (workflowModel.getStatus().equals(status)) {
                return;
            }
            workflowModel.setStatus(status);
            this.executionDAOFacade.updateWorkflow(workflowModel);
            this.executionLockService.releaseLock(str);
            try {
                this.queueDAO.remove(Utils.DECIDER_QUEUE, str);
            } catch (Exception e) {
                LOGGER.info("[pauseWorkflow] Error removing workflow: {} from decider queue", str, e);
            }
        } finally {
            this.executionLockService.releaseLock(str);
        }
    }

    public void resumeWorkflow(String str) {
        WorkflowModel workflowModel = this.executionDAOFacade.getWorkflowModel(str, false);
        if (!workflowModel.getStatus().equals(WorkflowModel.Status.PAUSED)) {
            throw new IllegalStateException("The workflow " + str + " is not PAUSED so cannot resume. Current status is " + workflowModel.getStatus().name());
        }
        workflowModel.setStatus(WorkflowModel.Status.RUNNING);
        workflowModel.setLastRetriedTime(System.currentTimeMillis());
        this.queueDAO.push(Utils.DECIDER_QUEUE, workflowModel.getWorkflowId(), workflowModel.getPriority(), this.properties.getWorkflowOffsetTimeout().getSeconds());
        this.executionDAOFacade.updateWorkflow(workflowModel);
        decide(str);
    }

    public void skipTaskFromWorkflow(String str, String str2, SkipTaskRequest skipTaskRequest) {
        WorkflowModel workflowModel = this.executionDAOFacade.getWorkflowModel(str, true);
        if (!workflowModel.getStatus().equals(WorkflowModel.Status.RUNNING)) {
            throw new IllegalStateException(String.format("The workflow %s is not running so the task referenced by %s cannot be skipped", str, str2));
        }
        WorkflowTask taskByRefName = workflowModel.getWorkflowDefinition().getTaskByRefName(str2);
        if (taskByRefName == null) {
            throw new IllegalStateException(String.format("The task referenced by %s does not exist in the WorkflowDefinition %s", str2, workflowModel.getWorkflowName()));
        }
        workflowModel.getTasks().forEach(taskModel -> {
            if (taskModel.getReferenceTaskName().equals(str2)) {
                throw new IllegalStateException(String.format("The task referenced %s has already been processed, cannot be skipped", str2));
            }
        });
        TaskModel taskModel2 = new TaskModel();
        taskModel2.setTaskId(this.idGenerator.generate());
        taskModel2.setReferenceTaskName(str2);
        taskModel2.setWorkflowInstanceId(str);
        taskModel2.setWorkflowPriority(workflowModel.getPriority());
        taskModel2.setStatus(TaskModel.Status.SKIPPED);
        taskModel2.setEndTime(System.currentTimeMillis());
        taskModel2.setTaskType(taskByRefName.getName());
        taskModel2.setCorrelationId(workflowModel.getCorrelationId());
        if (skipTaskRequest != null) {
            taskModel2.setInputData(skipTaskRequest.getTaskInput());
            taskModel2.setOutputData(skipTaskRequest.getTaskOutput());
            taskModel2.setInputMessage(skipTaskRequest.getTaskInputMessage());
            taskModel2.setOutputMessage(skipTaskRequest.getTaskOutputMessage());
        }
        this.executionDAOFacade.createTasks(Collections.singletonList(taskModel2));
        decide(workflowModel.getWorkflowId());
    }

    public WorkflowModel getWorkflow(String str, boolean z) {
        return this.executionDAOFacade.getWorkflowModel(str, z);
    }

    public void addTaskToQueue(TaskModel taskModel) {
        String queueName = QueueUtils.getQueueName(taskModel);
        if (taskModel.getCallbackAfterSeconds() > 0) {
            this.queueDAO.push(queueName, taskModel.getTaskId(), taskModel.getWorkflowPriority(), taskModel.getCallbackAfterSeconds());
        } else {
            this.queueDAO.push(queueName, taskModel.getTaskId(), taskModel.getWorkflowPriority(), 0L);
        }
        LOGGER.debug("Added task {} with priority {} to queue {} with call back seconds {}", new Object[]{taskModel, Integer.valueOf(taskModel.getWorkflowPriority()), queueName, Long.valueOf(taskModel.getCallbackAfterSeconds())});
    }

    @VisibleForTesting
    void setTaskDomains(List<TaskModel> list, WorkflowModel workflowModel) {
        Map<String, String> taskToDomain = workflowModel.getTaskToDomain();
        if (taskToDomain != null) {
            String str = taskToDomain.get("*");
            if (StringUtils.isNotBlank(str)) {
                String[] split = str.split(",");
                list.forEach(taskModel -> {
                    if (this.systemTaskRegistry.isSystemTask(taskModel.getTaskType())) {
                        return;
                    }
                    taskModel.setDomain(getActiveDomain(taskModel.getTaskType(), split));
                });
            }
            list.forEach(taskModel2 -> {
                String str2;
                if (this.systemTaskRegistry.isSystemTask(taskModel2.getTaskType()) || (str2 = (String) taskToDomain.get(taskModel2.getTaskType())) == null) {
                    return;
                }
                taskModel2.setDomain(getActiveDomain(taskModel2.getTaskType(), str2.split(",")));
            });
        }
    }

    @VisibleForTesting
    String getActiveDomain(String str, String[] strArr) {
        if (strArr == null || strArr.length == 0) {
            return null;
        }
        return (String) Arrays.stream(strArr).filter(str2 -> {
            return !str2.equalsIgnoreCase(Monitors.NO_DOMAIN);
        }).map(str3 -> {
            return this.executionDAOFacade.getTaskPollDataByDomain(str, str3.trim());
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).filter(this.validateLastPolledTime).findFirst().map((v0) -> {
            return v0.getDomain();
        }).orElse(strArr[strArr.length - 1].trim().equalsIgnoreCase(Monitors.NO_DOMAIN) ? null : strArr[strArr.length - 1].trim());
    }

    private long getTaskDuration(long j, TaskModel taskModel) {
        long endTime = j + (taskModel.getEndTime() - taskModel.getStartTime());
        return taskModel.getRetriedTaskId() == null ? endTime : endTime + getTaskDuration(endTime, this.executionDAOFacade.getTaskModel(taskModel.getRetriedTaskId()));
    }

    @VisibleForTesting
    boolean scheduleTask(WorkflowModel workflowModel, List<TaskModel> list) {
        boolean z = false;
        if (list == null) {
            return false;
        }
        try {
            if (list.isEmpty()) {
                return false;
            }
            int orElse = workflowModel.getTasks().stream().mapToInt((v0) -> {
                return v0.getSeq();
            }).max().orElse(0);
            for (TaskModel taskModel : list) {
                if (taskModel.getSeq() == 0) {
                    orElse++;
                    taskModel.setSeq(orElse);
                }
            }
            Monitors.recordNumTasksInWorkflow(workflowModel.getTasks().size() + list.size(), workflowModel.getWorkflowName(), String.valueOf(workflowModel.getWorkflowVersion()));
            this.executionDAOFacade.createTasks(list);
            List<TaskModel> list2 = (List) list.stream().filter(taskModel2 -> {
                return this.systemTaskRegistry.isSystemTask(taskModel2.getTaskType());
            }).collect(Collectors.toList());
            List<TaskModel> list3 = (List) list.stream().filter(taskModel3 -> {
                return !this.systemTaskRegistry.isSystemTask(taskModel3.getTaskType());
            }).collect(Collectors.toList());
            for (TaskModel taskModel4 : list2) {
                WorkflowSystemTask workflowSystemTask = this.systemTaskRegistry.get(taskModel4.getTaskType());
                if (workflowSystemTask == null) {
                    throw new NotFoundException("No system task found by name %s", taskModel4.getTaskType());
                }
                if (taskModel4.getStatus() != null && !taskModel4.getStatus().isTerminal() && taskModel4.getStartTime() == 0) {
                    taskModel4.setStartTime(System.currentTimeMillis());
                }
                if (workflowSystemTask.isAsync()) {
                    list3.add(taskModel4);
                } else {
                    try {
                        workflowSystemTask.start(workflowModel, taskModel4, this);
                        z = true;
                        this.executionDAOFacade.updateTask(taskModel4);
                    } catch (Exception e) {
                        throw new NonTransientException(String.format("Unable to start system task: %s, {id: %s, name: %s}", taskModel4.getTaskType(), taskModel4.getTaskId(), taskModel4.getTaskDefName()), e);
                    }
                }
            }
            try {
                addTaskToQueue(list3);
            } catch (Exception e2) {
                LOGGER.warn(String.format("Error pushing tasks to the queue: %s, for workflow: %s", (List) list3.stream().map((v0) -> {
                    return v0.getTaskId();
                }).collect(Collectors.toList()), workflowModel.getWorkflowId()), e2);
                Monitors.error(CLASS_NAME, "scheduleTask");
            }
            return z;
        } catch (Exception e3) {
            String format = String.format("Error scheduling tasks: %s, for workflow: %s", (List) list.stream().map((v0) -> {
                return v0.getTaskId();
            }).collect(Collectors.toList()), workflowModel.getWorkflowId());
            LOGGER.error(format, e3);
            Monitors.error(CLASS_NAME, "scheduleTask");
            throw new TerminateWorkflowException(format);
        }
    }

    private void addTaskToQueue(List<TaskModel> list) {
        Iterator<TaskModel> it = list.iterator();
        while (it.hasNext()) {
            addTaskToQueue(it.next());
        }
    }

    private WorkflowModel terminate(WorkflowModel workflowModel, TerminateWorkflowException terminateWorkflowException) {
        if (!workflowModel.getStatus().isTerminal()) {
            workflowModel.setStatus(terminateWorkflowException.getWorkflowStatus());
        }
        if (terminateWorkflowException.getTask() != null && workflowModel.getFailedTaskId() == null) {
            workflowModel.setFailedTaskId(terminateWorkflowException.getTask().getTaskId());
        }
        String failureWorkflow = workflowModel.getWorkflowDefinition().getFailureWorkflow();
        if (failureWorkflow != null && failureWorkflow.startsWith("$")) {
            failureWorkflow = (String) workflowModel.getInput().get(failureWorkflow.split("\\.")[2]);
        }
        if (terminateWorkflowException.getTask() != null) {
            this.executionDAOFacade.updateTask(terminateWorkflowException.getTask());
        }
        return terminateWorkflow(workflowModel, terminateWorkflowException.getMessage(), failureWorkflow);
    }

    private boolean rerunWF(String str, String str2, Map<String, Object> map, Map<String, Object> map2, String str3) {
        WorkflowModel workflowModel = this.executionDAOFacade.getWorkflowModel(str, true);
        if (!workflowModel.getStatus().isTerminal()) {
            String format = String.format("Workflow: %s is not in terminal state, unable to rerun.", workflowModel);
            LOGGER.error(format);
            throw new ConflictException(format);
        }
        updateAndPushParents(workflowModel, "reran");
        if (str2 == null) {
            workflowModel.getTasks().forEach(taskModel -> {
                this.executionDAOFacade.removeTask(taskModel.getTaskId());
            });
            workflowModel.setTasks(new ArrayList());
            workflowModel.setStatus(WorkflowModel.Status.RUNNING);
            workflowModel.setReasonForIncompletion(null);
            workflowModel.setFailedTaskId(null);
            workflowModel.setFailedReferenceTaskNames(new HashSet());
            workflowModel.setFailedTaskNames(new HashSet());
            if (str3 != null) {
                workflowModel.setCorrelationId(str3);
            }
            if (map2 != null) {
                workflowModel.setInput(map2);
            }
            this.queueDAO.push(Utils.DECIDER_QUEUE, workflowModel.getWorkflowId(), workflowModel.getPriority(), this.properties.getWorkflowOffsetTimeout().getSeconds());
            this.executionDAOFacade.updateWorkflow(workflowModel);
            decide(str);
            return true;
        }
        TaskModel taskModel2 = null;
        Iterator<TaskModel> it = workflowModel.getTasks().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            TaskModel next = it.next();
            if (next.getTaskId().equals(str2)) {
                taskModel2 = next;
                break;
            }
        }
        if (taskModel2 == null) {
            Iterator<TaskModel> it2 = workflowModel.getTasks().iterator();
            while (true) {
                if (!it2.hasNext()) {
                    break;
                }
                TaskModel next2 = it2.next();
                if (next2.getTaskType().equalsIgnoreCase("SUB_WORKFLOW") && rerunWF(next2.getSubWorkflowId(), str2, map, null, null)) {
                    taskModel2 = next2;
                    break;
                }
            }
        }
        if (taskModel2 == null) {
            return false;
        }
        workflowModel.setStatus(WorkflowModel.Status.RUNNING);
        workflowModel.setReasonForIncompletion(null);
        workflowModel.setFailedTaskId(null);
        workflowModel.setFailedReferenceTaskNames(new HashSet());
        workflowModel.setFailedTaskNames(new HashSet());
        if (str3 != null) {
            workflowModel.setCorrelationId(str3);
        }
        if (map2 != null) {
            workflowModel.setInput(map2);
        }
        this.queueDAO.push(Utils.DECIDER_QUEUE, workflowModel.getWorkflowId(), workflowModel.getPriority(), this.properties.getWorkflowOffsetTimeout().getSeconds());
        this.executionDAOFacade.updateWorkflow(workflowModel);
        this.executionDAOFacade.updateTasks(workflowModel.getTasks());
        ArrayList arrayList = new ArrayList();
        for (TaskModel taskModel3 : workflowModel.getTasks()) {
            if (taskModel3.getSeq() > taskModel2.getSeq()) {
                this.executionDAOFacade.removeTask(taskModel3.getTaskId());
            } else {
                arrayList.add(taskModel3);
            }
        }
        workflowModel.setTasks(arrayList);
        taskModel2.setScheduledTime(System.currentTimeMillis());
        taskModel2.setStartTime(0L);
        taskModel2.setUpdateTime(0L);
        taskModel2.setEndTime(0L);
        taskModel2.clearOutput();
        taskModel2.setRetried(false);
        taskModel2.setExecuted(false);
        if (taskModel2.getTaskType().equalsIgnoreCase("SUB_WORKFLOW")) {
            taskModel2.setStatus(TaskModel.Status.IN_PROGRESS);
            taskModel2.setStartTime(System.currentTimeMillis());
        } else {
            if (map != null) {
                taskModel2.setInputData(map);
            }
            if (!this.systemTaskRegistry.isSystemTask(taskModel2.getTaskType()) || this.systemTaskRegistry.get(taskModel2.getTaskType()).isAsync()) {
                taskModel2.setStatus(TaskModel.Status.SCHEDULED);
                addTaskToQueue(taskModel2);
            } else {
                this.systemTaskRegistry.get(taskModel2.getTaskType()).start(workflowModel, taskModel2, this);
            }
        }
        this.executionDAOFacade.updateTask(taskModel2);
        decide(workflowModel.getWorkflowId());
        return true;
    }

    public void scheduleNextIteration(TaskModel taskModel, WorkflowModel workflowModel) {
        List<TaskModel> tasksToBeScheduled = this.deciderService.getTasksToBeScheduled(workflowModel, (WorkflowTask) taskModel.getWorkflowTask().getLoopOver().get(0), taskModel.getRetryCount(), null);
        setTaskDomains(tasksToBeScheduled, workflowModel);
        tasksToBeScheduled.forEach(taskModel2 -> {
            taskModel2.setReferenceTaskName(TaskUtils.appendIteration(taskModel2.getReferenceTaskName(), taskModel.getIteration()));
            taskModel2.setIteration(taskModel.getIteration());
        });
        scheduleTask(workflowModel, tasksToBeScheduled);
        workflowModel.getTasks().addAll(tasksToBeScheduled);
    }

    public TaskDef getTaskDefinition(TaskModel taskModel) {
        return taskModel.getTaskDefinition().orElseGet(() -> {
            return (TaskDef) Optional.ofNullable(this.metadataDAO.getTaskDef(taskModel.getWorkflowTask().getName())).orElseThrow(() -> {
                return new TerminateWorkflowException(String.format("Invalid task specified. Cannot find task by name %s in the task definitions", taskModel.getWorkflowTask().getName()));
            });
        });
    }

    @VisibleForTesting
    void updateParentWorkflowTask(WorkflowModel workflowModel) {
        TaskModel taskModel = this.executionDAOFacade.getTaskModel(workflowModel.getParentWorkflowTaskId());
        executeSubworkflowTaskAndSyncData(workflowModel, taskModel);
        this.executionDAOFacade.updateTask(taskModel);
    }

    private void executeSubworkflowTaskAndSyncData(WorkflowModel workflowModel, TaskModel taskModel) {
        this.systemTaskRegistry.get("SUB_WORKFLOW").execute(workflowModel, taskModel, this);
    }

    private void expediteLazyWorkflowEvaluation(String str) {
        if (this.queueDAO.containsMessage(Utils.DECIDER_QUEUE, str)) {
            this.queueDAO.postpone(Utils.DECIDER_QUEUE, str, EXPEDITED_PRIORITY, 0L);
        } else {
            this.queueDAO.push(Utils.DECIDER_QUEUE, str, EXPEDITED_PRIORITY, 0L);
        }
        LOGGER.info("Pushed workflow {} to {} for expedited evaluation", str, Utils.DECIDER_QUEUE);
    }
}
