/*
 * Decompiled with CFR 0.152.
 */
package io.nflow.engine.workflow.curated;

import com.fasterxml.jackson.databind.JsonNode;
import io.nflow.engine.service.WorkflowInstanceService;
import io.nflow.engine.workflow.curated.State;
import io.nflow.engine.workflow.definition.NextAction;
import io.nflow.engine.workflow.definition.StateExecution;
import io.nflow.engine.workflow.definition.StateVar;
import io.nflow.engine.workflow.definition.WorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowSettings;
import io.nflow.engine.workflow.definition.WorkflowState;
import io.nflow.engine.workflow.definition.WorkflowStateType;
import io.nflow.engine.workflow.instance.WorkflowInstance;
import jakarta.inject.Inject;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class BulkWorkflow
extends WorkflowDefinition {
    public static final String BULK_WORKFLOW_TYPE = "bulk";
    public static final String VAR_CHILD_DATA = "childData";
    public static final String VAR_CONCURRENCY = "concurrency";
    private static final EnumSet<WorkflowInstance.WorkflowInstanceStatus> RUNNING_STATES = EnumSet.complementOf(EnumSet.of(WorkflowInstance.WorkflowInstanceStatus.finished, WorkflowInstance.WorkflowInstanceStatus.created));
    private static final Logger logger = LoggerFactory.getLogger(BulkWorkflow.class);
    public static final WorkflowState SPLIT_WORK = new State("splitWork", WorkflowStateType.start, "Create new child workflows");
    public static final WorkflowState WAIT_FOR_CHILDREN_TO_FINISH = new State("waitForChildrenToFinish", WorkflowStateType.wait, "Wait for all child workflows to finish, start new child workflows if possible");
    public static final WorkflowState DONE = new State("done", WorkflowStateType.end, "All child workflows have been processed");
    public static final WorkflowState ERROR = new State("error", WorkflowStateType.manual, "Processing failed, waiting for manual actions");
    @Inject
    private WorkflowInstanceService instanceService;

    protected BulkWorkflow(String type) {
        super(type, SPLIT_WORK, ERROR, new WorkflowSettings.Builder().setMaxRetries(Integer.MAX_VALUE).build());
        this.setDescription("Executes child workflows in bulk but gracefully without effecting non-bulk tasks.");
        this.permit(SPLIT_WORK, WAIT_FOR_CHILDREN_TO_FINISH);
        this.permit(WAIT_FOR_CHILDREN_TO_FINISH, DONE);
    }

    public BulkWorkflow() {
        this(BULK_WORKFLOW_TYPE);
    }

    public NextAction splitWork(StateExecution execution, @StateVar(value="childData", readOnly=true) JsonNode data) {
        boolean childrenFound = this.splitWorkImpl(execution, data);
        if (childrenFound) {
            return NextAction.moveToState(WAIT_FOR_CHILDREN_TO_FINISH, "Running");
        }
        return NextAction.retryAfter(this.waitForChildrenUntil(), "Waiting for child workflows");
    }

    protected boolean splitWorkImpl(StateExecution execution, JsonNode data) {
        if (execution.getAllChildWorkflows().isEmpty()) {
            throw new RuntimeException("No child workflows found for workflow instance " + execution.getWorkflowInstanceId() + " - either add them before starting the parent or implement splitWorkflowImpl");
        }
        return true;
    }

    protected DateTime waitForChildrenUntil() {
        return DateTime.now().plusHours(1);
    }

    public NextAction waitForChildrenToFinish(StateExecution execution, @StateVar(value="concurrency", readOnly=true) int concurrency) {
        List<WorkflowInstance> childWorkflows = execution.getAllChildWorkflows();
        long completed = 0L;
        long running = 0L;
        for (WorkflowInstance child : childWorkflows) {
            if (child.status == WorkflowInstance.WorkflowInstanceStatus.finished) {
                ++completed;
                continue;
            }
            if (!this.isRunning(child)) continue;
            ++running;
        }
        if (completed == (long)childWorkflows.size()) {
            return NextAction.moveToState(DONE, "All children completed");
        }
        long toStart = Math.min((long)Math.max(1, concurrency) - running, (long)childWorkflows.size() - completed);
        if (toStart > 0L) {
            childWorkflows.stream().filter(this::isInInitialState).limit(toStart).forEach(this::wakeup);
            logger.info("Started {} child workflows", (Object)toStart);
        }
        long progress = completed * 100L / (long)childWorkflows.size();
        return NextAction.retryAfter(this.waitForChildrenToCompleteUntil(), "Waiting for child workflows to complete - " + progress + "% done");
    }

    private void wakeup(WorkflowInstance instance) {
        this.instanceService.wakeupWorkflowInstance(instance.id, Collections.emptyList());
    }

    protected boolean isRunning(WorkflowInstance instance) {
        return RUNNING_STATES.contains((Object)instance.status);
    }

    private boolean isInInitialState(WorkflowInstance instance) {
        return instance.status == WorkflowInstance.WorkflowInstanceStatus.created;
    }

    protected DateTime waitForChildrenToCompleteUntil() {
        return DateTime.now().plusMinutes(15);
    }
}

