package com.hazelcast.jet.impl.processor;

import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.WatermarkPolicy;
import com.hazelcast.jet.core.test.TestOutbox;
import com.hazelcast.jet.core.test.TestProcessorContext;
import com.hazelcast.jet.impl.execution.WatermarkCoalescer;
import com.hazelcast.test.HazelcastParallelParametersRunnerFactory;
import com.hazelcast.test.HazelcastParametrizedRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Parameterized.UseParametersRunnerFactory(HazelcastParallelParametersRunnerFactory.class)
@RunWith(HazelcastParametrizedRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/processor/InsertWatermarksPTest.class */
public class InsertWatermarksPTest {
    private static final long LAG = 3;

    @Parameterized.Parameter
    public int outboxCapacity;
    private InsertWatermarksP<Item> p;
    private TestOutbox outbox;
    private Processor.Context context;
    private MockClock clock = new MockClock(100);
    private List<Object> resultToCheck = new ArrayList();
    private SupplierEx<WatermarkPolicy> wmPolicy = WatermarkPolicy.limitingLag(LAG);
    private long watermarkThrottlingFrameSize = 1;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/processor/InsertWatermarksPTest$Item.class */
    public static class Item {
        final long timestamp;

        Item(long j) {
            this.timestamp = j;
        }

        long getTimestamp() {
            return this.timestamp;
        }

        public String toString() {
            return "Item{timestamp=" + this.timestamp + '}';
        }

        public boolean equals(Object obj) {
            return this == obj || ((obj instanceof Item) && this.timestamp == ((Item) obj).timestamp);
        }

        public int hashCode() {
            return (int) (this.timestamp ^ (this.timestamp >>> 32));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/processor/InsertWatermarksPTest$MockClock.class */
    public static class MockClock {
        long now;
        static final /* synthetic */ boolean $assertionsDisabled;

        MockClock(long j) {
            this.now = j;
        }

        void set(long j) {
            if (!$assertionsDisabled && j < this.now) {
                throw new AssertionError();
            }
            this.now = j;
        }

        static {
            $assertionsDisabled = !InsertWatermarksPTest.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/processor/InsertWatermarksPTest$Tick.class */
    public static final class Tick {
        final long timestamp;

        private Tick(long j) {
            this.timestamp = j;
        }

        public String toString() {
            return "-- at " + this.timestamp;
        }
    }

    @Parameterized.Parameters(name = "outboxCapacity={0}")
    public static Collection<Object> parameters() {
        return Arrays.asList(1, 1024);
    }

    @Before
    public void setUp() {
        this.outbox = new TestOutbox(new int[]{this.outboxCapacity});
        this.context = new TestProcessorContext();
    }

    @Test
    public void when_manyEvents_then_oneWm() throws Exception {
        doTest(Arrays.asList(item(10L), item(10L)), Arrays.asList(JetTestSupport.wm(7L), item(10L), item(10L)));
    }

    @Test
    public void when_eventsIncrease_then_wmIncreases() throws Exception {
        doTest(Arrays.asList(item(10L), item(11L)), Arrays.asList(JetTestSupport.wm(7L), item(10L), JetTestSupport.wm(8L), item(11L)));
    }

    @Test
    public void when_eventsDecrease_then_oneWm() throws Exception {
        doTest(Arrays.asList(item(11L), item(10L)), Arrays.asList(JetTestSupport.wm(8L), item(11L), item(10L)));
    }

    @Test
    public void when_lateEvent_then_notDropped() throws Exception {
        doTest(Arrays.asList(item(11L), item(7L)), Arrays.asList(JetTestSupport.wm(8L), item(11L), item(7L)));
    }

    @Test
    public void when_gapBetweenEvents_then_oneWm() throws Exception {
        doTest(Arrays.asList(item(10L), item(13L)), Arrays.asList(JetTestSupport.wm(7L), item(10L), JetTestSupport.wm(10L), item(13L)));
    }

    @Test
    public void when_zeroLag() throws Exception {
        this.wmPolicy = WatermarkPolicy.limitingLag(0L);
        doTest(Arrays.asList(item(10L), item(13L)), Arrays.asList(JetTestSupport.wm(10L), item(10L), JetTestSupport.wm(13L), item(13L)));
    }

    @Test
    public void emitByFrame_when_eventsIncrease_then_wmIncreases() throws Exception {
        this.watermarkThrottlingFrameSize = 2L;
        doTest(Arrays.asList(item(10L), item(11L), item(12L), item(13L)), Arrays.asList(JetTestSupport.wm(6L), item(10L), JetTestSupport.wm(8L), item(11L), item(12L), JetTestSupport.wm(10L), item(13L)));
    }

    @Test
    public void emitByFrame_when_eventsIncreaseAndStartAtVergeOfFrame_then_wmIncreases() throws Exception {
        this.watermarkThrottlingFrameSize = 2L;
        doTest(Arrays.asList(item(11L), item(12L), item(13L), item(14L)), Arrays.asList(JetTestSupport.wm(8L), item(11L), item(12L), JetTestSupport.wm(10L), item(13L), item(14L)));
    }

    @Test
    public void emitByFrame_when_eventsNotAtTheVergeOfFrame_then_wmEmittedCorrectly() throws Exception {
        this.watermarkThrottlingFrameSize = 10L;
        doTest(Arrays.asList(item(14L), item(15L), item(24L)), Arrays.asList(JetTestSupport.wm(10L), item(14L), item(15L), JetTestSupport.wm(20L), item(24L)));
    }

    @Test
    public void emitByFrame_when_gapBetweenEvents_then_gapInWms() throws Exception {
        this.watermarkThrottlingFrameSize = 2L;
        doTest(Arrays.asList(item(11L), item(15L)), Arrays.asList(JetTestSupport.wm(8L), item(11L), JetTestSupport.wm(12L), item(15L)));
    }

    @Test
    public void when_idleTimeout_then_idleMessageAfterTimeout() throws Exception {
        long millis;
        createProcessor(100L);
        this.resultToCheck.clear();
        long nanoTime = System.nanoTime();
        doAndDrain(() -> {
            return this.p.tryProcess(0, item(10L));
        });
        Assert.assertEquals(Arrays.asList(JetTestSupport.wm(7L), item(10L)), this.resultToCheck);
        this.resultToCheck.clear();
        do {
            Assert.assertTrue(this.p.tryProcess());
            millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
            this.outbox.drainQueueAndReset(0, this.resultToCheck, false);
            if (millis < 100) {
                Assert.assertTrue("outbox should be empty, elapsedMs=" + millis, this.resultToCheck.isEmpty());
            } else if (!this.resultToCheck.isEmpty()) {
                System.out.println("WM emitted after " + millis + "ms (shortly after 100 was expected)");
                Assert.assertEquals(Collections.singletonList(WatermarkCoalescer.IDLE_MESSAGE), this.resultToCheck);
                return;
            }
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1L));
        } while (millis < 1000);
    }

    private void createProcessor(long j) throws Exception {
        this.p = new InsertWatermarksP<>(context -> {
            return EventTimePolicy.eventTimePolicy((v0) -> {
                return v0.getTimestamp();
            }, this.wmPolicy, this.watermarkThrottlingFrameSize, 0L, j);
        });
        this.p.init(this.outbox, this.context);
    }

    private void doTest(List<Object> list, List<Object> list2) throws Exception {
        if (this.p == null) {
            createProcessor(0L);
        }
        for (Object obj : list) {
            if (obj instanceof Tick) {
                this.clock.set(((Tick) obj).timestamp);
                this.resultToCheck.add(tick(this.clock.now));
                InsertWatermarksP<Item> insertWatermarksP = this.p;
                insertWatermarksP.getClass();
                doAndDrain(insertWatermarksP::tryProcess);
            } else {
                Assert.assertTrue(obj instanceof Item);
                doAndDrain(() -> {
                    return this.p.tryProcess(0, obj);
                });
            }
        }
        Assert.assertEquals(listToString(list2), listToString(this.resultToCheck));
    }

    private void doAndDrain(BooleanSupplier booleanSupplier) {
        boolean asBoolean;
        int i = 0;
        do {
            asBoolean = booleanSupplier.getAsBoolean();
            this.outbox.drainQueueAndReset(0, this.resultToCheck, false);
            String str = "action not done in " + i + " attempts";
            i++;
            Assert.assertTrue(str, i < 10);
        } while (!asBoolean);
    }

    private String myToString(Object obj) {
        return obj instanceof Watermark ? "Watermark{timestamp=" + ((Watermark) obj).timestamp() + '}' : obj.toString();
    }

    private String listToString(List<?> list) {
        return (String) list.stream().map(this::myToString).collect(Collectors.joining("\n"));
    }

    private static Item item(long j) {
        return new Item(j);
    }

    private static Tick tick(long j) {
        return new Tick(j);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -664195380:
                if (implMethodName.equals("lambda$createProcessor$ba9d5ed$1")) {
                    z = false;
                    break;
                }
                break;
            case 45521504:
                if (implMethodName.equals("getTimestamp")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/processor/InsertWatermarksPTest") && serializedLambda.getImplMethodSignature().equals("(JLcom/hazelcast/jet/core/ProcessorSupplier$Context;)Lcom/hazelcast/jet/core/EventTimePolicy;")) {
                    InsertWatermarksPTest insertWatermarksPTest = (InsertWatermarksPTest) serializedLambda.getCapturedArg(0);
                    long longValue = ((Long) serializedLambda.getCapturedArg(1)).longValue();
                    return context -> {
                        return EventTimePolicy.eventTimePolicy((v0) -> {
                            return v0.getTimestamp();
                        }, this.wmPolicy, this.watermarkThrottlingFrameSize, 0L, longValue);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/processor/InsertWatermarksPTest$Item") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.getTimestamp();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
