/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorRef;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.JobInfo;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.zookeeper.StateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class ZooKeeperSubmittedJobGraphsStoreITCase
extends TestLogger {
    private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
    private static final StateStorageHelper<SubmittedJobGraph> localStateStorage = new StateStorageHelper<SubmittedJobGraph>(){

        public StateHandle<SubmittedJobGraph> store(SubmittedJobGraph state) throws Exception {
            return new LocalStateHandle((Serializable)state);
        }
    };

    @AfterClass
    public static void tearDown() throws Exception {
        if (ZooKeeper != null) {
            ZooKeeper.shutdown();
        }
    }

    @Before
    public void cleanUp() throws Exception {
        ZooKeeper.deleteAll();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPutAndRemoveJobGraph() throws Exception {
        ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(ZooKeeper.createClient(), "/testPutAndRemoveJobGraph", localStateStorage);
        try {
            SubmittedJobGraphStore.SubmittedJobGraphListener listener = (SubmittedJobGraphStore.SubmittedJobGraphListener)Mockito.mock(SubmittedJobGraphStore.SubmittedJobGraphListener.class);
            jobGraphs.start(listener);
            SubmittedJobGraph jobGraph = this.createSubmittedJobGraph(new JobID(), 0L);
            Assert.assertEquals((long)0L, (long)jobGraphs.recoverJobGraphs().size());
            jobGraphs.putJobGraph(jobGraph);
            List actual = jobGraphs.recoverJobGraphs();
            Assert.assertEquals((long)1L, (long)actual.size());
            this.verifyJobGraphs(jobGraph, (SubmittedJobGraph)actual.get(0));
            jobGraph = this.createSubmittedJobGraph(jobGraph.getJobId(), 1L);
            jobGraphs.putJobGraph(jobGraph);
            actual = jobGraphs.recoverJobGraphs();
            Assert.assertEquals((long)1L, (long)actual.size());
            this.verifyJobGraphs(jobGraph, (SubmittedJobGraph)actual.get(0));
            jobGraphs.removeJobGraph(jobGraph.getJobId());
            Assert.assertEquals((long)0L, (long)jobGraphs.recoverJobGraphs().size());
            ((SubmittedJobGraphStore.SubmittedJobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.atMost((int)1))).onAddedJobGraph((JobID)Matchers.any(JobID.class));
            ((SubmittedJobGraphStore.SubmittedJobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onRemovedJobGraph((JobID)Matchers.any(JobID.class));
            jobGraphs.removeJobGraph(jobGraph.getJobId());
        }
        finally {
            jobGraphs.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRecoverJobGraphs() throws Exception {
        ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(ZooKeeper.createClient(), "/testRecoverJobGraphs", localStateStorage);
        try {
            SubmittedJobGraphStore.SubmittedJobGraphListener listener = (SubmittedJobGraphStore.SubmittedJobGraphListener)Mockito.mock(SubmittedJobGraphStore.SubmittedJobGraphListener.class);
            jobGraphs.start(listener);
            HashMap<JobID, SubmittedJobGraph> expected = new HashMap<JobID, SubmittedJobGraph>();
            JobID[] jobIds = new JobID[]{new JobID(), new JobID(), new JobID()};
            expected.put(jobIds[0], this.createSubmittedJobGraph(jobIds[0], 0L));
            expected.put(jobIds[1], this.createSubmittedJobGraph(jobIds[1], 1L));
            expected.put(jobIds[2], this.createSubmittedJobGraph(jobIds[2], 2L));
            for (SubmittedJobGraph jobGraph : expected.values()) {
                jobGraphs.putJobGraph(jobGraph);
            }
            List actual = jobGraphs.recoverJobGraphs();
            Assert.assertEquals((long)expected.size(), (long)actual.size());
            for (SubmittedJobGraph jobGraph : actual) {
                Assert.assertTrue((boolean)expected.containsKey(jobGraph.getJobId()));
                this.verifyJobGraphs((SubmittedJobGraph)expected.get(jobGraph.getJobId()), jobGraph);
                jobGraphs.removeJobGraph(jobGraph.getJobId());
            }
            Assert.assertEquals((long)0L, (long)jobGraphs.recoverJobGraphs().size());
            ((SubmittedJobGraphStore.SubmittedJobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.atMost((int)expected.size()))).onAddedJobGraph((JobID)Matchers.any(JobID.class));
            ((SubmittedJobGraphStore.SubmittedJobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onRemovedJobGraph((JobID)Matchers.any(JobID.class));
        }
        finally {
            jobGraphs.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentAddJobGraph() throws Exception {
        ZooKeeperSubmittedJobGraphStore jobGraphs = null;
        ZooKeeperSubmittedJobGraphStore otherJobGraphs = null;
        try {
            jobGraphs = new ZooKeeperSubmittedJobGraphStore(ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
            otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(ZooKeeper.createClient(), "/testConcurrentAddJobGraph", localStateStorage);
            SubmittedJobGraph jobGraph = this.createSubmittedJobGraph(new JobID(), 0L);
            SubmittedJobGraph otherJobGraph = this.createSubmittedJobGraph(new JobID(), 0L);
            SubmittedJobGraphStore.SubmittedJobGraphListener listener = (SubmittedJobGraphStore.SubmittedJobGraphListener)Mockito.mock(SubmittedJobGraphStore.SubmittedJobGraphListener.class);
            final JobID[] actualOtherJobId = new JobID[1];
            final CountDownLatch sync = new CountDownLatch(1);
            ((SubmittedJobGraphStore.SubmittedJobGraphListener)Mockito.doAnswer((Answer)new Answer<Void>(){

                public Void answer(InvocationOnMock invocation) throws Throwable {
                    actualOtherJobId[0] = (JobID)invocation.getArguments()[0];
                    sync.countDown();
                    return null;
                }
            }).when((Object)listener)).onAddedJobGraph((JobID)Matchers.any(JobID.class));
            jobGraphs.start(listener);
            otherJobGraphs.start(null);
            jobGraphs.putJobGraph(jobGraph);
            ((SubmittedJobGraphStore.SubmittedJobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onAddedJobGraph((JobID)Matchers.any(JobID.class));
            ((SubmittedJobGraphStore.SubmittedJobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onRemovedJobGraph((JobID)Matchers.any(JobID.class));
            otherJobGraphs.putJobGraph(otherJobGraph);
            sync.await();
            ((SubmittedJobGraphStore.SubmittedJobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onAddedJobGraph((JobID)Matchers.any(JobID.class));
            ((SubmittedJobGraphStore.SubmittedJobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onRemovedJobGraph((JobID)Matchers.any(JobID.class));
            Assert.assertEquals((Object)otherJobGraph.getJobId(), (Object)actualOtherJobId[0]);
        }
        finally {
            if (jobGraphs != null) {
                jobGraphs.stop();
            }
            if (otherJobGraphs != null) {
                otherJobGraphs.stop();
            }
        }
    }

    @Test(expected=IllegalStateException.class)
    public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
        ZooKeeperSubmittedJobGraphStore jobGraphs = new ZooKeeperSubmittedJobGraphStore(ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
        ZooKeeperSubmittedJobGraphStore otherJobGraphs = new ZooKeeperSubmittedJobGraphStore(ZooKeeper.createClient(), "/testUpdateJobGraphYouDidNotGetOrAdd", localStateStorage);
        jobGraphs.start(null);
        otherJobGraphs.start(null);
        SubmittedJobGraph jobGraph = this.createSubmittedJobGraph(new JobID(), 0L);
        jobGraphs.putJobGraph(jobGraph);
        otherJobGraphs.putJobGraph(jobGraph);
    }

    private SubmittedJobGraph createSubmittedJobGraph(JobID jobId, long start) {
        JobGraph jobGraph = new JobGraph(jobId, "Test JobGraph");
        JobVertex jobVertex = new JobVertex("Test JobVertex");
        jobVertex.setParallelism(1);
        jobGraph.addVertex(jobVertex);
        JobInfo jobInfo = new JobInfo(ActorRef.noSender(), ListeningBehaviour.DETACHED, start, Integer.MAX_VALUE);
        return new SubmittedJobGraph(jobGraph, jobInfo);
    }

    protected void verifyJobGraphs(SubmittedJobGraph expected, SubmittedJobGraph actual) throws Exception {
        JobGraph expectedJobGraph = expected.getJobGraph();
        JobGraph actualJobGraph = actual.getJobGraph();
        Assert.assertEquals((Object)expectedJobGraph.getName(), (Object)actualJobGraph.getName());
        Assert.assertEquals((Object)expectedJobGraph.getJobID(), (Object)actualJobGraph.getJobID());
        JobInfo expectedJobInfo = expected.getJobInfo();
        JobInfo actualJobInfo = actual.getJobInfo();
        Assert.assertEquals((Object)expectedJobInfo.listeningBehaviour(), (Object)actualJobInfo.listeningBehaviour());
        Assert.assertEquals((long)expectedJobInfo.start(), (long)actualJobInfo.start());
    }
}

