/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Set;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.apache.flink.shaded.com.google.common.collect.Sets;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PartitionRequestQueue
extends ChannelInboundHandlerAdapter {
    private final Logger LOG = LoggerFactory.getLogger(PartitionRequestQueue.class);
    private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener();
    private final Queue<SequenceNumberingSubpartitionView> queue = new ArrayDeque<SequenceNumberingSubpartitionView>();
    private final Set<InputChannelID> released = Sets.newHashSet();
    private SequenceNumberingSubpartitionView currentPartitionQueue;
    private boolean fatalError;
    private ChannelHandlerContext ctx;

    PartitionRequestQueue() {
    }

    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        if (this.ctx == null) {
            this.ctx = ctx;
        }
        super.channelRegistered(ctx);
    }

    public void enqueue(ResultSubpartitionView partitionQueue, InputChannelID receiverId) throws Exception {
        this.ctx.pipeline().fireUserEventTriggered((Object)new SequenceNumberingSubpartitionView(partitionQueue, receiverId));
    }

    public void cancel(InputChannelID receiverId) {
        this.ctx.pipeline().fireUserEventTriggered((Object)receiverId);
    }

    public void close() {
        if (this.ctx != null) {
            this.ctx.channel().close();
        }
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg.getClass() == SequenceNumberingSubpartitionView.class) {
            boolean triggerWrite = this.queue.isEmpty();
            this.queue.add((SequenceNumberingSubpartitionView)msg);
            if (triggerWrite) {
                this.writeAndFlushNextMessageIfPossible(ctx.channel());
            }
        } else if (msg.getClass() == InputChannelID.class) {
            InputChannelID toCancel = (InputChannelID)((Object)msg);
            if (this.released.contains((Object)toCancel)) {
                return;
            }
            if (this.currentPartitionQueue != null && this.currentPartitionQueue.getReceiverId().equals((Object)toCancel)) {
                this.currentPartitionQueue.releaseAllResources();
                this.markAsReleased(this.currentPartitionQueue.receiverId);
                this.currentPartitionQueue = null;
            } else {
                int size = this.queue.size();
                for (int i = 0; i < size; ++i) {
                    SequenceNumberingSubpartitionView curr = this.queue.poll();
                    if (curr.getReceiverId().equals((Object)toCancel)) {
                        curr.releaseAllResources();
                        this.markAsReleased(curr.receiverId);
                        continue;
                    }
                    this.queue.add(curr);
                }
            }
        } else {
            ctx.fireUserEventTriggered(msg);
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        this.writeAndFlushNextMessageIfPossible(ctx.channel());
    }

    private void writeAndFlushNextMessageIfPossible(Channel channel) throws IOException {
        if (this.fatalError) {
            return;
        }
        Buffer buffer = null;
        try {
            if (channel.isWritable()) {
                while (true) {
                    if (this.currentPartitionQueue == null && (this.currentPartitionQueue = this.queue.poll()) == null) {
                        return;
                    }
                    buffer = this.currentPartitionQueue.getNextBuffer();
                    if (buffer != null) break;
                    if (this.currentPartitionQueue.registerListener(null)) {
                        this.currentPartitionQueue = null;
                        continue;
                    }
                    if (!this.currentPartitionQueue.isReleased()) continue;
                    this.markAsReleased(this.currentPartitionQueue.getReceiverId());
                    Throwable cause = this.currentPartitionQueue.getFailureCause();
                    if (cause != null) {
                        this.ctx.writeAndFlush((Object)new NettyMessage.ErrorResponse(new ProducerFailedException(cause), this.currentPartitionQueue.receiverId));
                    }
                    this.currentPartitionQueue = null;
                }
                NettyMessage.BufferResponse resp = new NettyMessage.BufferResponse(buffer, this.currentPartitionQueue.getSequenceNumber(), this.currentPartitionQueue.getReceiverId());
                if (!buffer.isBuffer() && EventSerializer.fromBuffer(buffer, ((Object)((Object)this)).getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class) {
                    this.currentPartitionQueue.notifySubpartitionConsumed();
                    this.currentPartitionQueue.releaseAllResources();
                    this.markAsReleased(this.currentPartitionQueue.getReceiverId());
                    this.currentPartitionQueue = null;
                }
                channel.writeAndFlush((Object)resp).addListener((GenericFutureListener)this.writeListener);
                return;
            }
        }
        catch (Throwable t) {
            if (buffer != null) {
                buffer.recycle();
            }
            throw new IOException(t.getMessage(), t);
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        this.releaseAllResources();
        ctx.fireChannelInactive();
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        this.handleException(ctx.channel(), cause);
    }

    private void handleException(Channel channel, Throwable cause) throws IOException {
        this.fatalError = true;
        this.releaseAllResources();
        if (channel.isActive()) {
            channel.writeAndFlush((Object)new NettyMessage.ErrorResponse(cause)).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
        }
    }

    private void releaseAllResources() throws IOException {
        if (this.currentPartitionQueue != null) {
            this.currentPartitionQueue.releaseAllResources();
            this.markAsReleased(this.currentPartitionQueue.getReceiverId());
            this.currentPartitionQueue = null;
        }
        while ((this.currentPartitionQueue = this.queue.poll()) != null) {
            this.currentPartitionQueue.releaseAllResources();
            this.markAsReleased(this.currentPartitionQueue.getReceiverId());
        }
    }

    private void markAsReleased(InputChannelID receiverId) {
        this.released.add(receiverId);
    }

    private class SequenceNumberingSubpartitionView
    implements ResultSubpartitionView,
    NotificationListener {
        private final ResultSubpartitionView queueIterator;
        private final InputChannelID receiverId;
        private int sequenceNumber = -1;

        private SequenceNumberingSubpartitionView(ResultSubpartitionView queueIterator, InputChannelID receiverId) {
            this.queueIterator = (ResultSubpartitionView)Preconditions.checkNotNull((Object)queueIterator);
            this.receiverId = (InputChannelID)((Object)Preconditions.checkNotNull((Object)((Object)receiverId)));
        }

        private InputChannelID getReceiverId() {
            return this.receiverId;
        }

        private int getSequenceNumber() {
            return this.sequenceNumber;
        }

        @Override
        public Buffer getNextBuffer() throws IOException, InterruptedException {
            Buffer buffer = this.queueIterator.getNextBuffer();
            if (buffer != null) {
                ++this.sequenceNumber;
            }
            return buffer;
        }

        @Override
        public void notifySubpartitionConsumed() throws IOException {
            this.queueIterator.notifySubpartitionConsumed();
        }

        @Override
        public boolean isReleased() {
            return this.queueIterator.isReleased();
        }

        @Override
        public Throwable getFailureCause() {
            return this.queueIterator.getFailureCause();
        }

        @Override
        public boolean registerListener(NotificationListener ignored) throws IOException {
            return this.queueIterator.registerListener(this);
        }

        @Override
        public void releaseAllResources() throws IOException {
            this.queueIterator.releaseAllResources();
        }

        @Override
        public void onNotification() {
            PartitionRequestQueue.this.ctx.pipeline().fireUserEventTriggered((Object)this);
        }
    }

    private class WriteAndFlushNextMessageIfPossibleListener
    implements ChannelFutureListener {
        private WriteAndFlushNextMessageIfPossibleListener() {
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            try {
                if (future.isSuccess()) {
                    PartitionRequestQueue.this.writeAndFlushNextMessageIfPossible(future.channel());
                } else if (future.cause() != null) {
                    PartitionRequestQueue.this.handleException(future.channel(), future.cause());
                } else {
                    PartitionRequestQueue.this.handleException(future.channel(), new IllegalStateException("Sending cancelled by user."));
                }
            }
            catch (Throwable t) {
                PartitionRequestQueue.this.handleException(future.channel(), t);
            }
        }
    }
}

