package com.hazelcast.jet.core.test;

import com.hazelcast.config.NetworkConfig;
import com.hazelcast.instance.BuildInfoProvider;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.LoggingServiceImpl;
import com.hazelcast.nio.Address;
import com.hazelcast.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.util.concurrent.IdleStrategy;
import java.lang.invoke.SerializedLambda;
import java.net.UnknownHostException;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/core/test/TestSupport.class */
public final class TestSupport {
    public static final BiPredicate<List<?>, List<?>> SAME_ITEMS_ANY_ORDER;
    private static final Address LOCAL_ADDRESS;
    private static final long COOPERATIVE_TIME_LIMIT_MS_FAIL = 1000;
    private static final long COOPERATIVE_TIME_LIMIT_MS_WARN = 5;
    private static final long BLOCKING_TIME_LIMIT_MS_WARN = 10000;
    private static final LoggingServiceImpl LOGGING_SERVICE;
    private ProcessorSupplier supplier;
    private JetInstance jetInstance;
    private long runUntilCompletedTimeout;
    static final /* synthetic */ boolean $assertionsDisabled;
    private List<List<?>> inputs = Collections.emptyList();
    private List<List<?>> expectedOutputs = Collections.emptyList();
    private int[] priorities = new int[0];
    private boolean assertProgress = true;
    private boolean doSnapshots = true;
    private boolean logInputOutput = true;
    private boolean callComplete = true;
    private long cooperativeTimeout = COOPERATIVE_TIME_LIMIT_MS_FAIL;
    private BiPredicate<? super List<?>, ? super List<?>> outputChecker = (v0, v1) -> {
        return Objects.equals(v0, v1);
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/core/test/TestSupport$ObjectWithOrdinal.class */
    public static class ObjectWithOrdinal {
        final int ordinal;
        final Object item;

        ObjectWithOrdinal(int i, Object obj) {
            this.ordinal = i;
            this.item = obj;
        }
    }

    private TestSupport(@Nonnull ProcessorSupplier processorSupplier) {
        this.supplier = processorSupplier;
    }

    public static TestSupport verifyProcessor(Processor processor) {
        return new TestSupport(ProcessorSupplier.of(singletonSupplier(processor))).disableSnapshots();
    }

    public static TestSupport verifyProcessor(@Nonnull DistributedSupplier<Processor> distributedSupplier) {
        return new TestSupport(ProcessorSupplier.of(distributedSupplier));
    }

    public static TestSupport verifyProcessor(@Nonnull ProcessorSupplier processorSupplier) {
        return new TestSupport(processorSupplier);
    }

    public static TestSupport verifyProcessor(@Nonnull ProcessorMetaSupplier processorMetaSupplier) {
        processorMetaSupplier.init(new TestProcessorMetaSupplierContext());
        return new TestSupport(processorMetaSupplier.get(Collections.singletonList(LOCAL_ADDRESS)).apply(LOCAL_ADDRESS));
    }

    public TestSupport input(@Nonnull List<?> list) {
        this.inputs = Collections.singletonList(list);
        this.priorities = new int[]{0};
        return this;
    }

    public TestSupport inputs(@Nonnull List<List<?>> list) {
        return inputs(list, new int[list.size()]);
    }

    public TestSupport inputs(@Nonnull List<List<?>> list, int[] iArr) {
        if (list.size() != iArr.length) {
            throw new IllegalArgumentException("Number of inputs must be equal to number of priorities");
        }
        this.inputs = list;
        this.priorities = iArr;
        return this;
    }

    public void expectOutput(@Nonnull List<?> list) {
        expectOutputs(Collections.singletonList(list));
    }

    public void expectOutputs(@Nonnull List<List<?>> list) {
        try {
            this.supplier.init(new TestProcessorSupplierContext());
            this.expectedOutputs = list;
            runTest(this.doSnapshots, this.doSnapshots ? 1 : 0);
            this.supplier.close(null);
        } catch (Exception e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    public TestSupport disableProgressAssertion() {
        this.assertProgress = false;
        return this;
    }

    public TestSupport disableRunUntilCompleted(long j) {
        this.runUntilCompletedTimeout = j;
        return this;
    }

    public TestSupport disableSnapshots() {
        this.doSnapshots = false;
        return this;
    }

    public TestSupport disableLogging() {
        this.logInputOutput = false;
        return this;
    }

    public TestSupport disableCompleteCall() {
        this.callComplete = false;
        return this;
    }

    public TestSupport cooperativeTimeout(long j) {
        this.cooperativeTimeout = j;
        return this;
    }

    public TestSupport outputChecker(@Nonnull BiPredicate<? super List<?>, ? super List<?>> biPredicate) {
        this.outputChecker = biPredicate;
        return this;
    }

    public TestSupport jetInstance(@Nonnull JetInstance jetInstance) {
        this.jetInstance = jetInstance;
        return this;
    }

    private static String modeDescription(boolean z, int i) {
        if (!z && i == 0) {
            return "snapshots disabled";
        }
        if (z && i == 1) {
            return "snapshots enabled, restoring every snapshot";
        }
        if (z && i == 2) {
            return "snapshots enabled, restoring every other snapshot";
        }
        if (z && i == Integer.MAX_VALUE) {
            return "snapshots enabled, never restoring them";
        }
        throw new IllegalArgumentException("Unknown mode, doSnapshots=" + z + ", doRestoreEvery=" + i);
    }

    private void runTest(boolean z, int i) throws Exception {
        String processInbox;
        if (!$assertionsDisabled && !z && i != 0) {
            throw new AssertionError("Illegal combination: don't do snapshots, but do restore");
        }
        IdleStrategy backoffIdleStrategy = new BackoffIdleStrategy(0L, 0L, TimeUnit.MICROSECONDS.toNanos(1L), TimeUnit.MILLISECONDS.toNanos(1L));
        int i2 = 0;
        if (z && i == 1) {
            runTest(false, 0);
            runTest(true, Integer.MAX_VALUE);
            runTest(true, 2);
        }
        System.out.println("### Running the test, mode=" + modeDescription(z, i));
        TestInbox testInbox = new TestInbox();
        int i3 = -1;
        Processor[] processorArr = {newProcessorFromSupplier()};
        boolean isCooperative = processorArr[0].isCooperative();
        TestOutbox[] testOutboxArr = {createOutbox()};
        ArrayList arrayList = new ArrayList(this.expectedOutputs.size());
        for (int i4 = 0; i4 < this.expectedOutputs.size(); i4++) {
            arrayList.add(new ArrayList());
        }
        initProcessor(processorArr[0], testOutboxArr[0]);
        int[] iArr = {0};
        snapshotAndRestore(processorArr, testOutboxArr, arrayList, z, i, iArr);
        Iterator<ObjectWithOrdinal> it = mixInputs(this.inputs, this.priorities).iterator();
        Watermark[] watermarkArr = {null};
        while (true) {
            if (!it.hasNext() && testInbox.isEmpty() && watermarkArr[0] == null) {
                break;
            }
            if (testInbox.isEmpty() && watermarkArr[0] == null && it.hasNext()) {
                ObjectWithOrdinal next = it.next();
                testInbox.queue().add(next.item);
                i3 = next.ordinal;
                if (this.logInputOutput) {
                    System.out.println(LocalTime.now() + " Input-" + next.ordinal + ": " + testInbox.peek());
                }
            }
            if (watermarkArr[0] != null) {
                processInbox = "offer";
                if (testOutboxArr[0].offer(watermarkArr[0])) {
                    watermarkArr[0] = null;
                }
            } else {
                processInbox = processInbox(testInbox, i3, isCooperative, processorArr, watermarkArr);
            }
            boolean z2 = testInbox.isEmpty() || !testOutboxArr[0].queue(0).isEmpty();
            JetAssert.assertTrue(processInbox + "() call without progress", !this.assertProgress || z2);
            i2 = idle(backoffIdleStrategy, i2, z2);
            if (testOutboxArr[0].queue(0).size() == 1 && !testInbox.isEmpty()) {
                testOutboxArr[0].reset();
                processInbox(testInbox, i3, isCooperative, processorArr, watermarkArr);
            }
            testOutboxArr[0].drainQueuesAndReset(arrayList, this.logInputOutput);
            if (testInbox.isEmpty() && watermarkArr[0] == null) {
                snapshotAndRestore(processorArr, testOutboxArr, arrayList, z, i, iArr);
            }
        }
        if (this.logInputOutput && !this.inputs.isEmpty()) {
            System.out.println(LocalTime.now() + " Input processed, calling complete()");
        }
        if (this.callComplete) {
            long nanoTime = System.nanoTime();
            boolean[] zArr = {false};
            do {
                checkTime("complete", isCooperative, () -> {
                    zArr[0] = processorArr[0].complete();
                });
                boolean z3 = zArr[0] || !testOutboxArr[0].queue(0).isEmpty();
                JetAssert.assertTrue("complete() call without progress", !this.assertProgress || z3);
                testOutboxArr[0].drainQueuesAndReset(arrayList, this.logInputOutput);
                snapshotAndRestore(processorArr, testOutboxArr, arrayList, z3 && z && !zArr[0], i, iArr);
                i2 = idle(backoffIdleStrategy, i2, z3);
                if (this.runUntilCompletedTimeout > 0 && toMillis(System.nanoTime() - nanoTime) > this.runUntilCompletedTimeout) {
                    break;
                }
            } while (!zArr[0]);
            JetAssert.assertTrue("complete returned true", !zArr[0] || this.runUntilCompletedTimeout <= 0);
        }
        processorArr[0].close(null);
        for (int i5 = 0; i5 < this.expectedOutputs.size(); i5++) {
            List<?> list = this.expectedOutputs.get(i5);
            List list2 = (List) arrayList.get(i5);
            if (!this.outputChecker.test(list, list2)) {
                JetAssert.assertEquals("processor output in mode \"" + modeDescription(z, i) + "\" doesn't match", listToString(list), listToString(list2));
            }
        }
    }

    private Processor newProcessorFromSupplier() {
        return this.supplier.get(1).iterator().next();
    }

    private static List<ObjectWithOrdinal> mixInputs(List<List<?>> list, int[] iArr) {
        boolean z;
        TreeMap treeMap = new TreeMap();
        for (int i = 0; i < iArr.length; i++) {
            ((List) treeMap.computeIfAbsent(Integer.valueOf(iArr[i]), num -> {
                return new ArrayList();
            })).add(Integer.valueOf(i));
        }
        ArrayList arrayList = new ArrayList();
        for (List<Integer> list2 : treeMap.values()) {
            int i2 = 0;
            do {
                z = true;
                for (Integer num2 : list2) {
                    if (list.get(num2.intValue()).size() > i2) {
                        arrayList.add(new ObjectWithOrdinal(num2.intValue(), list.get(num2.intValue()).get(i2)));
                        z = false;
                    }
                }
                i2++;
            } while (!z);
        }
        return arrayList;
    }

    private TestOutbox createOutbox() {
        return new TestOutbox(IntStream.generate(() -> {
            return 1;
        }).limit(this.expectedOutputs.size()).toArray(), 1);
    }

    private String processInbox(TestInbox testInbox, int i, boolean z, Processor[] processorArr, Watermark[] watermarkArr) {
        if (!(testInbox.peek() instanceof Watermark)) {
            checkTime("process", z, () -> {
                processorArr[0].process(i, testInbox);
            });
            return "process";
        }
        Watermark watermark = (Watermark) testInbox.peek();
        checkTime("tryProcessWatermark", z, () -> {
            if (processorArr[0].tryProcessWatermark(watermark)) {
                testInbox.remove();
                watermarkArr[0] = watermark;
            }
        });
        return "tryProcessWatermark";
    }

    private int idle(IdleStrategy idleStrategy, int i, boolean z) {
        int i2;
        if (z) {
            i2 = 0;
        } else {
            i2 = i + 1;
            idleStrategy.idle(i2);
        }
        return i2;
    }

    private void snapshotAndRestore(Processor[] processorArr, TestOutbox[] testOutboxArr, List<List<Object>> list, boolean z, int i, int[] iArr) throws Exception {
        if (z) {
            iArr[0] = iArr[0] + 1;
            boolean z2 = iArr[0] % i == 0;
            if (this.logInputOutput) {
                System.out.println(LocalTime.now() + (z2 ? " Saving & restoring snapshot" : " Saving snapshot without restoring it"));
            }
            TestInbox testInbox = new TestInbox();
            boolean[] zArr = {false};
            boolean isCooperative = processorArr[0].isCooperative();
            HashSet hashSet = new HashSet();
            do {
                checkTime("saveSnapshot", isCooperative, () -> {
                    zArr[0] = processorArr[0].saveToSnapshot();
                });
                JetAssert.assertTrue("saveToSnapshot() call without progress", (this.assertProgress && !zArr[0] && testOutboxArr[0].snapshotQueue().isEmpty() && testOutboxArr[0].queue(0).isEmpty()) ? false : true);
                testOutboxArr[0].drainSnapshotQueueAndReset(testInbox.queue(), false);
                testOutboxArr[0].drainQueuesAndReset(list, this.logInputOutput);
            } while (!zArr[0]);
            Iterator<Object> it = testInbox.queue().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                JetAssert.assertTrue("Duplicate key produced in saveToSnapshot()\n  Duplicate: " + entry.getKey() + "\n  Keys so far: " + hashSet, hashSet.add(entry.getKey()));
            }
            if (z2) {
                if (!$assertionsDisabled && !testOutboxArr[0].queue(0).isEmpty()) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && !testOutboxArr[0].snapshotQueue().isEmpty()) {
                    throw new AssertionError();
                }
                processorArr[0].close(null);
                processorArr[0] = newProcessorFromSupplier();
                testOutboxArr[0] = createOutbox();
                initProcessor(processorArr[0], testOutboxArr[0]);
                int size = testInbox.queue().size();
                while (true) {
                    int i2 = size;
                    if (testInbox.isEmpty()) {
                        break;
                    }
                    checkTime("restoreSnapshot", isCooperative, () -> {
                        processorArr[0].restoreFromSnapshot(testInbox);
                    });
                    JetAssert.assertTrue("restoreFromSnapshot() call without progress", (this.assertProgress && i2 <= testInbox.queue().size() && testOutboxArr[0].queue(0).isEmpty()) ? false : true);
                    testOutboxArr[0].drainQueuesAndReset(list, this.logInputOutput);
                    size = testInbox.queue().size();
                }
                do {
                    checkTime("finishSnapshotRestore", isCooperative, () -> {
                        zArr[0] = processorArr[0].finishSnapshotRestore();
                    });
                    JetAssert.assertTrue("finishSnapshotRestore() call without progress", (this.assertProgress && !zArr[0] && testOutboxArr[0].queue(0).isEmpty()) ? false : true);
                    testOutboxArr[0].drainQueuesAndReset(list, this.logInputOutput);
                } while (!zArr[0]);
            }
        }
    }

    private void checkTime(String str, boolean z, Runnable runnable) {
        long nanoTime = System.nanoTime();
        runnable.run();
        long nanoTime2 = System.nanoTime() - nanoTime;
        if (!z) {
            if (nanoTime2 > TimeUnit.MILLISECONDS.toNanos(10000L)) {
                System.out.println(String.format("Warning: call to %s() took %.2fms in non-cooperative processor. Is this expected?", str, Double.valueOf(toMillis(nanoTime2))));
            }
        } else {
            if (this.cooperativeTimeout > 0) {
                JetAssert.assertTrue(String.format("call to %s() took %.1fms, it should be <%dms", str, Double.valueOf(toMillis(nanoTime2)), Long.valueOf(COOPERATIVE_TIME_LIMIT_MS_FAIL)), nanoTime2 < TimeUnit.MILLISECONDS.toNanos(COOPERATIVE_TIME_LIMIT_MS_FAIL));
            }
            if (nanoTime2 > TimeUnit.MILLISECONDS.toNanos(COOPERATIVE_TIME_LIMIT_MS_WARN)) {
                System.out.println(String.format("Warning: call to %s() took %.2fms, it should be <%dms normally", str, Double.valueOf(toMillis(nanoTime2)), Long.valueOf(COOPERATIVE_TIME_LIMIT_MS_WARN)));
            }
        }
    }

    private void initProcessor(Processor processor, TestOutbox testOutbox) {
        TestProcessorContext logger = new TestProcessorContext().setLogger(getLogger(processor.getClass().getName()));
        if (this.jetInstance != null) {
            logger.setJetInstance(this.jetInstance);
        }
        processor.init(testOutbox, logger);
    }

    private static double toMillis(long j) {
        return j / TimeUnit.MILLISECONDS.toNanos(1L);
    }

    public static Supplier<Processor> supplierFrom(ProcessorSupplier processorSupplier) {
        processorSupplier.init(new TestProcessorSupplierContext());
        return () -> {
            return processorSupplier.get(1).iterator().next();
        };
    }

    public static Supplier<Processor> supplierFrom(ProcessorMetaSupplier processorMetaSupplier) {
        processorMetaSupplier.init(new TestProcessorMetaSupplierContext());
        return supplierFrom(processorMetaSupplier.get(Collections.singletonList(LOCAL_ADDRESS)).apply(LOCAL_ADDRESS));
    }

    static ILogger getLogger(String str) {
        return LOGGING_SERVICE.getLogger(str);
    }

    static ILogger getLogger(Class cls) {
        return LOGGING_SERVICE.getLogger(cls);
    }

    public static String listToString(List<?> list) {
        return (String) list.stream().map(String::valueOf).collect(Collectors.joining("\n"));
    }

    private static DistributedSupplier<Processor> singletonSupplier(Processor processor) {
        Processor[] processorArr = {processor};
        return () -> {
            if (processorArr[0] == null) {
                throw new RuntimeException("More than one instance requested");
            }
            try {
                return processorArr[0];
            } finally {
                processorArr[0] = null;
            }
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1528439691:
                if (implMethodName.equals("lambda$singletonSupplier$8b83db1b$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/DistributedSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/test/TestSupport") && serializedLambda.getImplMethodSignature().equals("([Lcom/hazelcast/jet/core/Processor;)Lcom/hazelcast/jet/core/Processor;")) {
                    Processor[] processorArr = (Processor[]) serializedLambda.getCapturedArg(0);
                    return () -> {
                        if (processorArr[0] == null) {
                            throw new RuntimeException("More than one instance requested");
                        }
                        try {
                            return processorArr[0];
                        } finally {
                            processorArr[0] = null;
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !TestSupport.class.desiredAssertionStatus();
        SAME_ITEMS_ANY_ORDER = (list, list2) -> {
            if (list.size() != list2.size()) {
                return false;
            }
            return ((Map) list.stream().collect(Collectors.toMap(DistributedFunction.identity(), obj -> {
                return 1;
            }, (v0, v1) -> {
                return Integer.sum(v0, v1);
            }))).equals((Map) list2.stream().collect(Collectors.toMap(DistributedFunction.identity(), obj2 -> {
                return 1;
            }, (v0, v1) -> {
                return Integer.sum(v0, v1);
            })));
        };
        LOGGING_SERVICE = new LoggingServiceImpl("test-group", null, BuildInfoProvider.getBuildInfo());
        try {
            LOCAL_ADDRESS = new Address("localhost", NetworkConfig.DEFAULT_PORT);
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }
}
