/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class CheckpointStateRestoreTest {
    private static final ClassLoader cl = Thread.currentThread().getContextClassLoader();

    @Test
    public void testSetState() {
        try {
            SerializedValue serializedState = new SerializedValue((Object)new LocalStateHandle((Serializable)new SerializableObject()));
            JobID jid = new JobID();
            JobVertexID statefulId = new JobVertexID();
            JobVertexID statelessId = new JobVertexID();
            Execution statefulExec1 = this.mockExecution();
            Execution statefulExec2 = this.mockExecution();
            Execution statefulExec3 = this.mockExecution();
            Execution statelessExec1 = this.mockExecution();
            Execution statelessExec2 = this.mockExecution();
            ExecutionVertex stateful1 = this.mockExecutionVertex(statefulExec1, statefulId, 0, 3);
            ExecutionVertex stateful2 = this.mockExecutionVertex(statefulExec2, statefulId, 1, 3);
            ExecutionVertex stateful3 = this.mockExecutionVertex(statefulExec3, statefulId, 2, 3);
            ExecutionVertex stateless1 = this.mockExecutionVertex(statelessExec1, statelessId, 0, 2);
            ExecutionVertex stateless2 = this.mockExecutionVertex(statelessExec2, statelessId, 1, 2);
            ExecutionJobVertex stateful = this.mockExecutionJobVertex(statefulId, new ExecutionVertex[]{stateful1, stateful2, stateful3});
            ExecutionJobVertex stateless = this.mockExecutionJobVertex(statelessId, new ExecutionVertex[]{stateless1, stateless2});
            HashMap<JobVertexID, ExecutionJobVertex> map = new HashMap<JobVertexID, ExecutionJobVertex>();
            map.put(statefulId, stateful);
            map.put(statelessId, stateless);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L, 200000L, 42, new ExecutionVertex[]{stateful1, stateful2, stateful3, stateless1, stateless2}, new ExecutionVertex[]{stateful1, stateful2, stateful3, stateless1, stateless2}, new ExecutionVertex[0], cl, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
            long timestamp = 34623786L;
            coord.triggerCheckpoint(34623786L);
            PendingCheckpoint pending = (PendingCheckpoint)coord.getPendingCheckpoints().values().iterator().next();
            long checkpointId = pending.getCheckpointId();
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, serializedState, 0L));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, serializedState, 0L));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, serializedState, 0L));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
            Assert.assertEquals((long)1L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            coord.restoreLatestCheckpointedState(map, true, false);
            ((Execution)Mockito.verify((Object)statefulExec1, (VerificationMode)Mockito.times((int)1))).setInitialState((SerializedValue)Mockito.eq((Object)serializedState), (Map)Mockito.any());
            ((Execution)Mockito.verify((Object)statefulExec2, (VerificationMode)Mockito.times((int)1))).setInitialState((SerializedValue)Mockito.eq((Object)serializedState), (Map)Mockito.any());
            ((Execution)Mockito.verify((Object)statefulExec3, (VerificationMode)Mockito.times((int)1))).setInitialState((SerializedValue)Mockito.eq((Object)serializedState), (Map)Mockito.any());
            ((Execution)Mockito.verify((Object)statelessExec1, (VerificationMode)Mockito.times((int)0))).setInitialState((SerializedValue)Mockito.any(), (Map)Mockito.any());
            ((Execution)Mockito.verify((Object)statelessExec2, (VerificationMode)Mockito.times((int)0))).setInitialState((SerializedValue)Mockito.any(), (Map)Mockito.any());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testStateOnlyPartiallyAvailable() {
        try {
            SerializedValue serializedState = new SerializedValue((Object)new LocalStateHandle((Serializable)new SerializableObject()));
            JobID jid = new JobID();
            JobVertexID statefulId = new JobVertexID();
            JobVertexID statelessId = new JobVertexID();
            Execution statefulExec1 = this.mockExecution();
            Execution statefulExec2 = this.mockExecution();
            Execution statefulExec3 = this.mockExecution();
            Execution statelessExec1 = this.mockExecution();
            Execution statelessExec2 = this.mockExecution();
            ExecutionVertex stateful1 = this.mockExecutionVertex(statefulExec1, statefulId, 0, 3);
            ExecutionVertex stateful2 = this.mockExecutionVertex(statefulExec2, statefulId, 1, 3);
            ExecutionVertex stateful3 = this.mockExecutionVertex(statefulExec3, statefulId, 2, 3);
            ExecutionVertex stateless1 = this.mockExecutionVertex(statelessExec1, statelessId, 0, 2);
            ExecutionVertex stateless2 = this.mockExecutionVertex(statelessExec2, statelessId, 1, 2);
            ExecutionJobVertex stateful = this.mockExecutionJobVertex(statefulId, new ExecutionVertex[]{stateful1, stateful2, stateful3});
            ExecutionJobVertex stateless = this.mockExecutionJobVertex(statelessId, new ExecutionVertex[]{stateless1, stateless2});
            HashMap<JobVertexID, ExecutionJobVertex> map = new HashMap<JobVertexID, ExecutionJobVertex>();
            map.put(statefulId, stateful);
            map.put(statelessId, stateless);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L, 200000L, 42, new ExecutionVertex[]{stateful1, stateful2, stateful3, stateless1, stateless2}, new ExecutionVertex[]{stateful1, stateful2, stateful3, stateless1, stateless2}, new ExecutionVertex[0], cl, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
            long timestamp = 34623786L;
            coord.triggerCheckpoint(34623786L);
            PendingCheckpoint pending = (PendingCheckpoint)coord.getPendingCheckpoints().values().iterator().next();
            long checkpointId = pending.getCheckpointId();
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, serializedState, 0L));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, serializedState, 0L));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
            Assert.assertEquals((long)1L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            try {
                coord.restoreLatestCheckpointedState(map, true, true);
                Assert.fail((String)"this should fail with an exception");
            }
            catch (IllegalStateException illegalStateException) {}
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testNoCheckpointAvailable() {
        try {
            CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 200000L, 200000L, 42, new ExecutionVertex[]{(ExecutionVertex)Mockito.mock(ExecutionVertex.class)}, new ExecutionVertex[]{(ExecutionVertex)Mockito.mock(ExecutionVertex.class)}, new ExecutionVertex[0], cl, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
            try {
                coord.restoreLatestCheckpointedState(new HashMap(), true, false);
                Assert.fail((String)"this should throw an exception");
            }
            catch (IllegalStateException illegalStateException) {}
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private Execution mockExecution() {
        return this.mockExecution(ExecutionState.RUNNING);
    }

    private Execution mockExecution(ExecutionState state) {
        Execution mock = (Execution)Mockito.mock(Execution.class);
        Mockito.when((Object)mock.getAttemptId()).thenReturn((Object)new ExecutionAttemptID());
        Mockito.when((Object)mock.getState()).thenReturn((Object)state);
        return mock;
    }

    private ExecutionVertex mockExecutionVertex(Execution execution, JobVertexID vertexId, int subtask, int parallelism) {
        ExecutionVertex mock = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
        Mockito.when((Object)mock.getJobvertexId()).thenReturn((Object)vertexId);
        Mockito.when((Object)mock.getParallelSubtaskIndex()).thenReturn((Object)subtask);
        Mockito.when((Object)mock.getCurrentExecutionAttempt()).thenReturn((Object)execution);
        Mockito.when((Object)mock.getTotalNumberOfParallelSubtasks()).thenReturn((Object)parallelism);
        return mock;
    }

    private ExecutionJobVertex mockExecutionJobVertex(JobVertexID id, ExecutionVertex[] vertices) {
        ExecutionJobVertex vertex = (ExecutionJobVertex)Mockito.mock(ExecutionJobVertex.class);
        Mockito.when((Object)vertex.getParallelism()).thenReturn((Object)vertices.length);
        Mockito.when((Object)vertex.getJobVertexId()).thenReturn((Object)id);
        Mockito.when((Object)vertex.getTaskVertices()).thenReturn((Object)vertices);
        return vertex;
    }
}

