/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderelection;

import akka.actor.ActorRef;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.Tasks$BlockingOnceReceiver$;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.leaderelection.LeaderElectionRetrievalTestingCluster;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

public class LeaderChangeJobRecoveryTest
extends TestLogger {
    private static FiniteDuration timeout = FiniteDuration.apply((long)30L, (TimeUnit)TimeUnit.SECONDS);
    private int numTMs = 1;
    private int numSlotsPerTM = 1;
    private int parallelism = this.numTMs * this.numSlotsPerTM;
    private Configuration configuration;
    private LeaderElectionRetrievalTestingCluster cluster = null;
    private JobGraph job = this.createBlockingJob(this.parallelism);

    @Before
    public void before() throws TimeoutException, InterruptedException {
        Tasks$BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
        this.configuration = new Configuration();
        this.configuration.setInteger("local.number-jobmanager", 1);
        this.configuration.setInteger("local.number-taskmanager", this.numTMs);
        this.configuration.setInteger("taskmanager.numberOfTaskSlots", this.numSlotsPerTM);
        this.configuration.setString("restart-strategy", "fixeddelay");
        this.configuration.setInteger("restart-strategy.fixed-delay.attempts", 9999);
        this.configuration.setString("restart-strategy.fixed-delay.delay", "100 milli");
        this.cluster = new LeaderElectionRetrievalTestingCluster(this.configuration, true, false);
        this.cluster.start(false);
        this.cluster.waitForActorsToBeAlive();
    }

    @Test
    public void testNotRestartedWhenLosingLeadership() throws Exception {
        UUID leaderSessionID = UUID.randomUUID();
        this.cluster.grantLeadership(0, leaderSessionID);
        this.cluster.notifyRetrievalListeners(0, leaderSessionID);
        this.cluster.waitForTaskManagersToBeRegistered(timeout);
        this.cluster.submitJobDetached(this.job);
        ActorGateway jm = this.cluster.getLeaderGateway(timeout);
        Future wait = jm.ask((Object)new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(this.job.getJobID()), timeout);
        Await.ready((Awaitable)wait, (Duration)timeout);
        Future futureExecutionGraph = jm.ask((Object)new TestingJobManagerMessages.RequestExecutionGraph(this.job.getJobID()), timeout);
        TestingJobManagerMessages.ResponseExecutionGraph responseExecutionGraph = (TestingJobManagerMessages.ResponseExecutionGraph)Await.result((Awaitable)futureExecutionGraph, (Duration)timeout);
        Assert.assertTrue((boolean)(responseExecutionGraph instanceof TestingJobManagerMessages.ExecutionGraphFound));
        ExecutionGraph executionGraph = ((TestingJobManagerMessages.ExecutionGraphFound)responseExecutionGraph).executionGraph();
        TestActorGateway testActorGateway = new TestActorGateway();
        executionGraph.registerJobStatusListener((ActorGateway)testActorGateway);
        this.cluster.revokeLeadership();
        Future<Boolean> hasReachedTerminalState = testActorGateway.hasReachedTerminalState();
        Assert.assertTrue((String)"The job should have reached a terminal state.", (boolean)((Boolean)Await.result(hasReachedTerminalState, (Duration)timeout)));
    }

    public JobGraph createBlockingJob(int parallelism) {
        Tasks$BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
        JobVertex sender = new JobVertex("sender");
        JobVertex receiver = new JobVertex("receiver");
        sender.setInvokableClass(Tasks.Sender.class);
        receiver.setInvokableClass(Tasks.BlockingOnceReceiver.class);
        sender.setParallelism(parallelism);
        receiver.setParallelism(parallelism);
        receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        sender.setSlotSharingGroup(slotSharingGroup);
        receiver.setSlotSharingGroup(slotSharingGroup);
        ExecutionConfig executionConfig = new ExecutionConfig();
        JobGraph jobGraph = new JobGraph("Blocking test job", new JobVertex[]{sender, receiver});
        jobGraph.setExecutionConfig(executionConfig);
        return jobGraph;
    }

    public static class TestActorGateway
    implements ActorGateway {
        private static final long serialVersionUID = -736146686160538227L;
        private transient Promise<Boolean> terminalState = new Promise.DefaultPromise();

        public Future<Boolean> hasReachedTerminalState() {
            return this.terminalState.future();
        }

        public Future<Object> ask(Object message, FiniteDuration timeout) {
            return null;
        }

        public void tell(Object message) {
            this.tell(message, (ActorGateway)new AkkaActorGateway(ActorRef.noSender(), null));
        }

        public void tell(Object message, ActorGateway sender) {
            ExecutionGraphMessages.JobStatusChanged jobStatusChanged;
            if (message instanceof ExecutionGraphMessages.JobStatusChanged && ((jobStatusChanged = (ExecutionGraphMessages.JobStatusChanged)message).newJobStatus().isGloballyTerminalState() || jobStatusChanged.newJobStatus() == JobStatus.SUSPENDED)) {
                this.terminalState.success((Object)true);
            }
        }

        public void forward(Object message, ActorGateway sender) {
        }

        public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) {
            return null;
        }

        public String path() {
            return null;
        }

        public ActorRef actor() {
            return null;
        }

        public UUID leaderSessionID() {
            return null;
        }
    }
}

