package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.util.Optional;
import java.util.Random;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.XORShiftRandom;

/* loaded from: input_file:org/apache/flink/runtime/io/network/api/writer/RecordWriter.class */
public class RecordWriter<T extends IOReadableWritable> {
    protected final ResultPartitionWriter targetPartition;
    private final ChannelSelector<T> channelSelector;
    private final int numChannels;
    private final RecordSerializer<T>[] serializers;
    private final Optional<BufferBuilder>[] bufferBuilders;
    private final Random rng;
    private final boolean flushAlways;
    private Counter numBytesOut;
    private Counter numBuffersOut;

    public RecordWriter(ResultPartitionWriter resultPartitionWriter) {
        this(resultPartitionWriter, new RoundRobinChannelSelector());
    }

    public RecordWriter(ResultPartitionWriter resultPartitionWriter, ChannelSelector<T> channelSelector) {
        this(resultPartitionWriter, channelSelector, false);
    }

    public RecordWriter(ResultPartitionWriter resultPartitionWriter, ChannelSelector<T> channelSelector, boolean z) {
        this.rng = new XORShiftRandom();
        this.numBytesOut = new SimpleCounter();
        this.numBuffersOut = new SimpleCounter();
        this.flushAlways = z;
        this.targetPartition = resultPartitionWriter;
        this.channelSelector = channelSelector;
        this.numChannels = resultPartitionWriter.getNumberOfSubpartitions();
        this.serializers = new SpanningRecordSerializer[this.numChannels];
        this.bufferBuilders = new Optional[this.numChannels];
        for (int i = 0; i < this.numChannels; i++) {
            this.serializers[i] = new SpanningRecordSerializer();
            this.bufferBuilders[i] = Optional.empty();
        }
    }

    public void emit(T t) throws IOException, InterruptedException {
        for (int i : this.channelSelector.selectChannels(t, this.numChannels)) {
            sendToTarget(t, i);
        }
    }

    public void broadcastEmit(T t) throws IOException, InterruptedException {
        for (int i = 0; i < this.numChannels; i++) {
            sendToTarget(t, i);
        }
    }

    public void randomEmit(T t) throws IOException, InterruptedException {
        sendToTarget(t, this.rng.nextInt(this.numChannels));
    }

    private void sendToTarget(T t, int i) throws IOException, InterruptedException {
        RecordSerializer<T> recordSerializer = this.serializers[i];
        RecordSerializer.SerializationResult addRecord = recordSerializer.addRecord(t);
        while (true) {
            RecordSerializer.SerializationResult serializationResult = addRecord;
            if (!serializationResult.isFullBuffer() || (tryFinishCurrentBufferBuilder(i, recordSerializer) && serializationResult.isFullRecord())) {
                break;
            } else {
                addRecord = recordSerializer.continueWritingWithNextBufferBuilder(requestNewBufferBuilder(i));
            }
        }
        Preconditions.checkState(!recordSerializer.hasSerializedData(), "All data should be written at once");
        if (this.flushAlways) {
            this.targetPartition.flush(i);
        }
    }

    public void broadcastEvent(AbstractEvent abstractEvent) throws IOException {
        BufferConsumer bufferConsumer = EventSerializer.toBufferConsumer(abstractEvent);
        Throwable th = null;
        for (int i = 0; i < this.numChannels; i++) {
            try {
                try {
                    tryFinishCurrentBufferBuilder(i, this.serializers[i]);
                    this.targetPartition.addBufferConsumer(bufferConsumer.copy(), i);
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (bufferConsumer != null) {
                    if (th != null) {
                        try {
                            bufferConsumer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        bufferConsumer.close();
                    }
                }
                throw th3;
            }
        }
        if (this.flushAlways) {
            flushAll();
        }
        if (bufferConsumer != null) {
            if (0 == 0) {
                bufferConsumer.close();
                return;
            }
            try {
                bufferConsumer.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    public void flushAll() {
        this.targetPartition.flushAll();
    }

    public void clearBuffers() {
        for (int i = 0; i < this.numChannels; i++) {
            RecordSerializer<T> recordSerializer = this.serializers[i];
            closeBufferBuilder(i);
            recordSerializer.clear();
        }
    }

    public void setMetricGroup(TaskIOMetricGroup taskIOMetricGroup) {
        this.numBytesOut = taskIOMetricGroup.getNumBytesOutCounter();
        this.numBuffersOut = taskIOMetricGroup.getNumBuffersOutCounter();
    }

    private boolean tryFinishCurrentBufferBuilder(int i, RecordSerializer<T> recordSerializer) {
        if (!this.bufferBuilders[i].isPresent()) {
            return false;
        }
        BufferBuilder bufferBuilder = this.bufferBuilders[i].get();
        this.bufferBuilders[i] = Optional.empty();
        this.numBytesOut.inc(bufferBuilder.finish());
        this.numBuffersOut.inc();
        recordSerializer.clear();
        return true;
    }

    private BufferBuilder requestNewBufferBuilder(int i) throws IOException, InterruptedException {
        Preconditions.checkState(!this.bufferBuilders[i].isPresent());
        BufferBuilder requestBufferBuilderBlocking = this.targetPartition.getBufferProvider().requestBufferBuilderBlocking();
        this.bufferBuilders[i] = Optional.of(requestBufferBuilderBlocking);
        this.targetPartition.addBufferConsumer(requestBufferBuilderBlocking.createBufferConsumer(), i);
        return requestBufferBuilderBlocking;
    }

    private void closeBufferBuilder(int i) {
        if (this.bufferBuilders[i].isPresent()) {
            this.bufferBuilders[i].get().finish();
            this.bufferBuilders[i] = Optional.empty();
        }
    }
}
