/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.FailureHandlingResult;
import org.apache.flink.runtime.executiongraph.failover.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.failure.DefaultFailureEnricherContext;
import org.apache.flink.runtime.io.network.partition.PartitionException;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
import org.apache.flink.runtime.scheduler.ExecutionDeployer;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionOperations;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;

public class DefaultScheduler
extends SchedulerBase
implements SchedulerOperations {
    protected final Logger log;
    private final ClassLoader userCodeLoader;
    protected final ExecutionSlotAllocator executionSlotAllocator;
    private final ExecutionFailureHandler executionFailureHandler;
    private final ScheduledExecutor delayExecutor;
    protected final SchedulingStrategy schedulingStrategy;
    private final ExecutionOperations executionOperations;
    private final Set<ExecutionVertexID> verticesWaitingForRestart;
    protected final ShuffleMaster<?> shuffleMaster;
    private final Map<AllocationID, Long> reservedAllocationRefCounters;
    private final Map<ExecutionVertexID, AllocationID> reservedAllocationByExecutionVertex;
    protected final ExecutionDeployer executionDeployer;
    protected final FailoverStrategy failoverStrategy;

    protected DefaultScheduler(Logger log, JobGraph jobGraph, Executor ioExecutor, Configuration jobMasterConfiguration, Consumer<ComponentMainThreadExecutor> startUpAction, ScheduledExecutor delayExecutor, ClassLoader userCodeLoader, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory failoverStrategyFactory, RestartBackoffTimeStrategy restartBackoffTimeStrategy, ExecutionOperations executionOperations, ExecutionVertexVersioner executionVertexVersioner, ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, JobStatusListener jobStatusListener, Collection<FailureEnricher> failureEnrichers, ExecutionGraphFactory executionGraphFactory, ShuffleMaster<?> shuffleMaster, Time rpcTimeout, VertexParallelismStore vertexParallelismStore, ExecutionDeployer.Factory executionDeployerFactory) throws Exception {
        super(log, jobGraph, ioExecutor, jobMasterConfiguration, checkpointsCleaner, checkpointRecoveryFactory, jobManagerJobMetricGroup, executionVertexVersioner, initializationTimestamp, mainThreadExecutor, jobStatusListener, executionGraphFactory, vertexParallelismStore);
        this.log = log;
        this.delayExecutor = (ScheduledExecutor)Preconditions.checkNotNull((Object)delayExecutor);
        this.userCodeLoader = (ClassLoader)Preconditions.checkNotNull((Object)userCodeLoader);
        this.executionOperations = (ExecutionOperations)Preconditions.checkNotNull((Object)executionOperations);
        this.shuffleMaster = (ShuffleMaster)Preconditions.checkNotNull(shuffleMaster);
        this.reservedAllocationRefCounters = new HashMap<AllocationID, Long>();
        this.reservedAllocationByExecutionVertex = new HashMap<ExecutionVertexID, AllocationID>();
        this.failoverStrategy = failoverStrategyFactory.create(this.getSchedulingTopology(), this.getResultPartitionAvailabilityChecker());
        log.info("Using failover strategy {} for {} ({}).", new Object[]{this.failoverStrategy, jobGraph.getName(), jobGraph.getJobID()});
        FailureEnricher.Context taskFailureCtx = DefaultFailureEnricherContext.forTaskFailure(this.jobInfo, jobManagerJobMetricGroup, ioExecutor, userCodeLoader);
        FailureEnricher.Context globalFailureCtx = DefaultFailureEnricherContext.forGlobalFailure(this.jobInfo, jobManagerJobMetricGroup, ioExecutor, userCodeLoader);
        this.executionFailureHandler = new ExecutionFailureHandler(jobMasterConfiguration, this.getSchedulingTopology(), this.failoverStrategy, restartBackoffTimeStrategy, mainThreadExecutor, failureEnrichers, taskFailureCtx, globalFailureCtx, jobManagerJobMetricGroup);
        this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, this.getSchedulingTopology());
        this.executionSlotAllocator = ((ExecutionSlotAllocatorFactory)Preconditions.checkNotNull((Object)executionSlotAllocatorFactory)).createInstance(new DefaultExecutionSlotAllocationContext());
        this.verticesWaitingForRestart = new HashSet<ExecutionVertexID>();
        startUpAction.accept(mainThreadExecutor);
        this.executionDeployer = executionDeployerFactory.createInstance(log, this.executionSlotAllocator, executionOperations, executionVertexVersioner, rpcTimeout, this::startReserveAllocation, mainThreadExecutor);
    }

    @Override
    protected long getNumberOfRestarts() {
        return this.executionFailureHandler.getNumberOfRestarts();
    }

    @Override
    protected void cancelAllPendingSlotRequestsInternal() {
        this.getSchedulingTopology().getVertices().forEach(ev -> this.cancelAllPendingSlotRequestsForVertex((ExecutionVertexID)ev.getId()));
    }

    @Override
    protected void startSchedulingInternal() {
        this.log.info("Starting scheduling with scheduling strategy [{}]", (Object)this.schedulingStrategy.getClass().getName());
        this.transitionToRunning();
        this.schedulingStrategy.startScheduling();
    }

    @Override
    protected void onTaskFinished(Execution execution, IOMetrics ioMetrics) {
        Preconditions.checkState((execution.getState() == ExecutionState.FINISHED ? 1 : 0) != 0);
        ExecutionVertexID executionVertexId = execution.getVertex().getID();
        this.stopReserveAllocation(executionVertexId);
        this.schedulingStrategy.onExecutionStateChange(executionVertexId, ExecutionState.FINISHED);
    }

    @Override
    protected void onTaskFailed(Execution execution) {
        Preconditions.checkState((execution.getState() == ExecutionState.FAILED ? 1 : 0) != 0);
        Preconditions.checkState((boolean)execution.getFailureInfo().isPresent());
        Throwable error = execution.getFailureInfo().get().getException().deserializeError(this.userCodeLoader);
        this.handleTaskFailure(execution, this.maybeTranslateToClusterDatasetException(error, execution.getVertex().getID()));
    }

    protected void handleTaskFailure(Execution failedExecution, @Nullable Throwable error) {
        this.maybeRestartTasks(this.recordTaskFailure(failedExecution, error));
    }

    protected FailureHandlingResult recordTaskFailure(Execution failedExecution, @Nullable Throwable error) {
        long timestamp = System.currentTimeMillis();
        this.setGlobalFailureCause(error, timestamp);
        this.notifyCoordinatorsAboutTaskFailure(failedExecution, error);
        return this.executionFailureHandler.getFailureHandlingResult(failedExecution, error, timestamp);
    }

    private Throwable maybeTranslateToClusterDatasetException(@Nullable Throwable cause, ExecutionVertexID failedVertex) {
        IntermediateResultPartitionID failedPartitionId;
        if (!(cause instanceof PartitionException)) {
            return cause;
        }
        List<IntermediateDataSetID> intermediateDataSetIdsToConsume = this.getExecutionJobVertex(failedVertex.getJobVertexId()).getJobVertex().getIntermediateDataSetIdsToConsume();
        if (!intermediateDataSetIdsToConsume.contains((failedPartitionId = ((PartitionException)cause).getPartitionId().getPartitionId()).getIntermediateDataSetID())) {
            return cause;
        }
        return new ClusterDatasetCorruptedException(cause, Collections.singletonList(failedPartitionId.getIntermediateDataSetID()));
    }

    protected void notifyCoordinatorsAboutTaskFailure(Execution execution, @Nullable Throwable error) {
        ExecutionJobVertex jobVertex = execution.getVertex().getJobVertex();
        int subtaskIndex = execution.getParallelSubtaskIndex();
        int attemptNumber = execution.getAttemptNumber();
        jobVertex.getOperatorCoordinators().forEach(c -> c.executionAttemptFailed(subtaskIndex, attemptNumber, error));
    }

    @Override
    public void handleGlobalFailure(Throwable error) {
        long timestamp = System.currentTimeMillis();
        this.setGlobalFailureCause(error, timestamp);
        this.log.info("Trying to recover from a global failure.", error);
        FailureHandlingResult failureHandlingResult = this.executionFailureHandler.getGlobalFailureHandlingResult(error, timestamp);
        this.maybeRestartTasks(failureHandlingResult);
    }

    protected void maybeRestartTasks(FailureHandlingResult failureHandlingResult) {
        if (failureHandlingResult.canRestart()) {
            this.restartTasksWithDelay(failureHandlingResult);
        } else {
            this.failJob(failureHandlingResult.getError(), failureHandlingResult.getTimestamp(), failureHandlingResult.getFailureLabels());
        }
    }

    private void restartTasksWithDelay(FailureHandlingResult failureHandlingResult) {
        Set<ExecutionVertexID> verticesToRestart = failureHandlingResult.getVerticesToRestart();
        HashSet<ExecutionVertexVersion> executionVertexVersions = new HashSet<ExecutionVertexVersion>(this.executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
        boolean globalRecovery = failureHandlingResult.isGlobalFailure();
        if (globalRecovery) {
            this.log.info("{} tasks will be restarted to recover from a global failure.", (Object)verticesToRestart.size());
        } else {
            Preconditions.checkArgument((boolean)failureHandlingResult.getFailedExecution().isPresent());
            this.log.info("{} tasks will be restarted to recover the failed task {}.", (Object)verticesToRestart.size(), (Object)failureHandlingResult.getFailedExecution().get().getAttemptId());
        }
        this.addVerticesToRestartPending(verticesToRestart);
        CompletableFuture<?> cancelFuture = this.cancelTasksAsync(verticesToRestart);
        this.archiveFromFailureHandlingResult(this.createFailureHandlingResultSnapshot(failureHandlingResult));
        this.delayExecutor.schedule(() -> FutureUtils.assertNoException((CompletableFuture)cancelFuture.thenRunAsync(() -> this.restartTasks(executionVertexVersions, globalRecovery), (Executor)this.getMainThreadExecutor())), failureHandlingResult.getRestartDelayMS(), TimeUnit.MILLISECONDS);
    }

    protected FailureHandlingResultSnapshot createFailureHandlingResultSnapshot(FailureHandlingResult failureHandlingResult) {
        return FailureHandlingResultSnapshot.create(failureHandlingResult, id -> this.getExecutionVertex((ExecutionVertexID)id).getCurrentExecutions());
    }

    private void addVerticesToRestartPending(Set<ExecutionVertexID> verticesToRestart) {
        this.verticesWaitingForRestart.addAll(verticesToRestart);
        this.transitionExecutionGraphState(JobStatus.RUNNING, JobStatus.RESTARTING);
    }

    private void removeVerticesFromRestartPending(Set<ExecutionVertexID> verticesToRestart) {
        this.verticesWaitingForRestart.removeAll(verticesToRestart);
        if (this.verticesWaitingForRestart.isEmpty()) {
            this.transitionExecutionGraphState(JobStatus.RESTARTING, JobStatus.RUNNING);
        }
    }

    private void restartTasks(Set<ExecutionVertexVersion> executionVertexVersions, boolean isGlobalRecovery) {
        Set<ExecutionVertexID> verticesToRestart = this.executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
        if (verticesToRestart.isEmpty()) {
            return;
        }
        this.removeVerticesFromRestartPending(verticesToRestart);
        this.resetForNewExecutions(verticesToRestart);
        try {
            this.restoreState(verticesToRestart, isGlobalRecovery);
        }
        catch (Throwable t) {
            this.handleGlobalFailure(t);
            return;
        }
        this.schedulingStrategy.restartTasks(verticesToRestart);
    }

    private CompletableFuture<?> cancelTasksAsync(Set<ExecutionVertexID> verticesToRestart) {
        this.cancelAllPendingSlotRequestsForVertices(verticesToRestart);
        List cancelFutures = verticesToRestart.stream().map(this::cancelExecutionVertex).collect(Collectors.toList());
        return FutureUtils.combineAll(cancelFutures);
    }

    private CompletableFuture<?> cancelExecutionVertex(ExecutionVertexID executionVertexId) {
        return FutureUtils.combineAll((Collection)this.getExecutionVertex(executionVertexId).getCurrentExecutions().stream().map(this::cancelExecution).collect(Collectors.toList()));
    }

    protected CompletableFuture<?> cancelExecution(Execution execution) {
        this.notifyCoordinatorOfCancellation(execution);
        return this.executionOperations.cancel(execution);
    }

    private void cancelAllPendingSlotRequestsForVertices(Set<ExecutionVertexID> executionVertices) {
        executionVertices.forEach(this::cancelAllPendingSlotRequestsForVertex);
    }

    protected void cancelAllPendingSlotRequestsForVertex(ExecutionVertexID executionVertexId) {
        this.getExecutionVertex(executionVertexId).getCurrentExecutions().forEach(e -> this.executionSlotAllocator.cancel(e.getAttemptId()));
    }

    private Execution getCurrentExecutionOfVertex(ExecutionVertexID executionVertexId) {
        return this.getExecutionVertex(executionVertexId).getCurrentExecutionAttempt();
    }

    @Override
    public void allocateSlotsAndDeploy(List<ExecutionVertexID> verticesToDeploy) {
        Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex = this.executionVertexVersioner.recordVertexModifications(verticesToDeploy);
        List<Execution> executionsToDeploy = verticesToDeploy.stream().map(this::getCurrentExecutionOfVertex).collect(Collectors.toList());
        this.executionDeployer.allocateSlotsAndDeploy(executionsToDeploy, requiredVersionByVertex);
    }

    private void startReserveAllocation(ExecutionVertexID executionVertexId, AllocationID newAllocation) {
        this.stopReserveAllocation(executionVertexId);
        this.reservedAllocationByExecutionVertex.put(executionVertexId, newAllocation);
        this.reservedAllocationRefCounters.compute(newAllocation, (ignored, oldCount) -> oldCount == null ? 1L : oldCount + 1L);
    }

    private void stopReserveAllocation(ExecutionVertexID executionVertexId) {
        AllocationID priorAllocation = this.reservedAllocationByExecutionVertex.remove(executionVertexId);
        if (priorAllocation != null) {
            this.reservedAllocationRefCounters.compute(priorAllocation, (ignored, oldCount) -> oldCount > 1L ? Long.valueOf(oldCount - 1L) : null);
        }
    }

    private void notifyCoordinatorOfCancellation(Execution execution) {
        ExecutionState currentState = execution.getState();
        if (currentState == ExecutionState.FAILED || currentState == ExecutionState.CANCELING || currentState == ExecutionState.CANCELED) {
            return;
        }
        this.notifyCoordinatorsAboutTaskFailure(execution, null);
    }

    private class DefaultExecutionSlotAllocationContext
    implements ExecutionSlotAllocationContext {
        private DefaultExecutionSlotAllocationContext() {
        }

        @Override
        public ResourceProfile getResourceProfile(ExecutionVertexID executionVertexId) {
            return DefaultScheduler.this.getExecutionVertex(executionVertexId).getResourceProfile();
        }

        @Override
        public Optional<AllocationID> findPriorAllocationId(ExecutionVertexID executionVertexId) {
            return DefaultScheduler.this.getExecutionVertex(executionVertexId).findLastAllocation();
        }

        @Override
        public SchedulingTopology getSchedulingTopology() {
            return DefaultScheduler.this.getSchedulingTopology();
        }

        @Override
        public Set<SlotSharingGroup> getLogicalSlotSharingGroups() {
            return DefaultScheduler.this.getJobGraph().getSlotSharingGroups();
        }

        @Override
        public Set<CoLocationGroup> getCoLocationGroups() {
            return DefaultScheduler.this.getJobGraph().getCoLocationGroups();
        }

        @Override
        public Collection<ConsumedPartitionGroup> getConsumedPartitionGroups(ExecutionVertexID executionVertexId) {
            return DefaultScheduler.this.inputsLocationsRetriever.getConsumedPartitionGroups(executionVertexId);
        }

        @Override
        public Collection<ExecutionVertexID> getProducersOfConsumedPartitionGroup(ConsumedPartitionGroup consumedPartitionGroup) {
            return DefaultScheduler.this.inputsLocationsRetriever.getProducersOfConsumedPartitionGroup(consumedPartitionGroup);
        }

        @Override
        public Optional<CompletableFuture<TaskManagerLocation>> getTaskManagerLocation(ExecutionVertexID executionVertexId) {
            return DefaultScheduler.this.inputsLocationsRetriever.getTaskManagerLocation(executionVertexId);
        }

        @Override
        public Optional<TaskManagerLocation> getStateLocation(ExecutionVertexID executionVertexId) {
            return DefaultScheduler.this.stateLocationRetriever.getStateLocation(executionVertexId);
        }

        @Override
        public Set<AllocationID> getReservedAllocations() {
            return DefaultScheduler.this.reservedAllocationRefCounters.keySet();
        }
    }
}

