package com.hazelcast.jet.core.test;

import com.hazelcast.config.NetworkConfig;
import com.hazelcast.instance.BuildInfoProvider;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.test.TestOutbox;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.LoggingServiceImpl;
import com.hazelcast.nio.Address;
import com.hazelcast.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.util.concurrent.IdleStrategy;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/core/test/TestSupport.class */
public final class TestSupport {
    private static final Address LOCAL_ADDRESS;
    private static final long COOPERATIVE_TIME_LIMIT_MS_FAIL = 1000;
    private static final long COOPERATIVE_TIME_LIMIT_MS_WARN = 5;
    private static final long BLOCKING_TIME_LIMIT_MS_WARN = 10000;
    private static final LoggingServiceImpl LOGGING_SERVICE;
    private Supplier<Processor> supplier;
    private long runUntilCompletedTimeout;
    static final /* synthetic */ boolean $assertionsDisabled;
    private List<?> input = Collections.emptyList();
    private List<?> expectedOutput = Collections.emptyList();
    private boolean assertProgress = true;
    private boolean doSnapshots = true;
    private boolean logInputOutput = true;
    private boolean callComplete = true;
    private long cooperativeTimeout = COOPERATIVE_TIME_LIMIT_MS_FAIL;
    private BiPredicate<? super List<?>, ? super List<?>> outputChecker = (v0, v1) -> {
        return Objects.equals(v0, v1);
    };

    private TestSupport(@Nonnull Supplier<Processor> supplier) {
        this.supplier = supplier;
    }

    public static TestSupport verifyProcessor(Processor processor) {
        return new TestSupport(singletonSupplier(processor)).disableSnapshots();
    }

    public static TestSupport verifyProcessor(@Nonnull Supplier<Processor> supplier) {
        return new TestSupport(supplier);
    }

    public static TestSupport verifyProcessor(@Nonnull ProcessorSupplier processorSupplier) {
        return new TestSupport(supplierFrom(processorSupplier));
    }

    public static TestSupport verifyProcessor(@Nonnull ProcessorMetaSupplier processorMetaSupplier) {
        return new TestSupport(supplierFrom(processorMetaSupplier));
    }

    public TestSupport input(@Nonnull List<?> list) {
        this.input = list;
        return this;
    }

    public void expectOutput(@Nonnull List<?> list) {
        this.expectedOutput = list;
        runTest(this.doSnapshots, this.doSnapshots);
    }

    public TestSupport disableProgressAssertion() {
        this.assertProgress = false;
        return this;
    }

    public TestSupport disableRunUntilCompleted(long j) {
        this.runUntilCompletedTimeout = j;
        return this;
    }

    public TestSupport disableSnapshots() {
        this.doSnapshots = false;
        return this;
    }

    public TestSupport disableLogging() {
        this.logInputOutput = false;
        return this;
    }

    public TestSupport disableCompleteCall() {
        this.callComplete = false;
        return this;
    }

    public TestSupport cooperativeTimeout(long j) {
        this.cooperativeTimeout = j;
        return this;
    }

    public TestSupport outputChecker(@Nonnull BiPredicate<? super List<?>, ? super List<?>> biPredicate) {
        this.outputChecker = biPredicate;
        return this;
    }

    private void runTest(boolean z, boolean z2) {
        if (!$assertionsDisabled && !z && z2) {
            throw new AssertionError("Illegal combination: don't do snapshots, but do restore");
        }
        BackoffIdleStrategy backoffIdleStrategy = new BackoffIdleStrategy(0L, 0L, TimeUnit.MICROSECONDS.toNanos(1L), TimeUnit.MILLISECONDS.toNanos(1L));
        int i = 0;
        if (z && z2) {
            System.out.println("### Running the test with doSnapshots=false");
            runTest(false, false);
            System.out.println("### Running the test with doSnapshots=true, doRestore=false");
            runTest(true, false);
            System.out.println("### Running the test with doSnapshots=true, doRestore=true");
        }
        TestInbox testInbox = new TestInbox();
        Processor[] processorArr = {this.supplier.get()};
        boolean isCooperative = processorArr[0].isCooperative();
        TestOutbox testOutbox = new TestOutbox(new int[]{1}, 1);
        ArrayList arrayList = new ArrayList();
        initProcessor(processorArr[0], testOutbox);
        snapshotAndRestore(processorArr, testOutbox, arrayList, z, z2);
        Iterator<?> it = this.input.iterator();
        while (true) {
            if (!it.hasNext() && testInbox.isEmpty()) {
                break;
            }
            if (testInbox.isEmpty()) {
                testInbox.add(it.next());
                if (this.logInputOutput) {
                    System.out.println("Input: " + testInbox.peek());
                }
            }
            checkTime("process", isCooperative, () -> {
                processorArr[0].process(0, testInbox);
            });
            boolean z3 = testInbox.isEmpty() || !testOutbox.queueWithOrdinal(0).isEmpty();
            JetAssert.assertTrue("process() call without progress", !this.assertProgress || z3);
            i = idle(backoffIdleStrategy, i, z3);
            if (testOutbox.queueWithOrdinal(0).size() == 1 && !testInbox.isEmpty()) {
                checkTime("process", isCooperative, () -> {
                    processorArr[0].process(0, testInbox);
                });
            }
            drainOutbox(testOutbox.queueWithOrdinal(0), arrayList, this.logInputOutput);
            if (testInbox.isEmpty()) {
                snapshotAndRestore(processorArr, testOutbox, arrayList, z, z2);
            }
        }
        if (this.callComplete) {
            long nanoTime = System.nanoTime();
            boolean[] zArr = {false};
            do {
                checkTime("complete", isCooperative, () -> {
                    zArr[0] = processorArr[0].complete();
                });
                boolean z4 = zArr[0] || !testOutbox.queueWithOrdinal(0).isEmpty();
                JetAssert.assertTrue("complete() call without progress", !this.assertProgress || z4);
                drainOutbox(testOutbox.queueWithOrdinal(0), arrayList, this.logInputOutput);
                snapshotAndRestore(processorArr, testOutbox, arrayList, z4 && z && !zArr[0], z2);
                i = idle(backoffIdleStrategy, i, z4);
                if (this.runUntilCompletedTimeout > 0 && toMillis(System.nanoTime() - nanoTime) > this.runUntilCompletedTimeout) {
                    break;
                }
            } while (!zArr[0]);
            JetAssert.assertTrue("complete returned true", !zArr[0] || this.runUntilCompletedTimeout <= 0);
        }
        if (this.outputChecker.test(this.expectedOutput, arrayList)) {
            return;
        }
        JetAssert.assertEquals("processor output with doSnapshots=" + z + " doesn't match", listToString(this.expectedOutput), listToString(arrayList));
    }

    private int idle(IdleStrategy idleStrategy, int i, boolean z) {
        int i2;
        if (z) {
            i2 = 0;
        } else {
            i2 = i + 1;
            idleStrategy.idle(i2);
        }
        return i2;
    }

    private void snapshotAndRestore(Processor[] processorArr, TestOutbox testOutbox, List<Object> list, boolean z, boolean z2) {
        if (z) {
            TestInbox testInbox = new TestInbox();
            boolean[] zArr = {false};
            boolean isCooperative = processorArr[0].isCooperative();
            HashSet hashSet = new HashSet();
            do {
                checkTime("saveSnapshot", isCooperative, () -> {
                    zArr[0] = processorArr[0].saveToSnapshot();
                });
                for (Map.Entry<TestOutbox.MockData, TestOutbox.MockData> entry : testOutbox.snapshotQueue()) {
                    Object object = entry.getKey().getObject();
                    JetAssert.assertTrue("Duplicate key produced in saveToSnapshot()\n  Duplicate: " + object + "\n  Keys so far: " + hashSet, hashSet.add(object));
                    testInbox.add(Util.entry(object, entry.getValue().getObject()));
                }
                JetAssert.assertTrue("saveToSnapshot() call without progress", (this.assertProgress && !zArr[0] && testOutbox.snapshotQueue().isEmpty() && testOutbox.queueWithOrdinal(0).isEmpty()) ? false : true);
                drainOutbox(testOutbox.queueWithOrdinal(0), list, this.logInputOutput);
                testOutbox.snapshotQueue().clear();
            } while (!zArr[0]);
            if (z2) {
                processorArr[0] = this.supplier.get();
                initProcessor(processorArr[0], testOutbox);
                int size = testInbox.size();
                while (true) {
                    int i = size;
                    if (testInbox.isEmpty()) {
                        break;
                    }
                    checkTime("restoreSnapshot", isCooperative, () -> {
                        processorArr[0].restoreFromSnapshot(testInbox);
                    });
                    JetAssert.assertTrue("restoreFromSnapshot() call without progress", (this.assertProgress && i <= testInbox.size() && testOutbox.queueWithOrdinal(0).isEmpty()) ? false : true);
                    drainOutbox(testOutbox.queueWithOrdinal(0), list, this.logInputOutput);
                    size = testInbox.size();
                }
                do {
                    checkTime("finishSnapshotRestore", isCooperative, () -> {
                        zArr[0] = processorArr[0].finishSnapshotRestore();
                    });
                    JetAssert.assertTrue("finishSnapshotRestore() call without progress", (this.assertProgress && !zArr[0] && testOutbox.queueWithOrdinal(0).isEmpty()) ? false : true);
                    drainOutbox(testOutbox.queueWithOrdinal(0), list, this.logInputOutput);
                } while (!zArr[0]);
            }
        }
    }

    private void checkTime(String str, boolean z, Runnable runnable) {
        long nanoTime = System.nanoTime();
        runnable.run();
        long nanoTime2 = System.nanoTime() - nanoTime;
        if (!z) {
            if (nanoTime2 > TimeUnit.MILLISECONDS.toNanos(10000L)) {
                System.out.println(String.format("Warning: call to %s() took %.2fms in non-cooperative processor. Is this to be expected?", str, Double.valueOf(toMillis(nanoTime2))));
            }
        } else {
            if (this.cooperativeTimeout > 0) {
                JetAssert.assertTrue(String.format("call to %s() took %.1fms, it should be <%dms", str, Double.valueOf(toMillis(nanoTime2)), Long.valueOf(COOPERATIVE_TIME_LIMIT_MS_FAIL)), nanoTime2 < TimeUnit.MILLISECONDS.toNanos(COOPERATIVE_TIME_LIMIT_MS_FAIL));
            }
            if (nanoTime2 > TimeUnit.MILLISECONDS.toNanos(COOPERATIVE_TIME_LIMIT_MS_WARN)) {
                System.out.println(String.format("Warning: call to %s() took %.2fms, it should be <%dms normally", str, Double.valueOf(toMillis(nanoTime2)), Long.valueOf(COOPERATIVE_TIME_LIMIT_MS_WARN)));
            }
        }
    }

    private void initProcessor(Processor processor, TestOutbox testOutbox) {
        processor.init(testOutbox, new TestProcessorContext().setLogger(getLogger(processor.getClass().getName())));
    }

    private static double toMillis(long j) {
        return j / TimeUnit.MILLISECONDS.toNanos(1L);
    }

    public static <T> void drainOutbox(Queue<T> queue, Collection<? super T> collection, boolean z) {
        while (true) {
            T poll = queue.poll();
            if (poll == null) {
                return;
            }
            collection.add(poll);
            if (z) {
                System.out.println("Output: " + poll);
            }
        }
    }

    public static Supplier<Processor> supplierFrom(ProcessorSupplier processorSupplier) {
        processorSupplier.init(new TestProcessorSupplierContext());
        return () -> {
            return processorSupplier.get(1).iterator().next();
        };
    }

    public static Supplier<Processor> supplierFrom(ProcessorMetaSupplier processorMetaSupplier) {
        processorMetaSupplier.init(new TestProcessorMetaSupplierContext());
        return supplierFrom(processorMetaSupplier.get(Collections.singletonList(LOCAL_ADDRESS)).apply(LOCAL_ADDRESS));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ILogger getLogger(String str) {
        return LOGGING_SERVICE.getLogger(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ILogger getLogger(Class cls) {
        return LOGGING_SERVICE.getLogger(cls);
    }

    private static String listToString(List<?> list) {
        return (String) list.stream().map(String::valueOf).collect(Collectors.joining("\n"));
    }

    private static Supplier<Processor> singletonSupplier(Processor processor) {
        Processor[] processorArr = {processor};
        return () -> {
            if (processorArr[0] == null) {
                throw new RuntimeException("More than one instance requested");
            }
            try {
                return processorArr[0];
            } finally {
                processorArr[0] = null;
            }
        };
    }

    static {
        $assertionsDisabled = !TestSupport.class.desiredAssertionStatus();
        LOGGING_SERVICE = new LoggingServiceImpl("test-group", null, BuildInfoProvider.getBuildInfo());
        try {
            LOCAL_ADDRESS = new Address("localhost", NetworkConfig.DEFAULT_PORT);
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }
}
