package reactor.io.net.impl.zmq.tcp;

import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.core.support.UUIDUtils;
import reactor.fn.Consumer;
import reactor.fn.Supplier;
import reactor.fn.tuple.Tuple2;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.Reconnect;
import reactor.io.net.config.ClientSocketOptions;
import reactor.io.net.config.SslOptions;
import reactor.io.net.impl.zmq.ZeroMQChannelStream;
import reactor.io.net.impl.zmq.ZeroMQClientSocketOptions;
import reactor.io.net.impl.zmq.ZeroMQWorker;
import reactor.io.net.tcp.TcpClient;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Stream;
import reactor.rx.action.support.DefaultSubscriber;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.broadcast.SerializedBroadcaster;

/* loaded from: input_file:reactor/io/net/impl/zmq/tcp/ZeroMQTcpClient.class */
public class ZeroMQTcpClient<IN, OUT> extends TcpClient<IN, OUT> {
    private static final Logger log = LoggerFactory.getLogger(ZeroMQTcpClient.class);
    private final int ioThreadCount;
    private final ZeroMQClientSocketOptions zmqOpts;
    private final ExecutorService threadPool;

    public ZeroMQTcpClient(@Nonnull Environment environment, @Nonnull Dispatcher dispatcher, @Nonnull Supplier<InetSocketAddress> supplier, @Nullable ClientSocketOptions clientSocketOptions, @Nullable SslOptions sslOptions, @Nullable Codec<Buffer, IN, OUT> codec) {
        super(environment, dispatcher, supplier, clientSocketOptions, sslOptions, codec);
        this.ioThreadCount = getDefaultEnvironment().getIntProperty("reactor.zmq.ioThreadCount", 1);
        if (clientSocketOptions instanceof ZeroMQClientSocketOptions) {
            this.zmqOpts = (ZeroMQClientSocketOptions) clientSocketOptions;
        } else {
            this.zmqOpts = null;
        }
        this.threadPool = Executors.newCachedThreadPool(new NamedDaemonThreadFactory("zmq-client"));
    }

    @Override // reactor.io.net.ReactorClient
    protected Stream<Tuple2<InetSocketAddress, Integer>> doStart(ReactorChannelHandler reactorChannelHandler, Reconnect reconnect) {
        throw new IllegalStateException("Reconnects are handled transparently by the ZeroMQ network library");
    }

    @Override // reactor.io.net.ReactorPeer
    protected Promise<Void> doShutdown() {
        Promise<Void> prepare = Promises.prepare();
        this.threadPool.shutdownNow();
        prepare.onComplete();
        return prepare;
    }

    protected ZeroMQChannelStream<IN, OUT> bindChannel() {
        return new ZeroMQChannelStream<>(getDefaultEnvironment(), getDefaultPrefetchSize(), getDefaultDispatcher(), getConnectAddress(), getDefaultCodec());
    }

    @Override // reactor.io.net.ReactorPeer
    protected Promise<Void> doStart(final ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> reactorChannelHandler) {
        final UUID random = UUIDUtils.random();
        final Promise<Void> prepare = Promises.prepare();
        int socketType = null != this.zmqOpts ? this.zmqOpts.socketType() : 5;
        ZContext context = null != this.zmqOpts ? this.zmqOpts.context() : null;
        final Broadcaster create = SerializedBroadcaster.create(getDefaultEnvironment());
        this.threadPool.submit(new ZeroMQWorker(random, socketType, this.ioThreadCount, context, create) { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQTcpClient.1
            @Override // reactor.io.net.impl.zmq.ZeroMQWorker
            protected void configure(ZMQ.Socket socket) {
                socket.setReceiveBufferSize(ZeroMQTcpClient.this.getOptions().rcvbuf());
                socket.setSendBufferSize(ZeroMQTcpClient.this.getOptions().sndbuf());
                if (ZeroMQTcpClient.this.getOptions().keepAlive()) {
                    socket.setTCPKeepAlive(1);
                }
                if (null == ZeroMQTcpClient.this.zmqOpts || null == ZeroMQTcpClient.this.zmqOpts.socketConfigurer()) {
                    return;
                }
                ZeroMQTcpClient.this.zmqOpts.socketConfigurer().accept(socket);
            }

            @Override // reactor.io.net.impl.zmq.ZeroMQWorker
            protected void start(ZMQ.Socket socket) {
                try {
                    String createConnectAddress = ZeroMQTcpClient.this.createConnectAddress();
                    if (ZeroMQTcpClient.log.isInfoEnabled()) {
                        ZeroMQTcpClient.log.info("CONNECT: connecting ZeroMQ {} socket to {}", ZeroMQ.findSocketTypeName(socket.getType()), createConnectAddress);
                    }
                    socket.connect(createConnectAddress);
                    final ZeroMQChannelStream<IN, OUT> socket2 = ZeroMQTcpClient.this.bindChannel().setConnectionId(random.toString()).setSocket(socket);
                    ((Publisher) reactorChannelHandler.apply(socket2)).subscribe(new DefaultSubscriber<Void>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQTcpClient.1.1
                        public void onSubscribe(Subscription subscription) {
                            subscription.request(Long.MAX_VALUE);
                        }

                        public void onComplete() {
                            ZeroMQTcpClient.log.debug("Closing Client Worker " + random);
                            socket2.close();
                        }

                        public void onError(Throwable th) {
                            ZeroMQTcpClient.log.error("Error during registration", th);
                            socket2.close();
                        }
                    });
                    create.consume(new Consumer<ZMsg>() { // from class: reactor.io.net.impl.zmq.tcp.ZeroMQTcpClient.1.2
                        public void accept(ZMsg zMsg) {
                            while (true) {
                                ZFrame pop = zMsg.pop();
                                if (null == pop) {
                                    zMsg.destroy();
                                    return;
                                } else if (socket2.getDecoder() != null) {
                                    socket2.getDecoder().apply(Buffer.wrap(pop.getData()));
                                } else {
                                    socket2.doDecoded(Buffer.wrap(pop.getData()));
                                }
                            }
                        }
                    });
                    prepare.onComplete();
                } catch (Exception e) {
                    prepare.onError(e);
                }
            }
        });
        return prepare;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String createConnectAddress() {
        return (null == this.zmqOpts || null == this.zmqOpts.connectAddresses()) ? "tcp://" + getConnectAddress().getHostString() + ":" + getConnectAddress().getPort() : this.zmqOpts.connectAddresses();
    }
}
