package com.hazelcast.jet.impl.processor;

import com.hazelcast.function.Functions;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.test.TestOutbox;
import com.hazelcast.jet.core.test.TestProcessorContext;
import com.hazelcast.jet.core.test.TestSupport;
import com.hazelcast.jet.datamodel.KeyedWindowResult;
import com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor_GetLastMemberHeartbeatMillisTest;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.test.annotation.Repeat;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/processor/SessionWindowPTest.class */
public class SessionWindowPTest {
    private static final int SESSION_TIMEOUT = 10;
    private SupplierEx<Processor> supplier;
    private SessionWindowP<String, ?, Long, KeyedWindowResult<String, Long>> lastSuppliedProcessor;

    @Before
    public void before() {
        this.supplier = () -> {
            SessionWindowP<String, ?, Long, KeyedWindowResult<String, Long>> sessionWindowP = new SessionWindowP<>(10L, 0L, Collections.singletonList((v0) -> {
                return v0.getValue();
            }), Collections.singletonList(Functions.entryKey()), AggregateOperations.counting(), (v1, v2, v3, v4, v5) -> {
                return new KeyedWindowResult(v1, v2, v3, v4, v5);
            }, (byte) 0);
            this.lastSuppliedProcessor = sessionWindowP;
            return sessionWindowP;
        };
    }

    @After
    public void after() {
        Assert.assertTrue("keyToWindows not empty", this.lastSuppliedProcessor.keyToWindows.isEmpty());
        Assert.assertTrue("deadlineToKeys not empty", this.lastSuppliedProcessor.deadlineToKeys.isEmpty());
    }

    @Test
    public void when_orderedEventsWithOneKey() {
        assertCorrectness(eventsWithKey("a"));
    }

    @Test
    @Repeat(10)
    public void when_disorderedEventsWithOneKey() {
        List<Object> eventsWithKey = eventsWithKey("a");
        Collections.shuffle(eventsWithKey);
        assertCorrectness(eventsWithKey);
    }

    @Test
    public void when_orderedEventsWithThreeKeys() {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(eventsWithKey("a"));
        arrayList.addAll(eventsWithKey("b"));
        arrayList.addAll(eventsWithKey("c"));
        assertCorrectness(arrayList);
    }

    @Test
    public void when_multiKeyWatermarkReceived_then_emitOnlySupportedWm() {
        TestSupport.verifyProcessor(this.supplier).disableCompleteCall().input(Arrays.asList(JetTestSupport.wm(1L), JetTestSupport.wm(1L, (byte) 1), JetTestSupport.wm(1L, (byte) 2))).expectOutput(Collections.singletonList(JetTestSupport.wm(1L)));
    }

    @Test
    @Repeat(10)
    public void when_disorderedEVentsWithThreeKeys() {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(eventsWithKey("a"));
        arrayList.addAll(eventsWithKey("b"));
        arrayList.addAll(eventsWithKey("c"));
        Collections.shuffle(arrayList);
        assertCorrectness(arrayList);
    }

    @Test
    public void when_batchProcessing_then_flushEverything() {
        ArrayList arrayList = new ArrayList(eventsWithKey("a"));
        arrayList.add(new Watermark(25L));
        TestSupport.verifyProcessor(this.supplier).input(arrayList).expectOutput(Arrays.asList(new KeyedWindowResult(1L, 22L, "a", 3L, false), new Watermark(25L), new KeyedWindowResult(30L, 50L, "a", 3L, false)));
    }

    @Test
    public void when_lateEvent_then_dropped() {
        TestSupport.verifyProcessor(this.supplier).input(Arrays.asList(new Watermark(20L), Util.entry("key", 19L))).expectOutput(Collections.singletonList(new Watermark(20L)));
    }

    @Test
    public void when_sessionsTouch_then_shouldNotBeMerged() {
        TestSupport.verifyProcessor(this.supplier).input(Arrays.asList(Util.entry("key", 0L), Util.entry("key", 10L))).expectOutput(Arrays.asList(new KeyedWindowResult(0L, 10L, "key", 1L, false), new KeyedWindowResult(10L, 20L, "key", 1L, false)));
    }

    private void assertCorrectness(List<Object> list) {
        List list2 = (List) list.stream().map(obj -> {
            return (String) ((Map.Entry) obj).getKey();
        }).flatMap(SessionWindowPTest::expectedSessions).distinct().collect(Collectors.toList());
        list.add(new Watermark(100L));
        list2.add(new Watermark(100L));
        try {
            TestSupport.verifyProcessor(this.supplier).outputChecker(TestSupport.SAME_ITEMS_ANY_ORDER).input(list).expectOutput(list2);
        } catch (AssertionError e) {
            System.err.println("Tested with events: " + list);
            throw e;
        }
    }

    public static void main(String[] strArr) throws Exception {
        for (int i = 0; i < 10; i++) {
            SessionWindowPTest sessionWindowPTest = new SessionWindowPTest();
            sessionWindowPTest.before();
            sessionWindowPTest.runBench();
        }
    }

    private void runBench() throws Exception {
        ThreadLocalRandom current = ThreadLocalRandom.current();
        long nanoTime = System.nanoTime();
        long j = 40000000 / 2000;
        System.out.format("keyCount %,d eventsPerKey %,d wmInterval %,d%n", 2000L, Long.valueOf(j), 100L);
        TestOutbox testOutbox = new TestOutbox(new int[]{1024});
        this.supplier.get();
        this.lastSuppliedProcessor.init(testOutbox, new TestProcessorContext());
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (j3 >= j) {
                System.out.format("%nThroughput %,3d events/second%n", Long.valueOf((TimeUnit.SECONDS.toNanos(1L) * 40000000) / (System.nanoTime() - nanoTime)));
                return;
            }
            long j4 = j3 * 20;
            long j5 = (j4 / 10) % 2;
            while (true) {
                long j6 = j5;
                if (j6 >= 2000) {
                    break;
                }
                do {
                } while (!this.lastSuppliedProcessor.tryProcess(0, Util.entry(Long.valueOf(j6), Long.valueOf(j4 + current.nextInt(InvocationMonitor_GetLastMemberHeartbeatMillisTest.CALL_TIMEOUT)))));
                do {
                } while (!this.lastSuppliedProcessor.tryProcess(0, Util.entry(Long.valueOf(j6), Long.valueOf(j4 + current.nextInt(InvocationMonitor_GetLastMemberHeartbeatMillisTest.CALL_TIMEOUT)))));
                j5 = j6 + 2;
            }
            if (j3 % 100 == 0) {
                long j7 = j4 - 2000;
                int i = 0;
                while (!this.lastSuppliedProcessor.tryProcessWatermark(new Watermark(j7))) {
                    while (testOutbox.queue(0).poll() != null) {
                        i++;
                    }
                }
                while (testOutbox.queue(0).poll() != null) {
                    i++;
                }
            }
            j2 = j3 + 1;
        }
    }

    private static List<Object> eventsWithKey(String str) {
        return new ArrayList(Arrays.asList(Util.entry(str, 1L), Util.entry(str, 6L), Util.entry(str, 12L), Util.entry(str, 30L), Util.entry(str, 35L), Util.entry(str, 40L)));
    }

    private static Stream<KeyedWindowResult<String, Long>> expectedSessions(String str) {
        return Stream.of((Object[]) new KeyedWindowResult[]{new KeyedWindowResult(1L, 22L, str, 3L, false), new KeyedWindowResult(30L, 50L, str, 3L, false)});
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1092922539:
                if (implMethodName.equals("lambda$before$fb1a34a4$1")) {
                    z = true;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/core/function/KeyedWindowResultFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(JJLjava/lang/Object;Ljava/lang/Object;Z)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/datamodel/KeyedWindowResult") && serializedLambda.getImplMethodSignature().equals("(JJLjava/lang/Object;Ljava/lang/Object;Z)V")) {
                    return (v1, v2, v3, v4, v5) -> {
                        return new KeyedWindowResult(v1, v2, v3, v4, v5);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/processor/SessionWindowPTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    SessionWindowPTest sessionWindowPTest = (SessionWindowPTest) serializedLambda.getCapturedArg(0);
                    return () -> {
                        SessionWindowP<String, ?, Long, KeyedWindowResult<String, Long>> sessionWindowP = new SessionWindowP<>(10L, 0L, Collections.singletonList((v0) -> {
                            return v0.getValue();
                        }), Collections.singletonList(Functions.entryKey()), AggregateOperations.counting(), (v1, v2, v3, v4, v5) -> {
                            return new KeyedWindowResult(v1, v2, v3, v4, v5);
                        }, (byte) 0);
                        this.lastSuppliedProcessor = sessionWindowP;
                        return sessionWindowP;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
