/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.flink.runtime.io.network.ChannelManager;
import org.apache.flink.runtime.io.network.Envelope;
import org.apache.flink.runtime.io.network.NetworkConnectionManager;
import org.apache.flink.runtime.io.network.RemoteReceiver;
import org.apache.flink.runtime.io.network.netty.InboundEnvelopeDecoder;
import org.apache.flink.runtime.io.network.netty.InboundEnvelopeDispatcher;
import org.apache.flink.runtime.io.network.netty.OutboundConnectionQueue;
import org.apache.flink.runtime.io.network.netty.OutboundEnvelopeEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyConnectionManager
implements NetworkConnectionManager {
    private static final Logger LOG = LoggerFactory.getLogger(NettyConnectionManager.class);
    private static final int DEBUG_PRINT_QUEUED_ENVELOPES_EVERY_MS = 10000;
    private final ConcurrentMap<RemoteReceiver, Object> outConnections = new ConcurrentHashMap<RemoteReceiver, Object>();
    private final InetAddress bindAddress;
    private final int bindPort;
    private final int bufferSize;
    private final int numInThreads;
    private final int numOutThreads;
    private final int lowWaterMark;
    private final int highWaterMark;
    private ServerBootstrap in;
    private Bootstrap out;

    public NettyConnectionManager(InetAddress bindAddress, int bindPort, int bufferSize, int numInThreads, int numOutThreads, int lowWaterMark, int highWaterMark) {
        this.bindAddress = bindAddress;
        this.bindPort = bindPort;
        this.bufferSize = bufferSize;
        int defaultNumThreads = Math.max(Runtime.getRuntime().availableProcessors() / 4, 1);
        this.numInThreads = numInThreads == -1 ? defaultNumThreads : numInThreads;
        this.numOutThreads = numOutThreads == -1 ? defaultNumThreads : numOutThreads;
        this.lowWaterMark = lowWaterMark == -1 ? bufferSize / 2 : lowWaterMark;
        this.highWaterMark = highWaterMark == -1 ? bufferSize : highWaterMark;
    }

    @Override
    public void start(ChannelManager channelManager) throws IOException {
        LOG.info(String.format("Starting with %d incoming and %d outgoing connection threads.", this.numInThreads, this.numOutThreads));
        LOG.info(String.format("Setting low water mark to %d and high water mark to %d bytes.", this.lowWaterMark, this.highWaterMark));
        final ChannelManager bufferProviderBroker = channelManager;
        final ChannelManager envelopeDispatcher = channelManager;
        int numHeapArenas = 0;
        int numDirectArenas = this.numInThreads + this.numOutThreads;
        int pageSize = this.bufferSize << 1;
        int chunkSize = 0x1000000;
        int maxOrder = (int)(Math.log(chunkSize / pageSize) / Math.log(2.0));
        PooledByteBufAllocator pooledByteBufAllocator = new PooledByteBufAllocator(true, numHeapArenas, numDirectArenas, pageSize, maxOrder);
        String msg = String.format("Instantiated PooledByteBufAllocator with direct arenas: %d, heap arenas: %d, page size (bytes): %d, chunk size (bytes): %d.", numDirectArenas, numHeapArenas, pageSize, pageSize << maxOrder);
        LOG.info(msg);
        this.in = new ServerBootstrap();
        ((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)this.in.group((EventLoopGroup)new NioEventLoopGroup(this.numInThreads)).channel(NioServerSocketChannel.class)).localAddress(this.bindAddress, this.bindPort)).childHandler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline().addLast(new ChannelHandler[]{new InboundEnvelopeDecoder(bufferProviderBroker)}).addLast(new ChannelHandler[]{new InboundEnvelopeDispatcher(envelopeDispatcher)});
            }
        }).option(ChannelOption.RCVBUF_ALLOCATOR, (Object)new FixedRecvByteBufAllocator(pageSize))).option(ChannelOption.ALLOCATOR, (Object)pooledByteBufAllocator);
        this.out = new Bootstrap();
        ((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)this.out.group((EventLoopGroup)new NioEventLoopGroup(this.numOutThreads))).channel(NioSocketChannel.class)).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline().addLast(new ChannelHandler[]{new OutboundEnvelopeEncoder()});
            }
        })).option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, (Object)this.lowWaterMark)).option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, (Object)this.highWaterMark)).option(ChannelOption.ALLOCATOR, (Object)pooledByteBufAllocator)).option(ChannelOption.TCP_NODELAY, (Object)false)).option(ChannelOption.SO_KEEPALIVE, (Object)true);
        try {
            this.in.bind().sync();
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
        if (LOG.isDebugEnabled()) {
            new Thread(new Runnable(){

                @Override
                public void run() {
                    Date date = new Date();
                    while (true) {
                        try {
                            while (true) {
                                Thread.sleep(10000L);
                                date.setTime(System.currentTimeMillis());
                                System.out.println(date);
                                System.out.println(NettyConnectionManager.this.getNonZeroNumQueuedEnvelopes());
                            }
                        }
                        catch (InterruptedException e) {
                            e.printStackTrace();
                            continue;
                        }
                        break;
                    }
                }
            }).start();
        }
    }

    @Override
    public void enqueue(Envelope envelope, RemoteReceiver receiver) throws IOException {
        OutboundConnectionQueue channel;
        Object entry = this.outConnections.get(receiver);
        if (entry != null) {
            if (entry instanceof OutboundConnectionQueue) {
                channel = (OutboundConnectionQueue)((Object)entry);
            } else {
                ChannelInBuildup future = (ChannelInBuildup)entry;
                channel = future.waitForChannel();
            }
        } else {
            ChannelInBuildup inBuildup = new ChannelInBuildup(this.out, receiver);
            Object old = this.outConnections.putIfAbsent(receiver, inBuildup);
            if (old == null) {
                this.out.connect((SocketAddress)receiver.getConnectionAddress()).addListener((GenericFutureListener)inBuildup);
                channel = inBuildup.waitForChannel();
                Object previous = this.outConnections.put(receiver, (Object)channel);
                if (inBuildup != previous) {
                    throw new IOException("Race condition during channel build up.");
                }
            } else {
                channel = old instanceof ChannelInBuildup ? ((ChannelInBuildup)old).waitForChannel() : (OutboundConnectionQueue)((Object)old);
            }
        }
        channel.enqueue(envelope);
    }

    @Override
    public void shutdown() throws IOException {
        if (!this.in.group().isShuttingDown()) {
            LOG.info("Shutting down incoming connections.");
            try {
                this.in.group().shutdownGracefully().sync();
            }
            catch (InterruptedException e) {
                throw new IOException(e);
            }
        }
        if (!this.out.group().isShuttingDown()) {
            LOG.info("Shutting down outgoing connections.");
            try {
                this.out.group().shutdownGracefully().sync();
            }
            catch (InterruptedException e) {
                throw new IOException(e);
            }
        }
    }

    private String getNonZeroNumQueuedEnvelopes() {
        StringBuilder str = new StringBuilder();
        str.append(String.format("==== %d outgoing connections ===\n", this.outConnections.size()));
        for (Map.Entry entry : this.outConnections.entrySet()) {
            RemoteReceiver receiver = (RemoteReceiver)entry.getKey();
            Object value = entry.getValue();
            if (value instanceof OutboundConnectionQueue) {
                OutboundConnectionQueue queue = (OutboundConnectionQueue)((Object)value);
                if (queue.getNumQueuedEnvelopes() <= 0) continue;
                str.append(String.format("%s> Number of queued envelopes for %s with channel %s: %d\n", Thread.currentThread().getId(), receiver, queue.toString(), queue.getNumQueuedEnvelopes()));
                continue;
            }
            if (!(value instanceof ChannelInBuildup)) continue;
            str.append(String.format("%s> Connection to %s is still in buildup\n", Thread.currentThread().getId(), receiver));
        }
        return str.toString();
    }

    private static final class ChannelInBuildup
    implements ChannelFutureListener {
        private final Object lock = new Object();
        private volatile OutboundConnectionQueue channel;
        private volatile Throwable error;
        private int numRetries = 3;
        private final Bootstrap out;
        private final RemoteReceiver receiver;

        private ChannelInBuildup(Bootstrap out, RemoteReceiver receiver) {
            this.out = out;
            this.receiver = receiver;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handInChannel(OutboundConnectionQueue c) {
            Object object = this.lock;
            synchronized (object) {
                this.channel = c;
                this.lock.notifyAll();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void notifyOfError(Throwable error) {
            Object object = this.lock;
            synchronized (object) {
                this.error = error;
                this.lock.notifyAll();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private OutboundConnectionQueue waitForChannel() throws IOException {
            Object object = this.lock;
            synchronized (object) {
                while (this.error == null && this.channel == null) {
                    try {
                        this.lock.wait(2000L);
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException("Channel buildup interrupted.");
                    }
                }
            }
            if (this.error != null) {
                throw new IOException("Connecting the channel failed: " + this.error.getMessage(), this.error);
            }
            return this.channel;
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(String.format("Channel %s connected", future.channel()));
                }
                this.handInChannel(new OutboundConnectionQueue(future.channel()));
            } else if (this.numRetries > 0) {
                LOG.debug("Connection request did not succeed, retrying ({} attempts left)", (Object)this.numRetries);
                this.out.connect((SocketAddress)this.receiver.getConnectionAddress()).addListener((GenericFutureListener)this);
                --this.numRetries;
            } else if (future.getClass() != null) {
                this.notifyOfError(future.cause());
            } else {
                this.notifyOfError(new Exception("Connection could not be established."));
            }
        }
    }
}

