/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.instance.BaseTestingActorGateway;
import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import scala.concurrent.ExecutionContext;

public class ExecutionVertexCancelTest {
    @Test
    public void testCancelFromCreated() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid);
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)vertex.getExecutionState());
            vertex.cancel();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            Assert.assertNull((Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELED) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCancelFromScheduled() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid);
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.SCHEDULED);
            Assert.assertEquals((Object)ExecutionState.SCHEDULED, (Object)vertex.getExecutionState());
            vertex.cancel();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            Assert.assertNull((Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELED) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCancelConcurrentlyToDeploying_CallsNotOvertaking() {
        try {
            JobVertexID jid = new JobVertexID();
            TestingUtils.QueuedActionExecutionContext executionContext = TestingUtils.queuedActionExecutionContext();
            TestingUtils.ActionQueue actions = executionContext.actionQueue();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid, executionContext);
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.SCHEDULED);
            Assert.assertEquals((Object)ExecutionState.SCHEDULED, (Object)vertex.getExecutionState());
            CancelSequenceActorGateway actorGateway = new CancelSequenceActorGateway(executionContext, new TaskMessages.TaskOperationResult(execId, true), new TaskMessages.TaskOperationResult(execId, false));
            Instance instance = ExecutionGraphTestUtils.getInstance(actorGateway);
            SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
            vertex.deployToSlot(slot);
            Assert.assertEquals((Object)ExecutionState.DEPLOYING, (Object)vertex.getExecutionState());
            vertex.cancel();
            Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex.getExecutionState());
            actions.triggerNextAction();
            Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex.getExecutionState());
            Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex.getExecutionState());
            actions.triggerNextAction();
            vertex.getCurrentExecutionAttempt().cancelingComplete();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            actions.triggerNextAction();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            Assert.assertTrue((boolean)slot.isReleased());
            Assert.assertNull((Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELED) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCancelConcurrentlyToDeploying_CallsOvertaking() {
        try {
            JobVertexID jid = new JobVertexID();
            TestingUtils.QueuedActionExecutionContext executionContext = TestingUtils.queuedActionExecutionContext();
            TestingUtils.ActionQueue actions = executionContext.actionQueue();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid, executionContext);
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.SCHEDULED);
            Assert.assertEquals((Object)ExecutionState.SCHEDULED, (Object)vertex.getExecutionState());
            CancelSequenceActorGateway actorGateway = new CancelSequenceActorGateway(executionContext, new TaskMessages.TaskOperationResult(execId, false), new TaskMessages.TaskOperationResult(execId, true));
            Instance instance = ExecutionGraphTestUtils.getInstance(actorGateway);
            SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
            vertex.deployToSlot(slot);
            Assert.assertEquals((Object)ExecutionState.DEPLOYING, (Object)vertex.getExecutionState());
            vertex.cancel();
            Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex.getExecutionState());
            Runnable deployAction = actions.popNextAction();
            Runnable cancelAction = actions.popNextAction();
            cancelAction.run();
            actions.triggerNextAction();
            Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex.getExecutionState());
            deployAction.run();
            Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex.getExecutionState());
            vertex.getCurrentExecutionAttempt().cancelingComplete();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            Assert.assertTrue((boolean)slot.isReleased());
            Assert.assertNull((Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELED) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCancelFromRunning() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid, (ExecutionContext)TestingUtils.directExecutionContext());
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
            CancelSequenceActorGateway actorGateway = new CancelSequenceActorGateway((ExecutionContext)TestingUtils.directExecutionContext(), new TaskMessages.TaskOperationResult(execId, true));
            Instance instance = ExecutionGraphTestUtils.getInstance(actorGateway);
            SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.RUNNING);
            ExecutionGraphTestUtils.setVertexResource(vertex, slot);
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex.getExecutionState());
            vertex.cancel();
            vertex.getCurrentExecutionAttempt().cancelingComplete();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            Assert.assertTrue((boolean)slot.isReleased());
            Assert.assertNull((Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELED) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testRepeatedCancelFromRunning() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid, (ExecutionContext)TestingUtils.directExecutionContext());
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
            CancelSequenceActorGateway actorGateway = new CancelSequenceActorGateway((ExecutionContext)TestingUtils.directExecutionContext(), new TaskMessages.TaskOperationResult(execId, true));
            Instance instance = ExecutionGraphTestUtils.getInstance(actorGateway);
            SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.RUNNING);
            ExecutionGraphTestUtils.setVertexResource(vertex, slot);
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex.getExecutionState());
            vertex.cancel();
            Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex.getExecutionState());
            vertex.cancel();
            Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex.getExecutionState());
            vertex.getCurrentExecutionAttempt().cancelingComplete();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            Assert.assertTrue((boolean)slot.isReleased());
            Assert.assertNull((Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELING) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELED) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCancelFromRunningDidNotFindTask() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid, (ExecutionContext)TestingUtils.directExecutionContext());
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId();
            CancelSequenceActorGateway actorGateway = new CancelSequenceActorGateway((ExecutionContext)TestingUtils.directExecutionContext(), new TaskMessages.TaskOperationResult(execId, false));
            Instance instance = ExecutionGraphTestUtils.getInstance(actorGateway);
            SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.RUNNING);
            ExecutionGraphTestUtils.setVertexResource(vertex, slot);
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex.getExecutionState());
            vertex.cancel();
            Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)vertex.getExecutionState());
            Assert.assertNull((Object)vertex.getFailureCause());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELING) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCancelCallFails() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid, (ExecutionContext)TestingUtils.directExecutionContext());
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            CancelSequenceActorGateway gateway = new CancelSequenceActorGateway((ExecutionContext)TestingUtils.directExecutionContext(), new TaskMessages.TaskOperationResult[0]);
            Instance instance = ExecutionGraphTestUtils.getInstance(gateway);
            SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.RUNNING);
            ExecutionGraphTestUtils.setVertexResource(vertex, slot);
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex.getExecutionState());
            vertex.cancel();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            Assert.assertTrue((boolean)slot.isReleased());
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
            Assert.assertTrue((vertex.getStateTimestamp(ExecutionState.CANCELING) > 0L ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSendCancelAndReceiveFail() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid);
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            ExecutionAttemptID execID = vertex.getCurrentExecutionAttempt().getAttemptId();
            CancelSequenceActorGateway gateway = new CancelSequenceActorGateway((ExecutionContext)TestingUtils.defaultExecutionContext(), new TaskMessages.TaskOperationResult(execID, true));
            Instance instance = ExecutionGraphTestUtils.getInstance(gateway);
            SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.RUNNING);
            ExecutionGraphTestUtils.setVertexResource(vertex, slot);
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)vertex.getExecutionState());
            vertex.cancel();
            Assert.assertTrue((vertex.getExecutionState() == ExecutionState.CANCELING || vertex.getExecutionState() == ExecutionState.FAILED ? 1 : 0) != 0);
            vertex.getCurrentExecutionAttempt().markFailed(new Throwable("test"));
            Assert.assertTrue((vertex.getExecutionState() == ExecutionState.CANCELED || vertex.getExecutionState() == ExecutionState.FAILED ? 1 : 0) != 0);
            Assert.assertTrue((boolean)slot.isReleased());
            Assert.assertEquals((long)0L, (long)vertex.getExecutionGraph().getRegisteredExecutions().size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testScheduleOrDeployAfterCancel() {
        try {
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid);
            ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.CANCELED);
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            Scheduler scheduler = (Scheduler)Mockito.mock(Scheduler.class);
            vertex.scheduleForExecution(scheduler, false);
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            try {
                Instance instance = ExecutionGraphTestUtils.getInstance(DummyActorGateway.INSTANCE);
                SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
                vertex.deployToSlot(slot);
                Assert.fail((String)"Method should throw an exception");
            }
            catch (IllegalStateException e) {
                Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testActionsWhileCancelling() {
        try {
            SimpleSlot slot;
            Instance instance;
            ExecutionVertex vertex;
            JobVertexID jid = new JobVertexID();
            ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionVertex(jid);
            try {
                vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
                ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.CANCELING);
                Scheduler scheduler = (Scheduler)Mockito.mock(Scheduler.class);
                vertex.scheduleForExecution(scheduler, false);
            }
            catch (Exception e) {
                Assert.fail((String)"should not throw an exception");
            }
            try {
                vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
                ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.CANCELING);
                instance = ExecutionGraphTestUtils.getInstance(DummyActorGateway.INSTANCE);
                slot = instance.allocateSimpleSlot(new JobID());
                vertex.deployToSlot(slot);
                Assert.fail((String)"Method should throw an exception");
            }
            catch (IllegalStateException vertex2) {
                // empty catch block
            }
            vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            instance = ExecutionGraphTestUtils.getInstance(DummyActorGateway.INSTANCE);
            slot = instance.allocateSimpleSlot(new JobID());
            ExecutionGraphTestUtils.setVertexResource(vertex, slot);
            ExecutionGraphTestUtils.setVertexState(vertex, ExecutionState.CANCELING);
            Exception failureCause = new Exception("test exception");
            vertex.fail((Throwable)failureCause);
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)vertex.getExecutionState());
            Assert.assertTrue((boolean)slot.isReleased());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    public static class CancelSequenceActorGateway
    extends BaseTestingActorGateway {
        private final TaskMessages.TaskOperationResult[] results;
        private int index = -1;

        public CancelSequenceActorGateway(ExecutionContext executionContext, TaskMessages.TaskOperationResult ... result) {
            super(executionContext);
            this.results = result;
        }

        @Override
        public Object handleMessage(Object message) throws Exception {
            Messages.Acknowledge$ result;
            if (message instanceof TaskMessages.SubmitTask) {
                result = Messages.getAcknowledge();
            } else if (message instanceof TaskMessages.CancelTask) {
                ++this.index;
                if (this.index >= this.results.length) {
                    throw new IOException("RPC call failed.");
                }
                result = this.results[this.index];
            } else {
                result = null;
            }
            return result;
        }
    }
}

