package org.apache.flink.runtime.io.network.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.runtime.io.network.Envelope;
import org.apache.flink.runtime.io.network.NetworkConnectionManager;
import org.apache.flink.runtime.io.network.RemoteReceiver;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.class */
public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter {
    private static final Log LOG = LogFactory.getLog(OutboundConnectionQueue.class);
    private final Channel channel;
    private final RemoteReceiver receiver;
    private final NetworkConnectionManager connectionManager;
    private final ChannelWriteListener writeListener = new ChannelWriteListener();
    private final Queue<Envelope> queuedEnvelopes = new ArrayDeque();
    private boolean hasRequestedClose = false;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue$ChannelCloseListener.class */
    private class ChannelCloseListener implements ChannelFutureListener {
        private ChannelCloseListener() {
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                return;
            }
            OutboundConnectionQueue.this.exceptionOccurred(channelFuture.cause() == null ? new Exception("Close failed.") : channelFuture.cause());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue$ChannelWriteListener.class */
    public class ChannelWriteListener implements ChannelFutureListener {
        private ChannelWriteListener() {
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                OutboundConnectionQueue.this.writeAndFlushNextEnvelopeIfPossible();
            } else {
                OutboundConnectionQueue.this.exceptionOccurred(channelFuture.cause() == null ? new Exception("Envelope send aborted.") : channelFuture.cause());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue$QueueEvent.class */
    private enum QueueEvent {
        TRIGGER_WRITE
    }

    public OutboundConnectionQueue(Channel channel, RemoteReceiver remoteReceiver, NetworkConnectionManager networkConnectionManager, int i) {
        this.channel = channel;
        this.receiver = remoteReceiver;
        this.connectionManager = networkConnectionManager;
        channel.pipeline().addFirst("Outbound Connection Queue", this);
        channel.pipeline().addFirst("Idle State Handler", new IdleStateHandler(0L, 0L, i, TimeUnit.MILLISECONDS));
    }

    public boolean enqueue(Envelope envelope) {
        synchronized (this.channel) {
            if (this.hasRequestedClose) {
                return false;
            }
            boolean isEmpty = this.queuedEnvelopes.isEmpty();
            this.queuedEnvelopes.add(envelope);
            if (!isEmpty) {
                return true;
            }
            this.channel.pipeline().fireUserEventTriggered(QueueEvent.TRIGGER_WRITE);
            return true;
        }
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj.getClass() == QueueEvent.class) {
            writeAndFlushNextEnvelopeIfPossible();
            return;
        }
        if (obj.getClass() != IdleStateEvent.class) {
            throw new IllegalStateException("Triggered unknown event.");
        }
        boolean z = false;
        synchronized (this.channel) {
            if (this.queuedEnvelopes.isEmpty() && !this.hasRequestedClose) {
                this.hasRequestedClose = true;
                z = true;
                this.connectionManager.close(this.receiver);
            }
        }
        if (z) {
            channelHandlerContext.close().addListener(new ChannelCloseListener());
        }
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        writeAndFlushNextEnvelopeIfPossible();
    }

    public int getNumQueuedEnvelopes() {
        int size;
        synchronized (this.channel) {
            size = this.queuedEnvelopes.size();
        }
        return size;
    }

    public String toString() {
        return this.channel.toString();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeAndFlushNextEnvelopeIfPossible() {
        Envelope envelope = null;
        synchronized (this.channel) {
            if (this.channel.isWritable() && !this.queuedEnvelopes.isEmpty()) {
                envelope = this.queuedEnvelopes.poll();
            }
        }
        if (envelope != null) {
            this.channel.writeAndFlush(envelope).addListener(this.writeListener);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void exceptionOccurred(Throwable th) throws Exception {
        LOG.error(String.format("Exception in Channel %s: %s", this.channel, th.getMessage()));
        throw new Exception(th);
    }
}
