package com.hazelcast.jet.impl.processor;

import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.accumulator.LongAccumulator;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.SlidingWindowPolicy;
import com.hazelcast.jet.core.TimestampKind;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.core.test.TestSupport;
import com.hazelcast.jet.datamodel.KeyedWindowResult;
import com.hazelcast.test.HazelcastParallelParametersRunnerFactory;
import com.hazelcast.test.HazelcastParametrizedRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Parameterized.UseParametersRunnerFactory(HazelcastParallelParametersRunnerFactory.class)
@RunWith(HazelcastParametrizedRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/processor/SlidingWindowPTest.class */
public class SlidingWindowPTest {
    private static final Long KEY = 77L;

    @Parameterized.Parameter
    public boolean hasDeduct;

    @Parameterized.Parameter(1)
    public boolean singleStageProcessor;
    private SupplierEx<Processor> supplier;
    private SlidingWindowP lastSuppliedProcessor;

    @Parameterized.Parameters(name = "hasDeduct={0}, singleStageProcessor={1}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList(new Object[]{true, true}, new Object[]{true, false}, new Object[]{false, true}, new Object[]{false, false});
    }

    @Before
    public void before() {
        SlidingWindowPolicy slidingWinPolicy = SlidingWindowPolicy.slidingWinPolicy(4L, 1L);
        AggregateOperation1 andExportFinish = AggregateOperation.withCreate(LongAccumulator::new).andAccumulate((longAccumulator, entry) -> {
            longAccumulator.add(((Long) entry.getValue()).longValue());
        }).andCombine((v0, v1) -> {
            v0.add(v1);
        }).andDeduct(this.hasDeduct ? (v0, v1) -> {
            v0.subtract(v1);
        } : null).andExportFinish((v0) -> {
            return v0.get();
        });
        SupplierEx aggregateToSlidingWindowP = this.singleStageProcessor ? Processors.aggregateToSlidingWindowP(Collections.singletonList(obj -> {
            return KEY;
        }), Collections.singletonList((v0) -> {
            return v0.getKey();
        }), TimestampKind.EVENT, slidingWinPolicy, 0L, andExportFinish, (v1, v2, v3, v4, v5) -> {
            return new KeyedWindowResult(v1, v2, v3, v4, v5);
        }) : Processors.combineToSlidingWindowP(slidingWinPolicy, andExportFinish, (v1, v2, v3, v4, v5) -> {
            return new KeyedWindowResult(v1, v2, v3, v4, v5);
        });
        this.supplier = () -> {
            SlidingWindowP slidingWindowP = (SlidingWindowP) aggregateToSlidingWindowP.get();
            this.lastSuppliedProcessor = slidingWindowP;
            return slidingWindowP;
        };
    }

    @After
    public void after() {
        Assert.assertTrue("tsToKeyToFrame is not empty: " + this.lastSuppliedProcessor.tsToKeyToAcc, this.lastSuppliedProcessor.tsToKeyToAcc.isEmpty());
        Assert.assertTrue("slidingWindow is not empty: " + this.lastSuppliedProcessor.slidingWindow, this.lastSuppliedProcessor.slidingWindow == null || this.lastSuppliedProcessor.slidingWindow.isEmpty());
    }

    @Test
    public void when_noFramesReceived_then_onlyEmitWm() {
        List singletonList = Collections.singletonList(JetTestSupport.wm(1L));
        TestSupport.verifyProcessor(this.supplier).disableCompleteCall().input(singletonList).expectOutput(singletonList);
    }

    @Test
    public void when_multiKeyWatermarkReceived_then_emitOnlySupportedWm() {
        TestSupport.verifyProcessor(this.supplier).disableCompleteCall().input(Arrays.asList(JetTestSupport.wm(1L, (byte) 0), JetTestSupport.wm(1L, (byte) 1), JetTestSupport.wm(1L, (byte) 2))).expectOutput(Collections.singletonList(JetTestSupport.wm(1L)));
    }

    @Test
    public void simple_smokeTest() {
        TestSupport.verifyProcessor(this.supplier).disableCompleteCall().disableLogging().input(Arrays.asList(event(0L, 1L), JetTestSupport.wm(3L))).expectOutput(Arrays.asList(outboxFrame(0L, 1L), outboxFrame(1L, 1L), outboxFrame(2L, 1L), outboxFrame(3L, 1L), JetTestSupport.wm(3L)));
    }

    @Test
    public void when_receiveAscendingTimestamps_then_emitAscending() {
        TestSupport.verifyProcessor(this.supplier).disableCompleteCall().input(Arrays.asList(event(0L, 1L), event(1L, 1L), event(2L, 1L), event(3L, 1L), event(4L, 1L), JetTestSupport.wm(0L), JetTestSupport.wm(1L), JetTestSupport.wm(2L), JetTestSupport.wm(3L), JetTestSupport.wm(4L), JetTestSupport.wm(5L), JetTestSupport.wm(6L), JetTestSupport.wm(7L))).expectOutput(Arrays.asList(outboxFrame(0L, 1L), JetTestSupport.wm(0L), outboxFrame(1L, 2L), JetTestSupport.wm(1L), outboxFrame(2L, 3L), JetTestSupport.wm(2L), outboxFrame(3L, 4L), JetTestSupport.wm(3L), outboxFrame(4L, 4L), JetTestSupport.wm(4L), outboxFrame(5L, 3L), JetTestSupport.wm(5L), outboxFrame(6L, 2L), JetTestSupport.wm(6L), outboxFrame(7L, 1L), JetTestSupport.wm(7L)));
    }

    @Test
    public void when_receiveDescendingTimestamps_then_emitAscending() {
        TestSupport.verifyProcessor(this.supplier).disableCompleteCall().input(Arrays.asList(event(4L, 1L), event(3L, 1L), event(2L, 1L), event(1L, 1L), event(0L, 1L), JetTestSupport.wm(0L), JetTestSupport.wm(1L), JetTestSupport.wm(2L), JetTestSupport.wm(3L), JetTestSupport.wm(4L), JetTestSupport.wm(5L), JetTestSupport.wm(6L), JetTestSupport.wm(7L))).expectOutput(Arrays.asList(outboxFrame(0L, 1L), JetTestSupport.wm(0L), outboxFrame(1L, 2L), JetTestSupport.wm(1L), outboxFrame(2L, 3L), JetTestSupport.wm(2L), outboxFrame(3L, 4L), JetTestSupport.wm(3L), outboxFrame(4L, 4L), JetTestSupport.wm(4L), outboxFrame(5L, 3L), JetTestSupport.wm(5L), outboxFrame(6L, 2L), JetTestSupport.wm(6L), outboxFrame(7L, 1L), JetTestSupport.wm(7L)));
    }

    @Test
    public void when_receiveRandomTimestamps_then_emitAscending() {
        List list = (List) LongStream.range(0L, 100L).boxed().collect(Collectors.toList());
        Collections.shuffle(list);
        ArrayList arrayList = new ArrayList();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(event(((Long) it.next()).longValue(), 1L));
        }
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 > 105) {
                break;
            }
            arrayList.add(JetTestSupport.wm(j2));
            j = j2 + 1;
        }
        ArrayList arrayList2 = new ArrayList();
        arrayList2.addAll(Arrays.asList(outboxFrame(0L, 1L), JetTestSupport.wm(0L), outboxFrame(1L, 2L), JetTestSupport.wm(1L), outboxFrame(2L, 3L), JetTestSupport.wm(2L), outboxFrame(3L, 4L), JetTestSupport.wm(3L)));
        long j3 = 4;
        while (true) {
            long j4 = j3;
            if (j4 >= 100) {
                arrayList2.addAll(Arrays.asList(outboxFrame(100L, 3L), JetTestSupport.wm(100L), outboxFrame(101L, 2L), JetTestSupport.wm(101L), outboxFrame(102L, 1L), JetTestSupport.wm(102L), JetTestSupport.wm(103L), JetTestSupport.wm(104L), JetTestSupport.wm(105L)));
                TestSupport.verifyProcessor(this.supplier).disableCompleteCall().disableLogging().input(arrayList).expectOutput(arrayList2);
                return;
            } else {
                arrayList2.add(outboxFrame(j4, 4L));
                arrayList2.add(JetTestSupport.wm(j4));
                j3 = j4 + 1;
            }
        }
    }

    @Test
    public void when_wmNeverReceived_then_emitEverythingInComplete() {
        TestSupport.verifyProcessor(this.supplier).input(Arrays.asList(event(0L, 1L), event(1L, 1L))).expectOutput(Arrays.asList(outboxFrame(0L, 1L), outboxFrame(1L, 2L), outboxFrame(2L, 2L), outboxFrame(3L, 2L), outboxFrame(4L, 1L)));
    }

    @Test
    public void when_lateEvent_then_ignored() {
        TestSupport.verifyProcessor(this.supplier).input(Arrays.asList(JetTestSupport.wm(10L), event(7L, 1L), event(8L, 1L), event(9L, 1L), event(10L, 1L), event(11L, 123L))).expectOutput(Arrays.asList(JetTestSupport.wm(10L), outboxFrame(11L, 123L), outboxFrame(12L, 123L), outboxFrame(13L, 123L), outboxFrame(14L, 123L)));
    }

    private Map.Entry<Long, ?> event(long j, long j2) {
        return this.singleStageProcessor ? Util.entry(Long.valueOf(j - 1), Long.valueOf(j2)) : new KeyedWindowResult(j - 4, j, KEY, new LongAccumulator(j2));
    }

    private static KeyedWindowResult<Long, ?> outboxFrame(long j, long j2) {
        return new KeyedWindowResult<>(j - 4, j, KEY, Long.valueOf(j2));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2060248300:
                if (implMethodName.equals("subtract")) {
                    z = 4;
                    break;
                }
                break;
            case -1787405271:
                if (implMethodName.equals("lambda$before$52c622ae$1")) {
                    z = 7;
                    break;
                }
                break;
            case -1668311716:
                if (implMethodName.equals("lambda$before$c059caa9$1")) {
                    z = 3;
                    break;
                }
                break;
            case -1249358039:
                if (implMethodName.equals("getKey")) {
                    z = 2;
                    break;
                }
                break;
            case 96417:
                if (implMethodName.equals("add")) {
                    z = false;
                    break;
                }
                break;
            case 102230:
                if (implMethodName.equals("get")) {
                    z = 5;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = 6;
                    break;
                }
                break;
            case 1897471561:
                if (implMethodName.equals("lambda$before$df8434c$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/accumulator/LongAccumulator") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/accumulator/LongAccumulator;)Lcom/hazelcast/jet/accumulator/LongAccumulator;")) {
                    return (v0, v1) -> {
                        v0.add(v1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/processor/SlidingWindowPTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/function/SupplierEx;)Lcom/hazelcast/jet/core/Processor;")) {
                    SlidingWindowPTest slidingWindowPTest = (SlidingWindowPTest) serializedLambda.getCapturedArg(0);
                    SupplierEx supplierEx = (SupplierEx) serializedLambda.getCapturedArg(1);
                    return () -> {
                        SlidingWindowP slidingWindowP = (SlidingWindowP) supplierEx.get();
                        this.lastSuppliedProcessor = slidingWindowP;
                        return slidingWindowP;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/util/Map$Entry") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/processor/SlidingWindowPTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/accumulator/LongAccumulator;Ljava/util/Map$Entry;)V")) {
                    return (longAccumulator, entry) -> {
                        longAccumulator.add(((Long) entry.getValue()).longValue());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/accumulator/LongAccumulator") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/accumulator/LongAccumulator;)Lcom/hazelcast/jet/accumulator/LongAccumulator;")) {
                    return (v0, v1) -> {
                        v0.subtract(v1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/accumulator/LongAccumulator") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.get();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/accumulator/LongAccumulator") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return LongAccumulator::new;
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/core/function/KeyedWindowResultFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(JJLjava/lang/Object;Ljava/lang/Object;Z)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/datamodel/KeyedWindowResult") && serializedLambda.getImplMethodSignature().equals("(JJLjava/lang/Object;Ljava/lang/Object;Z)V")) {
                    return (v1, v2, v3, v4, v5) -> {
                        return new KeyedWindowResult(v1, v2, v3, v4, v5);
                    };
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/core/function/KeyedWindowResultFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(JJLjava/lang/Object;Ljava/lang/Object;Z)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/datamodel/KeyedWindowResult") && serializedLambda.getImplMethodSignature().equals("(JJLjava/lang/Object;Ljava/lang/Object;Z)V")) {
                    return (v1, v2, v3, v4, v5) -> {
                        return new KeyedWindowResult(v1, v2, v3, v4, v5);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/processor/SlidingWindowPTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Long;")) {
                    return obj -> {
                        return KEY;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
