package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamInputProcessor.class */
public class StreamInputProcessor<IN> {
    private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
    private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
    private final CheckpointBarrierHandler barrierHandler;
    private int currentChannel = -1;
    private boolean isFinished;
    private final long[] watermarks;
    private long lastEmittedWatermark;
    private final DeserializationDelegate<StreamElement> deserializationDelegate;

    public StreamInputProcessor(InputGate[] inputGateArr, TypeSerializer<IN> typeSerializer, EventListener<CheckpointBarrier> eventListener, CheckpointingMode checkpointingMode, IOManager iOManager, boolean z) throws IOException {
        InputGate createInputGate = InputGateUtil.createInputGate(inputGateArr);
        if (checkpointingMode == CheckpointingMode.EXACTLY_ONCE) {
            this.barrierHandler = new BarrierBuffer(createInputGate, iOManager);
        } else {
            if (checkpointingMode != CheckpointingMode.AT_LEAST_ONCE) {
                throw new IllegalArgumentException("Unrecognized CheckpointingMode: " + checkpointingMode);
            }
            this.barrierHandler = new BarrierTracker(createInputGate);
        }
        if (eventListener != null) {
            this.barrierHandler.registerCheckpointEventHandler(eventListener);
        }
        if (z) {
            this.deserializationDelegate = new NonReusingDeserializationDelegate(new MultiplexingStreamRecordSerializer(typeSerializer));
        } else {
            this.deserializationDelegate = new NonReusingDeserializationDelegate(new StreamRecordSerializer(typeSerializer));
        }
        this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[createInputGate.getNumberOfInputChannels()];
        for (int i = 0; i < this.recordDeserializers.length; i++) {
            this.recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer();
        }
        this.watermarks = new long[createInputGate.getNumberOfInputChannels()];
        for (int i2 = 0; i2 < createInputGate.getNumberOfInputChannels(); i2++) {
            this.watermarks[i2] = Long.MIN_VALUE;
        }
        this.lastEmittedWatermark = Long.MIN_VALUE;
    }

    public boolean processInput(OneInputStreamOperator<IN, ?> oneInputStreamOperator, Object obj) throws Exception {
        if (this.isFinished) {
            return false;
        }
        while (true) {
            if (this.currentRecordDeserializer != null) {
                RecordDeserializer.DeserializationResult nextRecord = this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);
                if (nextRecord.isBufferConsumed()) {
                    this.currentRecordDeserializer.getCurrentBuffer().recycle();
                    this.currentRecordDeserializer = null;
                }
                if (nextRecord.isFullRecord()) {
                    StreamElement streamElement = (StreamElement) this.deserializationDelegate.getInstance();
                    if (!streamElement.isWatermark()) {
                        StreamRecord<?> asRecord = streamElement.asRecord();
                        synchronized (obj) {
                            oneInputStreamOperator.setKeyContextElement1(asRecord);
                            oneInputStreamOperator.processElement(asRecord);
                        }
                        return true;
                    }
                    long timestamp = streamElement.asWatermark().getTimestamp();
                    if (timestamp > this.watermarks[this.currentChannel]) {
                        this.watermarks[this.currentChannel] = timestamp;
                        long j = Long.MAX_VALUE;
                        for (long j2 : this.watermarks) {
                            j = Math.min(j2, j);
                        }
                        if (j > this.lastEmittedWatermark) {
                            this.lastEmittedWatermark = j;
                            synchronized (obj) {
                                oneInputStreamOperator.processWatermark(new Watermark(this.lastEmittedWatermark));
                            }
                        } else {
                            continue;
                        }
                    } else {
                        continue;
                    }
                }
            }
            BufferOrEvent nextNonBlocked = this.barrierHandler.getNextNonBlocked();
            if (nextNonBlocked == null) {
                this.isFinished = true;
                if (this.barrierHandler.isEmpty()) {
                    return false;
                }
                throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
            }
            if (nextNonBlocked.isBuffer()) {
                this.currentChannel = nextNonBlocked.getChannelIndex();
                this.currentRecordDeserializer = this.recordDeserializers[this.currentChannel];
                this.currentRecordDeserializer.setNextBuffer(nextNonBlocked.getBuffer());
            } else {
                AbstractEvent event = nextNonBlocked.getEvent();
                if (event.getClass() != EndOfPartitionEvent.class) {
                    throw new IOException("Unexpected event: " + event);
                }
            }
        }
    }

    public void setReporter(AccumulatorRegistry.Reporter reporter) {
        for (RecordDeserializer<DeserializationDelegate<StreamElement>> recordDeserializer : this.recordDeserializers) {
            recordDeserializer.setReporter(reporter);
        }
    }

    public void cleanup() throws IOException {
        for (RecordDeserializer<DeserializationDelegate<StreamElement>> recordDeserializer : this.recordDeserializers) {
            Buffer currentBuffer = recordDeserializer.getCurrentBuffer();
            if (currentBuffer != null && !currentBuffer.isRecycled()) {
                currentBuffer.recycle();
            }
        }
        this.barrierHandler.cleanup();
    }
}
