/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.savepoint;

import akka.actor.ActorSystem;
import akka.actor.Props;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinatorDeActivator;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV0;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.impl.Promise;

public class SavepointCoordinator
extends CheckpointCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(SavepointCoordinator.class);
    private final SavepointStore savepointStore;
    private final Map<Long, Promise<String>> savepointPromises;
    private volatile String savepointRestorePath;

    public SavepointCoordinator(JobID jobId, long baseInterval, long checkpointTimeout, int numberKeyGroups, ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, ExecutionVertex[] tasksToCommitTo, ClassLoader userClassLoader, CheckpointIDCounter checkpointIDCounter, SavepointStore savepointStore, CheckpointStatsTracker statsTracker, Executor executor) {
        super(jobId, baseInterval, checkpointTimeout, 0L, Integer.MAX_VALUE, numberKeyGroups, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, userClassLoader, checkpointIDCounter, IgnoreCheckpointsStore.INSTANCE, RecoveryMode.STANDALONE, statsTracker, executor);
        this.savepointStore = (SavepointStore)Preconditions.checkNotNull((Object)savepointStore);
        this.savepointPromises = new ConcurrentHashMap<Long, Promise<String>>();
    }

    public String getSavepointRestorePath() {
        return this.savepointRestorePath;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Future<String> triggerSavepoint(long timestamp) throws Exception {
        Promise.DefaultPromise promise;
        block8: {
            promise = new Promise.DefaultPromise();
            try {
                long checkpointId = this.getAndIncrementCheckpointId();
                if (checkpointId == -1L) {
                    throw new IllegalStateException("Failed to get checkpoint Id");
                }
                LOG.info("Triggering savepoint with ID " + checkpointId);
                if (this.savepointPromises.put(checkpointId, (Promise<String>)promise) == null) {
                    boolean success = false;
                    try {
                        success = this.triggerCheckpoint(timestamp, checkpointId);
                        break block8;
                    }
                    finally {
                        if (!success) {
                            this.savepointPromises.remove(checkpointId);
                            promise.failure((Throwable)new Exception("Failed to trigger savepoint"));
                        }
                    }
                }
                throw new IllegalStateException("Duplicate checkpoint ID");
            }
            catch (Throwable t) {
                promise.failure((Throwable)new Exception("Failed to trigger savepoint", t));
            }
        }
        return promise.future();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void restoreSavepoint(Map<JobVertexID, ExecutionJobVertex> tasks, String savepointPath, boolean allowNonRestoredState) throws Exception {
        Preconditions.checkNotNull((Object)savepointPath, (String)"Savepoint path");
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutdown()) {
                throw new IllegalStateException("CheckpointCoordinator is shut down");
            }
            LOG.info("Rolling back to savepoint '{}'.", (Object)savepointPath);
            Savepoint savepoint = this.savepointStore.loadSavepoint(savepointPath);
            for (TaskState taskState : savepoint.getTaskStates()) {
                String msg;
                ExecutionJobVertex executionJobVertex = tasks.get((Object)taskState.getJobVertexID());
                if (executionJobVertex != null) {
                    if (executionJobVertex.getParallelism() != taskState.getParallelism()) {
                        msg = String.format("Failed to rollback to savepoint %s. Parallelism mismatch between savepoint state and new program. Cannot map operator %s with parallelism %d to new program with parallelism %d. You cannot change parallelism of Flink 1.1 programs.", new Object[]{savepoint, taskState.getJobVertexID(), taskState.getParallelism(), executionJobVertex.getParallelism()});
                        throw new IllegalStateException(msg);
                    }
                    List<Set<Integer>> keyGroupPartitions = this.createKeyGroupPartitions(this.numberKeyGroups, executionJobVertex.getParallelism());
                    for (int i = 0; i < executionJobVertex.getTaskVertices().length; ++i) {
                        SubtaskState subtaskState = taskState.getState(i);
                        SerializedValue<StateHandle<?>> state = null;
                        if (subtaskState != null) {
                            state = subtaskState.getState();
                        }
                        Map<Integer, SerializedValue<StateHandle<?>>> kvStateForTaskMap = taskState.getUnwrappedKvStates(keyGroupPartitions.get(i));
                        Execution currentExecutionAttempt = executionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt();
                        currentExecutionAttempt.setInitialState(state, kvStateForTaskMap);
                    }
                    continue;
                }
                if (allowNonRestoredState) {
                    LOG.info("Ignoring checkpoint state for operator {}.", (Object)taskState.getJobVertexID());
                    continue;
                }
                msg = String.format("Failed to rollback to savepoint %s. Cannot map savepoint state for operator %s to the new program, because the operator is not available in the new program. If you want to allow this, you can set the --allowNonRestoredState option on the CLI.", new Object[]{savepointPath, taskState.getJobVertexID()});
                throw new IllegalStateException(msg);
            }
            long nextCheckpointId = savepoint.getCheckpointId() + 1L;
            this.checkpointIdCounter.start();
            this.checkpointIdCounter.setCount(nextCheckpointId);
            LOG.info("Reset the checkpoint ID to {}", (Object)nextCheckpointId);
            if (this.savepointRestorePath == null) {
                this.savepointRestorePath = savepointPath;
            }
        }
    }

    @Override
    protected void onShutdown() {
        for (Promise<String> promise : this.savepointPromises.values()) {
            promise.failure((Throwable)new Exception("Checkpoint coordinator shutdown"));
        }
        this.savepointPromises.clear();
    }

    @Override
    protected void onCancelCheckpoint(long canceledCheckpointId) {
        LOG.info("Cancelling savepoint with checkpoint ID " + canceledCheckpointId);
        Promise<String> promise = this.savepointPromises.remove(canceledCheckpointId);
        if (promise != null) {
            promise.failure((Throwable)new Exception("Savepoint expired before completing"));
        }
    }

    @Override
    protected void onFullyAcknowledgedCheckpoint(CompletedCheckpoint checkpoint) {
        Promise<String> promise = this.savepointPromises.remove(checkpoint.getCheckpointID());
        if (promise == null) {
            LOG.warn("Pending savepoint with ID " + checkpoint.getCheckpointID() + "  has been " + "removed before receiving acknowledgment.");
            return;
        }
        if (promise.isCompleted()) {
            throw new IllegalStateException("Savepoint promise completed");
        }
        try {
            SavepointV0 savepoint = new SavepointV0(checkpoint.getCheckpointID(), checkpoint.getTaskStates().values());
            String path = this.savepointStore.storeSavepoint(savepoint);
            promise.success((Object)path);
        }
        catch (Exception e) {
            LOG.warn("Failed to store savepoint.", (Throwable)e);
            promise.failure((Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID leaderSessionID) {
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutdown()) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            if (this.getJobStatusListener() == null) {
                Props props = Props.create(SavepointCoordinatorDeActivator.class, (Object[])new Object[]{this, leaderSessionID});
                this.setJobStatusListener(new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID));
            }
            return this.getJobStatusListener();
        }
    }

    private static class IgnoreCheckpointsStore
    implements CompletedCheckpointStore {
        private static final CompletedCheckpointStore INSTANCE = new IgnoreCheckpointsStore();

        private IgnoreCheckpointsStore() {
        }

        @Override
        public void recover() throws Exception {
        }

        @Override
        public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
        }

        @Override
        public CompletedCheckpoint getLatestCheckpoint() throws Exception {
            return null;
        }

        @Override
        public void shutdown() throws Exception {
        }

        @Override
        public void suspend() throws Exception {
        }

        @Override
        public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
            return Collections.emptyList();
        }

        @Override
        public int getNumberOfRetainedCheckpoints() {
            return 0;
        }
    }
}

