/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
implements OneInputStreamOperator<IN, OUT>,
Triggerable,
InputTypeConfigurable {
    private static final long serialVersionUID = 1L;
    protected final WindowAssigner<? super IN, W> windowAssigner;
    protected final KeySelector<IN, K> keySelector;
    protected final Trigger<? super IN, ? super W> trigger;
    protected final StateDescriptor<? extends MergingState<IN, ACC>, ?> windowStateDescriptor;
    protected TypeSerializer<IN> inputSerializer;
    protected final TypeSerializer<K> keySerializer;
    protected final TypeSerializer<W> windowSerializer;
    protected transient TimestampedCollector<OUT> timestampedCollector;
    protected transient long currentWatermark = -1L;
    protected transient Context context = new Context(this, null, null);
    protected transient Set<Timer<K, W>> processingTimeTimers;
    protected transient PriorityQueue<Timer<K, W>> processingTimeTimersQueue;
    protected transient Set<Timer<K, W>> watermarkTimers;
    protected transient PriorityQueue<Timer<K, W>> watermarkTimersQueue;

    public WindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, StateDescriptor<? extends MergingState<IN, ACC>, ?> windowStateDescriptor, InternalWindowFunction<ACC, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger) {
        super(windowFunction);
        this.windowAssigner = Objects.requireNonNull(windowAssigner);
        this.windowSerializer = windowSerializer;
        this.keySelector = Objects.requireNonNull(keySelector);
        this.keySerializer = Objects.requireNonNull(keySerializer);
        this.windowStateDescriptor = windowStateDescriptor;
        this.trigger = Objects.requireNonNull(trigger);
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        this.currentWatermark = -1L;
    }

    public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
        this.inputSerializer = type.createSerializer(executionConfig);
    }

    @Override
    public final void open() throws Exception {
        super.open();
        this.timestampedCollector = new TimestampedCollector(this.output);
        if (this.inputSerializer == null) {
            throw new IllegalStateException("Input serializer was not set.");
        }
        if (this.watermarkTimers == null) {
            this.watermarkTimers = new HashSet<Timer<K, W>>();
            this.watermarkTimersQueue = new PriorityQueue(100);
        }
        if (this.processingTimeTimers == null) {
            this.processingTimeTimers = new HashSet<Timer<K, W>>();
            this.processingTimeTimersQueue = new PriorityQueue(100);
        }
        this.context = new Context(this, null, null);
    }

    @Override
    public final void close() throws Exception {
        super.close();
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        Collection<W> elementWindows = this.windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
        Object key = this.getStateBackend().getCurrentKey();
        for (Window window : elementWindows) {
            MergingState<IN, ACC> windowState = this.getPartitionedState(window, this.windowSerializer, this.windowStateDescriptor);
            windowState.add(element.getValue());
            this.context.key = key;
            this.context.window = window;
            TriggerResult triggerResult = this.context.onElement(element);
            this.processTriggerResult(triggerResult, key, window);
        }
    }

    protected void processTriggerResult(TriggerResult triggerResult, K key, W window) throws Exception {
        if (!triggerResult.isFire() && !triggerResult.isPurge()) {
            return;
        }
        if (triggerResult.isFire()) {
            this.timestampedCollector.setAbsoluteTimestamp(((Window)window).maxTimestamp());
            MergingState<IN, ACC> windowState = this.getPartitionedState(window, this.windowSerializer, this.windowStateDescriptor);
            Object contents = windowState.get();
            ((InternalWindowFunction)this.userFunction).apply(this.context.key, this.context.window, contents, this.timestampedCollector);
            if (triggerResult.isPurge()) {
                windowState.clear();
                this.context.clear();
            }
        } else if (triggerResult.isPurge()) {
            MergingState<IN, ACC> windowState = this.getPartitionedState(window, this.windowSerializer, this.windowStateDescriptor);
            windowState.clear();
            this.context.clear();
        }
    }

    @Override
    public final void processWatermark(Watermark mark) throws Exception {
        this.processTriggersFor(mark);
        this.output.emitWatermark(mark);
        this.currentWatermark = mark.getTimestamp();
    }

    private void processTriggersFor(Watermark mark) throws Exception {
        boolean fire;
        do {
            Timer<K, W> timer;
            if ((timer = this.watermarkTimersQueue.peek()) != null && timer.timestamp <= mark.getTimestamp()) {
                fire = true;
                this.watermarkTimers.remove(timer);
                this.watermarkTimersQueue.remove();
                this.context.key = timer.key;
                this.context.window = timer.window;
                this.setKeyContext(timer.key);
                TriggerResult triggerResult = this.context.onEventTime(timer.timestamp);
                this.processTriggerResult(triggerResult, this.context.key, this.context.window);
                continue;
            }
            fire = false;
        } while (fire);
    }

    @Override
    public final void trigger(long time) throws Exception {
        boolean fire;
        do {
            Timer<K, W> timer;
            if ((timer = this.processingTimeTimersQueue.peek()) != null && timer.timestamp <= time) {
                fire = true;
                this.processingTimeTimers.remove(timer);
                this.processingTimeTimersQueue.remove();
                this.context.key = timer.key;
                this.context.window = timer.window;
                this.setKeyContext(timer.key);
                TriggerResult triggerResult = this.context.onProcessingTime(timer.timestamp);
                this.processTriggerResult(triggerResult, this.context.key, this.context.window);
                continue;
            }
            fire = false;
        } while (fire);
        this.processTriggersFor(new Watermark(this.currentWatermark));
    }

    @Override
    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
        AbstractStateBackend.CheckpointStateOutputView out = this.getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
        out.writeInt(this.watermarkTimersQueue.size());
        for (Timer<K, W> timer : this.watermarkTimersQueue) {
            this.keySerializer.serialize(timer.key, (DataOutputView)out);
            this.windowSerializer.serialize(timer.window, (DataOutputView)out);
            out.writeLong(timer.timestamp);
        }
        out.writeInt(this.processingTimeTimers.size());
        for (Timer<K, W> timer : this.processingTimeTimersQueue) {
            this.keySerializer.serialize(timer.key, (DataOutputView)out);
            this.windowSerializer.serialize(timer.window, (DataOutputView)out);
            out.writeLong(timer.timestamp);
        }
        taskState.setOperatorState(out.closeAndGetHandle());
        return taskState;
    }

    @Override
    public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception {
        super.restoreState(taskState, recoveryTimestamp);
        ClassLoader userClassloader = this.getUserCodeClassloader();
        StateHandle<?> inputState = taskState.getOperatorState();
        DataInputView in = (DataInputView)inputState.getState(userClassloader);
        int numWatermarkTimers = in.readInt();
        this.watermarkTimers = new HashSet<Timer<K, W>>(numWatermarkTimers);
        this.watermarkTimersQueue = new PriorityQueue(Math.max(numWatermarkTimers, 1));
        for (int i = 0; i < numWatermarkTimers; ++i) {
            Object key = this.keySerializer.deserialize(in);
            Window window = (Window)this.windowSerializer.deserialize(in);
            long timestamp = in.readLong();
            Timer<Object, Window> timer = new Timer<Object, Window>(timestamp, key, window);
            this.watermarkTimers.add(timer);
            this.watermarkTimersQueue.add(timer);
        }
        int numProcessingTimeTimers = in.readInt();
        this.processingTimeTimers = new HashSet<Timer<K, W>>(numProcessingTimeTimers);
        this.processingTimeTimersQueue = new PriorityQueue(Math.max(numProcessingTimeTimers, 1));
        for (int i = 0; i < numProcessingTimeTimers; ++i) {
            Object key = this.keySerializer.deserialize(in);
            Window window = (Window)this.windowSerializer.deserialize(in);
            long timestamp = in.readLong();
            Timer<Object, Window> timer = new Timer<Object, Window>(timestamp, key, window);
            this.processingTimeTimers.add(timer);
            this.processingTimeTimersQueue.add(timer);
        }
    }

    @VisibleForTesting
    public Trigger<? super IN, ? super W> getTrigger() {
        return this.trigger;
    }

    @VisibleForTesting
    public KeySelector<IN, K> getKeySelector() {
        return this.keySelector;
    }

    @VisibleForTesting
    public WindowAssigner<? super IN, W> getWindowAssigner() {
        return this.windowAssigner;
    }

    @VisibleForTesting
    public StateDescriptor<? extends MergingState<IN, ACC>, ?> getStateDescriptor() {
        return this.windowStateDescriptor;
    }

    protected static class Timer<K, W extends Window>
    implements Comparable<Timer<K, W>> {
        protected long timestamp;
        protected K key;
        protected W window;

        public Timer(long timestamp, K key, W window) {
            this.timestamp = timestamp;
            this.key = key;
            this.window = window;
        }

        @Override
        public int compareTo(Timer<K, W> o) {
            return Long.compare(this.timestamp, o.timestamp);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Timer timer = (Timer)o;
            return this.timestamp == timer.timestamp && this.key.equals(timer.key) && this.window.equals(timer.window);
        }

        public int hashCode() {
            int result = (int)(this.timestamp ^ this.timestamp >>> 32);
            result = 31 * result + this.key.hashCode();
            result = 31 * result + this.window.hashCode();
            return result;
        }

        public String toString() {
            return "Timer{timestamp=" + this.timestamp + ", key=" + this.key + ", window=" + this.window + '}';
        }
    }

    protected static class Context
    implements Trigger.TriggerContext {
        protected K key;
        protected W window;
        final /* synthetic */ WindowOperator this$0;

        public Context(K key, W window) {
            this.this$0 = this$0;
            this.key = key;
            this.window = window;
        }

        @Override
        public long getCurrentWatermark() {
            return this.this$0.currentWatermark;
        }

        @Override
        public <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
            TypeInformation typeInfo;
            Objects.requireNonNull(stateType, "The state type class must not be null");
            try {
                typeInfo = TypeExtractor.getForClass(stateType);
            }
            catch (Exception e) {
                throw new RuntimeException("Cannot analyze type '" + stateType.getName() + "' from the class alone, due to generic type parameters. " + "Please specify the TypeInformation directly.", e);
            }
            return this.getKeyValueState(name, typeInfo, defaultState);
        }

        @Override
        public <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
            Objects.requireNonNull(name, "The name of the state must not be null");
            Objects.requireNonNull(stateType, "The state type information must not be null");
            ValueStateDescriptor stateDesc = new ValueStateDescriptor(name, stateType.createSerializer(this.this$0.getExecutionConfig()), defaultState);
            return (ValueState)this.getPartitionedState((StateDescriptor<S, ?>)stateDesc);
        }

        @Override
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            try {
                return (S)this.this$0.getPartitionedState(this.window, this.this$0.windowSerializer, stateDescriptor);
            }
            catch (Exception e) {
                throw new RuntimeException("Could not retrieve state", e);
            }
        }

        @Override
        public void registerProcessingTimeTimer(long time) {
            Timer timer = new Timer(time, this.key, this.window);
            if (this.this$0.processingTimeTimers.add(timer)) {
                this.this$0.processingTimeTimersQueue.add(timer);
                this.this$0.getRuntimeContext().registerTimer(time, this.this$0);
            }
        }

        @Override
        public void registerEventTimeTimer(long time) {
            Timer timer = new Timer(time, this.key, this.window);
            if (this.this$0.watermarkTimers.add(timer)) {
                this.this$0.watermarkTimersQueue.add(timer);
            }
            if (time <= this.this$0.currentWatermark) {
                this.this$0.getRuntimeContext().registerTimer(System.currentTimeMillis(), this.this$0);
            }
        }

        @Override
        public void deleteProcessingTimeTimer(long time) {
            Timer timer = new Timer(time, this.key, this.window);
            if (this.this$0.processingTimeTimers.remove(timer)) {
                this.this$0.processingTimeTimersQueue.remove(timer);
            }
        }

        @Override
        public void deleteEventTimeTimer(long time) {
            Timer timer = new Timer(time, this.key, this.window);
            if (this.this$0.watermarkTimers.remove(timer)) {
                this.this$0.watermarkTimersQueue.remove(timer);
            }
        }

        public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
            return this.this$0.trigger.onElement(element.getValue(), element.getTimestamp(), this.window, this);
        }

        public TriggerResult onProcessingTime(long time) throws Exception {
            return this.this$0.trigger.onProcessingTime(time, this.window, this);
        }

        public TriggerResult onEventTime(long time) throws Exception {
            return this.this$0.trigger.onEventTime(time, this.window, this);
        }

        public void clear() throws Exception {
            this.this$0.trigger.clear(this.window, this);
        }

        public String toString() {
            return "Context{key=" + this.key + ", window=" + this.window + '}';
        }
    }
}

