/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cep.operator;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.operator.StreamRecordComparator;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.migration.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Migration;
import org.apache.flink.util.Preconditions;

public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT>,
Triggerable<KEY, VoidNamespace>,
CheckpointedRestoringOperator {
    private static final long serialVersionUID = -4166778210774160757L;
    private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
    private final boolean isProcessingTime;
    private final TypeSerializer<IN> inputSerializer;
    private final TypeSerializer<KEY> keySerializer;
    private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorStateName";
    private static final String EVENT_QUEUE_STATE_NAME = "eventQueuesStateName";
    private transient ValueState<NFA<IN>> nfaOperatorState;
    private transient MapState<Long, List<IN>> elementQueueState;
    private final NFACompiler.NFAFactory<IN> nfaFactory;
    private transient InternalTimerService<VoidNamespace> timerService;
    private long lastWatermark;
    private final boolean migratingFromOldKeyedOperator;

    public AbstractKeyedCEPPatternOperator(TypeSerializer<IN> inputSerializer, boolean isProcessingTime, TypeSerializer<KEY> keySerializer, NFACompiler.NFAFactory<IN> nfaFactory, boolean migratingFromOldKeyedOperator) {
        this.inputSerializer = (TypeSerializer)Preconditions.checkNotNull(inputSerializer);
        this.isProcessingTime = (Boolean)Preconditions.checkNotNull((Object)isProcessingTime);
        this.keySerializer = (TypeSerializer)Preconditions.checkNotNull(keySerializer);
        this.nfaFactory = (NFACompiler.NFAFactory)Preconditions.checkNotNull(nfaFactory);
        this.migratingFromOldKeyedOperator = migratingFromOldKeyedOperator;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        if (this.nfaOperatorState == null) {
            this.nfaOperatorState = this.getRuntimeContext().getState(new ValueStateDescriptor(NFA_OPERATOR_STATE_NAME, new NFA.NFASerializer<IN>(this.inputSerializer)));
        }
        if (this.elementQueueState == null) {
            this.elementQueueState = this.getRuntimeContext().getMapState(new MapStateDescriptor(EVENT_QUEUE_STATE_NAME, (TypeSerializer)LongSerializer.INSTANCE, (TypeSerializer)new ListSerializer(this.inputSerializer)));
        }
    }

    public void open() throws Exception {
        super.open();
        this.timerService = this.getInternalTimerService("watermark-callbacks", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this);
    }

    public void processElement(StreamRecord<IN> element) throws Exception {
        if (this.isProcessingTime) {
            NFA<IN> nfa = this.getNFA();
            this.processEvent(nfa, element.getValue(), this.getProcessingTimeService().getCurrentProcessingTime());
            this.updateNFA(nfa);
        } else {
            long timestamp = element.getTimestamp();
            Object value = element.getValue();
            if (timestamp >= this.lastWatermark) {
                this.saveRegisterWatermarkTimer();
                ArrayList<Object> elementsForTimestamp = (ArrayList<Object>)this.elementQueueState.get((Object)timestamp);
                if (elementsForTimestamp == null) {
                    elementsForTimestamp = new ArrayList<Object>();
                }
                if (this.getExecutionConfig().isObjectReuseEnabled()) {
                    elementsForTimestamp.add(this.inputSerializer.copy(value));
                } else {
                    elementsForTimestamp.add(element.getValue());
                }
                this.elementQueueState.put((Object)timestamp, elementsForTimestamp);
            }
        }
    }

    private void saveRegisterWatermarkTimer() {
        long currentWatermark = this.timerService.currentWatermark();
        if (currentWatermark + 1L > currentWatermark) {
            this.timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, currentWatermark + 1L);
        }
    }

    public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
        PriorityQueue<Long> sortedTimestamps = this.getSortedTimestamps();
        NFA<IN> nfa = this.getNFA();
        while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= this.timerService.currentWatermark()) {
            long timestamp = sortedTimestamps.poll();
            for (Object element : (List)this.elementQueueState.get((Object)timestamp)) {
                this.processEvent(nfa, element, timestamp);
            }
            this.elementQueueState.remove((Object)timestamp);
        }
        this.advanceTime(nfa, this.timerService.currentWatermark());
        if (sortedTimestamps.isEmpty()) {
            this.elementQueueState.clear();
        }
        this.updateNFA(nfa);
        if (!sortedTimestamps.isEmpty() || !nfa.isEmpty()) {
            this.saveRegisterWatermarkTimer();
        }
        this.updateLastSeenWatermark(this.timerService.currentWatermark());
    }

    public void onProcessingTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
    }

    private void updateLastSeenWatermark(long timestamp) {
        this.lastWatermark = timestamp;
    }

    private NFA<IN> getNFA() throws IOException {
        NFA<IN> nfa = (NFA<IN>)this.nfaOperatorState.value();
        return nfa != null ? nfa : this.nfaFactory.createNFA();
    }

    private void updateNFA(NFA<IN> nfa) throws IOException {
        if (nfa.isEmpty()) {
            this.nfaOperatorState.clear();
        } else {
            this.nfaOperatorState.update(nfa);
        }
    }

    private PriorityQueue<Long> getSortedTimestamps() throws Exception {
        PriorityQueue<Long> sortedTimestamps = new PriorityQueue<Long>();
        for (Long timestamp : this.elementQueueState.keys()) {
            sortedTimestamps.offer(timestamp);
        }
        return sortedTimestamps;
    }

    protected abstract void processEvent(NFA<IN> var1, IN var2, long var3);

    protected abstract void advanceTime(NFA<IN> var1, long var2);

    public void restoreState(FSDataInputStream in) throws Exception {
        int hasUdfState;
        if (in instanceof Migration && (hasUdfState = in.read()) == 1) {
            throw new Exception("Found UDF state but CEPOperator is not an UDF operator.");
        }
        DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper((InputStream)in);
        this.timerService = this.getInternalTimerService("watermark-callbacks", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this);
        ValueState oldNfaOperatorState = this.getRuntimeContext().getState(new ValueStateDescriptor("nfaOperatorState", new NFA.Serializer()));
        ValueState oldPriorityQueueOperatorState = this.getRuntimeContext().getState(new ValueStateDescriptor("priorityQueueStateName", new PriorityQueueSerializer(new StreamElementSerializer(this.inputSerializer), new PriorityQueueStreamRecordFactory())));
        if (this.migratingFromOldKeyedOperator) {
            int numberEntries = inputView.readInt();
            for (int i = 0; i < numberEntries; ++i) {
                Object key = this.keySerializer.deserialize((DataInputView)inputView);
                this.setCurrentKey(key);
                this.saveRegisterWatermarkTimer();
                NFA nfa = (NFA)oldNfaOperatorState.value();
                oldNfaOperatorState.clear();
                this.nfaOperatorState.update((Object)nfa);
                PriorityQueue priorityQueue = (PriorityQueue)oldPriorityQueueOperatorState.value();
                if (priorityQueue == null || priorityQueue.isEmpty()) continue;
                HashMap<Long, ArrayList<Object>> elementMap = new HashMap<Long, ArrayList<Object>>();
                for (StreamRecord streamRecord : priorityQueue) {
                    long timestamp = streamRecord.getTimestamp();
                    Object element = streamRecord.getValue();
                    ArrayList<Object> elements = (ArrayList<Object>)elementMap.get(timestamp);
                    if (elements == null) {
                        elements = new ArrayList<Object>();
                        elementMap.put(timestamp, elements);
                    }
                    elements.add(element);
                }
                for (Map.Entry entry : elementMap.entrySet()) {
                    this.elementQueueState.put(entry.getKey(), entry.getValue());
                }
                oldPriorityQueueOperatorState.clear();
            }
        } else {
            ObjectInputStream ois = new ObjectInputStream((InputStream)in);
            NFA nfa = (NFA)ois.readObject();
            MultiplexingStreamRecordSerializer recordSerializer = new MultiplexingStreamRecordSerializer(this.inputSerializer);
            HashMap<Long, ArrayList<Object>> elementMap = new HashMap<Long, ArrayList<Object>>();
            int entries = ois.readInt();
            for (int i = 0; i < entries; ++i) {
                StreamElement streamElement = recordSerializer.deserialize((DataInputView)inputView);
                StreamRecord streamRecord = streamElement.asRecord();
                long timestamp = streamRecord.getTimestamp();
                Object element = streamRecord.getValue();
                ArrayList<Object> elements = (ArrayList<Object>)elementMap.get(timestamp);
                if (elements == null) {
                    elements = new ArrayList<Object>();
                    elementMap.put(timestamp, elements);
                }
                elements.add(element);
            }
            this.setCurrentKey((byte)0);
            this.nfaOperatorState.update((Object)nfa);
            for (Map.Entry entry : elementMap.entrySet()) {
                this.elementQueueState.put(entry.getKey(), entry.getValue());
            }
            if (!this.isProcessingTime) {
                this.setCurrentKey((byte)0);
                this.saveRegisterWatermarkTimer();
            }
            ois.close();
        }
    }

    @VisibleForTesting
    public boolean hasNonEmptyNFA(KEY key) throws IOException {
        this.setCurrentKey(key);
        return this.nfaOperatorState.value() != null;
    }

    @VisibleForTesting
    public boolean hasNonEmptyPQ(KEY key) throws Exception {
        this.setCurrentKey(key);
        return this.elementQueueState.keys().iterator().hasNext();
    }

    @VisibleForTesting
    public int getPQSize(KEY key) throws Exception {
        this.setCurrentKey(key);
        int counter = 0;
        for (List elements : this.elementQueueState.values()) {
            counter += elements.size();
        }
        return counter;
    }

    private static class PriorityQueueStreamRecordFactory<T>
    implements PriorityQueueFactory<StreamRecord<T>> {
        private static final long serialVersionUID = 1254766984454616593L;

        private PriorityQueueStreamRecordFactory() {
        }

        @Override
        public PriorityQueue<StreamRecord<T>> createPriorityQueue() {
            return new PriorityQueue<StreamRecord<T>>(11, new StreamRecordComparator());
        }

        public boolean equals(Object obj) {
            return obj instanceof PriorityQueueStreamRecordFactory;
        }

        public int hashCode() {
            return this.getClass().hashCode();
        }
    }

    private static interface PriorityQueueFactory<T>
    extends Serializable {
        public PriorityQueue<T> createPriorityQueue();
    }

    private static class PriorityQueueSerializer<T>
    extends TypeSerializer<PriorityQueue<T>> {
        private static final long serialVersionUID = -231980397616187715L;
        private final TypeSerializer<T> elementSerializer;
        private final PriorityQueueFactory<T> factory;

        PriorityQueueSerializer(TypeSerializer<T> elementSerializer, PriorityQueueFactory<T> factory) {
            this.elementSerializer = elementSerializer;
            this.factory = factory;
        }

        public boolean isImmutableType() {
            return false;
        }

        public TypeSerializer<PriorityQueue<T>> duplicate() {
            return new PriorityQueueSerializer<T>(this.elementSerializer.duplicate(), this.factory);
        }

        public PriorityQueue<T> createInstance() {
            return this.factory.createPriorityQueue();
        }

        public PriorityQueue<T> copy(PriorityQueue<T> from) {
            PriorityQueue<T> result = this.factory.createPriorityQueue();
            for (T element : from) {
                result.offer(this.elementSerializer.copy(element));
            }
            return result;
        }

        public PriorityQueue<T> copy(PriorityQueue<T> from, PriorityQueue<T> reuse) {
            reuse.clear();
            for (T element : from) {
                reuse.offer(this.elementSerializer.copy(element));
            }
            return reuse;
        }

        public int getLength() {
            return 0;
        }

        public void serialize(PriorityQueue<T> record, DataOutputView target) throws IOException {
            target.writeInt(record.size());
            for (T element : record) {
                this.elementSerializer.serialize(element, target);
            }
        }

        public PriorityQueue<T> deserialize(DataInputView source) throws IOException {
            PriorityQueue<T> result = this.factory.createPriorityQueue();
            return this.deserialize(result, source);
        }

        public PriorityQueue<T> deserialize(PriorityQueue<T> reuse, DataInputView source) throws IOException {
            reuse.clear();
            int numberEntries = source.readInt();
            for (int i = 0; i < numberEntries; ++i) {
                reuse.offer(this.elementSerializer.deserialize(source));
            }
            return reuse;
        }

        public void copy(DataInputView source, DataOutputView target) throws IOException {
        }

        public boolean equals(Object obj) {
            if (obj instanceof PriorityQueueSerializer) {
                PriorityQueueSerializer other = (PriorityQueueSerializer)((Object)obj);
                return this.factory.equals(other.factory) && this.elementSerializer.equals(other.elementSerializer);
            }
            return false;
        }

        public boolean canEqual(Object obj) {
            return obj instanceof PriorityQueueSerializer;
        }

        public int hashCode() {
            return Objects.hash(this.factory, this.elementSerializer);
        }

        public TypeSerializerConfigSnapshot snapshotConfiguration() {
            return new CollectionSerializerConfigSnapshot(this.elementSerializer);
        }

        public CompatibilityResult<PriorityQueue<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
            if (configSnapshot instanceof CollectionSerializerConfigSnapshot) {
                Tuple2 previousElemSerializerAndConfig = ((CollectionSerializerConfigSnapshot)configSnapshot).getSingleNestedSerializerAndConfig();
                CompatibilityResult compatResult = CompatibilityUtil.resolveCompatibilityResult((TypeSerializer)((TypeSerializer)previousElemSerializerAndConfig.f0), UnloadableDummyTypeSerializer.class, (TypeSerializerConfigSnapshot)((TypeSerializerConfigSnapshot)previousElemSerializerAndConfig.f1), this.elementSerializer);
                if (!compatResult.isRequiresMigration()) {
                    return CompatibilityResult.compatible();
                }
                if (compatResult.getConvertDeserializer() != null) {
                    return CompatibilityResult.requiresMigration(new PriorityQueueSerializer<T>(new TypeDeserializerAdapter(compatResult.getConvertDeserializer()), this.factory));
                }
            }
            return CompatibilityResult.requiresMigration();
        }
    }
}

