package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.XORShiftRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/api/writer/RecordWriter.class */
public abstract class RecordWriter<T extends IOReadableWritable> implements AvailabilityProvider {

    @VisibleForTesting
    public static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
    private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
    protected final ResultPartitionWriter targetPartition;
    protected final int numberOfChannels;
    private final boolean flushAlways;

    @Nullable
    private final RecordWriter<T>.OutputFlusher outputFlusher;
    private Throwable flusherException;
    protected final Random rng = new XORShiftRandom();
    private Counter numBytesOut = new SimpleCounter();
    private Counter numBuffersOut = new SimpleCounter();
    protected final RecordSerializer<T> serializer = new SpanningRecordSerializer();

    /* loaded from: input_file:org/apache/flink/runtime/io/network/api/writer/RecordWriter$OutputFlusher.class */
    private class OutputFlusher extends Thread {
        private final long timeout;
        private volatile boolean running;

        OutputFlusher(String str, long j) {
            super(str);
            this.running = true;
            setDaemon(true);
            this.timeout = j;
        }

        public void terminate() {
            this.running = false;
            interrupt();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.running) {
                try {
                    try {
                        Thread.sleep(this.timeout);
                    } catch (InterruptedException e) {
                        if (this.running) {
                            throw new Exception(e);
                        }
                    }
                    RecordWriter.this.flushAll();
                } catch (Throwable th) {
                    RecordWriter.this.notifyFlusherException(th);
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecordWriter(ResultPartitionWriter resultPartitionWriter, long j, String str) {
        this.targetPartition = resultPartitionWriter;
        this.numberOfChannels = resultPartitionWriter.getNumberOfSubpartitions();
        Preconditions.checkArgument(j >= -1);
        this.flushAlways = j == 0;
        if (j == -1 || j == 0) {
            this.outputFlusher = null;
        } else {
            this.outputFlusher = new OutputFlusher(str == null ? DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "OutputFlusher for " + str, j);
            this.outputFlusher.start();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emit(T t, int i) throws IOException, InterruptedException {
        checkErroneous();
        this.serializer.serializeRecord(t);
        if (copyFromSerializerToTargetChannel(i)) {
            this.serializer.prune();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean copyFromSerializerToTargetChannel(int i) throws IOException, InterruptedException {
        this.serializer.reset();
        boolean z = false;
        BufferBuilder bufferBuilder = getBufferBuilder(i);
        RecordSerializer.SerializationResult copyToBufferBuilder = this.serializer.copyToBufferBuilder(bufferBuilder);
        while (true) {
            RecordSerializer.SerializationResult serializationResult = copyToBufferBuilder;
            if (!serializationResult.isFullBuffer()) {
                break;
            }
            finishBufferBuilder(bufferBuilder);
            if (serializationResult.isFullRecord()) {
                z = true;
                emptyCurrentBufferBuilder(i);
                break;
            }
            bufferBuilder = requestNewBufferBuilder(i);
            copyToBufferBuilder = this.serializer.copyToBufferBuilder(bufferBuilder);
        }
        Preconditions.checkState(!this.serializer.hasSerializedData(), "All data should be written at once");
        if (this.flushAlways) {
            flushTargetPartition(i);
        }
        return z;
    }

    public void broadcastEvent(AbstractEvent abstractEvent) throws IOException {
        BufferConsumer bufferConsumer = EventSerializer.toBufferConsumer(abstractEvent);
        Throwable th = null;
        for (int i = 0; i < this.numberOfChannels; i++) {
            try {
                try {
                    tryFinishCurrentBufferBuilder(i);
                    this.targetPartition.addBufferConsumer(bufferConsumer.copy(), i);
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (bufferConsumer != null) {
                    if (th != null) {
                        try {
                            bufferConsumer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        bufferConsumer.close();
                    }
                }
                throw th3;
            }
        }
        if (this.flushAlways) {
            flushAll();
        }
        if (bufferConsumer != null) {
            if (0 == 0) {
                bufferConsumer.close();
                return;
            }
            try {
                bufferConsumer.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    public void flushAll() {
        this.targetPartition.flushAll();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void flushTargetPartition(int i) {
        this.targetPartition.flush(i);
    }

    public void setMetricGroup(TaskIOMetricGroup taskIOMetricGroup) {
        this.numBytesOut = taskIOMetricGroup.getNumBytesOutCounter();
        this.numBuffersOut = taskIOMetricGroup.getNumBuffersOutCounter();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void finishBufferBuilder(BufferBuilder bufferBuilder) {
        this.numBytesOut.inc(bufferBuilder.finish());
        this.numBuffersOut.inc();
    }

    @Override // org.apache.flink.runtime.io.AvailabilityProvider
    public CompletableFuture<?> getAvailableFuture() {
        return this.targetPartition.getAvailableFuture();
    }

    public abstract void emit(T t) throws IOException, InterruptedException;

    public abstract void randomEmit(T t) throws IOException, InterruptedException;

    public abstract void broadcastEmit(T t) throws IOException, InterruptedException;

    abstract BufferBuilder getBufferBuilder(int i) throws IOException, InterruptedException;

    abstract BufferBuilder requestNewBufferBuilder(int i) throws IOException, InterruptedException;

    abstract void tryFinishCurrentBufferBuilder(int i);

    abstract void emptyCurrentBufferBuilder(int i);

    abstract void closeBufferBuilder(int i);

    public abstract void clearBuffers();

    public void close() {
        clearBuffers();
        if (this.outputFlusher != null) {
            this.outputFlusher.terminate();
            try {
                this.outputFlusher.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyFlusherException(Throwable th) {
        if (this.flusherException == null) {
            LOG.error("An exception happened while flushing the outputs", th);
            this.flusherException = th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkErroneous() throws IOException {
        if (this.flusherException != null) {
            throw new IOException("An exception happened while flushing the outputs", this.flusherException);
        }
    }

    @VisibleForTesting
    ResultPartitionWriter getTargetPartition() {
        return this.targetPartition;
    }
}
