package org.apache.flink.runtime.io.network.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.runtime.io.network.Buffer;
import org.apache.flink.runtime.io.network.Envelope;
import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
import org.apache.flink.runtime.io.network.bufferprovider.BufferProviderBroker;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.jobgraph.JobID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/InboundEnvelopeDecoder.class */
public class InboundEnvelopeDecoder extends ChannelInboundHandlerAdapter implements BufferAvailabilityListener {
    private static final Logger LOG = LoggerFactory.getLogger(InboundEnvelopeDecoder.class);
    private final BufferProviderBroker bufferProviderBroker;
    private final BufferAvailabilityChangedTask bufferAvailabilityChangedTask = new BufferAvailabilityChangedTask(this, null);
    private final ConcurrentLinkedQueue<Buffer> bufferBroker = new ConcurrentLinkedQueue<>();
    private final ByteBuffer headerBuffer = ByteBuffer.allocateDirect(48);
    private Envelope currentEnvelope;
    private ByteBuffer currentEventsBuffer;
    private ByteBuffer currentDataBuffer;
    private int currentBufferRequestSize;
    private BufferProvider currentBufferProvider;
    private JobID lastJobId;
    private ChannelID lastSourceId;
    private ByteBuf stagedBuffer;
    private ChannelHandlerContext channelHandlerContext;
    private int bytesToSkip;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.runtime.io.network.netty.InboundEnvelopeDecoder$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/InboundEnvelopeDecoder$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$io$network$bufferprovider$BufferProvider$BufferAvailabilityRegistration = new int[BufferProvider.BufferAvailabilityRegistration.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$io$network$bufferprovider$BufferProvider$BufferAvailabilityRegistration[BufferProvider.BufferAvailabilityRegistration.SUCCEEDED_REGISTERED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$io$network$bufferprovider$BufferProvider$BufferAvailabilityRegistration[BufferProvider.BufferAvailabilityRegistration.FAILED_BUFFER_AVAILABLE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$io$network$bufferprovider$BufferProvider$BufferAvailabilityRegistration[BufferProvider.BufferAvailabilityRegistration.FAILED_BUFFER_POOL_DESTROYED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/InboundEnvelopeDecoder$BufferAvailabilityChangedTask.class */
    private class BufferAvailabilityChangedTask implements Runnable {
        private BufferAvailabilityChangedTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Buffer buffer = (Buffer) InboundEnvelopeDecoder.this.bufferBroker.poll();
            if (buffer == null) {
                throw new IllegalStateException("The BufferAvailabilityChangedTaskshould only be executed when a Buffer has been offeredto the Buffer broker (after becoming available).");
            }
            buffer.limitSize(InboundEnvelopeDecoder.this.currentBufferRequestSize);
            InboundEnvelopeDecoder.this.currentEnvelope.setBuffer(buffer);
            InboundEnvelopeDecoder.this.currentDataBuffer = buffer.getMemorySegment().wrap(0, InboundEnvelopeDecoder.this.currentBufferRequestSize);
            InboundEnvelopeDecoder.this.currentBufferRequestSize = 0;
            InboundEnvelopeDecoder.this.stagedBuffer.release();
            try {
                if (InboundEnvelopeDecoder.this.decodeBuffer(InboundEnvelopeDecoder.this.stagedBuffer, InboundEnvelopeDecoder.this.channelHandlerContext)) {
                    InboundEnvelopeDecoder.this.stagedBuffer = null;
                    InboundEnvelopeDecoder.this.channelHandlerContext.channel().config().setAutoRead(true);
                    if (InboundEnvelopeDecoder.LOG.isDebugEnabled()) {
                        InboundEnvelopeDecoder.LOG.debug(String.format("Set channel %s auto read to true.", InboundEnvelopeDecoder.this.channelHandlerContext.channel()));
                    }
                }
            } catch (IOException e) {
                buffer.recycleBuffer();
            }
        }

        /* synthetic */ BufferAvailabilityChangedTask(InboundEnvelopeDecoder inboundEnvelopeDecoder, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/InboundEnvelopeDecoder$DecoderState.class */
    public enum DecoderState {
        COMPLETE,
        PENDING,
        NO_BUFFER_AVAILABLE
    }

    public InboundEnvelopeDecoder(BufferProviderBroker bufferProviderBroker) {
        this.bufferProviderBroker = bufferProviderBroker;
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.channelHandlerContext == null) {
            this.channelHandlerContext = channelHandlerContext;
        }
        super.channelActive(channelHandlerContext);
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (this.stagedBuffer != null) {
            throw new IllegalStateException("No channel read event should be fired as long as the a buffer is staged.");
        }
        ByteBuf byteBuf = (ByteBuf) obj;
        if (this.bytesToSkip > 0) {
            this.bytesToSkip = skipBytes(byteBuf, this.bytesToSkip);
            if (this.bytesToSkip > 0) {
                byteBuf.release();
                return;
            }
        }
        decodeBuffer(byteBuf, channelHandlerContext);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean decodeBuffer(ByteBuf byteBuf, ChannelHandlerContext channelHandlerContext) throws IOException {
        while (true) {
            DecoderState decodeEnvelope = decodeEnvelope(byteBuf);
            if (decodeEnvelope == DecoderState.PENDING) {
                if (byteBuf.isReadable()) {
                    throw new IllegalStateException("Every buffer should have been fullyconsumed after *successfully* decoding it (if it was not successful, the buffer will be staged for later consumption).");
                }
                byteBuf.release();
                return true;
            }
            if (decodeEnvelope == DecoderState.COMPLETE) {
                channelHandlerContext.fireChannelRead(this.currentEnvelope);
                this.currentEnvelope = null;
            } else if (decodeEnvelope == DecoderState.NO_BUFFER_AVAILABLE) {
                switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$io$network$bufferprovider$BufferProvider$BufferAvailabilityRegistration[this.currentBufferProvider.registerBufferAvailabilityListener(this).ordinal()]) {
                    case 1:
                        if (channelHandlerContext.channel().config().isAutoRead()) {
                            channelHandlerContext.channel().config().setAutoRead(false);
                            if (LOG.isDebugEnabled()) {
                                LOG.debug(String.format("Set channel %s auto read to false.", channelHandlerContext.channel()));
                            }
                        }
                        this.stagedBuffer = byteBuf;
                        this.stagedBuffer.retain();
                        return false;
                    case 3:
                        this.bytesToSkip = skipBytes(byteBuf, this.currentBufferRequestSize);
                        this.currentBufferRequestSize = 0;
                        this.currentEventsBuffer = null;
                        this.currentEnvelope = null;
                        break;
                }
            } else {
                continue;
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener
    public void bufferAvailable(Buffer buffer) throws Exception {
        this.bufferBroker.offer(buffer);
        this.channelHandlerContext.channel().eventLoop().execute(this.bufferAvailabilityChangedTask);
    }

    private DecoderState decodeEnvelope(ByteBuf byteBuf) throws IOException {
        if (this.currentEnvelope == null) {
            copy(byteBuf, this.headerBuffer);
            if (this.headerBuffer.hasRemaining()) {
                return DecoderState.PENDING;
            }
            this.headerBuffer.flip();
            if (this.headerBuffer.getInt() != -1159983106) {
                throw new IOException("Network stream corrupted: invalid magicnumber in current envelope header.");
            }
            this.currentEnvelope = new Envelope(this.headerBuffer.getInt(), JobID.fromByteBuffer(this.headerBuffer), ChannelID.fromByteBuffer(this.headerBuffer));
            int i = this.headerBuffer.getInt();
            int i2 = this.headerBuffer.getInt();
            this.currentEventsBuffer = i > 0 ? ByteBuffer.allocate(i) : null;
            this.currentBufferRequestSize = i2 > 0 ? i2 : 0;
            this.headerBuffer.clear();
        }
        if (this.currentEventsBuffer != null) {
            copy(byteBuf, this.currentEventsBuffer);
            if (this.currentEventsBuffer.hasRemaining()) {
                return DecoderState.PENDING;
            }
            this.currentEventsBuffer.flip();
            this.currentEnvelope.setEventsSerialized(this.currentEventsBuffer);
            this.currentEventsBuffer = null;
        }
        if (this.currentBufferRequestSize > 0) {
            Buffer requestBufferForTarget = requestBufferForTarget(this.currentEnvelope.getJobID(), this.currentEnvelope.getSource(), this.currentBufferRequestSize);
            if (requestBufferForTarget == null) {
                return DecoderState.NO_BUFFER_AVAILABLE;
            }
            this.currentEnvelope.setBuffer(requestBufferForTarget);
            this.currentDataBuffer = requestBufferForTarget.getMemorySegment().wrap(0, this.currentBufferRequestSize);
            this.currentBufferRequestSize = 0;
        }
        if (this.currentDataBuffer != null) {
            copy(byteBuf, this.currentDataBuffer);
            if (this.currentDataBuffer.hasRemaining()) {
                return DecoderState.PENDING;
            }
            this.currentDataBuffer = null;
        }
        return DecoderState.COMPLETE;
    }

    private Buffer requestBufferForTarget(JobID jobID, ChannelID channelID, int i) throws IOException {
        if (!jobID.equals(this.lastJobId) || !channelID.equals(this.lastSourceId)) {
            this.lastJobId = jobID;
            this.lastSourceId = channelID;
            this.currentBufferProvider = this.bufferProviderBroker.getBufferProvider(jobID, channelID);
        }
        return this.currentBufferProvider.requestBuffer(i);
    }

    private void copy(ByteBuf byteBuf, ByteBuffer byteBuffer) {
        if (byteBuf.isReadable()) {
            if (byteBuf.readableBytes() >= byteBuffer.remaining()) {
                byteBuf.readBytes(byteBuffer);
                return;
            }
            int limit = byteBuffer.limit();
            byteBuffer.limit(byteBuffer.position() + byteBuf.readableBytes());
            byteBuf.readBytes(byteBuffer);
            byteBuffer.limit(limit);
        }
    }

    private int skipBytes(ByteBuf byteBuf, int i) {
        if (i <= byteBuf.readableBytes()) {
            byteBuf.readBytes(i);
            return 0;
        }
        int readableBytes = i - byteBuf.readableBytes();
        byteBuf.readerIndex(byteBuf.readerIndex() + byteBuf.readableBytes());
        return readableBytes;
    }
}
