/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.io.StrictlyLocalAssignment;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;

public class LocalInputSplitsTest {
    private static final FiniteDuration TIMEOUT = new FiniteDuration(100L, TimeUnit.SECONDS);

    @Test
    public void testNotEnoughSubtasks() {
        int numHosts = 3;
        int slotsPerHost = 1;
        int parallelism = 2;
        TestLocatableInputSplit[] splits = new TestLocatableInputSplit[]{new TestLocatableInputSplit(1, "host1"), new TestLocatableInputSplit(2, "host2"), new TestLocatableInputSplit(3, "host3")};
        try {
            LocalInputSplitsTest.runTests(numHosts, slotsPerHost, parallelism, splits);
            Assert.fail((String)"should throw an exception");
        }
        catch (JobException jobException) {
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testDisallowMultipleLocations() {
        int numHosts = 2;
        int slotsPerHost = 1;
        int parallelism = 2;
        TestLocatableInputSplit[] splits = new TestLocatableInputSplit[]{new TestLocatableInputSplit(1, new String[]{"host1", "host2"}), new TestLocatableInputSplit(2, new String[]{"host1", "host2"})};
        try {
            LocalInputSplitsTest.runTests(numHosts, slotsPerHost, parallelism, splits);
            Assert.fail((String)"should throw an exception");
        }
        catch (JobException jobException) {
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testNonExistentHost() {
        int numHosts = 2;
        int slotsPerHost = 1;
        int parallelism = 2;
        TestLocatableInputSplit[] splits = new TestLocatableInputSplit[]{new TestLocatableInputSplit(1, "host1"), new TestLocatableInputSplit(2, "bogus_host")};
        try {
            LocalInputSplitsTest.runTests(numHosts, slotsPerHost, parallelism, splits);
            Assert.fail((String)"should throw an exception");
        }
        catch (JobException jobException) {
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testEqualSplitsPerHostAndSubtask() {
        int numHosts = 5;
        int slotsPerHost = 2;
        int parallelism = 10;
        TestLocatableInputSplit[] splits = new TestLocatableInputSplit[]{new TestLocatableInputSplit(7, "host4"), new TestLocatableInputSplit(8, "host4"), new TestLocatableInputSplit(1, "host1"), new TestLocatableInputSplit(2, "host1"), new TestLocatableInputSplit(3, "host2"), new TestLocatableInputSplit(4, "host2"), new TestLocatableInputSplit(5, "host3"), new TestLocatableInputSplit(6, "host3"), new TestLocatableInputSplit(9, "host5"), new TestLocatableInputSplit(10, "host5")};
        try {
            String[] hostsForTasks = LocalInputSplitsTest.runTests(numHosts, slotsPerHost, parallelism, splits);
            Assert.assertEquals((Object)"host1", (Object)hostsForTasks[0]);
            Assert.assertEquals((Object)"host1", (Object)hostsForTasks[1]);
            Assert.assertEquals((Object)"host2", (Object)hostsForTasks[2]);
            Assert.assertEquals((Object)"host2", (Object)hostsForTasks[3]);
            Assert.assertEquals((Object)"host3", (Object)hostsForTasks[4]);
            Assert.assertEquals((Object)"host3", (Object)hostsForTasks[5]);
            Assert.assertEquals((Object)"host4", (Object)hostsForTasks[6]);
            Assert.assertEquals((Object)"host4", (Object)hostsForTasks[7]);
            Assert.assertEquals((Object)"host5", (Object)hostsForTasks[8]);
            Assert.assertEquals((Object)"host5", (Object)hostsForTasks[9]);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testNonEqualSplitsPerhost() {
        int numHosts = 3;
        int slotsPerHost = 2;
        int parallelism = 5;
        TestLocatableInputSplit[] splits = new TestLocatableInputSplit[]{new TestLocatableInputSplit(1, "host3"), new TestLocatableInputSplit(2, "host1"), new TestLocatableInputSplit(3, "host1"), new TestLocatableInputSplit(4, "host1"), new TestLocatableInputSplit(5, "host1"), new TestLocatableInputSplit(6, "host1"), new TestLocatableInputSplit(7, "host2"), new TestLocatableInputSplit(8, "host2")};
        try {
            String[] hostsForTasks = LocalInputSplitsTest.runTests(numHosts, slotsPerHost, parallelism, splits);
            Assert.assertEquals((Object)"host1", (Object)hostsForTasks[0]);
            Assert.assertEquals((Object)"host1", (Object)hostsForTasks[1]);
            Assert.assertEquals((Object)"host2", (Object)hostsForTasks[2]);
            Assert.assertEquals((Object)"host2", (Object)hostsForTasks[3]);
            Assert.assertEquals((Object)"host3", (Object)hostsForTasks[4]);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testWithSubtasksEmpty() {
        int numHosts = 3;
        int slotsPerHost = 5;
        int parallelism = 7;
        TestLocatableInputSplit[] splits = new TestLocatableInputSplit[]{new TestLocatableInputSplit(1, "host1"), new TestLocatableInputSplit(2, "host2"), new TestLocatableInputSplit(3, "host2"), new TestLocatableInputSplit(4, "host2"), new TestLocatableInputSplit(5, "host3"), new TestLocatableInputSplit(6, "host3"), new TestLocatableInputSplit(7, "host3"), new TestLocatableInputSplit(8, "host3"), new TestLocatableInputSplit(9, "host3"), new TestLocatableInputSplit(10, "host3"), new TestLocatableInputSplit(11, "host3"), new TestLocatableInputSplit(12, "host3"), new TestLocatableInputSplit(13, "host3")};
        try {
            String[] hostsForTasks = LocalInputSplitsTest.runTests(numHosts, slotsPerHost, parallelism, splits);
            Assert.assertEquals((Object)"host1", (Object)hostsForTasks[0]);
            Assert.assertEquals((Object)"host2", (Object)hostsForTasks[1]);
            Assert.assertEquals((Object)"host2", (Object)hostsForTasks[2]);
            Assert.assertEquals((Object)"host3", (Object)hostsForTasks[3]);
            Assert.assertEquals((Object)"host3", (Object)hostsForTasks[4]);
            Assert.assertTrue((hostsForTasks[5].equals("host1") || hostsForTasks[5].equals("host2") || hostsForTasks[5].equals("host3") ? 1 : 0) != 0);
            Assert.assertTrue((hostsForTasks[6].equals("host1") || hostsForTasks[6].equals("host2") || hostsForTasks[6].equals("host3") ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMultipleInstancesPerHost() {
        TestLocatableInputSplit[] splits = new TestLocatableInputSplit[]{new TestLocatableInputSplit(1, "host1"), new TestLocatableInputSplit(2, "host1"), new TestLocatableInputSplit(3, "host2"), new TestLocatableInputSplit(4, "host2"), new TestLocatableInputSplit(5, "host3"), new TestLocatableInputSplit(6, "host3")};
        try {
            JobVertex vertex = new JobVertex("test vertex");
            vertex.setParallelism(6);
            vertex.setInvokableClass(DummyInvokable.class);
            vertex.setInputSplitSource((InputSplitSource)new TestInputSplitSource(splits));
            JobGraph jobGraph = new JobGraph("test job", new JobVertex[]{vertex});
            ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), jobGraph.getJobID(), jobGraph.getName(), jobGraph.getJobConfiguration(), TIMEOUT, (RestartStrategy)new NoRestartStrategy());
            eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
            eg.setQueuedSchedulingAllowed(false);
            Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
            Instance i1 = LocalInputSplitsTest.getInstance(new byte[]{10, 0, 1, 1}, 12345, "host1", 1);
            Instance i2 = LocalInputSplitsTest.getInstance(new byte[]{10, 0, 1, 1}, 12346, "host1", 1);
            Instance i3 = LocalInputSplitsTest.getInstance(new byte[]{10, 0, 1, 2}, 12345, "host2", 1);
            Instance i4 = LocalInputSplitsTest.getInstance(new byte[]{10, 0, 1, 2}, 12346, "host2", 1);
            Instance i5 = LocalInputSplitsTest.getInstance(new byte[]{10, 0, 1, 3}, 12345, "host3", 1);
            Instance i6 = LocalInputSplitsTest.getInstance(new byte[]{10, 0, 1, 3}, 12346, "host4", 1);
            scheduler.newInstanceAvailable(i1);
            scheduler.newInstanceAvailable(i2);
            scheduler.newInstanceAvailable(i3);
            scheduler.newInstanceAvailable(i4);
            scheduler.newInstanceAvailable(i5);
            scheduler.newInstanceAvailable(i6);
            eg.scheduleForExecution(scheduler);
            ExecutionVertex[] tasks = ((ExecutionJobVertex)eg.getVerticesTopologically().iterator().next()).getTaskVertices();
            Assert.assertEquals((long)6L, (long)tasks.length);
            Instance taskInstance1 = tasks[0].getCurrentAssignedResource().getInstance();
            Instance taskInstance2 = tasks[1].getCurrentAssignedResource().getInstance();
            Instance taskInstance3 = tasks[2].getCurrentAssignedResource().getInstance();
            Instance taskInstance4 = tasks[3].getCurrentAssignedResource().getInstance();
            Instance taskInstance5 = tasks[4].getCurrentAssignedResource().getInstance();
            Instance taskInstance6 = tasks[5].getCurrentAssignedResource().getInstance();
            Assert.assertTrue((taskInstance1 == i1 || taskInstance1 == i2 ? 1 : 0) != 0);
            Assert.assertTrue((taskInstance2 == i1 || taskInstance2 == i2 ? 1 : 0) != 0);
            Assert.assertTrue((taskInstance3 == i3 || taskInstance3 == i4 ? 1 : 0) != 0);
            Assert.assertTrue((taskInstance4 == i3 || taskInstance4 == i4 ? 1 : 0) != 0);
            Assert.assertTrue((taskInstance5 == i5 || taskInstance5 == i6 ? 1 : 0) != 0);
            Assert.assertTrue((taskInstance6 == i5 || taskInstance6 == i6 ? 1 : 0) != 0);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private static String[] runTests(int numHosts, int slotsPerHost, int parallelism, TestLocatableInputSplit[] splits) throws Exception {
        JobVertex vertex = new JobVertex("test vertex");
        vertex.setParallelism(parallelism);
        vertex.setInvokableClass(DummyInvokable.class);
        vertex.setInputSplitSource((InputSplitSource)new TestInputSplitSource(splits));
        JobGraph jobGraph = new JobGraph("test job", new JobVertex[]{vertex});
        ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), jobGraph.getJobID(), jobGraph.getName(), jobGraph.getJobConfiguration(), TIMEOUT, (RestartStrategy)new NoRestartStrategy());
        eg.setQueuedSchedulingAllowed(false);
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Scheduler scheduler = LocalInputSplitsTest.getScheduler(numHosts, slotsPerHost);
        eg.scheduleForExecution(scheduler);
        ExecutionVertex[] tasks = ((ExecutionJobVertex)eg.getVerticesTopologically().iterator().next()).getTaskVertices();
        Assert.assertEquals((long)parallelism, (long)tasks.length);
        String[] hostsForTasks = new String[parallelism];
        for (int i = 0; i < parallelism; ++i) {
            hostsForTasks[i] = tasks[i].getCurrentAssignedResourceLocation().getHostname();
        }
        return hostsForTasks;
    }

    private static Scheduler getScheduler(int numInstances, int numSlotsPerInstance) throws Exception {
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        for (int i = 0; i < numInstances; ++i) {
            byte[] ipAddress = new byte[]{10, 0, 1, (byte)(1 + i)};
            int dataPort = 12001 + i;
            String host = "host" + (i + 1);
            Instance instance = LocalInputSplitsTest.getInstance(ipAddress, dataPort, host, numSlotsPerInstance);
            scheduler.newInstanceAvailable(instance);
        }
        return scheduler;
    }

    private static Instance getInstance(byte[] ipAddress, int dataPort, String hostname, int slots) throws Exception {
        HardwareDescription hardwareDescription = new HardwareDescription(4, 0x80000000L, 0x40000000L, 0x20000000L);
        InstanceConnectionInfo connection = (InstanceConnectionInfo)Mockito.mock(InstanceConnectionInfo.class);
        Mockito.when((Object)connection.address()).thenReturn((Object)InetAddress.getByAddress(ipAddress));
        Mockito.when((Object)connection.dataPort()).thenReturn((Object)dataPort);
        Mockito.when((Object)connection.getInetAdress()).thenReturn((Object)InetAddress.getByAddress(ipAddress).toString());
        Mockito.when((Object)connection.getHostname()).thenReturn((Object)hostname);
        Mockito.when((Object)connection.getFQDNHostname()).thenReturn((Object)hostname);
        return new Instance((ActorGateway)new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.defaultExecutionContext()), connection, new InstanceID(), hardwareDescription, slots);
    }

    private static class TestInputSplitSource
    implements InputSplitSource<TestLocatableInputSplit>,
    StrictlyLocalAssignment {
        private static final long serialVersionUID = 1L;
        private final TestLocatableInputSplit[] splits;

        public TestInputSplitSource(TestLocatableInputSplit[] splits) {
            this.splits = splits;
        }

        public TestLocatableInputSplit[] createInputSplits(int minNumSplits) {
            return this.splits;
        }

        public InputSplitAssigner getInputSplitAssigner(TestLocatableInputSplit[] inputSplits) {
            Assert.fail((String)"This method should not be called on StrictlyLocalAssignment splits.");
            return null;
        }
    }

    private static class TestLocatableInputSplit
    extends LocatableInputSplit {
        private static final long serialVersionUID = 1L;

        public TestLocatableInputSplit(int splitNumber, String hostname) {
            super(splitNumber, hostname);
        }

        public TestLocatableInputSplit(int splitNumber, String[] hostnames) {
            super(splitNumber, hostnames);
        }
    }
}

