package org.apache.reef.wake.remote.transport.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.IOException;
import java.net.BindException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.wake.EStage;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.impl.DefaultThreadFactory;
import org.apache.reef.wake.remote.Encoder;
import org.apache.reef.wake.remote.RemoteConfiguration;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
import org.apache.reef.wake.remote.impl.TransportEvent;
import org.apache.reef.wake.remote.ports.TcpPortProvider;
import org.apache.reef.wake.remote.transport.Link;
import org.apache.reef.wake.remote.transport.LinkListener;
import org.apache.reef.wake.remote.transport.Transport;
import org.apache.reef.wake.remote.transport.exception.TransportRuntimeException;

/* loaded from: input_file:org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.class */
public class NettyMessagingTransport implements Transport {
    private static final String CLASS_NAME = NettyMessagingTransport.class.getName();
    private static final Logger LOG = Logger.getLogger(CLASS_NAME);
    private static final int SERVER_BOSS_NUM_THREADS = 3;
    private static final int SERVER_WORKER_NUM_THREADS = 20;
    private static final int CLIENT_WORKER_NUM_THREADS = 10;
    private final EventLoopGroup clientWorkerGroup;
    private final EventLoopGroup serverBossGroup;
    private final EventLoopGroup serverWorkerGroup;
    private final Bootstrap clientBootstrap;
    private final ServerBootstrap serverBootstrap;
    private final Channel acceptor;
    private final int serverPort;
    private final SocketAddress localAddress;
    private final NettyClientEventListener clientEventListener;
    private final NettyServerEventListener serverEventListener;
    private final int numberOfTries;
    private final int retryTimeout;
    public static final String UNKNOWN_HOST_NAME = "##UNKNOWN##";
    private final ConcurrentMap<SocketAddress, LinkReference> addrToLinkRefMap = new ConcurrentHashMap();
    private final ChannelGroup clientChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private final ChannelGroup serverChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Inject
    NettyMessagingTransport(@Parameter(RemoteConfiguration.HostAddress.class) String str, @Parameter(RemoteConfiguration.Port.class) int i, @Parameter(RemoteConfiguration.RemoteClientStage.class) EStage<TransportEvent> eStage, @Parameter(RemoteConfiguration.RemoteServerStage.class) EStage<TransportEvent> eStage2, @Parameter(RemoteConfiguration.NumberOfTries.class) int i2, @Parameter(RemoteConfiguration.RetryTimeout.class) int i3, TcpPortProvider tcpPortProvider, LocalAddressProvider localAddressProvider) {
        int i4 = i;
        if (i4 < 0) {
            throw new RemoteRuntimeException("Invalid server port: " + i4);
        }
        String localAddress = "##UNKNOWN##".equals(str) ? localAddressProvider.getLocalAddress() : str;
        this.numberOfTries = i2;
        this.retryTimeout = i3;
        this.clientEventListener = new NettyClientEventListener(this.addrToLinkRefMap, eStage);
        this.serverEventListener = new NettyServerEventListener(this.addrToLinkRefMap, eStage2);
        this.serverBossGroup = new NioEventLoopGroup(SERVER_BOSS_NUM_THREADS, new DefaultThreadFactory(CLASS_NAME + "ServerBoss"));
        this.serverWorkerGroup = new NioEventLoopGroup(SERVER_WORKER_NUM_THREADS, new DefaultThreadFactory(CLASS_NAME + "ServerWorker"));
        this.clientWorkerGroup = new NioEventLoopGroup(CLIENT_WORKER_NUM_THREADS, new DefaultThreadFactory(CLASS_NAME + "ClientWorker"));
        this.clientBootstrap = new Bootstrap();
        this.clientBootstrap.group(this.clientWorkerGroup).channel(NioSocketChannel.class).handler(new NettyChannelInitializer(new NettyDefaultChannelHandlerFactory("client", this.clientChannelGroup, this.clientEventListener))).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_KEEPALIVE, true);
        this.serverBootstrap = new ServerBootstrap();
        this.serverBootstrap.group(this.serverBossGroup, this.serverWorkerGroup).channel(NioServerSocketChannel.class).childHandler(new NettyChannelInitializer(new NettyDefaultChannelHandlerFactory("server", this.serverChannelGroup, this.serverEventListener))).option(ChannelOption.SO_BACKLOG, 128).option(ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.SO_KEEPALIVE, true);
        LOG.log(Level.FINE, "Binding to {0}", Integer.valueOf(i4));
        Channel channel = null;
        try {
            if (i4 > 0) {
                channel = this.serverBootstrap.bind(new InetSocketAddress(localAddress, i4)).sync().channel();
            } else {
                Iterator<Integer> it = tcpPortProvider.iterator();
                while (channel == null) {
                    if (!it.hasNext()) {
                        throw new IllegalStateException("tcpPortProvider cannot find a free port.");
                    }
                    i4 = it.next().intValue();
                    LOG.log(Level.FINEST, "Try port {0}", Integer.valueOf(i4));
                    try {
                        channel = this.serverBootstrap.bind(new InetSocketAddress(localAddress, i4)).sync().channel();
                    } catch (Exception e) {
                        if (!(e instanceof BindException)) {
                            throw e;
                        }
                        LOG.log(Level.FINEST, "The port {0} is already bound. Try again", Integer.valueOf(i4));
                    }
                }
            }
            this.acceptor = channel;
            this.serverPort = i4;
            this.localAddress = new InetSocketAddress(localAddress, this.serverPort);
            LOG.log(Level.FINE, "Starting netty transport socket address: {0}", this.localAddress);
        } catch (IllegalStateException e2) {
            TransportRuntimeException transportRuntimeException = new TransportRuntimeException("tcpPortProvider failed to return free ports.", e2);
            LOG.log(Level.SEVERE, "Cannot find a free port with " + tcpPortProvider, (Throwable) transportRuntimeException);
            this.clientWorkerGroup.shutdownGracefully();
            this.serverBossGroup.shutdownGracefully();
            this.serverWorkerGroup.shutdownGracefully();
            throw transportRuntimeException;
        } catch (Exception e3) {
            TransportRuntimeException transportRuntimeException2 = new TransportRuntimeException("Cannot bind to port " + i4, e3);
            LOG.log(Level.SEVERE, "Cannot bind to port " + i4, (Throwable) e3);
            this.clientWorkerGroup.shutdownGracefully();
            this.serverBossGroup.shutdownGracefully();
            this.serverWorkerGroup.shutdownGracefully();
            throw transportRuntimeException2;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        LOG.log(Level.FINE, "Closing netty transport socket address: {0}", this.localAddress);
        this.clientChannelGroup.close().awaitUninterruptibly();
        this.serverChannelGroup.close().awaitUninterruptibly();
        this.acceptor.close().sync();
        this.clientWorkerGroup.shutdownGracefully();
        this.serverBossGroup.shutdownGracefully();
        this.serverWorkerGroup.shutdownGracefully();
        LOG.log(Level.FINE, "Closing netty transport socket address: {0} done", this.localAddress);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.reef.wake.remote.transport.Transport
    public <T> Link<T> open(SocketAddress socketAddress, Encoder<? super T> encoder, LinkListener<? super T> linkListener) throws IOException {
        Link link = null;
        for (int i = 0; i <= this.numberOfTries; i++) {
            LinkReference linkReference = this.addrToLinkRefMap.get(socketAddress);
            if (linkReference != null) {
                Link<T> link2 = (Link<T>) linkReference.getLink();
                if (LOG.isLoggable(Level.FINE)) {
                    LOG.log(Level.FINE, "Link {0} for {1} found", new Object[]{link2, socketAddress});
                }
                if (link2 != null) {
                    return link2;
                }
            }
            if (i == this.numberOfTries) {
                throw new ConnectException("Connection to " + socketAddress + " refused");
            }
            LOG.log(Level.FINE, "No cached link for {0} thread {1}", new Object[]{socketAddress, Thread.currentThread()});
            LinkReference linkReference2 = new LinkReference();
            LinkReference putIfAbsent = this.addrToLinkRefMap.putIfAbsent(socketAddress, linkReference2);
            AtomicInteger connectInProgress = putIfAbsent != null ? putIfAbsent.getConnectInProgress() : linkReference2.getConnectInProgress();
            synchronized (connectInProgress) {
                if (!connectInProgress.compareAndSet(0, 1)) {
                    while (connectInProgress.get() == 1) {
                        try {
                            connectInProgress.wait();
                        } catch (InterruptedException e) {
                            LOG.log(Level.WARNING, "Wait interrupted", (Throwable) e);
                        }
                    }
                }
            }
            LinkReference linkReference3 = this.addrToLinkRefMap.get(socketAddress);
            link = linkReference3.getLink();
            if (link != null) {
                return link;
            }
            try {
                ChannelFuture connect = this.clientBootstrap.connect(socketAddress);
                connect.syncUninterruptibly();
                link = new NettyLink(connect.channel(), encoder, linkListener);
                linkReference3.setLink(link);
                synchronized (connectInProgress) {
                    connectInProgress.compareAndSet(1, 2);
                    connectInProgress.notifyAll();
                }
                break;
            } catch (Exception e2) {
                if (e2.getClass().getSimpleName().compareTo("ConnectException") != 0) {
                    throw e2;
                }
                LOG.log(Level.WARNING, "Connection refused. Retry {0} of {1}", new Object[]{Integer.valueOf(i + 1), Integer.valueOf(this.numberOfTries)});
                synchronized (connectInProgress) {
                    connectInProgress.compareAndSet(1, 0);
                    connectInProgress.notifyAll();
                    if (i < this.numberOfTries) {
                        try {
                            Thread.sleep(this.retryTimeout);
                        } catch (InterruptedException e3) {
                            LOG.log(Level.WARNING, "Thread {0} interrupted while sleeping", Thread.currentThread());
                        }
                    }
                }
            }
        }
        return link;
    }

    @Override // org.apache.reef.wake.remote.transport.Transport
    public <T> Link<T> get(SocketAddress socketAddress) {
        LinkReference linkReference = this.addrToLinkRefMap.get(socketAddress);
        if (linkReference != null) {
            return (Link<T>) linkReference.getLink();
        }
        return null;
    }

    @Override // org.apache.reef.wake.remote.transport.Transport
    public SocketAddress getLocalAddress() {
        return this.localAddress;
    }

    @Override // org.apache.reef.wake.remote.transport.Transport
    public int getListeningPort() {
        return this.serverPort;
    }

    @Override // org.apache.reef.wake.remote.transport.Transport
    public void registerErrorHandler(EventHandler<Exception> eventHandler) {
        this.clientEventListener.registerErrorHandler(eventHandler);
        this.serverEventListener.registerErrorHandler(eventHandler);
    }
}
