package com.hazelcast.jet.benchmark;

import com.hazelcast.cluster.Member;
import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.Functions;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.core.processor.SinkProcessors;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.NightlyTest;
import java.io.PrintStream;
import java.lang.invoke.SerializedLambda;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({NightlyTest.class})
/* loaded from: input_file:com/hazelcast/jet/benchmark/BackpressureTest.class */
public class BackpressureTest extends JetTestSupport {
    private static final int CLUSTER_SIZE = 2;
    private static final int TOTAL_PARALLELISM = Math.min(8, Runtime.getRuntime().availableProcessors());
    private static final int PARALLELISM_PER_MEMBER = TOTAL_PARALLELISM / 2;
    private static final int KEY_COUNT = 1000;
    private static final int COUNT_PER_KEY_AND_SLICE = 10000;
    private HazelcastInstance hz1;
    private HazelcastInstance hz2;

    /* loaded from: input_file:com/hazelcast/jet/benchmark/BackpressureTest$CombineP.class */
    private static class CombineP extends AbstractProcessor {
        private Map<String, Long> counts = new HashMap();
        private Traverser<Map.Entry<String, Long>> resultTraverser = Traversers.lazy(() -> {
            return Traversers.traverseIterable(this.counts.entrySet());
        });

        private CombineP() {
        }

        protected boolean tryProcess(int i, @Nonnull Object obj) {
            Map.Entry entry = (Map.Entry) obj;
            this.counts.compute((String) entry.getKey(), (str, l) -> {
                return Long.valueOf(l == null ? ((Long) entry.getValue()).longValue() : l.longValue() + ((Long) entry.getValue()).longValue());
            });
            return true;
        }

        public boolean complete() {
            return emitFromTraverser(this.resultTraverser);
        }

        public String toString() {
            return "CombineP";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/benchmark/BackpressureTest$GenerateP.class */
    public static class GenerateP extends AbstractProcessor {
        private int key;
        private int itemsEmittedPerKey;
        private final Traverser<Map.Entry<String, Long>> trav = () -> {
            if (this.itemsEmittedPerKey == BackpressureTest.COUNT_PER_KEY_AND_SLICE) {
                return null;
            }
            try {
                return Util.entry(Integer.toString(this.key), 1L);
            } finally {
                int i = this.key + 1;
                this.key = i;
                if (i == 1000) {
                    this.itemsEmittedPerKey++;
                    this.key = 0;
                }
            }
        };

        private GenerateP() {
        }

        public boolean complete() {
            return emitFromTraverser(this.trav);
        }

        public String toString() {
            return "GenerateP";
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/benchmark/BackpressureTest$HiccupP.class */
    private static class HiccupP extends CombineP {
        private long startTime;
        private boolean isHiccuping;
        private long nextDeadline;
        private int itemsProcessed;

        private HiccupP() {
        }

        protected void init(@Nonnull Processor.Context context) throws Exception {
            super.init(context);
            long nanoTime = System.nanoTime();
            this.startTime = nanoTime;
            this.isHiccuping = true;
            this.nextDeadline = nanoTime;
        }

        @Override // com.hazelcast.jet.benchmark.BackpressureTest.CombineP
        protected boolean tryProcess(int i, @Nonnull Object obj) {
            updateHiccupStatus();
            if (this.isHiccuping) {
                return false;
            }
            this.itemsProcessed++;
            return super.tryProcess(i, obj);
        }

        private void updateHiccupStatus() {
            long nanoTime = System.nanoTime();
            if (nanoTime < this.nextDeadline) {
                return;
            }
            this.isHiccuping = !this.isHiccuping;
            long j = this.isHiccuping ? 301L : 700L;
            long j2 = this.isHiccuping ? 570L : 2000L;
            long j3 = this.nextDeadline;
            this.nextDeadline = nanoTime + TimeUnit.MILLISECONDS.toNanos(j) + TimeUnit.MILLISECONDS.toNanos(ThreadLocalRandom.current().nextLong(j2));
            PrintStream printStream = System.out;
            Object[] objArr = new Object[5];
            objArr[0] = Long.valueOf(TimeUnit.NANOSECONDS.toMillis(nanoTime - this.startTime));
            objArr[1] = this.isHiccuping ? "Hiccup" : "Resume";
            objArr[2] = Long.valueOf(TimeUnit.NANOSECONDS.toMillis(this.nextDeadline - nanoTime));
            objArr[3] = Long.valueOf(TimeUnit.NANOSECONDS.toMillis(nanoTime - j3));
            objArr[4] = Integer.valueOf(this.itemsProcessed);
            printStream.printf("==== %,7d %s for %,5d ms, late by %,d ms. Processed %,d items%n", objArr);
        }

        @Override // com.hazelcast.jet.benchmark.BackpressureTest.CombineP
        public String toString() {
            return "HiccupP";
        }
    }

    @Before
    public void setUp() {
        Config defaultInstanceConfigWithJetEnabled = defaultInstanceConfigWithJetEnabled();
        defaultInstanceConfigWithJetEnabled.getJetConfig().setCooperativeThreadCount(PARALLELISM_PER_MEMBER);
        this.hz1 = createHazelcastInstance(defaultInstanceConfigWithJetEnabled);
        this.hz2 = createHazelcastInstance(defaultInstanceConfigWithJetEnabled);
    }

    @Test
    public void testBackpressure() {
        DAG dag = new DAG();
        int port = this.hz1.getCluster().getLocalMember().getAddress().getPort();
        Member localMember = this.hz2.getCluster().getLocalMember();
        int intValue = ((Integer) this.hz1.getPartitionService().getPartitions().stream().filter(partition -> {
            return partition.getOwner().equals(localMember);
        }).map((v0) -> {
            return v0.getPartitionId();
        }).findAny().orElseThrow(() -> {
            return new RuntimeException("Can't find a partition owned by member " + this.hz2);
        })).intValue();
        Vertex newVertex = dag.newVertex("source", ProcessorMetaSupplier.of(address -> {
            return ProcessorSupplier.of(address.getPort() == port ? GenerateP::new : Processors.noopP());
        }));
        Vertex newVertex2 = dag.newVertex("hiccup", HiccupP::new);
        dag.edge(Edge.between(newVertex, newVertex2).distributed().partitioned(Functions.wholeItem(), (obj, i) -> {
            return intValue;
        })).edge(Edge.between(newVertex2, dag.newVertex("sink", SinkProcessors.writeMapP("counts"))));
        this.hz1.getJet().newJob(dag).join();
        assertCounts(this.hz1.getMap("counts"));
    }

    private static void assertCounts(Map<String, Long> map) {
        for (int i = 0; i < 1000; i++) {
            Long l = map.get(Integer.toString(i));
            Assert.assertNotNull("Missing count for " + i, l);
            Assert.assertEquals("The count for key " + i + " is not correct", COUNT_PER_KEY_AND_SLICE * PARALLELISM_PER_MEMBER, l.longValue());
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1171459060:
                if (implMethodName.equals("lambda$testBackpressure$d2eb3b95$1")) {
                    z = false;
                    break;
                }
                break;
            case 1271377522:
                if (implMethodName.equals("lambda$testBackpressure$de590dad$1")) {
                    z = true;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/benchmark/BackpressureTest") && serializedLambda.getImplMethodSignature().equals("(ILcom/hazelcast/cluster/Address;)Lcom/hazelcast/jet/core/ProcessorSupplier;")) {
                    int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return address -> {
                        return ProcessorSupplier.of(address.getPort() == intValue ? GenerateP::new : Processors.noopP());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/core/Partitioner") && serializedLambda.getFunctionalInterfaceMethodName().equals("getPartition") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;I)I") && serializedLambda.getImplClass().equals("com/hazelcast/jet/benchmark/BackpressureTest") && serializedLambda.getImplMethodSignature().equals("(ILjava/lang/Object;I)I")) {
                    int intValue2 = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return (obj, i) -> {
                        return intValue2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/benchmark/BackpressureTest$GenerateP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return GenerateP::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/benchmark/BackpressureTest$HiccupP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return HiccupP::new;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
