package com.hazelcast.jet.core;

import com.hazelcast.cluster.Address;
import com.hazelcast.internal.tpcengine.net.AsyncSocket_RpcTest;
import com.hazelcast.jet.SimpleTestInClusterSupport;
import com.hazelcast.jet.core.TestProcessors;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/core/RoutingPolicyDistributedTest.class */
public class RoutingPolicyDistributedTest extends SimpleTestInClusterSupport {
    private static final List<Integer>[] NUMBERS = {(List) IntStream.range(0, 4096).boxed().collect(Collectors.toList()), (List) IntStream.range(4096, 8192).boxed().collect(Collectors.toList()), (List) IntStream.range(8192, 12288).boxed().collect(Collectors.toList()), (List) IntStream.range(12288, AsyncSocket_RpcTest.SOCKET_BUFFER_SIZE).boxed().collect(Collectors.toList())};
    private static Address address1;
    private TestProcessors.CollectPerProcessorSink consumerSup;

    @BeforeClass
    public static void beforeClass() {
        initialize(2, null);
        address1 = instances()[1].getCluster().getLocalMember().getAddress();
    }

    @Before
    public void before() {
        TestProcessors.reset(1);
        this.consumerSup = new TestProcessors.CollectPerProcessorSink();
    }

    @Test
    public void when_distributedToOne_partitioned() {
        DAG dag = new DAG();
        Vertex producer = producer(NUMBERS);
        Vertex consumer = consumer();
        dag.vertex(producer).vertex(consumer).edge(Edge.between(producer, consumer).distributeTo(address1).partitioned(num -> {
            return Integer.valueOf(num.intValue() % 271);
        }));
        instance().getJet().newJob(dag).join();
        for (int i = 0; i < this.consumerSup.getLists().size(); i++) {
            this.logger.info("size" + i + ": " + this.consumerSup.getListAt(i).size());
        }
        this.logger.info("size of union: " + this.consumerSup.getLists().stream().flatMap((v0) -> {
            return v0.stream();
        }).distinct().count());
        Assert.assertEquals("list count", 4L, this.consumerSup.getLists().size());
        Set of = setOf(this.consumerSup.getListAt(0), this.consumerSup.getListAt(1));
        Set of2 = setOf(this.consumerSup.getListAt(2), this.consumerSup.getListAt(3));
        if (address1.equals(this.consumerSup.getMembers().get(0))) {
            of = of2;
            of2 = of;
        }
        Assert.assertEquals("items on member0", Collections.emptySet(), of);
        Assert.assertEquals("items on member1", setOf(NUMBERS), of2);
    }

    @Test
    public void when_distributedToOne_broadcast() {
        when_distributedToOne_notPartitioned((v0) -> {
            v0.broadcast();
        }, "must be partitioned");
    }

    @Test
    public void when_distributedToOne_unicast() {
        when_distributedToOne_notPartitioned((v0) -> {
            v0.unicast();
        }, "must be partitioned");
    }

    @Test
    public void when_distributedToOne_isolated() {
        when_distributedToOne_notPartitioned((v0) -> {
            v0.isolated();
        }, "Isolated edges must be local");
    }

    @Test
    public void when_distributedToOne_fanout() {
        when_distributedToOne_notPartitioned((v0) -> {
            v0.fanout();
        }, "must be partitioned");
    }

    private void when_distributedToOne_notPartitioned(Consumer<Edge> consumer, String str) {
        DAG dag = new DAG();
        Vertex producer = producer(NUMBERS);
        Vertex consumer2 = consumer();
        Edge distributeTo = Edge.between(producer, consumer2).distributeTo(address1);
        consumer.accept(distributeTo);
        dag.vertex(producer).vertex(consumer2).edge(distributeTo);
        Assertions.assertThatThrownBy(() -> {
            instance().getJet().newJob(dag).join();
        }).hasMessageContaining(str);
    }

    @Test
    public void when_distributedToOne_and_targetMemberMissing() throws Exception {
        DAG dag = new DAG();
        Vertex producer = producer(NUMBERS);
        Vertex consumer = consumer();
        dag.vertex(producer).vertex(consumer).edge(Edge.between(producer, consumer).distributeTo(new Address("1.2.3.4", 9999)).allToOne());
        Assertions.assertThatThrownBy(() -> {
            instance().getJet().newJob(dag).join();
        }).hasMessageContaining("The target member of an edge is not present in the cluster");
    }

    @Test
    public void when_local_fanout() {
        DAG dag = new DAG();
        Vertex localParallelism = new Vertex("producer", new TestProcessors.ListsSourceP(Arrays.asList(1, 2), Arrays.asList(3, 4))).localParallelism(1);
        Vertex localParallelism2 = new Vertex("consumer", this.consumerSup).localParallelism(2);
        dag.vertex(localParallelism).vertex(localParallelism2).edge(Edge.between(localParallelism, localParallelism2).fanout());
        instance().getJet().newJob(dag).join();
        Assert.assertEquals("items on member0-processor0", Set.of(1), new HashSet(this.consumerSup.getListAt(0)));
        Assert.assertEquals("items on member0-processor1", Set.of(2), new HashSet(this.consumerSup.getListAt(1)));
        Assert.assertEquals("items on member1-processor0", Set.of(3), new HashSet(this.consumerSup.getListAt(2)));
        Assert.assertEquals("items on member1-processor1", Set.of(4), new HashSet(this.consumerSup.getListAt(3)));
    }

    @Test
    public void when_distributed_fanout() {
        DAG dag = new DAG();
        Vertex localParallelism = new Vertex("producer", new TestProcessors.ListsSourceP(Arrays.asList(1, 2), Arrays.asList(3, 4))).localParallelism(1);
        Vertex localParallelism2 = new Vertex("consumer", this.consumerSup).localParallelism(2);
        dag.vertex(localParallelism).vertex(localParallelism2).edge(Edge.between(localParallelism, localParallelism2).distributed().fanout());
        instance().getJet().newJob(dag).join();
        Assert.assertEquals("items on member0-processor0", Set.of(1, 3), new HashSet(this.consumerSup.getListAt(0)));
        Assert.assertEquals("items on member0-processor1", Set.of(2, 4), new HashSet(this.consumerSup.getListAt(1)));
        Assert.assertEquals("items on member1-processor0", Set.of(1, 3), new HashSet(this.consumerSup.getListAt(2)));
        Assert.assertEquals("items on member1-processor1", Set.of(2, 4), new HashSet(this.consumerSup.getListAt(3)));
    }

    private Vertex consumer() {
        return new Vertex("consumer", this.consumerSup).localParallelism(2);
    }

    private static Vertex producer(List<?>... listArr) {
        Assert.assertEquals(0L, listArr.length % instances().length);
        return new Vertex("producer", new TestProcessors.ListsSourceP(listArr)).localParallelism(listArr.length / instances().length);
    }

    @SafeVarargs
    private static <T> Set<T> setOf(Collection<T>... collectionArr) {
        HashSet hashSet = new HashSet();
        int i = 0;
        for (Collection<T> collection : collectionArr) {
            hashSet.addAll(collection);
            i += collection.size();
        }
        Assert.assertEquals("there were some duplicates in the collections", i, hashSet.size());
        return hashSet;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -291824625:
                if (implMethodName.equals("lambda$when_distributedToOne_partitioned$a441ef18$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/RoutingPolicyDistributedTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Object;")) {
                    return num -> {
                        return Integer.valueOf(num.intValue() % 271);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
