package org.apache.flink.runtime.io.network.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Set;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.shaded.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.class */
public class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
    private final Logger LOG = LoggerFactory.getLogger(PartitionRequestQueue.class);
    private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener();
    private final Queue<SequenceNumberingSubpartitionView> queue = new ArrayDeque();
    private final Set<InputChannelID> released = Sets.newHashSet();
    private SequenceNumberingSubpartitionView currentPartitionQueue;
    private boolean fatalError;
    private ChannelHandlerContext ctx;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestQueue$SequenceNumberingSubpartitionView.class */
    public class SequenceNumberingSubpartitionView implements ResultSubpartitionView, NotificationListener {
        private final ResultSubpartitionView queueIterator;
        private final InputChannelID receiverId;
        private int sequenceNumber;

        private SequenceNumberingSubpartitionView(ResultSubpartitionView resultSubpartitionView, InputChannelID inputChannelID) {
            this.sequenceNumber = -1;
            this.queueIterator = (ResultSubpartitionView) Preconditions.checkNotNull(resultSubpartitionView);
            this.receiverId = (InputChannelID) Preconditions.checkNotNull(inputChannelID);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public InputChannelID getReceiverId() {
            return this.receiverId;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getSequenceNumber() {
            return this.sequenceNumber;
        }

        @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
        public Buffer getNextBuffer() throws IOException, InterruptedException {
            Buffer nextBuffer = this.queueIterator.getNextBuffer();
            if (nextBuffer != null) {
                this.sequenceNumber++;
            }
            return nextBuffer;
        }

        @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
        public void notifySubpartitionConsumed() throws IOException {
            this.queueIterator.notifySubpartitionConsumed();
        }

        @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
        public boolean isReleased() {
            return this.queueIterator.isReleased();
        }

        @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
        public Throwable getFailureCause() {
            return this.queueIterator.getFailureCause();
        }

        @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
        public boolean registerListener(NotificationListener notificationListener) throws IOException {
            return this.queueIterator.registerListener(this);
        }

        @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
        public void releaseAllResources() throws IOException {
            this.queueIterator.releaseAllResources();
        }

        @Override // org.apache.flink.runtime.util.event.NotificationListener
        public void onNotification() {
            PartitionRequestQueue.this.ctx.pipeline().fireUserEventTriggered(this);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestQueue$WriteAndFlushNextMessageIfPossibleListener.class */
    private class WriteAndFlushNextMessageIfPossibleListener implements ChannelFutureListener {
        private WriteAndFlushNextMessageIfPossibleListener() {
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            try {
                if (channelFuture.isSuccess()) {
                    PartitionRequestQueue.this.writeAndFlushNextMessageIfPossible(channelFuture.channel());
                } else if (channelFuture.cause() != null) {
                    PartitionRequestQueue.this.handleException(channelFuture.channel(), channelFuture.cause());
                } else {
                    PartitionRequestQueue.this.handleException(channelFuture.channel(), new IllegalStateException("Sending cancelled by user."));
                }
            } catch (Throwable th) {
                PartitionRequestQueue.this.handleException(channelFuture.channel(), th);
            }
        }
    }

    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.ctx == null) {
            this.ctx = channelHandlerContext;
        }
        super.channelRegistered(channelHandlerContext);
    }

    public void enqueue(ResultSubpartitionView resultSubpartitionView, InputChannelID inputChannelID) throws Exception {
        this.ctx.pipeline().fireUserEventTriggered(new SequenceNumberingSubpartitionView(resultSubpartitionView, inputChannelID));
    }

    public void cancel(InputChannelID inputChannelID) {
        this.ctx.pipeline().fireUserEventTriggered(inputChannelID);
    }

    public void close() {
        if (this.ctx != null) {
            this.ctx.channel().close();
        }
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj.getClass() == SequenceNumberingSubpartitionView.class) {
            boolean isEmpty = this.queue.isEmpty();
            this.queue.add((SequenceNumberingSubpartitionView) obj);
            if (isEmpty) {
                writeAndFlushNextMessageIfPossible(channelHandlerContext.channel());
                return;
            }
            return;
        }
        if (obj.getClass() != InputChannelID.class) {
            channelHandlerContext.fireUserEventTriggered(obj);
            return;
        }
        InputChannelID inputChannelID = (InputChannelID) obj;
        if (this.released.contains(inputChannelID)) {
            return;
        }
        if (this.currentPartitionQueue != null && this.currentPartitionQueue.getReceiverId().equals(inputChannelID)) {
            this.currentPartitionQueue.releaseAllResources();
            markAsReleased(this.currentPartitionQueue.receiverId);
            this.currentPartitionQueue = null;
            return;
        }
        int size = this.queue.size();
        for (int i = 0; i < size; i++) {
            SequenceNumberingSubpartitionView poll = this.queue.poll();
            if (poll.getReceiverId().equals(inputChannelID)) {
                poll.releaseAllResources();
                markAsReleased(poll.receiverId);
            } else {
                this.queue.add(poll);
            }
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        writeAndFlushNextMessageIfPossible(channelHandlerContext.channel());
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Code restructure failed: missing block: B:34:0x0097, code lost:
    
        r0 = new org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse(r9, r7.currentPartitionQueue.getSequenceNumber(), r7.currentPartitionQueue.getReceiverId());
     */
    /* JADX WARN: Code restructure failed: missing block: B:35:0x00b2, code lost:
    
        if (r9.isBuffer() != false) goto L29;
     */
    /* JADX WARN: Code restructure failed: missing block: B:37:0x00c5, code lost:
    
        if (org.apache.flink.runtime.io.network.api.serialization.EventSerializer.fromBuffer(r9, getClass().getClassLoader()).getClass() != org.apache.flink.runtime.io.network.api.EndOfPartitionEvent.class) goto L29;
     */
    /* JADX WARN: Code restructure failed: missing block: B:38:0x00c8, code lost:
    
        r7.currentPartitionQueue.notifySubpartitionConsumed();
        r7.currentPartitionQueue.releaseAllResources();
        markAsReleased(r7.currentPartitionQueue.getReceiverId());
        r7.currentPartitionQueue = null;
     */
    /* JADX WARN: Code restructure failed: missing block: B:39:0x00e6, code lost:
    
        r8.writeAndFlush(r0).addListener(r7.writeListener);
     */
    /* JADX WARN: Code restructure failed: missing block: B:40:0x00f7, code lost:
    
        return;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void writeAndFlushNextMessageIfPossible(io.netty.channel.Channel r8) throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 274
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(io.netty.channel.Channel):void");
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        releaseAllResources();
        channelHandlerContext.fireChannelInactive();
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        handleException(channelHandlerContext.channel(), th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleException(Channel channel, Throwable th) throws IOException {
        this.fatalError = true;
        releaseAllResources();
        if (channel.isActive()) {
            channel.writeAndFlush(new NettyMessage.ErrorResponse(th)).addListener(ChannelFutureListener.CLOSE);
        }
    }

    private void releaseAllResources() throws IOException {
        if (this.currentPartitionQueue != null) {
            this.currentPartitionQueue.releaseAllResources();
            markAsReleased(this.currentPartitionQueue.getReceiverId());
            this.currentPartitionQueue = null;
        }
        while (true) {
            SequenceNumberingSubpartitionView poll = this.queue.poll();
            this.currentPartitionQueue = poll;
            if (poll == null) {
                return;
            }
            this.currentPartitionQueue.releaseAllResources();
            markAsReleased(this.currentPartitionQueue.getReceiverId());
        }
    }

    private void markAsReleased(InputChannelID inputChannelID) {
        this.released.add(inputChannelID);
    }
}
