package reactor.io.net.impl.zmq.tcp;

import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.Assert;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.core.support.UUIDUtils;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.config.ServerSocketOptions;
import reactor.io.net.config.SslOptions;
import reactor.io.net.impl.zmq.ZeroMQChannelStream;
import reactor.io.net.impl.zmq.ZeroMQServerSocketOptions;
import reactor.io.net.impl.zmq.ZeroMQWorker;
import reactor.io.net.tcp.TcpServer;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Stream;
import reactor.rx.action.support.DefaultSubscriber;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.broadcast.SerializedBroadcaster;
import reactor.rx.stream.GroupedStream;

/* loaded from: input_file:reactor/io/net/impl/zmq/tcp/ZeroMQTcpServer.class */
public class ZeroMQTcpServer<IN, OUT> extends TcpServer<IN, OUT> {
    private static final Logger log = LoggerFactory.getLogger(ZeroMQTcpServer.class);
    private final int ioThreadCount;
    private final ZeroMQServerSocketOptions zmqOpts;
    private final ExecutorService threadPool;
    private volatile ZeroMQWorker worker;
    private volatile Future<?> workerFuture;

    public ZeroMQTcpServer(Environment environment, Dispatcher dispatcher, InetSocketAddress inetSocketAddress, ServerSocketOptions serverSocketOptions, SslOptions sslOptions, Codec<Buffer, IN, OUT> codec) {
        super(environment, dispatcher, inetSocketAddress, serverSocketOptions, sslOptions, codec);
        this.ioThreadCount = getDefaultEnvironment().getIntProperty("reactor.zmq.ioThreadCount", 1);
        if (serverSocketOptions instanceof ZeroMQServerSocketOptions) {
            this.zmqOpts = (ZeroMQServerSocketOptions) serverSocketOptions;
        } else {
            this.zmqOpts = null;
        }
        this.threadPool = Executors.newCachedThreadPool(new NamedDaemonThreadFactory("zmq-server"));
    }

    @Override // reactor.io.net.ReactorPeer
    protected Promise<Void> doStart(final ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> reactorChannelHandler) {
        Assert.isNull(this.worker, "This ZeroMQ server has already been started");
        final Promise<Void> ready = Promises.ready(getDefaultEnvironment(), getDefaultDispatcher());
        final UUID random = UUIDUtils.random();
        final int socketType = null != this.zmqOpts ? this.zmqOpts.socketType() : 6;
        ZContext context = null != this.zmqOpts ? this.zmqOpts.context() : null;
        Broadcaster create = SerializedBroadcaster.create(getDefaultEnvironment());
        final Stream groupBy = create.groupBy(new Function<ZMsg, String>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQTcpServer.1
            public String apply(ZMsg zMsg) {
                String uuid;
                switch (socketType) {
                    case 6:
                        uuid = zMsg.popString();
                        break;
                    default:
                        uuid = random.toString();
                        break;
                }
                return uuid;
            }
        });
        this.worker = new ZeroMQWorker(random, socketType, this.ioThreadCount, context, create) { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQTcpServer.2
            @Override // reactor.io.net.impl.zmq.ZeroMQWorker
            protected void configure(ZMQ.Socket socket) {
                socket.setReceiveBufferSize(ZeroMQTcpServer.this.getOptions().rcvbuf());
                socket.setSendBufferSize(ZeroMQTcpServer.this.getOptions().sndbuf());
                socket.setBacklog(ZeroMQTcpServer.this.getOptions().backlog());
                if (ZeroMQTcpServer.this.getOptions().keepAlive()) {
                    socket.setTCPKeepAlive(1);
                }
                if (null == ZeroMQTcpServer.this.zmqOpts || null == ZeroMQTcpServer.this.zmqOpts.socketConfigurer()) {
                    return;
                }
                ZeroMQTcpServer.this.zmqOpts.socketConfigurer().accept(socket);
            }

            @Override // reactor.io.net.impl.zmq.ZeroMQWorker
            protected void start(final ZMQ.Socket socket) {
                try {
                    String listenAddresses = (null == ZeroMQTcpServer.this.zmqOpts || null == ZeroMQTcpServer.this.zmqOpts.listenAddresses()) ? "tcp://" + ZeroMQTcpServer.this.getListenAddress().getHostString() + ":" + ZeroMQTcpServer.this.getListenAddress().getPort() : ZeroMQTcpServer.this.zmqOpts.listenAddresses();
                    if (ZeroMQTcpServer.log.isInfoEnabled()) {
                        ZeroMQTcpServer.log.info("BIND: starting ZeroMQ {} socket on {}", ZeroMQ.findSocketTypeName(socket.getType()), listenAddresses);
                    }
                    socket.bind(listenAddresses);
                    groupBy.consume(new Consumer<GroupedStream<String, ZMsg>>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQTcpServer.2.1
                        public void accept(final GroupedStream<String, ZMsg> groupedStream) {
                            final ZeroMQChannelStream<IN, OUT> socket2 = ZeroMQTcpServer.this.bindChannel().setConnectionId((String) groupedStream.key()).setSocket(socket);
                            ((Publisher) reactorChannelHandler.apply(socket2)).subscribe(new DefaultSubscriber<Void>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQTcpServer.2.1.1
                                public void onSubscribe(Subscription subscription) {
                                    subscription.request(Long.MAX_VALUE);
                                }

                                public void onComplete() {
                                    ZeroMQTcpServer.log.debug("Closing handler " + ((String) groupedStream.key()));
                                    socket2.close();
                                }

                                public void onError(Throwable th) {
                                    ZeroMQTcpServer.log.error("Error during registration " + ((String) groupedStream.key()), th);
                                    socket2.close();
                                }
                            });
                            groupedStream.consume(new Consumer<ZMsg>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQTcpServer.2.1.2
                                public void accept(ZMsg zMsg) {
                                    while (true) {
                                        ZFrame pop = zMsg.pop();
                                        if (null == pop) {
                                            zMsg.destroy();
                                            return;
                                        } else if (socket2.getDecoder() != null) {
                                            socket2.getDecoder().apply(Buffer.wrap(pop.getData()));
                                        } else {
                                            socket2.doDecoded(Buffer.wrap(pop.getData()));
                                        }
                                    }
                                }
                            }, (Consumer) null, new Consumer<Void>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQTcpServer.2.1.3
                                public void accept(Void r3) {
                                    socket2.close();
                                }
                            });
                        }
                    });
                    ready.onComplete();
                } catch (Exception e) {
                    ready.onError(e);
                }
            }
        };
        this.workerFuture = this.threadPool.submit(this.worker);
        return ready;
    }

    protected ZeroMQChannelStream<IN, OUT> bindChannel() {
        return new ZeroMQChannelStream<>(getDefaultEnvironment(), getDefaultPrefetchSize(), getDefaultDispatcher(), null, getDefaultCodec());
    }

    @Override // reactor.io.net.ReactorPeer
    protected Promise<Void> doShutdown() {
        if (null == this.worker) {
            return Promises.error(new IllegalStateException("This ZeroMQ server has not been started"));
        }
        this.worker.shutdown();
        if (!this.workerFuture.isDone()) {
            this.workerFuture.cancel(true);
        }
        this.threadPool.shutdownNow();
        return Promises.success();
    }
}
