package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.shaded.com.google.common.base.Function;
import org.apache.flink.shaded.com.google.common.collect.FluentIterable;
import org.apache.flink.shaded.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.class */
public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends WindowOperator<K, IN, Iterable<IN>, OUT, W> {
    private static final long serialVersionUID = 1;
    private final Evictor<? super IN, ? super W> evictor;
    private transient EvictingWindowOperator<K, IN, OUT, W>.EvictorContext evictorContext;
    private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator$EvictorContext.class */
    public class EvictorContext implements Evictor.EvictorContext {
        protected K key;
        protected W window;

        public EvictorContext(K k, W w) {
            this.key = k;
            this.window = w;
        }

        @Override // org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
        public long getCurrentProcessingTime() {
            return EvictingWindowOperator.this.internalTimerService.currentProcessingTime();
        }

        @Override // org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
        public long getCurrentWatermark() {
            return EvictingWindowOperator.this.internalTimerService.currentWatermark();
        }

        @Override // org.apache.flink.streaming.api.windowing.evictors.Evictor.EvictorContext
        public MetricGroup getMetricGroup() {
            return EvictingWindowOperator.this.getMetricGroup();
        }

        public K getKey() {
            return this.key;
        }

        void evictBefore(Iterable<TimestampedValue<IN>> iterable, int i) {
            EvictingWindowOperator.this.evictor.evictBefore(iterable, i, this.window, this);
        }

        void evictAfter(Iterable<TimestampedValue<IN>> iterable, int i) {
            EvictingWindowOperator.this.evictor.evictAfter(iterable, i, this.window, this);
        }
    }

    public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> typeSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> typeSerializer2, StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> stateDescriptor, InternalWindowFunction<Iterable<IN>, OUT, K, W> internalWindowFunction, Trigger<? super IN, ? super W> trigger, Evictor<? super IN, ? super W> evictor, long j) {
        super(windowAssigner, typeSerializer, keySelector, typeSerializer2, null, internalWindowFunction, trigger, j);
        this.evictor = (Evictor) Objects.requireNonNull(evictor);
        this.windowStateDescriptor = stateDescriptor;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        Collection<W> assignWindows = this.windowAssigner.assignWindows(streamRecord.getValue(), streamRecord.getTimestamp(), this.windowAssignerContext);
        final K k = (K) getKeyedStateBackend().getCurrentKey();
        if (!(this.windowAssigner instanceof MergingWindowAssigner)) {
            for (W w : assignWindows) {
                if (!isLate(w)) {
                    ListState<StreamRecord<IN>> partitionedState = getPartitionedState(w, this.windowSerializer, this.windowStateDescriptor);
                    partitionedState.add(streamRecord);
                    this.context.key = k;
                    this.context.window = w;
                    this.evictorContext.key = k;
                    this.evictorContext.window = w;
                    TriggerResult onElement = this.context.onElement(streamRecord);
                    if (onElement.isFire()) {
                        Iterable<StreamRecord<IN>> iterable = (Iterable) partitionedState.get();
                        if (iterable != null) {
                            emitWindowContents(w, iterable, partitionedState);
                        }
                    }
                    if (onElement.isPurge()) {
                        partitionedState.clear();
                    }
                    registerCleanupTimer(w);
                }
            }
            return;
        }
        MergingWindowSet mergingWindowSet = getMergingWindowSet();
        for (W w2 : assignWindows) {
            W w3 = (W) mergingWindowSet.addWindow(w2, new MergingWindowSet.MergeFunction<W>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.1
                public void merge(W w4, Collection<W> collection, W w5, Collection<W> collection2) throws Exception {
                    EvictingWindowOperator.this.context.key = (K) k;
                    EvictingWindowOperator.this.context.window = w4;
                    EvictingWindowOperator.this.context.onMerge(collection);
                    for (W w6 : collection) {
                        EvictingWindowOperator.this.context.window = w6;
                        EvictingWindowOperator.this.context.clear();
                        EvictingWindowOperator.this.deleteCleanupTimer(w6);
                    }
                    EvictingWindowOperator.this.getKeyedStateBackend().mergePartitionedStates(w5, collection2, EvictingWindowOperator.this.windowSerializer, EvictingWindowOperator.this.windowStateDescriptor);
                }

                /* JADX WARN: Multi-variable type inference failed */
                @Override // org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.MergeFunction
                public /* bridge */ /* synthetic */ void merge(Object obj, Collection collection, Object obj2, Collection collection2) throws Exception {
                    merge((Collection) obj, (Collection<Collection>) collection, (Collection) obj2, (Collection<Collection>) collection2);
                }
            });
            if (isLate(w3)) {
                mergingWindowSet.retireWindow(w3);
            } else {
                Window stateWindow = mergingWindowSet.getStateWindow(w3);
                if (stateWindow == null) {
                    throw new IllegalStateException("Window " + w2 + " is not in in-flight window set.");
                }
                ListState<StreamRecord<IN>> partitionedState2 = getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
                partitionedState2.add(streamRecord);
                this.context.key = k;
                this.context.window = w3;
                this.evictorContext.key = k;
                this.evictorContext.window = w3;
                TriggerResult onElement2 = this.context.onElement(streamRecord);
                if (onElement2.isFire()) {
                    Iterable<StreamRecord<IN>> iterable2 = (Iterable) partitionedState2.get();
                    if (iterable2 != null) {
                        emitWindowContents(w3, iterable2, partitionedState2);
                    }
                }
                if (onElement2.isPurge()) {
                    partitionedState2.clear();
                }
                registerCleanupTimer(w3);
            }
        }
        mergingWindowSet.persist();
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, org.apache.flink.streaming.api.operators.Triggerable
    public void onEventTime(InternalTimer<K, W> internalTimer) throws Exception {
        ListState<StreamRecord<IN>> listState;
        this.context.key = internalTimer.getKey();
        this.context.window = internalTimer.getNamespace();
        this.evictorContext.key = internalTimer.getKey();
        this.evictorContext.window = internalTimer.getNamespace();
        MergingWindowSet<W> mergingWindowSet = null;
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            mergingWindowSet = getMergingWindowSet();
            W stateWindow = mergingWindowSet.getStateWindow(this.context.window);
            listState = stateWindow == null ? null : (ListState) getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
        } else {
            listState = (ListState) getPartitionedState(this.context.window, this.windowSerializer, this.windowStateDescriptor);
        }
        Iterable<StreamRecord<IN>> iterable = null;
        if (listState != null) {
            iterable = (Iterable) listState.get();
        }
        if (iterable != null) {
            TriggerResult onEventTime = this.context.onEventTime(internalTimer.getTimestamp());
            if (onEventTime.isFire()) {
                emitWindowContents(this.context.window, iterable, listState);
            }
            if (onEventTime.isPurge()) {
                listState.clear();
            }
        }
        if (this.windowAssigner.isEventTime() && isCleanupTime(this.context.window, internalTimer.getTimestamp())) {
            clearAllState((EvictingWindowOperator<K, IN, OUT, W>) this.context.window, (ListState) listState, (MergingWindowSet<EvictingWindowOperator<K, IN, OUT, W>>) mergingWindowSet);
        }
        if (mergingWindowSet != null) {
            mergingWindowSet.persist();
        }
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, org.apache.flink.streaming.api.operators.Triggerable
    public void onProcessingTime(InternalTimer<K, W> internalTimer) throws Exception {
        ListState<StreamRecord<IN>> listState;
        this.context.key = internalTimer.getKey();
        this.context.window = internalTimer.getNamespace();
        this.evictorContext.key = internalTimer.getKey();
        this.evictorContext.window = internalTimer.getNamespace();
        MergingWindowSet<W> mergingWindowSet = null;
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            mergingWindowSet = getMergingWindowSet();
            W stateWindow = mergingWindowSet.getStateWindow(this.context.window);
            listState = stateWindow == null ? null : (ListState) getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
        } else {
            listState = (ListState) getPartitionedState(this.context.window, this.windowSerializer, this.windowStateDescriptor);
        }
        Iterable<StreamRecord<IN>> iterable = null;
        if (listState != null) {
            iterable = (Iterable) listState.get();
        }
        if (iterable != null) {
            TriggerResult onProcessingTime = this.context.onProcessingTime(internalTimer.getTimestamp());
            if (onProcessingTime.isFire()) {
                emitWindowContents(this.context.window, iterable, listState);
            }
            if (onProcessingTime.isPurge()) {
                listState.clear();
            }
        }
        if (!this.windowAssigner.isEventTime() && isCleanupTime(this.context.window, internalTimer.getTimestamp())) {
            clearAllState((EvictingWindowOperator<K, IN, OUT, W>) this.context.window, (ListState) listState, (MergingWindowSet<EvictingWindowOperator<K, IN, OUT, W>>) mergingWindowSet);
        }
        if (mergingWindowSet != null) {
            mergingWindowSet.persist();
        }
    }

    private void emitWindowContents(W w, Iterable<StreamRecord<IN>> iterable, ListState<StreamRecord<IN>> listState) throws Exception {
        this.timestampedCollector.setAbsoluteTimestamp(w.maxTimestamp());
        FluentIterable transform = FluentIterable.from(iterable).transform(new Function<StreamRecord<IN>, TimestampedValue<IN>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.2
            @Override // org.apache.flink.shaded.com.google.common.base.Function
            public TimestampedValue<IN> apply(StreamRecord<IN> streamRecord) {
                return TimestampedValue.from(streamRecord);
            }
        });
        this.evictorContext.evictBefore(transform, Iterables.size(transform));
        ((InternalWindowFunction) this.userFunction).apply(this.context.key, this.context.window, transform.transform(new Function<TimestampedValue<IN>, IN>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.3
            @Override // org.apache.flink.shaded.com.google.common.base.Function
            public IN apply(TimestampedValue<IN> timestampedValue) {
                return timestampedValue.getValue();
            }
        }), this.timestampedCollector);
        this.evictorContext.evictAfter(transform, Iterables.size(transform));
        listState.clear();
        Iterator<E> it = transform.iterator();
        while (it.hasNext()) {
            listState.add(((TimestampedValue) it.next()).getStreamRecord());
        }
    }

    private void clearAllState(W w, ListState<StreamRecord<IN>> listState, MergingWindowSet<W> mergingWindowSet) throws Exception {
        listState.clear();
        this.context.clear();
        if (mergingWindowSet != null) {
            mergingWindowSet.retireWindow(w);
            mergingWindowSet.persist();
        }
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.evictorContext = new EvictorContext(null, null);
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        this.evictorContext = null;
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator, org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void dispose() throws Exception {
        super.dispose();
        this.evictorContext = null;
    }

    @VisibleForTesting
    public Evictor<? super IN, ? super W> getEvictor() {
        return this.evictor;
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
    @VisibleForTesting
    public StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?> getStateDescriptor() {
        return this.windowStateDescriptor;
    }
}
