/*
 * Decompiled with CFR 0.152.
 */
package reactor.net.zmq.tcp;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.spec.Promises;
import reactor.event.dispatch.Dispatcher;
import reactor.function.Consumer;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.net.NetChannel;
import reactor.net.config.ServerSocketOptions;
import reactor.net.config.SslOptions;
import reactor.net.tcp.TcpServer;
import reactor.net.zmq.ZeroMQNetChannel;
import reactor.net.zmq.ZeroMQServerSocketOptions;
import reactor.net.zmq.ZeroMQWorker;
import reactor.net.zmq.tcp.ZeroMQ;
import reactor.support.NamedDaemonThreadFactory;
import reactor.util.Assert;
import reactor.util.UUIDUtils;

public class ZeroMQTcpServer<IN, OUT>
extends TcpServer<IN, OUT> {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final int ioThreadCount;
    private final ZeroMQServerSocketOptions zmqOpts;
    private final ExecutorService threadPool;
    private volatile ZeroMQWorker<IN, OUT> worker;
    private volatile Future<?> workerFuture;

    public ZeroMQTcpServer(@Nonnull Environment env, @Nonnull Reactor reactor, @Nullable InetSocketAddress listenAddress, ServerSocketOptions options, SslOptions sslOptions, @Nullable Codec<Buffer, IN, OUT> codec, @Nonnull Collection<Consumer<NetChannel<IN, OUT>>> consumers) {
        super(env, reactor, listenAddress, options, sslOptions, codec, consumers);
        this.ioThreadCount = (Integer)env.getProperty("reactor.zmq.ioThreadCount", Integer.class, (Object)1);
        this.zmqOpts = options instanceof ZeroMQServerSocketOptions ? (ZeroMQServerSocketOptions)options : null;
        this.threadPool = Executors.newCachedThreadPool((ThreadFactory)new NamedDaemonThreadFactory("zmq-server"));
    }

    @Override
    public TcpServer<IN, OUT> start(final @Nullable Runnable started) {
        Assert.isNull(this.worker, (String)"This ZeroMQ server has already been started");
        UUID id = UUIDUtils.random();
        int socketType = null != this.zmqOpts ? this.zmqOpts.socketType() : 6;
        ZContext zmq = null != this.zmqOpts ? this.zmqOpts.context() : null;
        this.worker = new ZeroMQWorker<IN, OUT>(id, socketType, this.ioThreadCount, zmq){

            @Override
            protected void configure(ZMQ.Socket socket) {
                socket.setReceiveBufferSize((long)ZeroMQTcpServer.this.getOptions().rcvbuf());
                socket.setSendBufferSize((long)ZeroMQTcpServer.this.getOptions().sndbuf());
                socket.setBacklog((long)ZeroMQTcpServer.this.getOptions().backlog());
                if (ZeroMQTcpServer.this.getOptions().keepAlive()) {
                    socket.setTCPKeepAlive(1);
                }
                if (null != ZeroMQTcpServer.this.zmqOpts && null != ZeroMQTcpServer.this.zmqOpts.socketConfigurer()) {
                    ZeroMQTcpServer.this.zmqOpts.socketConfigurer().accept((Object)socket);
                }
            }

            @Override
            protected void start(ZMQ.Socket socket) {
                String addr = null != ZeroMQTcpServer.this.zmqOpts && null != ZeroMQTcpServer.this.zmqOpts.listenAddresses() ? ZeroMQTcpServer.this.zmqOpts.listenAddresses() : "tcp://" + ZeroMQTcpServer.this.getListenAddress().getHostString() + ":" + ZeroMQTcpServer.this.getListenAddress().getPort();
                if (ZeroMQTcpServer.this.log.isInfoEnabled()) {
                    String type = ZeroMQ.findSocketTypeName(socket.getType());
                    ZeroMQTcpServer.this.log.info("BIND: starting ZeroMQ {} socket on {}", (Object)type, (Object)addr);
                }
                socket.bind(addr);
                ZeroMQTcpServer.this.notifyStart(started);
            }

            @Override
            protected ZeroMQNetChannel<IN, OUT> select(Object id) {
                return (ZeroMQNetChannel)ZeroMQTcpServer.this.select(id);
            }
        };
        this.workerFuture = this.threadPool.submit(this.worker);
        return this;
    }

    @Override
    protected <C> NetChannel<IN, OUT> createChannel(C ioChannel) {
        return new ZeroMQNetChannel(this.getEnvironment(), this.getReactor(), this.getReactor().getDispatcher(), this.getCodec());
    }

    @Override
    public Promise<Boolean> shutdown() {
        if (null == this.worker) {
            return (Promise)Promises.error((Throwable)new IllegalStateException("This ZeroMQ server has not been started")).get();
        }
        Deferred d = Promises.defer((Environment)this.getEnvironment(), (Dispatcher)this.getReactor().getDispatcher());
        super.close(null);
        this.worker.shutdown();
        if (!this.workerFuture.isDone()) {
            this.workerFuture.cancel(true);
        }
        this.threadPool.shutdownNow();
        this.notifyShutdown();
        d.accept((Object)true);
        return (Promise)d.compose();
    }
}

