/*
 * Decompiled with CFR 0.152.
 */
package reactor.net.zmq.tcp;

import com.gs.collections.api.block.procedure.Procedure2;
import com.gs.collections.api.map.MutableMap;
import com.gs.collections.impl.block.procedure.checked.CheckedProcedure2;
import com.gs.collections.impl.map.mutable.SynchronizedMutableMap;
import com.gs.collections.impl.map.mutable.UnifiedMap;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Promises;
import reactor.event.dispatch.Dispatcher;
import reactor.function.Consumer;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.net.NetChannel;
import reactor.net.Reconnect;
import reactor.net.config.ClientSocketOptions;
import reactor.net.config.SslOptions;
import reactor.net.tcp.TcpClient;
import reactor.net.zmq.ZeroMQClientSocketOptions;
import reactor.net.zmq.ZeroMQNetChannel;
import reactor.net.zmq.ZeroMQWorker;
import reactor.net.zmq.tcp.ZeroMQ;
import reactor.support.NamedDaemonThreadFactory;
import reactor.util.UUIDUtils;

public class ZeroMQTcpClient<IN, OUT>
extends TcpClient<IN, OUT> {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final MutableMap<ZeroMQWorker<IN, OUT>, Future<?>> workers = SynchronizedMutableMap.of((Map)UnifiedMap.newMap());
    private final int ioThreadCount;
    private final ZeroMQClientSocketOptions zmqOpts;
    private final ExecutorService threadPool;

    public ZeroMQTcpClient(@Nonnull Environment env, @Nonnull Reactor reactor, @Nonnull InetSocketAddress connectAddress, @Nullable ClientSocketOptions options, @Nullable SslOptions sslOptions, @Nullable Codec<Buffer, IN, OUT> codec, @Nonnull Collection<Consumer<NetChannel<IN, OUT>>> consumers) {
        super(env, reactor, connectAddress, options, sslOptions, codec, consumers);
        this.ioThreadCount = (Integer)env.getProperty("reactor.zmq.ioThreadCount", Integer.class, (Object)1);
        this.zmqOpts = options instanceof ZeroMQClientSocketOptions ? (ZeroMQClientSocketOptions)options : null;
        this.threadPool = Executors.newCachedThreadPool((ThreadFactory)new NamedDaemonThreadFactory("zmq-client"));
    }

    @Override
    public Promise<NetChannel<IN, OUT>> open() {
        Deferred d = Promises.defer((Environment)this.getEnvironment(), (Dispatcher)this.getReactor().getDispatcher());
        this.doOpen((Consumer<NetChannel<IN, OUT>>)d);
        return (Promise)d.compose();
    }

    @Override
    public Stream<NetChannel<IN, OUT>> open(Reconnect reconnect) {
        throw new IllegalStateException("Reconnects are handled transparently by the ZeroMQ network library");
    }

    @Override
    public void close(@Nullable Consumer<Boolean> onClose) {
        if (this.workers.isEmpty()) {
            throw new IllegalStateException("This ZeroMQ server has not been started");
        }
        super.close(null);
        this.workers.forEachKeyValue((Procedure2)new CheckedProcedure2<ZeroMQWorker<IN, OUT>, Future<?>>(){

            public void safeValue(ZeroMQWorker<IN, OUT> w, Future<?> f) throws Exception {
                w.shutdown();
                if (!f.isDone()) {
                    f.cancel(true);
                }
            }
        });
        this.threadPool.shutdownNow();
        this.getReactor().schedule(onClose, (Object)true);
        this.notifyShutdown();
    }

    @Override
    protected <C> NetChannel<IN, OUT> createChannel(C ioChannel) {
        final ZeroMQNetChannel ch = new ZeroMQNetChannel(this.getEnvironment(), this.getReactor(), this.getReactor().getDispatcher(), this.getCodec());
        ch.on().close(new Runnable(){

            @Override
            public void run() {
                ZeroMQTcpClient.this.notifyClose(ch);
            }
        });
        return ch;
    }

    private void doOpen(final Consumer<NetChannel<IN, OUT>> consumer) {
        final UUID id = UUIDUtils.random();
        int socketType = null != this.zmqOpts ? this.zmqOpts.socketType() : 5;
        ZContext zmq = null != this.zmqOpts ? this.zmqOpts.context() : null;
        ZeroMQWorker worker = new ZeroMQWorker<IN, OUT>(id, socketType, this.ioThreadCount, zmq){

            @Override
            protected void configure(ZMQ.Socket socket) {
                socket.setReceiveBufferSize((long)ZeroMQTcpClient.this.getOptions().rcvbuf());
                socket.setSendBufferSize((long)ZeroMQTcpClient.this.getOptions().sndbuf());
                if (ZeroMQTcpClient.this.getOptions().keepAlive()) {
                    socket.setTCPKeepAlive(1);
                }
                if (null != ZeroMQTcpClient.this.zmqOpts && null != ZeroMQTcpClient.this.zmqOpts.socketConfigurer()) {
                    ZeroMQTcpClient.this.zmqOpts.socketConfigurer().accept((Object)socket);
                }
            }

            @Override
            protected void start(final ZMQ.Socket socket) {
                String addr = ZeroMQTcpClient.this.createConnectAddress();
                if (ZeroMQTcpClient.this.log.isInfoEnabled()) {
                    String type = ZeroMQ.findSocketTypeName(socket.getType());
                    ZeroMQTcpClient.this.log.info("CONNECT: connecting ZeroMQ {} socket to {}", (Object)type, (Object)addr);
                }
                socket.connect(addr);
                ZeroMQTcpClient.this.notifyStart(new Runnable(){

                    @Override
                    public void run() {
                        ZeroMQNetChannel ch = this.select(id.toString()).setConnectionId(id.toString()).setSocket(socket);
                        consumer.accept(ch);
                    }
                });
            }

            @Override
            protected ZeroMQNetChannel<IN, OUT> select(Object id2) {
                return (ZeroMQNetChannel)ZeroMQTcpClient.this.select(id2);
            }
        };
        this.workers.put((Object)worker, this.threadPool.submit(worker));
    }

    private String createConnectAddress() {
        String addrs = null != this.zmqOpts && null != this.zmqOpts.connectAddresses() ? this.zmqOpts.connectAddresses() : "tcp://" + this.getConnectAddress().getHostString() + ":" + this.getConnectAddress().getPort();
        return addrs;
    }
}

