package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Arrays;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.OutputTag;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.class */
public class RegularWindowOperatorContractTest extends WindowOperatorContractTest {
    @Test
    public void testReducingWindow() throws Exception {
        WindowAssigner mockTimeWindowAssigner = mockTimeWindowAssigner();
        Trigger mockTrigger = mockTrigger();
        InternalWindowFunction mockWindowFunction = mockWindowFunction();
        ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("int-reduce", new ReduceFunction<Integer>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.RegularWindowOperatorContractTest.1
            private static final long serialVersionUID = 1;

            public Integer reduce(Integer num, Integer num2) throws Exception {
                return Integer.valueOf(num.intValue() + num2.intValue());
            }
        }, IntSerializer.INSTANCE);
        final ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("string-state", StringSerializer.INSTANCE);
        KeyedOneInputStreamOperatorTestHarness createWindowOperator = createWindowOperator(mockTimeWindowAssigner, mockTrigger, 0L, (StateDescriptor) reducingStateDescriptor, mockWindowFunction);
        createWindowOperator.open();
        Mockito.when(mockTimeWindowAssigner.assignWindows(Integer.valueOf(Matchers.anyInt()), Matchers.anyLong(), anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        Assert.assertEquals(0L, createWindowOperator.getOutput().size());
        Assert.assertEquals(0L, createWindowOperator.numKeyedStateEntries());
        createWindowOperator.processElement(new StreamRecord(1, 0L));
        createWindowOperator.processElement(new StreamRecord(1, 0L));
        ((Trigger) Mockito.doAnswer(new Answer<TriggerResult>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.RegularWindowOperatorContractTest.2
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public TriggerResult m60answer(InvocationOnMock invocationOnMock) throws Exception {
                TimeWindow timeWindow = (TimeWindow) invocationOnMock.getArguments()[2];
                Trigger.TriggerContext triggerContext = (Trigger.TriggerContext) invocationOnMock.getArguments()[3];
                triggerContext.registerEventTimeTimer(timeWindow.getEnd());
                triggerContext.getPartitionedState(valueStateDescriptor).update("hello");
                return TriggerResult.FIRE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), anyTimeWindow(), anyTriggerContext());
        createWindowOperator.processElement(new StreamRecord(1, 0L));
        ((InternalWindowFunction) Mockito.verify(mockWindowFunction, Mockito.times(2))).process(Integer.valueOf(Matchers.eq(1)), anyTimeWindow(), anyInternalWindowContext(), Integer.valueOf(Matchers.anyInt()), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction) Mockito.verify(mockWindowFunction, Mockito.times(1))).process(Integer.valueOf(Matchers.eq(1)), (Window) Matchers.eq(new TimeWindow(0L, 2L)), anyInternalWindowContext(), Integer.valueOf(Matchers.eq(3)), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction) Mockito.verify(mockWindowFunction, Mockito.times(1))).process(Integer.valueOf(Matchers.eq(1)), (Window) Matchers.eq(new TimeWindow(2L, 4L)), anyInternalWindowContext(), Integer.valueOf(Matchers.eq(3)), WindowOperatorContractTest.anyCollector());
        ((Trigger) Mockito.verify(mockTrigger, Mockito.never())).clear(anyTimeWindow(), anyTriggerContext());
        Assert.assertEquals(4L, createWindowOperator.numKeyedStateEntries());
        Assert.assertEquals(4L, createWindowOperator.numEventTimeTimers());
    }

    @Test
    public void testFoldingWindow() throws Exception {
        WindowAssigner mockTimeWindowAssigner = mockTimeWindowAssigner();
        Trigger mockTrigger = mockTrigger();
        InternalWindowFunction mockWindowFunction = mockWindowFunction();
        FoldingStateDescriptor foldingStateDescriptor = new FoldingStateDescriptor("int-fold", 0, new FoldFunction<Integer, Integer>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.RegularWindowOperatorContractTest.3
            private static final long serialVersionUID = 1;

            public Integer fold(Integer num, Integer num2) throws Exception {
                return Integer.valueOf(num.intValue() + num2.intValue());
            }
        }, IntSerializer.INSTANCE);
        final ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("string-state", StringSerializer.INSTANCE);
        KeyedOneInputStreamOperatorTestHarness createWindowOperator = createWindowOperator(mockTimeWindowAssigner, mockTrigger, 0L, (StateDescriptor) foldingStateDescriptor, mockWindowFunction);
        createWindowOperator.open();
        Mockito.when(mockTimeWindowAssigner.assignWindows(Integer.valueOf(Matchers.anyInt()), Matchers.anyLong(), anyAssignerContext())).thenReturn(Arrays.asList(new TimeWindow(2L, 4L), new TimeWindow(0L, 2L)));
        Assert.assertEquals(0L, createWindowOperator.getOutput().size());
        Assert.assertEquals(0L, createWindowOperator.numKeyedStateEntries());
        createWindowOperator.processElement(new StreamRecord(1, 0L));
        createWindowOperator.processElement(new StreamRecord(1, 0L));
        ((Trigger) Mockito.doAnswer(new Answer<TriggerResult>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.RegularWindowOperatorContractTest.4
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public TriggerResult m61answer(InvocationOnMock invocationOnMock) throws Exception {
                TimeWindow timeWindow = (TimeWindow) invocationOnMock.getArguments()[2];
                Trigger.TriggerContext triggerContext = (Trigger.TriggerContext) invocationOnMock.getArguments()[3];
                triggerContext.registerEventTimeTimer(timeWindow.getEnd());
                triggerContext.getPartitionedState(valueStateDescriptor).update("hello");
                return TriggerResult.FIRE;
            }
        }).when(mockTrigger)).onElement(Matchers.anyObject(), Matchers.anyLong(), anyTimeWindow(), anyTriggerContext());
        createWindowOperator.processElement(new StreamRecord(1, 0L));
        ((InternalWindowFunction) Mockito.verify(mockWindowFunction, Mockito.times(2))).process(Integer.valueOf(Matchers.eq(1)), anyTimeWindow(), anyInternalWindowContext(), Integer.valueOf(Matchers.anyInt()), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction) Mockito.verify(mockWindowFunction, Mockito.times(1))).process(Integer.valueOf(Matchers.eq(1)), (Window) Matchers.eq(new TimeWindow(0L, 2L)), anyInternalWindowContext(), Integer.valueOf(Matchers.eq(3)), WindowOperatorContractTest.anyCollector());
        ((InternalWindowFunction) Mockito.verify(mockWindowFunction, Mockito.times(1))).process(Integer.valueOf(Matchers.eq(1)), (Window) Matchers.eq(new TimeWindow(2L, 4L)), anyInternalWindowContext(), Integer.valueOf(Matchers.eq(3)), WindowOperatorContractTest.anyCollector());
        ((Trigger) Mockito.verify(mockTrigger, Mockito.never())).clear(anyTimeWindow(), anyTriggerContext());
        Assert.assertEquals(4L, createWindowOperator.numKeyedStateEntries());
        Assert.assertEquals(4L, createWindowOperator.numEventTimeTimers());
    }

    private <W extends Window, ACC, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(WindowAssigner<Integer, W> windowAssigner, Trigger<Integer, W> trigger, long j, StateDescriptor<? extends AppendingState<Integer, ACC>, ?> stateDescriptor, InternalWindowFunction<ACC, OUT, Integer, W> internalWindowFunction) throws Exception {
        KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.RegularWindowOperatorContractTest.5
            private static final long serialVersionUID = 1;

            public Integer getKey(Integer num) throws Exception {
                return num;
            }
        };
        return new KeyedOneInputStreamOperatorTestHarness<>(new WindowOperator(windowAssigner, windowAssigner.getWindowSerializer(new ExecutionConfig()), keySelector, IntSerializer.INSTANCE, stateDescriptor, internalWindowFunction, trigger, j, (OutputTag) null), keySelector, BasicTypeInfo.INT_TYPE_INFO);
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorContractTest
    protected <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(WindowAssigner<Integer, W> windowAssigner, Trigger<Integer, W> trigger, long j, InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> internalWindowFunction, OutputTag<Integer> outputTag) throws Exception {
        KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.RegularWindowOperatorContractTest.6
            private static final long serialVersionUID = 1;

            public Integer getKey(Integer num) throws Exception {
                return num;
            }
        };
        return new KeyedOneInputStreamOperatorTestHarness<>(new WindowOperator(windowAssigner, windowAssigner.getWindowSerializer(new ExecutionConfig()), keySelector, IntSerializer.INSTANCE, new ListStateDescriptor("int-list", IntSerializer.INSTANCE), internalWindowFunction, trigger, j, outputTag), keySelector, BasicTypeInfo.INT_TYPE_INFO);
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorContractTest
    protected <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(WindowAssigner<Integer, W> windowAssigner, Trigger<Integer, W> trigger, long j, InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> internalWindowFunction) throws Exception {
        return createWindowOperator(windowAssigner, trigger, j, internalWindowFunction, (OutputTag<Integer>) null);
    }
}
