package org.apache.flink.runtime.io.network.api.serialization;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UTFDataFormatException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.util.Random;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.util.DataInputDeserializer;
import org.apache.flink.util.StringUtils;

/* loaded from: input_file:org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.class */
public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWritable> implements RecordDeserializer<T> {
    private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE = "Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.";
    private static final int THRESHOLD_FOR_SPILLING = 5242880;
    private final NonSpanningWrapper nonSpanningWrapper;
    private final SpanningWrapper spanningWrapper;
    private Buffer currentBuffer;
    private AccumulatorRegistry.Reporter reporter;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.class */
    public static final class NonSpanningWrapper implements DataInputView {
        private MemorySegment segment;
        private int limit;
        private int position;
        private byte[] utfByteBuffer;
        private char[] utfCharBuffer;

        private NonSpanningWrapper() {
        }

        int remaining() {
            return this.limit - this.position;
        }

        void clear() {
            this.segment = null;
            this.limit = 0;
            this.position = 0;
        }

        void initializeFromMemorySegment(MemorySegment memorySegment, int i, int i2) {
            this.segment = memorySegment;
            this.position = i;
            this.limit = i2;
        }

        public final void readFully(byte[] bArr) throws IOException {
            readFully(bArr, 0, bArr.length);
        }

        public final void readFully(byte[] bArr, int i, int i2) throws IOException {
            if (i < 0 || i2 < 0 || i + i2 > bArr.length) {
                throw new IndexOutOfBoundsException();
            }
            this.segment.get(this.position, bArr, i, i2);
            this.position += i2;
        }

        public final boolean readBoolean() throws IOException {
            return readByte() == 1;
        }

        public final byte readByte() throws IOException {
            MemorySegment memorySegment = this.segment;
            int i = this.position;
            this.position = i + 1;
            return memorySegment.get(i);
        }

        public final int readUnsignedByte() throws IOException {
            return readByte() & 255;
        }

        public final short readShort() throws IOException {
            short shortBigEndian = this.segment.getShortBigEndian(this.position);
            this.position += 2;
            return shortBigEndian;
        }

        public final int readUnsignedShort() throws IOException {
            int shortBigEndian = this.segment.getShortBigEndian(this.position) & 65535;
            this.position += 2;
            return shortBigEndian;
        }

        public final char readChar() throws IOException {
            char charBigEndian = this.segment.getCharBigEndian(this.position);
            this.position += 2;
            return charBigEndian;
        }

        public final int readInt() throws IOException {
            int intBigEndian = this.segment.getIntBigEndian(this.position);
            this.position += 4;
            return intBigEndian;
        }

        public final long readLong() throws IOException {
            long longBigEndian = this.segment.getLongBigEndian(this.position);
            this.position += 8;
            return longBigEndian;
        }

        public final float readFloat() throws IOException {
            return Float.intBitsToFloat(readInt());
        }

        public final double readDouble() throws IOException {
            return Double.longBitsToDouble(readLong());
        }

        public final String readLine() throws IOException {
            StringBuilder sb = new StringBuilder(32);
            while (true) {
                try {
                    int readUnsignedByte = readUnsignedByte();
                    if (readUnsignedByte == 10) {
                        break;
                    }
                    if (readUnsignedByte != 13) {
                        sb.append((char) readUnsignedByte);
                    }
                } catch (EOFException e) {
                }
            }
            if (sb.length() == 0) {
                return null;
            }
            int length = sb.length();
            if (length > 0 && sb.charAt(length - 1) == '\r') {
                sb.setLength(length - 1);
            }
            return sb.toString();
        }

        /* JADX WARN: Failed to find 'out' block for switch in B:22:0x0092. Please report as an issue. */
        public final String readUTF() throws IOException {
            byte[] bArr;
            char[] cArr;
            int i;
            int readUnsignedShort = readUnsignedShort();
            if (this.utfByteBuffer == null || this.utfByteBuffer.length < readUnsignedShort) {
                bArr = new byte[readUnsignedShort];
                this.utfByteBuffer = bArr;
            } else {
                bArr = this.utfByteBuffer;
            }
            if (this.utfCharBuffer == null || this.utfCharBuffer.length < readUnsignedShort) {
                cArr = new char[readUnsignedShort];
                this.utfCharBuffer = cArr;
            } else {
                cArr = this.utfCharBuffer;
            }
            int i2 = 0;
            int i3 = 0;
            readFully(bArr, 0, readUnsignedShort);
            while (i2 < readUnsignedShort && (i = bArr[i2] & 255) <= 127) {
                i2++;
                int i4 = i3;
                i3++;
                cArr[i4] = (char) i;
            }
            while (i2 < readUnsignedShort) {
                int i5 = bArr[i2] & 255;
                switch (i5 >> 4) {
                    case 0:
                    case 1:
                    case 2:
                    case 3:
                    case 4:
                    case 5:
                    case 6:
                    case 7:
                        i2++;
                        int i6 = i3;
                        i3++;
                        cArr[i6] = (char) i5;
                    case 8:
                    case 9:
                    case 10:
                    case 11:
                    default:
                        throw new UTFDataFormatException("malformed input around byte " + i2);
                    case 12:
                    case 13:
                        i2 += 2;
                        if (i2 > readUnsignedShort) {
                            throw new UTFDataFormatException("malformed input: partial character at end");
                        }
                        byte b = bArr[i2 - 1];
                        if ((b & 192) != 128) {
                            throw new UTFDataFormatException("malformed input around byte " + i2);
                        }
                        int i7 = i3;
                        i3++;
                        cArr[i7] = (char) (((i5 & 31) << 6) | (b & 63));
                    case 14:
                        i2 += 3;
                        if (i2 > readUnsignedShort) {
                            throw new UTFDataFormatException("malformed input: partial character at end");
                        }
                        byte b2 = bArr[i2 - 2];
                        byte b3 = bArr[i2 - 1];
                        if ((b2 & 192) != 128 || (b3 & 192) != 128) {
                            throw new UTFDataFormatException("malformed input around byte " + (i2 - 1));
                        }
                        int i8 = i3;
                        i3++;
                        cArr[i8] = (char) (((i5 & 15) << 12) | ((b2 & 63) << 6) | ((b3 & 63) << 0));
                        break;
                }
            }
            return new String(cArr, 0, i3);
        }

        public final int skipBytes(int i) throws IOException {
            if (i < 0) {
                throw new IllegalArgumentException();
            }
            int min = Math.min(i, remaining());
            this.position += min;
            return min;
        }

        public void skipBytesToRead(int i) throws IOException {
            if (skipBytes(i) < i) {
                throw new EOFException("Could not skip " + i + " bytes.");
            }
        }

        public int read(byte[] bArr, int i, int i2) throws IOException {
            if (bArr == null) {
                throw new NullPointerException("Byte array b cannot be null.");
            }
            if (i < 0) {
                throw new IllegalArgumentException("The offset off cannot be negative.");
            }
            if (i2 < 0) {
                throw new IllegalArgumentException("The length len cannot be negative.");
            }
            int min = Math.min(i2, remaining());
            this.segment.get(this.position, bArr, i, min);
            this.position += min;
            return min;
        }

        public int read(byte[] bArr) throws IOException {
            return read(bArr, 0, bArr.length);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer$SpanningWrapper.class */
    public static final class SpanningWrapper {
        private final String[] tempDirs;
        private final DataInputDeserializer serializationReadBuffer;
        private FileChannel spillingChannel;
        private byte[] buffer;
        private int recordLength;
        private int accumulatedRecordBytes;
        private MemorySegment leftOverData;
        private int leftOverStart;
        private int leftOverLimit;
        private File spillFile;
        private InputViewDataInputStreamWrapper spillFileReader;
        private AccumulatorRegistry.Reporter reporter;
        private final byte[] initialBuffer = new byte[1024];
        private final Random rnd = new Random();
        private final ByteBuffer lengthBuffer = ByteBuffer.allocate(4);

        public SpanningWrapper(String[] strArr) {
            this.tempDirs = strArr;
            this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
            this.recordLength = -1;
            this.serializationReadBuffer = new DataInputDeserializer();
            this.buffer = this.initialBuffer;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void initializeWithPartialRecord(NonSpanningWrapper nonSpanningWrapper, int i) throws IOException {
            this.recordLength = i;
            int remaining = nonSpanningWrapper.remaining();
            if (i > SpillingAdaptiveSpanningRecordDeserializer.THRESHOLD_FOR_SPILLING) {
                this.spillingChannel = createSpillingChannel();
                this.spillingChannel.write(nonSpanningWrapper.segment.wrap(nonSpanningWrapper.position, remaining));
            } else {
                ensureBufferCapacity(remaining);
                nonSpanningWrapper.segment.get(nonSpanningWrapper.position, this.buffer, 0, remaining);
            }
            this.accumulatedRecordBytes = remaining;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void initializeWithPartialLength(NonSpanningWrapper nonSpanningWrapper) throws IOException {
            nonSpanningWrapper.segment.get(nonSpanningWrapper.position, this.lengthBuffer, nonSpanningWrapper.remaining());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addNextChunkFromMemorySegment(MemorySegment memorySegment, int i) throws IOException {
            int i2 = 0;
            if (this.lengthBuffer.position() > 0) {
                int min = Math.min(this.lengthBuffer.remaining(), i);
                memorySegment.get(0, this.lengthBuffer, min);
                if (this.lengthBuffer.hasRemaining()) {
                    return;
                }
                this.recordLength = this.lengthBuffer.getInt(0);
                if (this.reporter != null) {
                    this.reporter.reportNumBytesIn(this.recordLength);
                }
                this.lengthBuffer.clear();
                i2 = min;
                if (this.recordLength > SpillingAdaptiveSpanningRecordDeserializer.THRESHOLD_FOR_SPILLING) {
                    this.spillingChannel = createSpillingChannel();
                }
            }
            int i3 = i - i2;
            int min2 = Math.min(this.recordLength - this.accumulatedRecordBytes, i3);
            if (this.spillingChannel != null) {
                this.spillingChannel.write(memorySegment.wrap(i2, min2));
            } else {
                ensureBufferCapacity(this.accumulatedRecordBytes + min2);
                memorySegment.get(i2, this.buffer, this.accumulatedRecordBytes, min2);
            }
            this.accumulatedRecordBytes += min2;
            if (min2 < i3) {
                this.leftOverData = memorySegment;
                this.leftOverStart = i2 + min2;
                this.leftOverLimit = i;
            }
            if (this.accumulatedRecordBytes == this.recordLength) {
                if (this.spillingChannel == null) {
                    this.serializationReadBuffer.setBuffer(this.buffer, 0, this.recordLength);
                } else {
                    this.spillingChannel.close();
                    this.spillFileReader = new InputViewDataInputStreamWrapper(new DataInputStream(new BufferedInputStream(new FileInputStream(this.spillFile), 2097152)));
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper nonSpanningWrapper) {
            nonSpanningWrapper.clear();
            if (this.leftOverData != null) {
                nonSpanningWrapper.initializeFromMemorySegment(this.leftOverData, this.leftOverStart, this.leftOverLimit);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean hasFullRecord() {
            return this.recordLength >= 0 && this.accumulatedRecordBytes >= this.recordLength;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getNumGatheredBytes() {
            return this.accumulatedRecordBytes + (this.recordLength >= 0 ? 4 : this.lengthBuffer.position());
        }

        public void clear() {
            this.buffer = this.initialBuffer;
            this.serializationReadBuffer.releaseArrays();
            this.recordLength = -1;
            this.lengthBuffer.clear();
            this.leftOverData = null;
            this.accumulatedRecordBytes = 0;
            if (this.spillingChannel != null) {
                try {
                    this.spillingChannel.close();
                } catch (Throwable th) {
                }
                this.spillingChannel = null;
            }
            if (this.spillFileReader != null) {
                try {
                    this.spillFileReader.close();
                } catch (Throwable th2) {
                }
                this.spillFileReader = null;
            }
            if (this.spillFile != null) {
                this.spillFile.delete();
                this.spillFile = null;
            }
        }

        public DataInputView getInputView() {
            return this.spillFileReader == null ? this.serializationReadBuffer : this.spillFileReader;
        }

        private void ensureBufferCapacity(int i) {
            if (this.buffer.length < i) {
                byte[] bArr = new byte[Math.max(i, this.buffer.length * 2)];
                System.arraycopy(this.buffer, 0, bArr, 0, this.accumulatedRecordBytes);
                this.buffer = bArr;
            }
        }

        private FileChannel createSpillingChannel() throws IOException {
            if (this.spillFile != null) {
                throw new IllegalStateException("Spilling file already exists.");
            }
            this.spillFile = new File(this.tempDirs[this.rnd.nextInt(this.tempDirs.length)], randomString(this.rnd) + ".inputchannel");
            return new RandomAccessFile(this.spillFile, "rw").getChannel();
        }

        private static String randomString(Random random) {
            byte[] bArr = new byte[20];
            random.nextBytes(bArr);
            return StringUtils.byteToHexString(bArr);
        }

        public void setReporter(AccumulatorRegistry.Reporter reporter) {
            this.reporter = reporter;
        }
    }

    public SpillingAdaptiveSpanningRecordDeserializer() {
        String[] split = GlobalConfiguration.getString("taskmanager.tmp.dirs", ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
        this.nonSpanningWrapper = new NonSpanningWrapper();
        this.spanningWrapper = new SpanningWrapper(split);
    }

    @Override // org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer
    public void setNextBuffer(Buffer buffer) throws IOException {
        this.currentBuffer = buffer;
        setNextMemorySegment(buffer.getMemorySegment(), buffer.getSize());
    }

    @Override // org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer
    public Buffer getCurrentBuffer() {
        Buffer buffer = this.currentBuffer;
        this.currentBuffer = null;
        return buffer;
    }

    @Override // org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer
    public void setNextMemorySegment(MemorySegment memorySegment, int i) throws IOException {
        if (this.spanningWrapper.getNumGatheredBytes() > 0) {
            this.spanningWrapper.addNextChunkFromMemorySegment(memorySegment, i);
        } else {
            this.nonSpanningWrapper.initializeFromMemorySegment(memorySegment, 0, i);
        }
    }

    @Override // org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer
    public RecordDeserializer.DeserializationResult getNextRecord(T t) throws IOException {
        int remaining = this.nonSpanningWrapper.remaining();
        if (remaining < 4) {
            if (remaining > 0) {
                this.spanningWrapper.initializeWithPartialLength(this.nonSpanningWrapper);
                this.nonSpanningWrapper.clear();
                return RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
            }
            if (!this.spanningWrapper.hasFullRecord()) {
                return RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
            }
            t.read(this.spanningWrapper.getInputView());
            if (this.reporter != null) {
                this.reporter.reportNumRecordsIn(1L);
            }
            this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper);
            this.spanningWrapper.clear();
            return this.nonSpanningWrapper.remaining() == 0 ? RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER : RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
        }
        int readInt = this.nonSpanningWrapper.readInt();
        if (this.reporter != null) {
            this.reporter.reportNumBytesIn(readInt);
        }
        if (readInt > remaining - 4) {
            this.spanningWrapper.initializeWithPartialRecord(this.nonSpanningWrapper, readInt);
            this.nonSpanningWrapper.clear();
            return RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
        }
        try {
            t.read(this.nonSpanningWrapper);
            if (this.reporter != null) {
                this.reporter.reportNumRecordsIn(1L);
            }
            int remaining2 = this.nonSpanningWrapper.remaining();
            if (remaining2 > 0) {
                return RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
            }
            if (remaining2 == 0) {
                return RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER;
            }
            throw new IndexOutOfBoundsException("Remaining = " + remaining2);
        } catch (IndexOutOfBoundsException e) {
            throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, e);
        }
    }

    @Override // org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer
    public void clear() {
        this.nonSpanningWrapper.clear();
        this.spanningWrapper.clear();
    }

    @Override // org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer
    public boolean hasUnfinishedData() {
        return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0;
    }

    @Override // org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer
    public void setReporter(AccumulatorRegistry.Reporter reporter) {
        this.reporter = reporter;
        this.spanningWrapper.setReporter(reporter);
    }
}
