/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.InstanceListener;
import org.apache.flink.runtime.instance.InstanceManager;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManager;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Int;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class JobManagerHARecoveryTest {
    private static ActorSystem system;
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem((ActorSystem)system);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobRecoveryWhenLosingLeadership() throws Exception {
        FiniteDuration timeout = new FiniteDuration(30L, TimeUnit.SECONDS);
        FiniteDuration jobRecoveryTimeout = new FiniteDuration(3L, TimeUnit.SECONDS);
        Deadline deadline = new FiniteDuration(2L, TimeUnit.MINUTES).fromNow();
        Configuration flinkConfiguration = new Configuration();
        UUID leaderSessionID = UUID.randomUUID();
        UUID newLeaderSessionID = UUID.randomUUID();
        int slots = 2;
        ActorRef archive = null;
        ActorRef jobManager = null;
        ActorRef taskManager = null;
        flinkConfiguration.setString("recovery.mode", "zookeeper");
        flinkConfiguration.setString("recovery.zookeeper.storageDir", this.temporaryFolder.newFolder().toString());
        flinkConfiguration.setInteger("taskmanager.numberOfTaskSlots", slots);
        try {
            long[] recoveredStates;
            Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
            MySubmittedJobGraphStore mySubmittedJobGraphStore = new MySubmittedJobGraphStore();
            MyCheckpointStore checkpointStore = new MyCheckpointStore();
            StandaloneCheckpointIDCounter checkpointCounter = new StandaloneCheckpointIDCounter();
            MyCheckpointRecoveryFactory checkpointStateFactory = new MyCheckpointRecoveryFactory(checkpointStore, (CheckpointIDCounter)checkpointCounter);
            TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService();
            TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService();
            InstanceManager instanceManager = new InstanceManager();
            instanceManager.addInstanceListener((InstanceListener)scheduler);
            archive = system.actorOf(Props.create(MemoryArchivist.class, (Object[])new Object[]{10}), "archive");
            Props jobManagerProps = Props.create(TestingJobManager.class, (Object[])new Object[]{flinkConfiguration, new ForkJoinPool(), instanceManager, scheduler, new BlobLibraryCacheManager((BlobService)new BlobServer(flinkConfiguration), 3600000L), archive, new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100L), timeout, myLeaderElectionService, mySubmittedJobGraphStore, checkpointStateFactory, new HeapSavepointStore(), jobRecoveryTimeout, Option.apply(null)});
            jobManager = system.actorOf(jobManagerProps, "jobmanager");
            AkkaActorGateway gateway = new AkkaActorGateway(jobManager, leaderSessionID);
            taskManager = TaskManager.startTaskManagerComponentsAndActor((Configuration)flinkConfiguration, (ResourceID)ResourceID.generate(), (ActorSystem)system, (String)"localhost", (Option)Option.apply((Object)"taskmanager"), (Option)Option.apply((Object)myLeaderRetrievalService), (boolean)true, TestingTaskManager.class);
            AkkaActorGateway tmGateway = new AkkaActorGateway(taskManager, leaderSessionID);
            Future tmAlive = tmGateway.ask(TestingMessages.getAlive(), deadline.timeLeft());
            Await.ready((Awaitable)tmAlive, (Duration)deadline.timeLeft());
            JobVertex sourceJobVertex = new JobVertex("Source");
            sourceJobVertex.setInvokableClass(BlockingStatefulInvokable.class);
            sourceJobVertex.setParallelism(slots);
            JobGraph jobGraph = new JobGraph("TestingJob", new JobVertex[]{sourceJobVertex});
            List<JobVertexID> vertexId = Collections.singletonList(sourceJobVertex.getID());
            jobGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexId, vertexId, vertexId, 100L, 600000L, 0L, 1));
            BlockingStatefulInvokable.initializeStaticHelpers(slots);
            Future isLeader = gateway.ask(TestingJobManagerMessages.getNotifyWhenLeader(), deadline.timeLeft());
            Future isConnectedToJobManager = tmGateway.ask((Object)new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager), deadline.timeLeft());
            myLeaderElectionService.isLeader(leaderSessionID);
            myLeaderRetrievalService.notifyListener(gateway.path(), leaderSessionID);
            Await.ready((Awaitable)isLeader, (Duration)deadline.timeLeft());
            Await.ready((Awaitable)isConnectedToJobManager, (Duration)deadline.timeLeft());
            Future jobSubmitted = gateway.ask((Object)new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), deadline.timeLeft());
            Await.ready((Awaitable)jobSubmitted, (Duration)deadline.timeLeft());
            BlockingStatefulInvokable.awaitCompletedCheckpoints();
            Future jobRemoved = gateway.ask((Object)new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft());
            myLeaderElectionService.notLeader();
            Await.ready((Awaitable)jobRemoved, (Duration)deadline.timeLeft());
            Assert.assertTrue((boolean)mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
            Future jobRunning = gateway.ask((Object)new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.RUNNING), deadline.timeLeft());
            myLeaderElectionService.isLeader(newLeaderSessionID);
            myLeaderRetrievalService.notifyListener(gateway.path(), newLeaderSessionID);
            Await.ready((Awaitable)jobRunning, (Duration)deadline.timeLeft());
            Future jobFinished = gateway.ask((Object)new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft());
            BlockingInvokable.unblock();
            Await.ready((Awaitable)jobFinished, (Duration)deadline.timeLeft());
            Assert.assertFalse((boolean)mySubmittedJobGraphStore.contains(jobGraph.getJobID()));
            for (long state : recoveredStates = BlockingStatefulInvokable.getRecoveredStates()) {
                boolean isExpected = state >= 5L;
                Assert.assertTrue((String)("Did not recover checkpoint state correctly, expecting >= 5, but state was " + state), (boolean)isExpected);
            }
        }
        finally {
            if (archive != null) {
                archive.tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (jobManager != null) {
                jobManager.tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (taskManager != null) {
                taskManager.tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            }
        }
    }

    public static class BlockingStatefulInvokable
    extends BlockingInvokable
    implements StatefulTask<StateHandle<Long>> {
        private static final int NUM_CHECKPOINTS_TO_COMPLETE = 5;
        private static volatile CountDownLatch completedCheckpointsLatch = new CountDownLatch(1);
        private static volatile long[] recoveredStates = new long[0];
        private int completedCheckpoints = 0;

        public void setInitialState(StateHandle<Long> stateHandle) throws Exception {
            int subtaskIndex = this.getIndexInSubtaskGroup();
            if (subtaskIndex < recoveredStates.length) {
                BlockingStatefulInvokable.recoveredStates[subtaskIndex] = (Long)stateHandle.getState(this.getUserCodeClassLoader());
            }
        }

        public boolean triggerCheckpoint(long checkpointId, long timestamp) {
            LocalStateHandle state = new LocalStateHandle((Serializable)Long.valueOf(checkpointId));
            this.getEnvironment().acknowledgeCheckpoint(checkpointId, (StateHandle)state);
            return true;
        }

        public void notifyCheckpointComplete(long checkpointId) {
            if (this.completedCheckpoints++ > 5) {
                completedCheckpointsLatch.countDown();
            }
        }

        public static void initializeStaticHelpers(int numSubtasks) {
            completedCheckpointsLatch = new CountDownLatch(numSubtasks);
            recoveredStates = new long[numSubtasks];
        }

        public static void awaitCompletedCheckpoints() throws InterruptedException {
            completedCheckpointsLatch.await();
        }

        public static long[] getRecoveredStates() {
            return recoveredStates;
        }
    }

    public static class BlockingInvokable
    extends AbstractInvokable {
        private static boolean blocking = true;
        private static Object lock = new Object();

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            while (blocking) {
                Object object = lock;
                synchronized (object) {
                    lock.wait();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public static void unblock() {
            blocking = false;
            Object object = lock;
            synchronized (object) {
                lock.notifyAll();
            }
        }
    }

    static class MySubmittedJobGraphStore
    implements SubmittedJobGraphStore {
        Map<JobID, SubmittedJobGraph> storedJobs = new HashMap<JobID, SubmittedJobGraph>();

        MySubmittedJobGraphStore() {
        }

        public void start(SubmittedJobGraphStore.SubmittedJobGraphListener jobGraphListener) throws Exception {
        }

        public void stop() throws Exception {
        }

        public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
            return new ArrayList<SubmittedJobGraph>(this.storedJobs.values());
        }

        public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception {
            if (this.storedJobs.containsKey(jobId)) {
                return Option.apply((Object)this.storedJobs.get(jobId));
            }
            return Option.apply(null);
        }

        public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
            this.storedJobs.put(jobGraph.getJobId(), jobGraph);
        }

        public void removeJobGraph(JobID jobId) throws Exception {
            this.storedJobs.remove(jobId);
        }

        boolean contains(JobID jobId) {
            return this.storedJobs.containsKey(jobId);
        }
    }

    static class MyCheckpointRecoveryFactory
    implements CheckpointRecoveryFactory {
        private final CompletedCheckpointStore store;
        private final CheckpointIDCounter counter;

        public MyCheckpointRecoveryFactory(CompletedCheckpointStore store, CheckpointIDCounter counter) {
            this.store = store;
            this.counter = counter;
        }

        public void start() {
        }

        public void stop() {
        }

        public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader) throws Exception {
            return this.store;
        }

        public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception {
            return this.counter;
        }
    }

    static class MyCheckpointStore
    implements CompletedCheckpointStore {
        private final ArrayDeque<CompletedCheckpoint> checkpoints = new ArrayDeque(2);
        private final ArrayDeque<CompletedCheckpoint> suspended = new ArrayDeque(2);

        MyCheckpointStore() {
        }

        public void recover() throws Exception {
            this.checkpoints.addAll(this.suspended);
            this.suspended.clear();
        }

        public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
            this.checkpoints.addLast(checkpoint);
            if (this.checkpoints.size() > 1) {
                this.checkpoints.removeFirst().discard(ClassLoader.getSystemClassLoader());
            }
        }

        public CompletedCheckpoint getLatestCheckpoint() throws Exception {
            return this.checkpoints.isEmpty() ? null : this.checkpoints.getLast();
        }

        public void shutdown() throws Exception {
            this.checkpoints.clear();
            this.suspended.clear();
        }

        public void suspend() throws Exception {
            this.suspended.addAll(this.checkpoints);
            this.checkpoints.clear();
        }

        public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
            return new ArrayList<CompletedCheckpoint>(this.checkpoints);
        }

        public int getNumberOfRetainedCheckpoints() {
            return this.checkpoints.size();
        }
    }
}

