package org.apache.flink.runtime.io.network.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.flink.runtime.io.network.ChannelManager;
import org.apache.flink.runtime.io.network.Envelope;
import org.apache.flink.runtime.io.network.NetworkConnectionManager;
import org.apache.flink.runtime.io.network.RemoteReceiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyConnectionManager.class */
public class NettyConnectionManager implements NetworkConnectionManager {
    private static final Logger LOG = LoggerFactory.getLogger(NettyConnectionManager.class);
    private static final int DEBUG_PRINT_QUEUED_ENVELOPES_EVERY_MS = 10000;
    private final ConcurrentMap<RemoteReceiver, Object> outConnections = new ConcurrentHashMap();
    private final InetAddress bindAddress;
    private final int bindPort;
    private final int bufferSize;
    private final int numInThreads;
    private final int numOutThreads;
    private final int lowWaterMark;
    private final int highWaterMark;
    private ServerBootstrap in;
    private Bootstrap out;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyConnectionManager$ChannelInBuildup.class */
    public static final class ChannelInBuildup implements ChannelFutureListener {
        private final Object lock;
        private volatile OutboundConnectionQueue channel;
        private volatile Throwable error;
        private int numRetries;
        private final Bootstrap out;
        private final RemoteReceiver receiver;

        private ChannelInBuildup(Bootstrap bootstrap, RemoteReceiver remoteReceiver) {
            this.lock = new Object();
            this.numRetries = 3;
            this.out = bootstrap;
            this.receiver = remoteReceiver;
        }

        private void handInChannel(OutboundConnectionQueue outboundConnectionQueue) {
            synchronized (this.lock) {
                this.channel = outboundConnectionQueue;
                this.lock.notifyAll();
            }
        }

        private void notifyOfError(Throwable th) {
            synchronized (this.lock) {
                this.error = th;
                this.lock.notifyAll();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public OutboundConnectionQueue waitForChannel() throws IOException {
            synchronized (this.lock) {
                while (this.error == null && this.channel == null) {
                    try {
                        this.lock.wait(2000L);
                    } catch (InterruptedException e) {
                        throw new RuntimeException("Channel buildup interrupted.");
                    }
                }
            }
            if (this.error != null) {
                throw new IOException("Connecting the channel failed: " + this.error.getMessage(), this.error);
            }
            return this.channel;
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                if (NettyConnectionManager.LOG.isDebugEnabled()) {
                    NettyConnectionManager.LOG.debug(String.format("Channel %s connected", channelFuture.channel()));
                }
                handInChannel(new OutboundConnectionQueue(channelFuture.channel()));
            } else if (this.numRetries > 0) {
                NettyConnectionManager.LOG.debug("Connection request did not succeed, retrying ({} attempts left)", Integer.valueOf(this.numRetries));
                this.out.connect(this.receiver.getConnectionAddress()).addListener(this);
                this.numRetries--;
            } else if (channelFuture.getClass() != null) {
                notifyOfError(channelFuture.cause());
            } else {
                notifyOfError(new Exception("Connection could not be established."));
            }
        }
    }

    public NettyConnectionManager(InetAddress inetAddress, int i, int i2, int i3, int i4, int i5, int i6) {
        this.bindAddress = inetAddress;
        this.bindPort = i;
        this.bufferSize = i2;
        int max = Math.max(Runtime.getRuntime().availableProcessors() / 4, 1);
        this.numInThreads = i3 == -1 ? max : i3;
        this.numOutThreads = i4 == -1 ? max : i4;
        this.lowWaterMark = i5 == -1 ? i2 / 2 : i5;
        this.highWaterMark = i6 == -1 ? i2 : i6;
    }

    @Override // org.apache.flink.runtime.io.network.NetworkConnectionManager
    public void start(final ChannelManager channelManager) throws IOException {
        LOG.info(String.format("Starting with %d incoming and %d outgoing connection threads.", Integer.valueOf(this.numInThreads), Integer.valueOf(this.numOutThreads)));
        LOG.info(String.format("Setting low water mark to %d and high water mark to %d bytes.", Integer.valueOf(this.lowWaterMark), Integer.valueOf(this.highWaterMark)));
        int i = this.numInThreads + this.numOutThreads;
        int i2 = this.bufferSize << 1;
        int log = (int) (Math.log(16777216 / i2) / Math.log(2.0d));
        PooledByteBufAllocator pooledByteBufAllocator = new PooledByteBufAllocator(true, 0, i, i2, log);
        LOG.info(String.format("Instantiated PooledByteBufAllocator with direct arenas: %d, heap arenas: %d, page size (bytes): %d, chunk size (bytes): %d.", Integer.valueOf(i), 0, Integer.valueOf(i2), Integer.valueOf(i2 << log)));
        this.in = new ServerBootstrap();
        this.in.group(new NioEventLoopGroup(this.numInThreads)).channel(NioServerSocketChannel.class).localAddress(this.bindAddress, this.bindPort).childHandler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.runtime.io.network.netty.NettyConnectionManager.1
            public void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new ChannelHandler[]{new InboundEnvelopeDecoder(channelManager)}).addLast(new ChannelHandler[]{new InboundEnvelopeDispatcher(channelManager)});
            }
        }).option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(i2)).option(ChannelOption.ALLOCATOR, pooledByteBufAllocator);
        this.out = new Bootstrap();
        this.out.group(new NioEventLoopGroup(this.numOutThreads)).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.runtime.io.network.netty.NettyConnectionManager.2
            public void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new ChannelHandler[]{new OutboundEnvelopeEncoder()});
            }
        }).option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, Integer.valueOf(this.lowWaterMark)).option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, Integer.valueOf(this.highWaterMark)).option(ChannelOption.ALLOCATOR, pooledByteBufAllocator).option(ChannelOption.TCP_NODELAY, false).option(ChannelOption.SO_KEEPALIVE, true);
        try {
            this.in.bind().sync();
            if (LOG.isDebugEnabled()) {
                new Thread(new Runnable() { // from class: org.apache.flink.runtime.io.network.netty.NettyConnectionManager.3
                    @Override // java.lang.Runnable
                    public void run() {
                        Date date = new Date();
                        while (true) {
                            try {
                                Thread.sleep(10000L);
                                date.setTime(System.currentTimeMillis());
                                System.out.println(date);
                                System.out.println(NettyConnectionManager.this.getNonZeroNumQueuedEnvelopes());
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }).start();
            }
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    @Override // org.apache.flink.runtime.io.network.NetworkConnectionManager
    public void enqueue(Envelope envelope, RemoteReceiver remoteReceiver) throws IOException {
        OutboundConnectionQueue waitForChannel;
        Object obj = this.outConnections.get(remoteReceiver);
        if (obj != null) {
            waitForChannel = obj instanceof OutboundConnectionQueue ? (OutboundConnectionQueue) obj : ((ChannelInBuildup) obj).waitForChannel();
        } else {
            ChannelInBuildup channelInBuildup = new ChannelInBuildup(this.out, remoteReceiver);
            Object putIfAbsent = this.outConnections.putIfAbsent(remoteReceiver, channelInBuildup);
            if (putIfAbsent == null) {
                this.out.connect(remoteReceiver.getConnectionAddress()).addListener(channelInBuildup);
                waitForChannel = channelInBuildup.waitForChannel();
                if (channelInBuildup != this.outConnections.put(remoteReceiver, waitForChannel)) {
                    throw new IOException("Race condition during channel build up.");
                }
            } else {
                waitForChannel = putIfAbsent instanceof ChannelInBuildup ? ((ChannelInBuildup) putIfAbsent).waitForChannel() : (OutboundConnectionQueue) putIfAbsent;
            }
        }
        waitForChannel.enqueue(envelope);
    }

    @Override // org.apache.flink.runtime.io.network.NetworkConnectionManager
    public void shutdown() throws IOException {
        if (!this.in.group().isShuttingDown()) {
            LOG.info("Shutting down incoming connections.");
            try {
                this.in.group().shutdownGracefully().sync();
            } catch (InterruptedException e) {
                throw new IOException(e);
            }
        }
        if (this.out.group().isShuttingDown()) {
            return;
        }
        LOG.info("Shutting down outgoing connections.");
        try {
            this.out.group().shutdownGracefully().sync();
        } catch (InterruptedException e2) {
            throw new IOException(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getNonZeroNumQueuedEnvelopes() {
        StringBuilder sb = new StringBuilder();
        sb.append(String.format("==== %d outgoing connections ===\n", Integer.valueOf(this.outConnections.size())));
        for (Map.Entry<RemoteReceiver, Object> entry : this.outConnections.entrySet()) {
            RemoteReceiver key = entry.getKey();
            Object value = entry.getValue();
            if (value instanceof OutboundConnectionQueue) {
                OutboundConnectionQueue outboundConnectionQueue = (OutboundConnectionQueue) value;
                if (outboundConnectionQueue.getNumQueuedEnvelopes() > 0) {
                    sb.append(String.format("%s> Number of queued envelopes for %s with channel %s: %d\n", Long.valueOf(Thread.currentThread().getId()), key, outboundConnectionQueue.toString(), Integer.valueOf(outboundConnectionQueue.getNumQueuedEnvelopes())));
                }
            } else if (value instanceof ChannelInBuildup) {
                sb.append(String.format("%s> Connection to %s is still in buildup\n", Long.valueOf(Thread.currentThread().getId()), key));
            }
        }
        return sb.toString();
    }
}
