package com.hazelcast.jet.core;

import com.hazelcast.collection.IList;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.Functions;
import com.hazelcast.function.ToLongFunctionEx;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.core.processor.SinkProcessors;
import com.hazelcast.jet.datamodel.KeyedWindowResult;
import com.hazelcast.test.HazelcastParallelParametersRunnerFactory;
import com.hazelcast.test.HazelcastParametrizedRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.lang.invoke.SerializedLambda;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Parameterized.UseParametersRunnerFactory(HazelcastParallelParametersRunnerFactory.class)
@RunWith(HazelcastParametrizedRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/core/Processors_slidingWindowingIntegrationTest.class */
public class Processors_slidingWindowingIntegrationTest extends JetTestSupport {

    @Parameterized.Parameter
    public boolean singleStageProcessor;

    @Parameterized.Parameter(1)
    public boolean isBatch;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/core/Processors_slidingWindowingIntegrationTest$EmitListP.class */
    public static class EmitListP extends AbstractProcessor {
        private Traverser<Object> traverser;
        private final boolean complete;

        EmitListP(List<?> list, boolean z) {
            this.traverser = Traversers.traverseIterable(list);
            this.complete = z;
            if (z) {
                return;
            }
            this.traverser = this.traverser.append(new Watermark(100000L));
        }

        public boolean isCooperative() {
            return false;
        }

        public boolean complete() {
            if (emitFromTraverser(this.traverser)) {
                return this.complete;
            }
            return false;
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/core/Processors_slidingWindowingIntegrationTest$MyEvent.class */
    private static class MyEvent extends AbstractMap.SimpleImmutableEntry<String, Long> {
        private final long timestamp;

        MyEvent(long j, String str, Long l) {
            super(str, l);
            this.timestamp = j;
        }

        long getTimestamp() {
            return this.timestamp;
        }
    }

    @Parameterized.Parameters(name = "singleStageProcessor={0}, isBatch={1}")
    public static Collection<Object> parameters() {
        return Arrays.asList(new Object[]{true, true}, new Object[]{true, false}, new Object[]{false, true}, new Object[]{false, false});
    }

    @Test
    public void smokeTest() throws Exception {
        HazelcastInstance createHazelcastInstance = createHazelcastInstance();
        SlidingWindowPolicy slidingWinPolicy = SlidingWindowPolicy.slidingWinPolicy(2000L, 1000L);
        DAG dag = new DAG();
        boolean z = this.isBatch;
        FunctionEx functionEx = (v0) -> {
            return v0.getKey();
        };
        ToLongFunctionEx toLongFunctionEx = (v0) -> {
            return v0.getTimestamp();
        };
        List singletonList = Collections.singletonList(new MyEvent(10L, "a", 1L));
        Vertex localParallelism = dag.newVertex("source", () -> {
            return new EmitListP(singletonList, z);
        }).localParallelism(1);
        Vertex localParallelism2 = dag.newVertex("insertWmP", Processors.insertWatermarksP(EventTimePolicy.eventTimePolicy(toLongFunctionEx, WatermarkPolicy.limitingLag(0L), slidingWinPolicy.frameSize(), slidingWinPolicy.frameOffset(), 0L))).localParallelism(1);
        Vertex newVertex = dag.newVertex("sink", SinkProcessors.writeListP("sink"));
        dag.edge(Edge.between(localParallelism, localParallelism2).isolated());
        AggregateOperation1 counting = AggregateOperations.counting();
        if (this.singleStageProcessor) {
            Vertex newVertex2 = dag.newVertex("slidingWin", Processors.aggregateToSlidingWindowP(Collections.singletonList(functionEx), Collections.singletonList(toLongFunctionEx), TimestampKind.EVENT, slidingWinPolicy, 0L, counting, (v1, v2, v3, v4, v5) -> {
                return new KeyedWindowResult(v1, v2, v3, v4, v5);
            }));
            dag.edge(Edge.between(localParallelism2, newVertex2).partitioned((v0) -> {
                return v0.getKey();
            }).distributed()).edge(Edge.between(newVertex2, newVertex));
        } else {
            Vertex newVertex3 = dag.newVertex("accumulateByFrame", Processors.accumulateByFrameP(Collections.singletonList(functionEx), Collections.singletonList(toLongFunctionEx), TimestampKind.EVENT, slidingWinPolicy, counting.withIdentityFinish()));
            Vertex newVertex4 = dag.newVertex("slidingWin", Processors.combineToSlidingWindowP(slidingWinPolicy, counting, (v1, v2, v3, v4, v5) -> {
                return new KeyedWindowResult(v1, v2, v3, v4, v5);
            }));
            dag.edge(Edge.between(localParallelism2, newVertex3).partitioned(functionEx)).edge(Edge.between(newVertex3, newVertex4).partitioned(Functions.entryKey()).distributed()).edge(Edge.between(newVertex4, newVertex));
        }
        Job newJob = createHazelcastInstance.getJet().newJob(dag);
        if (this.isBatch) {
            newJob.join();
        }
        IList list = createHazelcastInstance.getList("sink");
        List asList = Arrays.asList(new KeyedWindowResult(-1000L, 1000L, "a", 1L), new KeyedWindowResult(0L, 2000L, "a", 1L));
        assertTrueEventually(() -> {
            Assert.assertEquals(streamToString(asList.stream()), streamToString(new ArrayList((Collection) list).stream()));
        }, 5L);
        Thread.sleep(1000L);
        Assert.assertEquals(asList.size(), list.size());
    }

    private static String streamToString(Stream<?> stream) {
        return (String) stream.map(String::valueOf).collect(Collectors.joining("\n"));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1249358039:
                if (implMethodName.equals("getKey")) {
                    z = false;
                    break;
                }
                break;
            case -231044167:
                if (implMethodName.equals("lambda$smokeTest$715602f5$1")) {
                    z = true;
                    break;
                }
                break;
            case 45521504:
                if (implMethodName.equals("getTimestamp")) {
                    z = 2;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/util/AbstractMap$SimpleImmutableEntry") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/util/AbstractMap$SimpleImmutableEntry") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/Processors_slidingWindowingIntegrationTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;Z)Lcom/hazelcast/jet/core/Processor;")) {
                    List list = (List) serializedLambda.getCapturedArg(0);
                    boolean booleanValue = ((Boolean) serializedLambda.getCapturedArg(1)).booleanValue();
                    return () -> {
                        return new EmitListP(list, booleanValue);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/Processors_slidingWindowingIntegrationTest$MyEvent") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.getTimestamp();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/core/function/KeyedWindowResultFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(JJLjava/lang/Object;Ljava/lang/Object;Z)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/datamodel/KeyedWindowResult") && serializedLambda.getImplMethodSignature().equals("(JJLjava/lang/Object;Ljava/lang/Object;Z)V")) {
                    return (v1, v2, v3, v4, v5) -> {
                        return new KeyedWindowResult(v1, v2, v3, v4, v5);
                    };
                }
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/core/function/KeyedWindowResultFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(JJLjava/lang/Object;Ljava/lang/Object;Z)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/datamodel/KeyedWindowResult") && serializedLambda.getImplMethodSignature().equals("(JJLjava/lang/Object;Ljava/lang/Object;Z)V")) {
                    return (v1, v2, v3, v4, v5) -> {
                        return new KeyedWindowResult(v1, v2, v3, v4, v5);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
