/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.UnrecoverableException;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

public class ExecutionGraphRestartTest
extends TestLogger {
    private static final int NUM_TASKS = 31;

    @Test
    public void testNoManualRestart() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 31);
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex sender = new JobVertex("Task");
        sender.setInvokableClass(Tasks.NoOpInvokable.class);
        sender.setParallelism(31);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{sender});
        ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), new JobID(), "test job", new Configuration(), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy());
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).fail((Throwable)new Exception("Test Exception"));
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            vertex.getCurrentExecutionAttempt().cancelingComplete();
        }
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
        eg.restart();
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
    }

    @Test
    public void testConstraintsAfterRestart() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 31);
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex groupVertex = new JobVertex("Task1");
        groupVertex.setInvokableClass(Tasks.NoOpInvokable.class);
        groupVertex.setParallelism(31);
        JobVertex groupVertex2 = new JobVertex("Task2");
        groupVertex2.setInvokableClass(Tasks.NoOpInvokable.class);
        groupVertex2.setParallelism(31);
        SlotSharingGroup sharingGroup = new SlotSharingGroup();
        groupVertex.setSlotSharingGroup(sharingGroup);
        groupVertex2.setSlotSharingGroup(sharingGroup);
        groupVertex.setStrictlyCoLocatedWith(groupVertex2);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{groupVertex, groupVertex2});
        ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), new JobID(), "test job", new Configuration(), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new FixedDelayRestartStrategy(1, 0L));
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        this.validateConstraints(eg);
        ExecutionGraphRestartTest.restartAfterFailure(eg, new FiniteDuration(2L, TimeUnit.MINUTES), false);
        this.validateConstraints(eg);
        ExecutionGraphRestartTest.haltExecution(eg);
    }

    private void validateConstraints(ExecutionGraph eg) {
        ExecutionJobVertex[] tasks = eg.getAllVertices().values().toArray(new ExecutionJobVertex[2]);
        for (int i = 0; i < 31; ++i) {
            CoLocationConstraint constr1 = tasks[0].getTaskVertices()[i].getLocationConstraint();
            CoLocationConstraint constr2 = tasks[1].getTaskVertices()[i].getLocationConstraint();
            Assert.assertNotNull((Object)constr1.getSharedSlot());
            Assert.assertTrue((boolean)constr1.isAssigned());
            Assert.assertEquals((Object)constr1, (Object)constr2);
        }
    }

    @Test
    public void testRestartAutomatically() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 31);
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex sender = new JobVertex("Task");
        sender.setInvokableClass(Tasks.NoOpInvokable.class);
        sender.setParallelism(31);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{sender});
        ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), new JobID(), "Test job", new Configuration(), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new FixedDelayRestartStrategy(1, 1000L));
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        ExecutionGraphRestartTest.restartAfterFailure(eg, new FiniteDuration(2L, TimeUnit.MINUTES), true);
    }

    @Test
    public void testCancelWhileRestarting() throws Exception {
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 31);
        scheduler.newInstanceAvailable(instance);
        ExecutionGraph executionGraph = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), new JobID(), "TestJob", new Configuration(), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
        JobVertex jobVertex = new JobVertex("NoOpInvokable");
        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
        jobVertex.setParallelism(31);
        JobGraph jobGraph = new JobGraph("TestJob", new JobVertex[]{jobVertex});
        executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)executionGraph.getState());
        executionGraph.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)executionGraph.getState());
        instance.markDead();
        Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
        while (deadline.hasTimeLeft() && executionGraph.getState() != JobStatus.RESTARTING) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)executionGraph.getState());
        executionGraph.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)executionGraph.getState());
        executionGraph.restart();
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)executionGraph.getState());
    }

    @Test
    public void testCancelWhileFailing() throws Exception {
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 31);
        scheduler.newInstanceAvailable(instance);
        ExecutionGraph executionGraph = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), new JobID(), "TestJob", new Configuration(), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
        executionGraph = (ExecutionGraph)Mockito.spy((Object)executionGraph);
        ((ExecutionGraph)Mockito.doNothing().when((Object)executionGraph)).jobVertexInFinalState();
        JobVertex jobVertex = new JobVertex("NoOpInvokable");
        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
        jobVertex.setParallelism(31);
        JobGraph jobGraph = new JobGraph("TestJob", new JobVertex[]{jobVertex});
        executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)executionGraph.getState());
        executionGraph.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)executionGraph.getState());
        instance.markDead();
        Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
        boolean success = false;
        block0: while (deadline.hasTimeLeft() && !success) {
            success = true;
            for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
                ExecutionState state = vertex.getExecutionState();
                if (state == ExecutionState.FAILED || state == ExecutionState.CANCELED) continue;
                success = false;
                Thread.sleep(100L);
                continue block0;
            }
        }
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)executionGraph.getState());
        executionGraph.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)executionGraph.getState());
        ((ExecutionGraph)Mockito.doCallRealMethod().when((Object)executionGraph)).jobVertexInFinalState();
        executionGraph.jobVertexInFinalState();
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)executionGraph.getState());
    }

    @Test
    public void testNoRestartOnUnrecoverableException() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 31);
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex sender = new JobVertex("Task");
        sender.setInvokableClass(Tasks.NoOpInvokable.class);
        sender.setParallelism(31);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{sender});
        ExecutionGraph eg = (ExecutionGraph)Mockito.spy((Object)new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), new JobID(), "Test job", new Configuration(), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new FixedDelayRestartStrategy(1, 1000L)));
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).fail((Throwable)new UnrecoverableException((Throwable)new Exception("Test Exception")));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)eg.getState());
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            vertex.getCurrentExecutionAttempt().cancelingComplete();
        }
        FiniteDuration timeout = new FiniteDuration(2L, TimeUnit.MINUTES);
        Deadline deadline = timeout.fromNow();
        while (deadline.hasTimeLeft() && eg.getState() != JobStatus.FAILED) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
        ((ExecutionGraph)Mockito.verify((Object)eg, (VerificationMode)Mockito.never())).restart();
        RestartStrategy restartStrategy = eg.getRestartStrategy();
        Assert.assertTrue((boolean)(restartStrategy instanceof FixedDelayRestartStrategy));
        Assert.assertEquals((long)0L, (long)((FixedDelayRestartStrategy)restartStrategy).getCurrentRestartAttempt());
    }

    @Test
    public void testFailingExecutionAfterRestart() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 2);
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex sender = new JobVertex("Task1");
        sender.setInvokableClass(Tasks.NoOpInvokable.class);
        sender.setParallelism(1);
        JobVertex receiver = new JobVertex("Task2");
        receiver.setInvokableClass(Tasks.NoOpInvokable.class);
        receiver.setParallelism(1);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{sender, receiver});
        ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), new JobID(), "test job", new Configuration(), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new FixedDelayRestartStrategy(1, 1000L));
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        Iterator executionVertices = eg.getAllExecutionVertices().iterator();
        Execution finishedExecution = ((ExecutionVertex)executionVertices.next()).getCurrentExecutionAttempt();
        Execution failedExecution = ((ExecutionVertex)executionVertices.next()).getCurrentExecutionAttempt();
        finishedExecution.markFinished();
        failedExecution.fail((Throwable)new Exception("Test Exception"));
        failedExecution.cancelingComplete();
        FiniteDuration timeout = new FiniteDuration(2L, TimeUnit.MINUTES);
        Deadline deadline = timeout.fromNow();
        while (deadline.hasTimeLeft() && eg.getState() != JobStatus.RUNNING) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        deadline = timeout.fromNow();
        boolean success = false;
        block1: while (deadline.hasTimeLeft() && !success) {
            success = true;
            for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
                if (vertex.getCurrentExecutionAttempt().getAssignedResource() != null) continue;
                success = false;
                Thread.sleep(100L);
                continue block1;
            }
        }
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            Assert.assertNotNull((String)"No assigned resource (test instability).", (Object)vertex.getCurrentAssignedResource());
            vertex.getCurrentExecutionAttempt().switchToRunning();
        }
        finishedExecution.fail((Throwable)new Exception("This should have no effect"));
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            vertex.getCurrentExecutionAttempt().markFinished();
        }
        Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)finishedExecution.getState());
        Assert.assertEquals((Object)JobStatus.FINISHED, (Object)eg.getState());
    }

    @Test
    public void testFailExecutionAfterCancel() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 2);
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex vertex = new JobVertex("Test Vertex");
        vertex.setInvokableClass(Tasks.NoOpInvokable.class);
        vertex.setParallelism(1);
        JobGraph jobGraph = new JobGraph("Test Job", new JobVertex[]{vertex});
        jobGraph.setRestartStrategyConfiguration(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)Integer.MAX_VALUE));
        ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), new JobID(), "test job", new Configuration(), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new FixedDelayRestartStrategy(1, 1000000L));
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        eg.cancel();
        for (ExecutionVertex v : eg.getAllExecutionVertices()) {
            v.getCurrentExecutionAttempt().fail((Throwable)new Exception("Test Exception"));
        }
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)eg.getState());
        Execution execution = ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt();
        execution.cancelingComplete();
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)eg.getState());
    }

    @Test
    public void testFailExecutionGraphAfterCancel() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 2);
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex vertex = new JobVertex("Test Vertex");
        vertex.setInvokableClass(Tasks.NoOpInvokable.class);
        vertex.setParallelism(1);
        JobGraph jobGraph = new JobGraph("Test Job", new JobVertex[]{vertex});
        jobGraph.setRestartStrategyConfiguration(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)Integer.MAX_VALUE));
        ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), new JobID(), "test job", new Configuration(), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new FixedDelayRestartStrategy(1, 1000000L));
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        eg.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)eg.getState());
        eg.fail((Throwable)new Exception("Test Exception"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)eg.getState());
        Execution execution = ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt();
        execution.cancelingComplete();
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)eg.getState());
    }

    private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {
        ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).fail((Throwable)new Exception("Test Exception"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)eg.getState());
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            vertex.getCurrentExecutionAttempt().cancelingComplete();
        }
        Deadline deadline = timeout.fromNow();
        while (deadline.hasTimeLeft() && eg.getState() != JobStatus.RUNNING) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        deadline = timeout.fromNow();
        boolean success = false;
        block2: while (deadline.hasTimeLeft() && !success) {
            success = true;
            for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
                if (vertex.getCurrentExecutionAttempt().getAssignedResource() != null) continue;
                success = false;
                Thread.sleep(100L);
                continue block2;
            }
        }
        if (haltAfterRestart) {
            if (deadline.hasTimeLeft()) {
                ExecutionGraphRestartTest.haltExecution(eg);
            } else {
                Assert.fail((String)"Failed to wait until all execution attempts left the state DEPLOYING.");
            }
        }
    }

    private static void haltExecution(ExecutionGraph eg) {
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            vertex.getCurrentExecutionAttempt().markFinished();
        }
        Assert.assertEquals((Object)JobStatus.FINISHED, (Object)eg.getState());
    }
}

