package org.apache.flink.runtime.io.network.api;

import java.io.IOException;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.event.task.AbstractEvent;
import org.apache.flink.runtime.io.network.Buffer;
import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;

/* loaded from: input_file:org/apache/flink/runtime/io/network/api/RecordWriter.class */
public class RecordWriter<T extends IOReadableWritable> extends BufferWriter {
    private final BufferProvider bufferPool;
    private final ChannelSelector<T> channelSelector;
    private int numChannels;
    private RecordSerializer<T>[] serializers;

    public RecordWriter(AbstractInvokable abstractInvokable) {
        this(abstractInvokable, new RoundRobinChannelSelector());
    }

    public RecordWriter(AbstractInvokable abstractInvokable, ChannelSelector<T> channelSelector) {
        super(abstractInvokable);
        this.bufferPool = abstractInvokable.getEnvironment().getOutputBufferProvider();
        this.channelSelector = channelSelector;
    }

    public void initializeSerializers() {
        this.numChannels = this.outputGate.getNumChannels();
        this.serializers = new RecordSerializer[this.numChannels];
        for (int i = 0; i < this.numChannels; i++) {
            this.serializers[i] = new SpanningRecordSerializer();
        }
    }

    public void emit(T t) throws IOException, InterruptedException {
        for (int i : this.channelSelector.selectChannels(t, this.numChannels)) {
            RecordSerializer<T> recordSerializer = this.serializers[i];
            RecordSerializer.SerializationResult addRecord = recordSerializer.addRecord(t);
            while (addRecord.isFullBuffer()) {
                Buffer currentBuffer = recordSerializer.getCurrentBuffer();
                if (currentBuffer != null) {
                    sendBuffer(currentBuffer, i);
                }
                addRecord = recordSerializer.setNextBuffer(this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize()));
            }
        }
    }

    public void flush() throws IOException, InterruptedException {
        for (int i = 0; i < this.numChannels; i++) {
            RecordSerializer<T> recordSerializer = this.serializers[i];
            Buffer currentBuffer = recordSerializer.getCurrentBuffer();
            if (currentBuffer != null) {
                sendBuffer(currentBuffer, i);
            }
            recordSerializer.clear();
        }
    }

    @Override // org.apache.flink.runtime.io.network.api.BufferWriter
    public void broadcastEvent(AbstractEvent abstractEvent) throws IOException, InterruptedException {
        for (int i = 0; i < this.numChannels; i++) {
            RecordSerializer<T> recordSerializer = this.serializers[i];
            Buffer currentBuffer = recordSerializer.getCurrentBuffer();
            if (currentBuffer == null) {
                super.sendEvent(abstractEvent, i);
            } else {
                super.sendBufferAndEvent(currentBuffer, abstractEvent, i);
                recordSerializer.setNextBuffer(this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize()));
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.api.BufferWriter
    public void sendEndOfSuperstep() throws IOException, InterruptedException {
        for (int i = 0; i < this.numChannels; i++) {
            RecordSerializer<T> recordSerializer = this.serializers[i];
            Buffer currentBuffer = recordSerializer.getCurrentBuffer();
            if (currentBuffer == null) {
                super.sendEvent(EndOfSuperstepEvent.INSTANCE, i);
            } else {
                super.sendBufferAndEvent(currentBuffer, EndOfSuperstepEvent.INSTANCE, i);
                recordSerializer.setNextBuffer(this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize()));
            }
        }
    }
}
