/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Kill;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.typesafe.config.Config;
import java.util.UUID;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class FlinkUntypedActorTest {
    private static ActorSystem actorSystem;

    @BeforeClass
    public static void setup() {
        actorSystem = ActorSystem.create((String)"TestingActorSystem", (Config)TestingUtils.testConfig());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem((ActorSystem)actorSystem);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLeaderSessionMessageFilteringOfFlinkUntypedActor() {
        UUID leaderSessionID = UUID.randomUUID();
        UUID oldSessionID = UUID.randomUUID();
        TestActorRef actor = null;
        try {
            actor = TestActorRef.create((ActorSystem)actorSystem, (Props)Props.create(PlainFlinkUntypedActor.class, (Object[])new Object[]{leaderSessionID}));
            PlainFlinkUntypedActor underlyingActor = (PlainFlinkUntypedActor)actor.underlyingActor();
            actor.tell((Object)new JobManagerMessages.LeaderSessionMessage(leaderSessionID, (Object)1), ActorRef.noSender());
            actor.tell((Object)new JobManagerMessages.LeaderSessionMessage(oldSessionID, (Object)2), ActorRef.noSender());
            actor.tell((Object)new JobManagerMessages.LeaderSessionMessage(leaderSessionID, (Object)2), ActorRef.noSender());
            actor.tell((Object)1, ActorRef.noSender());
            Assert.assertEquals((long)3L, (long)underlyingActor.getMessageCounter());
        }
        catch (Throwable throwable) {
            FlinkUntypedActorTest.stopActor(actor);
            throw throwable;
        }
        FlinkUntypedActorTest.stopActor((ActorRef)actor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testThrowingExceptionWhenReceivingNonWrappedRequiresLeaderSessionIDMessage() {
        UUID leaderSessionID = UUID.randomUUID();
        TestActorRef actor = null;
        try {
            Props props = Props.create(PlainFlinkUntypedActor.class, (Object[])new Object[]{leaderSessionID});
            actor = TestActorRef.create((ActorSystem)actorSystem, (Props)props);
            actor.receive((Object)new JobManagerMessages.LeaderSessionMessage(leaderSessionID, (Object)1));
            try {
                actor.receive((Object)new PlainRequiresLeaderSessionID());
                Assert.fail((String)"Expected an exception to be thrown, because a RequiresLeaderSessionIDmessage was sent without being wrapped in LeaderSessionMessage.");
            }
            catch (Exception e) {
                Assert.assertEquals((Object)"Received a message PlainRequiresLeaderSessionID without a leader session ID, even though the message requires a leader session ID.", (Object)e.getMessage());
            }
        }
        catch (Throwable throwable) {
            FlinkUntypedActorTest.stopActor(actor);
            throw throwable;
        }
        FlinkUntypedActorTest.stopActor((ActorRef)actor);
    }

    private static void stopActor(ActorRef actor) {
        if (actor != null) {
            actor.tell((Object)Kill.getInstance(), ActorRef.noSender());
        }
    }

    static class PlainRequiresLeaderSessionID
    implements RequiresLeaderSessionID {
        PlainRequiresLeaderSessionID() {
        }

        public String toString() {
            return "PlainRequiresLeaderSessionID";
        }
    }

    static class PlainFlinkUntypedActor
    extends FlinkUntypedActor {
        private UUID leaderSessionID;
        private int messageCounter;

        public PlainFlinkUntypedActor(UUID leaderSessionID) {
            this.leaderSessionID = leaderSessionID;
            this.messageCounter = 0;
        }

        protected void handleMessage(Object message) throws Exception {
            ++this.messageCounter;
        }

        protected UUID getLeaderSessionID() {
            return this.leaderSessionID;
        }

        public int getMessageCounter() {
            return this.messageCounter;
        }
    }
}

