package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.shaded.com.google.common.base.Function;
import org.apache.flink.shaded.com.google.common.collect.FluentIterable;
import org.apache.flink.shaded.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.class */
public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends WindowOperator<K, IN, Iterable<IN>, OUT, W> {
    private static final long serialVersionUID = 1;
    private final Evictor<? super IN, ? super W> evictor;
    private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor;

    public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> typeSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> typeSerializer2, StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> stateDescriptor, InternalWindowFunction<Iterable<IN>, OUT, K, W> internalWindowFunction, Trigger<? super IN, ? super W> trigger, Evictor<? super IN, ? super W> evictor) {
        super(windowAssigner, typeSerializer, keySelector, typeSerializer2, null, internalWindowFunction, trigger);
        this.evictor = (Evictor) Objects.requireNonNull(evictor);
        this.windowStateDescriptor = stateDescriptor;
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        Collection<W> assignWindows = this.windowAssigner.assignWindows(streamRecord.getValue(), streamRecord.getTimestamp());
        K k = (K) getStateBackend().getCurrentKey();
        for (W w : assignWindows) {
            getPartitionedState(w, this.windowSerializer, this.windowStateDescriptor).add(streamRecord);
            this.context.key = k;
            this.context.window = w;
            processTriggerResult(this.context.onElement(streamRecord), k, w);
        }
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
    protected void processTriggerResult(TriggerResult triggerResult, K k, W w) throws Exception {
        if (triggerResult.isFire() || triggerResult.isPurge()) {
            if (!triggerResult.isFire()) {
                if (triggerResult.isPurge()) {
                    setKeyContext(k);
                    getPartitionedState(w, this.windowSerializer, this.windowStateDescriptor).clear();
                    return;
                }
                return;
            }
            this.timestampedCollector.setAbsoluteTimestamp(w.maxTimestamp());
            setKeyContext(k);
            ListState partitionedState = getPartitionedState(w, this.windowSerializer, this.windowStateDescriptor);
            Iterable<StreamRecord<? super IN>> iterable = (Iterable) partitionedState.get();
            int evict = this.evictor.evict(iterable, Iterables.size(iterable), (Object) this.context.window);
            ((InternalWindowFunction) this.userFunction).apply(this.context.key, this.context.window, FluentIterable.from(iterable).skip(evict).transform(new Function<StreamRecord<IN>, IN>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.1
                @Override // org.apache.flink.shaded.com.google.common.base.Function
                public IN apply(StreamRecord<IN> streamRecord) {
                    return streamRecord.getValue();
                }
            }), this.timestampedCollector);
            if (triggerResult.isPurge()) {
                partitionedState.clear();
                return;
            }
            partitionedState.clear();
            Iterator<E> it = FluentIterable.from(iterable).skip(evict).iterator();
            while (it.hasNext()) {
                partitionedState.add((StreamRecord) it.next());
            }
        }
    }

    @VisibleForTesting
    public Evictor<? super IN, ? super W> getEvictor() {
        return this.evictor;
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
    @VisibleForTesting
    public StateDescriptor<? extends MergingState<IN, Iterable<IN>>, ?> getStateDescriptor() {
        return this.windowStateDescriptor;
    }
}
