/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.CheckpointSchedulingProvider;
import org.apache.flink.runtime.scheduler.adaptive.ResourceConsumer;
import org.apache.flink.runtime.scheduler.adaptive.State;
import org.apache.flink.runtime.scheduler.adaptive.StateFactory;
import org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

class Executing
extends StateWithExecutionGraph
implements ResourceConsumer {
    private final Context context;
    private final ClassLoader userCodeClassLoader;

    Executing(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Logger logger, Context context, ClassLoader userCodeClassLoader) {
        super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger);
        this.context = context;
        this.userCodeClassLoader = userCodeClassLoader;
        Preconditions.checkState((executionGraph.getState() == JobStatus.RUNNING ? 1 : 0) != 0, (Object)"Assuming running execution graph");
        this.deploy();
        context.runIfState(this, this::notifyNewResourcesAvailable, Duration.ZERO);
    }

    @Override
    public JobStatus getJobStatus() {
        return JobStatus.RUNNING;
    }

    @Override
    public void cancel() {
        this.context.goToCanceling(this.getExecutionGraph(), this.getExecutionGraphHandler(), this.getOperatorCoordinatorHandler());
    }

    @Override
    public void handleGlobalFailure(Throwable cause) {
        this.handleAnyFailure(cause);
    }

    private void handleAnyFailure(Throwable cause) {
        FailureResult failureResult = this.context.howToHandleFailure(cause);
        if (failureResult.canRestart()) {
            this.getLogger().info("Restarting job.", failureResult.getFailureCause());
            this.context.goToRestarting(this.getExecutionGraph(), this.getExecutionGraphHandler(), this.getOperatorCoordinatorHandler(), failureResult.getBackoffTime());
        } else {
            this.getLogger().info("Failing job.", failureResult.getFailureCause());
            this.context.goToFailing(this.getExecutionGraph(), this.getExecutionGraphHandler(), this.getOperatorCoordinatorHandler(), failureResult.getFailureCause());
        }
    }

    @Override
    boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState) {
        boolean successfulUpdate = this.getExecutionGraph().updateState(taskExecutionState);
        if (successfulUpdate && taskExecutionState.getExecutionState() == ExecutionState.FAILED) {
            Throwable cause = taskExecutionState.getError(this.userCodeClassLoader);
            this.handleAnyFailure((Throwable)(cause == null ? new FlinkException("Unknown failure cause. Probably related to FLINK-21376.") : cause));
        }
        return successfulUpdate;
    }

    @Override
    void onGloballyTerminalState(JobStatus globallyTerminalState) {
        this.context.goToFinished(ArchivedExecutionGraph.createFrom(this.getExecutionGraph()));
    }

    private void deploy() {
        for (ExecutionJobVertex executionJobVertex : this.getExecutionGraph().getVerticesTopologically()) {
            for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) {
                if (executionVertex.getExecutionState() != ExecutionState.CREATED && executionVertex.getExecutionState() != ExecutionState.SCHEDULED) continue;
                this.deploySafely(executionVertex);
            }
        }
    }

    private void deploySafely(ExecutionVertex executionVertex) {
        try {
            executionVertex.deploy();
        }
        catch (JobException e) {
            this.handleDeploymentFailure(executionVertex, e);
        }
    }

    private void handleDeploymentFailure(ExecutionVertex executionVertex, JobException e) {
        executionVertex.markFailed((Throwable)((Object)e));
    }

    @Override
    public void notifyNewResourcesAvailable() {
        if (this.context.canScaleUp(this.getExecutionGraph())) {
            this.getLogger().info("New resources are available. Restarting job to scale up.");
            this.context.goToRestarting(this.getExecutionGraph(), this.getExecutionGraphHandler(), this.getOperatorCoordinatorHandler(), Duration.ofMillis(0L));
        }
    }

    CompletableFuture<String> stopWithSavepoint(@Nullable String targetDirectory, boolean terminate) {
        ExecutionGraph executionGraph = this.getExecutionGraph();
        StopWithSavepointTerminationManager.checkStopWithSavepointPreconditions(executionGraph.getCheckpointCoordinator(), targetDirectory, executionGraph.getJobID(), this.getLogger());
        this.getLogger().info("Triggering stop-with-savepoint for job {}.", (Object)executionGraph.getJobID());
        CheckpointSchedulingProvider schedulingProvider = new CheckpointSchedulingProvider(executionGraph);
        schedulingProvider.stopCheckpointScheduler();
        CompletionStage savepointFuture = executionGraph.getCheckpointCoordinator().triggerSynchronousSavepoint(terminate, targetDirectory).thenApply(CompletedCheckpoint::getExternalPointer);
        return this.context.goToStopWithSavepoint(executionGraph, this.getExecutionGraphHandler(), this.getOperatorCoordinatorHandler(), schedulingProvider, (CompletableFuture<String>)savepointFuture);
    }

    static class Factory
    implements StateFactory<Executing> {
        private final Context context;
        private final Logger log;
        private final ExecutionGraph executionGraph;
        private final ExecutionGraphHandler executionGraphHandler;
        private final OperatorCoordinatorHandler operatorCoordinatorHandler;
        private final ClassLoader userCodeClassLoader;

        Factory(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Logger log, Context context, ClassLoader userCodeClassLoader) {
            this.context = context;
            this.log = log;
            this.executionGraph = executionGraph;
            this.executionGraphHandler = executionGraphHandler;
            this.operatorCoordinatorHandler = operatorCoordinatorHandler;
            this.userCodeClassLoader = userCodeClassLoader;
        }

        @Override
        public Class<Executing> getStateClass() {
            return Executing.class;
        }

        @Override
        public Executing getState() {
            return new Executing(this.executionGraph, this.executionGraphHandler, this.operatorCoordinatorHandler, this.log, this.context, this.userCodeClassLoader);
        }
    }

    static final class FailureResult {
        @Nullable
        private final Duration backoffTime;
        private final Throwable failureCause;

        private FailureResult(Throwable failureCause, @Nullable Duration backoffTime) {
            this.backoffTime = backoffTime;
            this.failureCause = failureCause;
        }

        boolean canRestart() {
            return this.backoffTime != null;
        }

        Duration getBackoffTime() {
            Preconditions.checkState((boolean)this.canRestart(), (Object)"Failure result must be restartable to return a backoff time.");
            return this.backoffTime;
        }

        Throwable getFailureCause() {
            return this.failureCause;
        }

        static FailureResult canRestart(Throwable failureCause, Duration backoffTime) {
            return new FailureResult(failureCause, backoffTime);
        }

        static FailureResult canNotRestart(Throwable failureCause) {
            return new FailureResult(failureCause, null);
        }
    }

    static interface Context
    extends StateWithExecutionGraph.Context {
        public void goToCanceling(ExecutionGraph var1, ExecutionGraphHandler var2, OperatorCoordinatorHandler var3);

        public FailureResult howToHandleFailure(Throwable var1);

        public boolean canScaleUp(ExecutionGraph var1);

        public void goToRestarting(ExecutionGraph var1, ExecutionGraphHandler var2, OperatorCoordinatorHandler var3, Duration var4);

        public void goToFailing(ExecutionGraph var1, ExecutionGraphHandler var2, OperatorCoordinatorHandler var3, Throwable var4);

        public CompletableFuture<String> goToStopWithSavepoint(ExecutionGraph var1, ExecutionGraphHandler var2, OperatorCoordinatorHandler var3, CheckpointScheduling var4, CompletableFuture<String> var5);

        public ScheduledFuture<?> runIfState(State var1, Runnable var2, Duration var3);
    }
}

