/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.FileSystemStateStore;
import org.apache.flink.runtime.checkpoint.HeapStateStore;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.SavepointCoordinator;
import org.apache.flink.runtime.checkpoint.StateForTask;
import org.apache.flink.runtime.checkpoint.StateStore;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class SavepointCoordinatorTest {
    @Test
    public void testSimpleTriggerSavepoint() throws Exception {
        JobID jobId = new JobID();
        long checkpointTimeout = 60000L;
        long timestamp = 1272635L;
        ExecutionVertex[] vertices = new ExecutionVertex[]{SavepointCoordinatorTest.mockExecutionVertex(jobId), SavepointCoordinatorTest.mockExecutionVertex(jobId)};
        MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
        HeapStateStore savepointStore = new HeapStateStore();
        SavepointCoordinator coordinator = SavepointCoordinatorTest.createSavepointCoordinator(jobId, checkpointTimeout, vertices, vertices, vertices, checkpointIdCounter, (StateStore<CompletedCheckpoint>)savepointStore);
        Future savepointPathFuture = coordinator.triggerSavepoint(timestamp);
        Assert.assertFalse((boolean)savepointPathFuture.isCompleted());
        long checkpointId = checkpointIdCounter.getLastReturnedCount();
        Assert.assertEquals((long)0L, (long)checkpointId);
        for (ExecutionVertex vertex : vertices) {
            SavepointCoordinatorTest.verifyTriggerCheckpoint(vertex, checkpointId, timestamp);
        }
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint)coordinator.getPendingCheckpoints().get(checkpointId);
        SavepointCoordinatorTest.verifyPendingCheckpoint(pendingCheckpoint, jobId, checkpointId, timestamp, 0, 2, 0, false, false);
        for (ExecutionVertex vertex : vertices) {
            coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, vertex.getCurrentExecutionAttempt().getAttemptId(), checkpointId, SavepointCoordinatorTest.createSerializedStateHandle(vertex), 0L));
        }
        Assert.assertTrue((boolean)pendingCheckpoint.isDiscarded());
        Assert.assertEquals((long)0L, (long)coordinator.getSuccessfulCheckpoints().size());
        for (ExecutionVertex vertex : vertices) {
            SavepointCoordinatorTest.verifyNotifyCheckpointComplete(vertex, checkpointId, timestamp);
        }
        Assert.assertTrue((boolean)savepointPathFuture.isCompleted());
        String savepointPath = (String)Await.result((Awaitable)savepointPathFuture, (Duration)FiniteDuration.Zero());
        CompletedCheckpoint savepoint = (CompletedCheckpoint)savepointStore.getState(savepointPath);
        SavepointCoordinatorTest.verifySavepoint(savepoint, jobId, checkpointId, timestamp, vertices);
        Assert.assertEquals((long)0L, (long)this.getSavepointPromises(coordinator).size());
        coordinator.shutdown();
    }

    @Test
    public void testSimpleRollbackSavepoint() throws Exception {
        JobID jobId = new JobID();
        ExecutionJobVertex[] jobVertices = new ExecutionJobVertex[]{SavepointCoordinatorTest.mockExecutionJobVertex(jobId, new JobVertexID(), 4), SavepointCoordinatorTest.mockExecutionJobVertex(jobId, new JobVertexID(), 4)};
        ExecutionVertex[] triggerVertices = jobVertices[0].getTaskVertices();
        ExecutionVertex[] ackVertices = new ExecutionVertex[8];
        int i = 0;
        for (ExecutionJobVertex jobVertex : jobVertices) {
            for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
                ackVertices[i++] = vertex;
            }
        }
        MockCheckpointIdCounter idCounter = new MockCheckpointIdCounter();
        HeapStateStore savepointStore = new HeapStateStore();
        SavepointCoordinator coordinator = SavepointCoordinatorTest.createSavepointCoordinator(jobId, 60000L, triggerVertices, ackVertices, new ExecutionVertex[0], idCounter, (StateStore<CompletedCheckpoint>)savepointStore);
        Future savepointPathFuture = coordinator.triggerSavepoint(1231273123L);
        for (ExecutionVertex vertex : ackVertices) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptId, 0L, SavepointCoordinatorTest.createSerializedStateHandle(vertex), 0L));
        }
        String savepointPath = (String)Await.result((Awaitable)savepointPathFuture, (Duration)FiniteDuration.Zero());
        Assert.assertNotNull((Object)savepointPath);
        coordinator.restoreSavepoint(SavepointCoordinatorTest.createExecutionJobVertexMap(jobVertices), savepointPath);
        for (ExecutionVertex vertex : ackVertices) {
            ((Execution)Mockito.verify((Object)vertex.getCurrentExecutionAttempt(), (VerificationMode)Mockito.times((int)1))).setInitialState((SerializedValue)Matchers.any(SerializedValue.class), Matchers.anyLong());
        }
        Assert.assertEquals((long)0L, (long)this.getSavepointPromises(coordinator).size());
        Assert.assertTrue((boolean)idCounter.isStarted());
        coordinator.shutdown();
    }

    @Test
    public void testRollbackParallelismMismatch() throws Exception {
        JobID jobId = new JobID();
        ExecutionJobVertex[] jobVertices = new ExecutionJobVertex[]{SavepointCoordinatorTest.mockExecutionJobVertex(jobId, new JobVertexID(), 4), SavepointCoordinatorTest.mockExecutionJobVertex(jobId, new JobVertexID(), 4)};
        ExecutionVertex[] triggerVertices = jobVertices[0].getTaskVertices();
        ExecutionVertex[] ackVertices = new ExecutionVertex[8];
        int index = 0;
        for (ExecutionJobVertex jobVertex : jobVertices) {
            for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
                ackVertices[index++] = vertex;
            }
        }
        HeapStateStore savepointStore = new HeapStateStore();
        SavepointCoordinator coordinator = SavepointCoordinatorTest.createSavepointCoordinator(jobId, 60000L, triggerVertices, ackVertices, new ExecutionVertex[0], new MockCheckpointIdCounter(), (StateStore<CompletedCheckpoint>)savepointStore);
        Future savepointPathFuture = coordinator.triggerSavepoint(1231273123L);
        for (ExecutionVertex vertex : ackVertices) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptId, 0L, SavepointCoordinatorTest.createSerializedStateHandle(vertex), 0L));
        }
        String savepointPath = (String)Await.result((Awaitable)savepointPathFuture, (Duration)FiniteDuration.Zero());
        Assert.assertNotNull((Object)savepointPath);
        for (int i = 0; i < jobVertices.length; ++i) {
            jobVertices[i] = SavepointCoordinatorTest.mockExecutionJobVertex(jobId, jobVertices[i].getJobVertexId(), 2);
        }
        try {
            coordinator.restoreSavepoint(SavepointCoordinatorTest.createExecutionJobVertexMap(jobVertices), savepointPath);
            Assert.fail((String)"Did not throw expected Exception after rollback with parallelism mismatch.");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertEquals((long)0L, (long)this.getSavepointPromises(coordinator).size());
        coordinator.shutdown();
    }

    @Test
    public void testRollbackStateStoreFailure() throws Exception {
        JobID jobId = new JobID();
        ExecutionJobVertex jobVertex = SavepointCoordinatorTest.mockExecutionJobVertex(jobId, new JobVertexID(), 4);
        HeapStateStore savepointStore = (HeapStateStore)Mockito.spy((Object)new HeapStateStore());
        SavepointCoordinator coordinator = SavepointCoordinatorTest.createSavepointCoordinator(jobId, 60000L, jobVertex.getTaskVertices(), jobVertex.getTaskVertices(), new ExecutionVertex[0], new MockCheckpointIdCounter(), (StateStore<CompletedCheckpoint>)savepointStore);
        Future savepointPathFuture = coordinator.triggerSavepoint(1231273123L);
        for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptId, 0L, SavepointCoordinatorTest.createSerializedStateHandle(vertex), 0L));
        }
        String savepointPath = (String)Await.result((Awaitable)savepointPathFuture, (Duration)FiniteDuration.Zero());
        Assert.assertNotNull((Object)savepointPath);
        ((HeapStateStore)Mockito.doThrow((Throwable)new Exception("TestException")).when((Object)savepointStore)).getState(Matchers.anyString());
        try {
            coordinator.restoreSavepoint(SavepointCoordinatorTest.createExecutionJobVertexMap(jobVertex), savepointPath);
            Assert.fail((String)"Did not throw expected Exception after rollback with savepoint store failure.");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertEquals((long)0L, (long)this.getSavepointPromises(coordinator).size());
        coordinator.shutdown();
    }

    @Test
    public void testRollbackSetsCheckpointID() throws Exception {
        CompletedCheckpoint savepoint = (CompletedCheckpoint)Mockito.mock(CompletedCheckpoint.class);
        Mockito.when((Object)savepoint.getStates()).thenReturn(Collections.emptyList());
        Mockito.when((Object)savepoint.getCheckpointID()).thenReturn((Object)12312312L);
        CheckpointIDCounter checkpointIdCounter = (CheckpointIDCounter)Mockito.mock(CheckpointIDCounter.class);
        StateStore savepointStore = (StateStore)Mockito.mock(StateStore.class);
        Mockito.when((Object)savepointStore.getState(Matchers.anyString())).thenReturn((Object)savepoint);
        SavepointCoordinator coordinator = SavepointCoordinatorTest.createSavepointCoordinator(new JobID(), 60000L, new ExecutionVertex[0], new ExecutionVertex[0], new ExecutionVertex[0], checkpointIdCounter, (StateStore<CompletedCheckpoint>)savepointStore);
        coordinator.restoreSavepoint(SavepointCoordinatorTest.createExecutionJobVertexMap(new ExecutionJobVertex[0]), "any");
        ((CheckpointIDCounter)Mockito.verify((Object)checkpointIdCounter)).setCount(Matchers.eq((long)12312313L));
        coordinator.shutdown();
    }

    @Test
    public void testAbortSavepointIfTriggerTasksNotExecuted() throws Exception {
        JobID jobId = new JobID();
        ExecutionVertex[] triggerVertices = new ExecutionVertex[]{(ExecutionVertex)Mockito.mock(ExecutionVertex.class), (ExecutionVertex)Mockito.mock(ExecutionVertex.class)};
        ExecutionVertex[] ackVertices = new ExecutionVertex[]{SavepointCoordinatorTest.mockExecutionVertex(jobId), SavepointCoordinatorTest.mockExecutionVertex(jobId)};
        SavepointCoordinator coordinator = SavepointCoordinatorTest.createSavepointCoordinator(jobId, 60000L, triggerVertices, ackVertices, new ExecutionVertex[0], new MockCheckpointIdCounter(), (StateStore<CompletedCheckpoint>)new HeapStateStore());
        Future savepointPathFuture = coordinator.triggerSavepoint(1238123L);
        Assert.assertTrue((boolean)savepointPathFuture.isCompleted());
        try {
            Await.result((Awaitable)savepointPathFuture, (Duration)FiniteDuration.Zero());
            Assert.fail((String)"Did not throw expected Exception after shutdown");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertEquals((long)0L, (long)this.getSavepointPromises(coordinator).size());
        coordinator.shutdown();
    }

    @Test
    public void testAbortSavepointIfTriggerTasksAreFinished() throws Exception {
        JobID jobId = new JobID();
        ExecutionVertex[] triggerVertices = new ExecutionVertex[]{SavepointCoordinatorTest.mockExecutionVertex(jobId), SavepointCoordinatorTest.mockExecutionVertex(jobId, ExecutionState.FINISHED)};
        ExecutionVertex[] ackVertices = new ExecutionVertex[]{SavepointCoordinatorTest.mockExecutionVertex(jobId), SavepointCoordinatorTest.mockExecutionVertex(jobId)};
        SavepointCoordinator coordinator = SavepointCoordinatorTest.createSavepointCoordinator(jobId, 60000L, triggerVertices, ackVertices, new ExecutionVertex[0], new MockCheckpointIdCounter(), (StateStore<CompletedCheckpoint>)new HeapStateStore());
        Future savepointPathFuture = coordinator.triggerSavepoint(1238123L);
        Assert.assertTrue((boolean)savepointPathFuture.isCompleted());
        try {
            Await.result((Awaitable)savepointPathFuture, (Duration)FiniteDuration.Zero());
            Assert.fail((String)"Did not throw expected Exception after shutdown");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertEquals((long)0L, (long)this.getSavepointPromises(coordinator).size());
        coordinator.shutdown();
    }

    @Test
    public void testAbortSavepointIfAckTasksAreNotExecuted() throws Exception {
        JobID jobId = new JobID();
        ExecutionVertex[] triggerVertices = new ExecutionVertex[]{SavepointCoordinatorTest.mockExecutionVertex(jobId), SavepointCoordinatorTest.mockExecutionVertex(jobId)};
        ExecutionVertex[] ackVertices = new ExecutionVertex[]{(ExecutionVertex)Mockito.mock(ExecutionVertex.class), (ExecutionVertex)Mockito.mock(ExecutionVertex.class)};
        SavepointCoordinator coordinator = SavepointCoordinatorTest.createSavepointCoordinator(jobId, 60000L, triggerVertices, ackVertices, new ExecutionVertex[0], new MockCheckpointIdCounter(), (StateStore<CompletedCheckpoint>)new HeapStateStore());
        Future savepointPathFuture = coordinator.triggerSavepoint(1238123L);
        Assert.assertTrue((boolean)savepointPathFuture.isCompleted());
        try {
            Await.result((Awaitable)savepointPathFuture, (Duration)FiniteDuration.Zero());
            Assert.fail((String)"Did not throw expected Exception after shutdown");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertEquals((long)0L, (long)this.getSavepointPromises(coordinator).size());
        coordinator.shutdown();
    }

    @Test
    public void testAbortOnCheckpointTimeout() throws Exception {
        JobID jobId = new JobID();
        ExecutionVertex[] vertices = new ExecutionVertex[]{SavepointCoordinatorTest.mockExecutionVertex(jobId), SavepointCoordinatorTest.mockExecutionVertex(jobId)};
        ExecutionVertex commitVertex = SavepointCoordinatorTest.mockExecutionVertex(jobId);
        MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
        SavepointCoordinator coordinator = SavepointCoordinatorTest.createSavepointCoordinator(jobId, 20L, vertices, vertices, new ExecutionVertex[]{commitVertex}, checkpointIdCounter, (StateStore<CompletedCheckpoint>)new HeapStateStore());
        Future savepointPathFuture = coordinator.triggerSavepoint(12731273L);
        Assert.assertFalse((boolean)savepointPathFuture.isCompleted());
        long checkpointId = checkpointIdCounter.getLastReturnedCount();
        coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, vertices[0].getCurrentExecutionAttempt().getAttemptId(), checkpointId, SavepointCoordinatorTest.createSerializedStateHandle(vertices[0]), 0L));
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint)coordinator.getPendingCheckpoints().get(checkpointId);
        Assert.assertFalse((boolean)pendingCheckpoint.isDiscarded());
        Deadline deadline = FiniteDuration.apply((long)5L, (String)"s").fromNow();
        while (deadline.hasTimeLeft() && !pendingCheckpoint.isDiscarded() && coordinator.getNumberOfPendingCheckpoints() > 0) {
            Thread.sleep(250L);
        }
        Assert.assertTrue((boolean)pendingCheckpoint.isDiscarded());
        Assert.assertEquals((long)0L, (long)coordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals((long)0L, (long)coordinator.getNumberOfRetainedSuccessfulCheckpoints());
        ((ExecutionVertex)Mockito.verify((Object)commitVertex, (VerificationMode)Mockito.times((int)0))).sendMessageToCurrentExecution((Serializable)Matchers.any(NotifyCheckpointComplete.class), (ExecutionAttemptID)Matchers.any(ExecutionAttemptID.class));
        Assert.assertTrue((boolean)savepointPathFuture.isCompleted());
        try {
            Await.result((Awaitable)savepointPathFuture, (Duration)FiniteDuration.Zero());
            Assert.fail((String)"Did not throw expected Exception after timeout");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertEquals((long)0L, (long)this.getSavepointPromises(coordinator).size());
        coordinator.shutdown();
    }

    @Test
    public void testAbortSavepointsOnShutdown() throws Exception {
        JobID jobId = new JobID();
        ExecutionVertex[] vertices = new ExecutionVertex[]{SavepointCoordinatorTest.mockExecutionVertex(jobId), SavepointCoordinatorTest.mockExecutionVertex(jobId)};
        SavepointCoordinator coordinator = SavepointCoordinatorTest.createSavepointCoordinator(jobId, 60000L, vertices, vertices, vertices, new MockCheckpointIdCounter(), (StateStore<CompletedCheckpoint>)new HeapStateStore());
        ArrayList<Future> savepointPathFutures = new ArrayList<Future>();
        savepointPathFutures.add(coordinator.triggerSavepoint(12731273L));
        savepointPathFutures.add(coordinator.triggerSavepoint(12731396L));
        for (Future future : savepointPathFutures) {
            Assert.assertFalse((boolean)future.isCompleted());
        }
        coordinator.shutdown();
        for (Future future : savepointPathFutures) {
            Assert.assertTrue((boolean)future.isCompleted());
            try {
                Await.result((Awaitable)future, (Duration)FiniteDuration.Zero());
                Assert.fail((String)"Did not throw expected Exception after shutdown");
            }
            catch (Exception exception) {}
        }
        Assert.assertEquals((long)0L, (long)this.getSavepointPromises(coordinator).size());
    }

    @Test
    public void testAbortSavepointOnStateStoreFailure() throws Exception {
        JobID jobId = new JobID();
        ExecutionJobVertex jobVertex = SavepointCoordinatorTest.mockExecutionJobVertex(jobId, new JobVertexID(), 4);
        HeapStateStore savepointStore = (HeapStateStore)Mockito.spy((Object)new HeapStateStore());
        SavepointCoordinator coordinator = SavepointCoordinatorTest.createSavepointCoordinator(jobId, 60000L, jobVertex.getTaskVertices(), jobVertex.getTaskVertices(), new ExecutionVertex[0], new MockCheckpointIdCounter(), (StateStore<CompletedCheckpoint>)savepointStore);
        ((HeapStateStore)Mockito.doThrow((Throwable)new Exception("TestException")).when((Object)savepointStore)).putState((Serializable)Matchers.any(CompletedCheckpoint.class));
        Future savepointPathFuture = coordinator.triggerSavepoint(1231273123L);
        for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptId, 0L, SavepointCoordinatorTest.createSerializedStateHandle(vertex), 0L));
        }
        try {
            Await.result((Awaitable)savepointPathFuture, (Duration)FiniteDuration.Zero());
            Assert.fail((String)"Did not throw expected Exception after rollback with savepoint store failure.");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertEquals((long)0L, (long)this.getSavepointPromises(coordinator).size());
        coordinator.shutdown();
    }

    @Test
    public void testAbortSavepointIfSubsumed() throws Exception {
        JobID jobId = new JobID();
        long checkpointTimeout = 60000L;
        long[] timestamps = new long[]{1272635L, 1272645L};
        long[] checkpointIds = new long[2];
        ExecutionVertex[] vertices = new ExecutionVertex[]{SavepointCoordinatorTest.mockExecutionVertex(jobId), SavepointCoordinatorTest.mockExecutionVertex(jobId)};
        MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
        HeapStateStore savepointStore = new HeapStateStore();
        SavepointCoordinator coordinator = SavepointCoordinatorTest.createSavepointCoordinator(jobId, checkpointTimeout, vertices, vertices, vertices, checkpointIdCounter, (StateStore<CompletedCheckpoint>)savepointStore);
        ArrayList<Future> savepointPathFutures = new ArrayList<Future>();
        savepointPathFutures.add(coordinator.triggerSavepoint(timestamps[0]));
        checkpointIds[0] = checkpointIdCounter.getLastReturnedCount();
        savepointPathFutures.add(coordinator.triggerSavepoint(timestamps[1]));
        checkpointIds[1] = checkpointIdCounter.getLastReturnedCount();
        for (Future future : savepointPathFutures) {
            Assert.assertFalse((boolean)future.isCompleted());
        }
        for (ExecutionVertex vertex : vertices) {
            SavepointCoordinatorTest.verifyTriggerCheckpoint(vertex, checkpointIds[0], timestamps[0]);
            SavepointCoordinatorTest.verifyTriggerCheckpoint(vertex, checkpointIds[1], timestamps[1]);
        }
        PendingCheckpoint[] pendingCheckpoints = new PendingCheckpoint[]{(PendingCheckpoint)coordinator.getPendingCheckpoints().get(checkpointIds[0]), (PendingCheckpoint)coordinator.getPendingCheckpoints().get(checkpointIds[1])};
        SavepointCoordinatorTest.verifyPendingCheckpoint(pendingCheckpoints[0], jobId, checkpointIds[0], timestamps[0], 0, 2, 0, false, false);
        SavepointCoordinatorTest.verifyPendingCheckpoint(pendingCheckpoints[1], jobId, checkpointIds[1], timestamps[1], 0, 2, 0, false, false);
        for (ExecutionVertex vertex : vertices) {
            coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, vertex.getCurrentExecutionAttempt().getAttemptId(), checkpointIds[1], SavepointCoordinatorTest.createSerializedStateHandle(vertex), 0L));
        }
        coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, vertices[0].getCurrentExecutionAttempt().getAttemptId(), checkpointIds[0], SavepointCoordinatorTest.createSerializedStateHandle(vertices[0]), 0L));
        Assert.assertTrue((boolean)pendingCheckpoints[0].isDiscarded());
        Assert.assertTrue((boolean)pendingCheckpoints[1].isDiscarded());
        Assert.assertEquals((long)0L, (long)coordinator.getSuccessfulCheckpoints().size());
        for (ExecutionVertex vertex : vertices) {
            SavepointCoordinatorTest.verifyNotifyCheckpointComplete(vertex, checkpointIds[1], timestamps[1]);
        }
        CompletedCheckpoint[] savepoints = new CompletedCheckpoint[2];
        String[] savepointPaths = new String[2];
        Assert.assertTrue((boolean)((Future)savepointPathFutures.get(0)).isCompleted());
        try {
            savepointPaths[0] = (String)Await.result((Awaitable)((Awaitable)savepointPathFutures.get(0)), (Duration)FiniteDuration.Zero());
            Assert.fail((String)"Did not throw expected exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertTrue((boolean)((Future)savepointPathFutures.get(1)).isCompleted());
        savepointPaths[1] = (String)Await.result((Awaitable)((Awaitable)savepointPathFutures.get(1)), (Duration)FiniteDuration.Zero());
        savepoints[1] = (CompletedCheckpoint)savepointStore.getState(savepointPaths[1]);
        SavepointCoordinatorTest.verifySavepoint(savepoints[1], jobId, checkpointIds[1], timestamps[1], vertices);
        Assert.assertEquals((long)0L, (long)this.getSavepointPromises(coordinator).size());
        coordinator.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testShutdownDoesNotCleanUpCompletedCheckpointsWithFileSystemStore() throws Exception {
        JobID jobId = new JobID();
        long checkpointTimeout = 60000L;
        long timestamp = 1272635L;
        ExecutionVertex[] vertices = new ExecutionVertex[]{SavepointCoordinatorTest.mockExecutionVertex(jobId), SavepointCoordinatorTest.mockExecutionVertex(jobId)};
        MockCheckpointIdCounter checkpointIdCounter = new MockCheckpointIdCounter();
        File tmpDir = CommonTestUtils.createTempDirectory();
        try {
            FileSystemStateStore savepointStore = new FileSystemStateStore(tmpDir.toURI().toString(), "sp-");
            SavepointCoordinator coordinator = SavepointCoordinatorTest.createSavepointCoordinator(jobId, checkpointTimeout, vertices, vertices, vertices, checkpointIdCounter, (StateStore<CompletedCheckpoint>)savepointStore);
            Future savepointPathFuture = coordinator.triggerSavepoint(timestamp);
            Assert.assertFalse((boolean)savepointPathFuture.isCompleted());
            long checkpointId = checkpointIdCounter.getLastReturnedCount();
            Assert.assertEquals((long)0L, (long)checkpointId);
            for (ExecutionVertex vertex : vertices) {
                SavepointCoordinatorTest.verifyTriggerCheckpoint(vertex, checkpointId, timestamp);
            }
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint)coordinator.getPendingCheckpoints().get(checkpointId);
            SavepointCoordinatorTest.verifyPendingCheckpoint(pendingCheckpoint, jobId, checkpointId, timestamp, 0, 2, 0, false, false);
            for (ExecutionVertex vertex : vertices) {
                coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, vertex.getCurrentExecutionAttempt().getAttemptId(), checkpointId, SavepointCoordinatorTest.createSerializedStateHandle(vertex), 0L));
            }
            Assert.assertTrue((boolean)pendingCheckpoint.isDiscarded());
            Assert.assertEquals((long)0L, (long)coordinator.getSuccessfulCheckpoints().size());
            for (ExecutionVertex vertex : vertices) {
                SavepointCoordinatorTest.verifyNotifyCheckpointComplete(vertex, checkpointId, timestamp);
            }
            Assert.assertTrue((boolean)savepointPathFuture.isCompleted());
            String savepointPath = (String)Await.result((Awaitable)savepointPathFuture, (Duration)FiniteDuration.Zero());
            Assert.assertEquals((long)0L, (long)this.getSavepointPromises(coordinator).size());
            coordinator.shutdown();
            CompletedCheckpoint savepoint = (CompletedCheckpoint)savepointStore.getState(savepointPath);
            SavepointCoordinatorTest.verifySavepoint(savepoint, jobId, checkpointId, timestamp, vertices);
        }
        finally {
            FileUtils.deleteDirectory((File)tmpDir);
        }
    }

    private static SavepointCoordinator createSavepointCoordinator(JobID jobId, long checkpointTimeout, ExecutionVertex[] triggerVertices, ExecutionVertex[] ackVertices, ExecutionVertex[] commitVertices, CheckpointIDCounter checkpointIdCounter, StateStore<CompletedCheckpoint> savepointStore) throws Exception {
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        return new SavepointCoordinator(jobId, checkpointTimeout, checkpointTimeout, triggerVertices, ackVertices, commitVertices, classLoader, checkpointIdCounter, savepointStore, (CheckpointStatsTracker)new DisabledCheckpointStatsTracker());
    }

    private static Map<JobVertexID, ExecutionJobVertex> createExecutionJobVertexMap(ExecutionJobVertex ... jobVertices) {
        HashMap<JobVertexID, ExecutionJobVertex> jobVertexMap = new HashMap<JobVertexID, ExecutionJobVertex>();
        for (ExecutionJobVertex jobVertex : jobVertices) {
            jobVertexMap.put(jobVertex.getJobVertexId(), jobVertex);
        }
        return jobVertexMap;
    }

    private static SerializedValue<StateHandle<?>> createSerializedStateHandle(ExecutionVertex vertex) throws IOException {
        return new SerializedValue((Object)new LocalStateHandle((Serializable)vertex.getCurrentExecutionAttempt().getAttemptId()));
    }

    private Map<Long, Promise<String>> getSavepointPromises(SavepointCoordinator coordinator) throws NoSuchFieldException, IllegalAccessException {
        Field field = SavepointCoordinator.class.getDeclaredField("savepointPromises");
        field.setAccessible(true);
        return (Map)field.get(coordinator);
    }

    private static void verifyTriggerCheckpoint(ExecutionVertex mockExecutionVertex, long expectedCheckpointId, long expectedTimestamp) {
        ExecutionAttemptID attemptId = mockExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        TriggerCheckpoint expectedMsg = new TriggerCheckpoint(mockExecutionVertex.getJobId(), attemptId, expectedCheckpointId, expectedTimestamp);
        ((ExecutionVertex)Mockito.verify((Object)mockExecutionVertex)).sendMessageToCurrentExecution((Serializable)Matchers.eq((Object)expectedMsg), (ExecutionAttemptID)Matchers.eq((Object)attemptId));
    }

    private static void verifyNotifyCheckpointComplete(ExecutionVertex mockExecutionVertex, long expectedCheckpointId, long expectedTimestamp) {
        ExecutionAttemptID attemptId = mockExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        NotifyCheckpointComplete expectedMsg = new NotifyCheckpointComplete(mockExecutionVertex.getJobId(), attemptId, expectedCheckpointId, expectedTimestamp);
        ((ExecutionVertex)Mockito.verify((Object)mockExecutionVertex)).sendMessageToCurrentExecution((Serializable)Matchers.eq((Object)expectedMsg), (ExecutionAttemptID)Matchers.eq((Object)attemptId));
    }

    private static void verifyPendingCheckpoint(PendingCheckpoint checkpoint, JobID expectedJobId, long expectedCheckpointId, long expectedTimestamp, int expectedNumberOfAcknowledgedTasks, int expectedNumberOfNonAcknowledgedTasks, int expectedNumberOfCollectedStates, boolean expectedIsDiscarded, boolean expectedIsFullyAcknowledged) {
        Assert.assertNotNull((Object)checkpoint);
        Assert.assertEquals((Object)expectedJobId, (Object)checkpoint.getJobId());
        Assert.assertEquals((long)expectedCheckpointId, (long)checkpoint.getCheckpointId());
        Assert.assertEquals((long)expectedTimestamp, (long)checkpoint.getCheckpointTimestamp());
        Assert.assertEquals((long)expectedNumberOfAcknowledgedTasks, (long)checkpoint.getNumberOfAcknowledgedTasks());
        Assert.assertEquals((long)expectedNumberOfNonAcknowledgedTasks, (long)checkpoint.getNumberOfNonAcknowledgedTasks());
        Assert.assertEquals((long)expectedNumberOfCollectedStates, (long)checkpoint.getCollectedStates().size());
        Assert.assertEquals((Object)expectedIsDiscarded, (Object)checkpoint.isDiscarded());
        Assert.assertEquals((Object)expectedIsFullyAcknowledged, (Object)checkpoint.isFullyAcknowledged());
    }

    private static void verifySavepoint(CompletedCheckpoint savepoint, JobID expectedJobId, long expectedCheckpointId, long expectedTimestamp, ExecutionVertex[] expectedVertices) throws Exception {
        SavepointCoordinatorTest.verifyCompletedCheckpoint(savepoint, expectedJobId, expectedCheckpointId, expectedTimestamp, expectedVertices);
    }

    private static void verifyCompletedCheckpoint(CompletedCheckpoint checkpoint, JobID expectedJobId, long expectedCheckpointId, long expectedTimestamp, ExecutionVertex[] expectedVertices) throws Exception {
        Assert.assertNotNull((Object)checkpoint);
        Assert.assertEquals((Object)expectedJobId, (Object)checkpoint.getJobId());
        Assert.assertEquals((long)expectedCheckpointId, (long)checkpoint.getCheckpointID());
        Assert.assertEquals((long)expectedTimestamp, (long)checkpoint.getTimestamp());
        List states = checkpoint.getStates();
        Assert.assertEquals((long)expectedVertices.length, (long)states.size());
        for (ExecutionVertex vertex : expectedVertices) {
            JobVertexID expectedOperatorId = vertex.getJobvertexId();
            boolean success = false;
            for (StateForTask state : states) {
                if (!state.getOperatorId().equals((Object)expectedOperatorId)) continue;
                ExecutionAttemptID vertexAttemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
                ExecutionAttemptID stateAttemptId = (ExecutionAttemptID)((StateHandle)state.getState().deserializeValue(Thread.currentThread().getContextClassLoader())).getState(Thread.currentThread().getContextClassLoader());
                Assert.assertEquals((Object)vertexAttemptId, (Object)stateAttemptId);
                success = true;
                break;
            }
            Assert.assertTrue((boolean)success);
        }
    }

    private static ExecutionJobVertex mockExecutionJobVertex(JobID jobId, JobVertexID jobVertexId, int parallelism) {
        ExecutionJobVertex jobVertex = (ExecutionJobVertex)Mockito.mock(ExecutionJobVertex.class);
        Mockito.when((Object)jobVertex.getJobId()).thenReturn((Object)jobId);
        Mockito.when((Object)jobVertex.getJobVertexId()).thenReturn((Object)jobVertexId);
        Mockito.when((Object)jobVertex.getParallelism()).thenReturn((Object)parallelism);
        ExecutionVertex[] vertices = new ExecutionVertex[parallelism];
        for (int i = 0; i < vertices.length; ++i) {
            vertices[i] = SavepointCoordinatorTest.mockExecutionVertex(jobId, jobVertexId, i, ExecutionState.RUNNING);
        }
        Mockito.when((Object)jobVertex.getTaskVertices()).thenReturn((Object)vertices);
        return jobVertex;
    }

    private static ExecutionVertex mockExecutionVertex(JobID jobId) {
        return SavepointCoordinatorTest.mockExecutionVertex(jobId, ExecutionState.RUNNING);
    }

    private static ExecutionVertex mockExecutionVertex(JobID jobId, ExecutionState state) {
        return SavepointCoordinatorTest.mockExecutionVertex(jobId, new JobVertexID(), 0, state);
    }

    private static ExecutionVertex mockExecutionVertex(JobID jobId, JobVertexID jobVertexId, int subtaskIndex, ExecutionState executionState) {
        Execution exec = (Execution)Mockito.mock(Execution.class);
        Mockito.when((Object)exec.getAttemptId()).thenReturn((Object)new ExecutionAttemptID());
        Mockito.when((Object)exec.getState()).thenReturn((Object)executionState);
        ExecutionVertex vertex = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
        Mockito.when((Object)vertex.getJobId()).thenReturn((Object)jobId);
        Mockito.when((Object)vertex.getJobvertexId()).thenReturn((Object)jobVertexId);
        Mockito.when((Object)vertex.getParallelSubtaskIndex()).thenReturn((Object)subtaskIndex);
        Mockito.when((Object)vertex.getCurrentExecutionAttempt()).thenReturn((Object)exec);
        return vertex;
    }

    private static class MockCheckpointIdCounter
    implements CheckpointIDCounter {
        private boolean started;
        private long count;
        private long lastReturnedCount;

        private MockCheckpointIdCounter() {
        }

        public void start() throws Exception {
            this.started = true;
        }

        public void stop() throws Exception {
            this.started = false;
        }

        public long getAndIncrement() throws Exception {
            this.lastReturnedCount = this.count;
            return this.count++;
        }

        public void setCount(long newCount) {
            this.count = newCount;
        }

        long getLastReturnedCount() {
            return this.lastReturnedCount;
        }

        public boolean isStarted() {
            return this.started;
        }
    }
}

