/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.InvalidActorNameException;
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.Option;
import scala.Some;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class TaskManagerRegistrationTest
extends TestLogger {
    private static final Option<String> NONE_STRING = Option.empty();
    private static ActorSystem actorSystem;
    private static Configuration config;
    private static FiniteDuration timeout;

    @BeforeClass
    public static void startActorSystem() {
        config = new Configuration();
        config.setString("akka.ask.timeout", "5 s");
        config.setString("akka.watch.heartbeat.interval", "200 ms");
        config.setString("akka.watch.heartbeat.pause", "2 s");
        config.setDouble("akka.watch.threshold", 2.0);
        actorSystem = AkkaUtils.createLocalActorSystem((Configuration)config);
    }

    @AfterClass
    public static void shutdownActorSystem() {
        if (actorSystem != null) {
            actorSystem.shutdown();
        }
    }

    @Test
    public void testSimpleRegistration() {
        new JavaTestKit(actorSystem){
            {
                block5: {
                    super(x0);
                    ActorGateway jobManager = null;
                    ActorGateway taskManager1 = null;
                    ActorGateway taskManager2 = null;
                    try {
                        jobManager = TestingUtils.createJobManager(actorSystem, config);
                        taskManager1 = TestingUtils.createTaskManager(actorSystem, jobManager, config, true, false);
                        taskManager2 = TestingUtils.createTaskManager(actorSystem, jobManager, config, true, false);
                        Future responseFuture1 = taskManager1.ask((Object)TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), timeout);
                        Future responseFuture2 = taskManager2.ask((Object)TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), timeout);
                        Object response1 = Await.result((Awaitable)responseFuture1, (Duration)timeout);
                        Object response2 = Await.result((Awaitable)responseFuture2, (Duration)timeout);
                        Class<?> confirmClass = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
                        Assert.assertTrue((response1 != null && confirmClass.isAssignableFrom(response1.getClass()) ? 1 : 0) != 0);
                        Assert.assertTrue((response2 != null && confirmClass.isAssignableFrom(response2.getClass()) ? 1 : 0) != 0);
                        Future numTaskManagersFuture = jobManager.ask(JobManagerMessages.getRequestNumberRegisteredTaskManager(), timeout);
                        Integer count = (Integer)Await.result((Awaitable)numTaskManagersFuture, (Duration)timeout);
                        Assert.assertEquals((long)2L, (long)count.intValue());
                        TestingUtils.stopActor(taskManager1);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail((String)e.getMessage());
                        break block5;
                    }
                    finally {
                        TestingUtils.stopActor(taskManager1);
                        TestingUtils.stopActor(taskManager2);
                        TestingUtils.stopActor(jobManager);
                    }
                    TestingUtils.stopActor(taskManager2);
                    TestingUtils.stopActor(jobManager);
                }
            }
        };
    }

    @Test
    public void testDelayedRegistration() {
        new JavaTestKit(actorSystem){
            {
                ActorGateway jobManager = null;
                ActorGateway taskManager = null;
                FiniteDuration delayedTimeout = timeout.$times(3L);
                try {
                    taskManager = TestingUtils.createTaskManager(actorSystem, JobManager.getLocalJobManagerAkkaURL((Option)Option.empty()), new Configuration(), true, false);
                    Thread.sleep(6000L);
                    jobManager = TestingUtils.createJobManager(actorSystem, new Configuration());
                    Future responseFuture = taskManager.ask((Object)TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), delayedTimeout);
                    Object response = Await.result((Awaitable)responseFuture, (Duration)delayedTimeout);
                    Class<?> confirmClass = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
                    Assert.assertTrue((response != null && confirmClass.isAssignableFrom(response.getClass()) ? 1 : 0) != 0);
                }
                catch (Exception e) {
                    try {
                        e.printStackTrace();
                        Assert.fail((String)e.getMessage());
                    }
                    catch (Throwable throwable) {
                        TestingUtils.stopActor(taskManager);
                        TestingUtils.stopActor(jobManager);
                        throw throwable;
                    }
                    TestingUtils.stopActor(taskManager);
                    TestingUtils.stopActor(jobManager);
                }
                TestingUtils.stopActor(taskManager);
                TestingUtils.stopActor(jobManager);
            }
        };
    }

    @Test
    public void testShutdownAfterRegistrationDurationExpired() {
        new JavaTestKit(actorSystem){
            {
                ActorGateway taskManager = null;
                try {
                    Configuration tmConfig = new Configuration();
                    tmConfig.setString("taskmanager.maxRegistrationDuration", "500 ms");
                    taskManager = TestingUtils.createTaskManager(actorSystem, JobManager.getLocalJobManagerAkkaURL((Option)Option.empty()), tmConfig, true, false);
                    this.watch(taskManager.actor());
                    final ActorGateway tm = taskManager;
                    new JavaTestKit.Within(timeout){

                        protected void run() {
                            this.expectTerminated(tm.actor());
                        }
                    };
                }
                catch (Throwable e) {
                    try {
                        e.printStackTrace();
                        Assert.fail((String)e.getMessage());
                    }
                    catch (Throwable throwable) {
                        throw throwable;
                    }
                    finally {
                        TestingUtils.stopActor(taskManager);
                    }
                }
                TestingUtils.stopActor(taskManager);
            }
        };
    }

    @Test
    public void testTaskManagerResumesConnectAfterRefusedRegistration() {
        new JavaTestKit(actorSystem){
            {
                ActorGateway jm = null;
                ActorGateway taskManager = null;
                try {
                    final ActorGateway jmGateway = jm = TestingUtils.createForwardingJobManager(actorSystem, this.getTestActor(), (Option<String>)Option.empty());
                    final ActorGateway taskManagerGateway = taskManager = TestingUtils.createTaskManager(actorSystem, jmGateway, config, true, false);
                    new JavaTestKit.Within(timeout){

                        protected void run() {
                            this.expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                            taskManagerGateway.tell((Object)new RegistrationMessages.RefuseRegistration("test reason"), jmGateway);
                        }
                    };
                    FiniteDuration maxDelay = (FiniteDuration)TaskManager.DELAY_AFTER_REFUSED_REGISTRATION().$times(2.0);
                    new JavaTestKit.Within(maxDelay){

                        protected void run() {
                            this.expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                        }
                    };
                }
                catch (Throwable e) {
                    try {
                        e.printStackTrace();
                        Assert.fail((String)e.getMessage());
                    }
                    catch (Throwable throwable) {
                        throw throwable;
                    }
                    finally {
                        TestingUtils.stopActor(taskManager);
                        TestingUtils.stopActor(jm);
                    }
                }
                TestingUtils.stopActor(taskManager);
                TestingUtils.stopActor(jm);
            }
        };
    }

    @Test
    public void testTaskManagerResumesConnectAfterJobManagerFailure() {
        new JavaTestKit(actorSystem){
            {
                ActorGateway fakeJobManager1Gateway = null;
                ActorGateway fakeJobManager2Gateway = null;
                ActorGateway taskManagerGateway = null;
                String JOB_MANAGER_NAME = "ForwardingJobManager";
                try {
                    final ActorGateway fakeJM1Gateway = fakeJobManager1Gateway = TestingUtils.createForwardingJobManager(actorSystem, this.getTestActor(), (Option<String>)Option.apply((Object)"ForwardingJobManager"));
                    final ActorGateway tm = taskManagerGateway = TestingUtils.createTaskManager(actorSystem, fakeJobManager1Gateway, config, true, false);
                    new JavaTestKit.Within(timeout){

                        protected void run() {
                            this.expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                            tm.tell((Object)new RegistrationMessages.AcknowledgeRegistration(new InstanceID(), 45234), fakeJM1Gateway);
                        }
                    };
                    this.watch(fakeJobManager1Gateway.actor());
                    TestingUtils.stopActor(fakeJobManager1Gateway.actor());
                    final ActorGateway gateway = fakeJobManager1Gateway;
                    new JavaTestKit.Within(timeout){

                        protected void run() {
                            Object message = null;
                            while (message == null || !(message instanceof Terminated)) {
                                message = this.receiveOne((Duration)timeout);
                            }
                            Terminated terminatedMessage = (Terminated)message;
                            Assert.assertEquals((Object)gateway.actor(), (Object)terminatedMessage.actor());
                        }
                    };
                    fakeJobManager1Gateway = null;
                    long deadline = 20000000000L + System.nanoTime();
                    do {
                        try {
                            fakeJobManager2Gateway = TestingUtils.createForwardingJobManager(actorSystem, this.getTestActor(), (Option<String>)Option.apply((Object)"ForwardingJobManager"));
                        }
                        catch (InvalidActorNameException e) {
                            Thread.sleep(100L);
                        }
                    } while (fakeJobManager2Gateway == null && System.nanoTime() < deadline);
                    final ActorGateway fakeJM2GatewayClosure = fakeJobManager2Gateway;
                    new JavaTestKit.Within(timeout){

                        protected void run() {
                            this.expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                            tm.tell((Object)new RegistrationMessages.AcknowledgeRegistration(new InstanceID(), 45234), fakeJM2GatewayClosure);
                        }
                    };
                }
                catch (Throwable e) {
                    try {
                        e.printStackTrace();
                        Assert.fail((String)e.getMessage());
                    }
                    catch (Throwable throwable) {
                        TestingUtils.stopActor(taskManagerGateway);
                        TestingUtils.stopActor(fakeJobManager1Gateway);
                        TestingUtils.stopActor(fakeJobManager2Gateway);
                        throw throwable;
                    }
                    TestingUtils.stopActor(taskManagerGateway);
                    TestingUtils.stopActor(fakeJobManager1Gateway);
                    TestingUtils.stopActor(fakeJobManager2Gateway);
                }
                TestingUtils.stopActor(taskManagerGateway);
                TestingUtils.stopActor(fakeJobManager1Gateway);
                TestingUtils.stopActor(fakeJobManager2Gateway);
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStartupWhenNetworkStackFailsToInitialize() {
        ServerSocket blocker = null;
        try {
            blocker = new ServerSocket(0, 50, InetAddress.getByName("localhost"));
            final Configuration cfg = new Configuration();
            cfg.setString("taskmanager.hostname", "localhost");
            cfg.setInteger("taskmanager.data.port", blocker.getLocalPort());
            cfg.setInteger("taskmanager.memory.size", 1);
            new JavaTestKit(actorSystem){
                {
                    block5: {
                        super(x0);
                        ActorRef taskManager = null;
                        ActorRef jobManager = null;
                        try {
                            jobManager = TaskManagerRegistrationTest.startJobManager(config);
                            taskManager = TaskManager.startTaskManagerComponentsAndActor((Configuration)cfg, (ActorSystem)actorSystem, (String)"localhost", (Option)NONE_STRING, (Option)new Some((Object)new StandaloneLeaderRetrievalService(jobManager.path().toString())), (boolean)false, TaskManager.class);
                            this.watch(taskManager);
                            this.expectTerminated((Duration)timeout, taskManager);
                            TestingUtils.stopActor(taskManager);
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                            Assert.fail((String)e.getMessage());
                            break block5;
                        }
                        finally {
                            TestingUtils.stopActor(taskManager);
                            TestingUtils.stopActor(jobManager);
                        }
                        TestingUtils.stopActor(jobManager);
                    }
                }
            };
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            if (blocker != null) {
                try {
                    blocker.close();
                }
                catch (IOException iOException) {}
            }
        }
    }

    @Test
    public void testCheckForValidRegistrationSessionIDs() {
        new JavaTestKit(actorSystem){
            {
                ActorGateway taskManagerGateway = null;
                try {
                    taskManagerGateway = TestingUtils.createTaskManager(actorSystem, this.getTestActor(), config, true, false);
                    final ActorRef taskManager = taskManagerGateway.actor();
                    final UUID falseLeaderSessionID = UUID.randomUUID();
                    final UUID trueLeaderSessionID = null;
                    new JavaTestKit.Within(timeout){

                        protected void run() {
                            taskManager.tell((Object)TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), this.getTestActor());
                            JobManagerMessages.LeaderSessionMessage lsm = (JobManagerMessages.LeaderSessionMessage)this.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);
                            Assert.assertTrue((lsm.leaderSessionID() == trueLeaderSessionID ? 1 : 0) != 0);
                            Assert.assertTrue((boolean)(lsm.message() instanceof RegistrationMessages.RegisterTaskManager));
                            ActorRef tm = this.getLastSender();
                            tm.tell((Object)new JobManagerMessages.LeaderSessionMessage(falseLeaderSessionID, (Object)new RegistrationMessages.AcknowledgeRegistration(new InstanceID(), 1)), this.getTestActor());
                            tm.tell((Object)new JobManagerMessages.LeaderSessionMessage(trueLeaderSessionID, (Object)new RegistrationMessages.AcknowledgeRegistration(new InstanceID(), 1)), this.getTestActor());
                            Object message = null;
                            Class<?> confirmMessageClass = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
                            while (message == null || !message.getClass().equals(confirmMessageClass)) {
                                message = this.receiveOne((Duration)TestingUtils.TESTING_DURATION());
                            }
                            tm.tell(JobManagerMessages.getRequestLeaderSessionID(), this.getTestActor());
                            this.expectMsgEquals(new JobManagerMessages.ResponseLeaderSessionID(trueLeaderSessionID));
                        }
                    };
                }
                catch (Throwable e) {
                    try {
                        e.printStackTrace();
                        Assert.fail((String)e.getMessage());
                    }
                    catch (Throwable throwable) {
                        TestingUtils.stopActor(taskManagerGateway);
                        throw throwable;
                    }
                    TestingUtils.stopActor(taskManagerGateway);
                }
                TestingUtils.stopActor(taskManagerGateway);
            }
        };
    }

    private static ActorRef startJobManager(Configuration configuration) throws Exception {
        return (ActorRef)JobManager.startJobManagerActors((Configuration)configuration, (ActorSystem)actorSystem, NONE_STRING, NONE_STRING, JobManager.class, MemoryArchivist.class)._1();
    }

    static {
        timeout = new FiniteDuration(20L, TimeUnit.SECONDS);
    }
}

