package reactor.net.zmq.tcp;

import com.gs.collections.api.list.MutableList;
import com.gs.collections.impl.block.function.checked.CheckedFunction0;
import com.gs.collections.impl.block.predicate.checked.CheckedPredicate;
import com.gs.collections.impl.list.mutable.FastList;
import com.gs.collections.impl.list.mutable.SynchronizedMutableList;
import com.gs.collections.impl.map.mutable.SynchronizedMutableMap;
import com.gs.collections.impl.map.mutable.UnifiedMap;
import java.lang.reflect.Field;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.spec.Promises;
import reactor.core.spec.Reactors;
import reactor.event.dispatch.Dispatcher;
import reactor.function.Consumer;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.io.encoding.StandardCodecs;
import reactor.net.NetChannel;
import reactor.net.tcp.TcpClient;
import reactor.net.tcp.TcpServer;
import reactor.net.tcp.spec.TcpClientSpec;
import reactor.net.tcp.spec.TcpServerSpec;
import reactor.net.zmq.ZeroMQClientSocketOptions;
import reactor.net.zmq.ZeroMQServerSocketOptions;
import reactor.support.NamedDaemonThreadFactory;
import reactor.util.Assert;

/* loaded from: input_file:reactor/net/zmq/tcp/ZeroMQ.class */
public class ZeroMQ<T> {
    private static final SynchronizedMutableMap<Integer, String> SOCKET_TYPES = SynchronizedMutableMap.of(UnifiedMap.newMap());
    private final Logger log;
    private final MutableList<TcpClient<T, T>> clients;
    private final MutableList<TcpServer<T, T>> servers;
    private final ExecutorService threadPool;
    private final Environment env;
    private final Dispatcher dispatcher;

    /* renamed from: reactor, reason: collision with root package name */
    private final Reactor f1reactor;
    private final ZContext zmqCtx;
    private volatile Codec<Buffer, T, T> codec;
    private volatile boolean shutdown;

    public ZeroMQ(Environment environment) {
        this(environment, environment.getDefaultDispatcher());
    }

    public ZeroMQ(Environment environment, String str) {
        this(environment, environment.getDispatcher(str));
    }

    public ZeroMQ(Environment environment, Dispatcher dispatcher) {
        this.log = LoggerFactory.getLogger(getClass());
        this.clients = SynchronizedMutableList.of(FastList.newList());
        this.servers = SynchronizedMutableList.of(FastList.newList());
        this.threadPool = Executors.newCachedThreadPool(new NamedDaemonThreadFactory("zmq"));
        this.codec = StandardCodecs.PASS_THROUGH_CODEC;
        this.shutdown = false;
        this.env = environment;
        this.dispatcher = dispatcher;
        this.f1reactor = Reactors.reactor(environment, dispatcher);
        this.zmqCtx = new ZContext();
        this.zmqCtx.setLinger(100);
    }

    public static String findSocketTypeName(final int i) {
        return (String) SOCKET_TYPES.getIfAbsentPut(Integer.valueOf(i), new CheckedFunction0<String>() { // from class: reactor.net.zmq.tcp.ZeroMQ.1
            /* renamed from: safeValue, reason: merged with bridge method [inline-methods] */
            public String m11safeValue() throws Exception {
                for (Field field : ZMQ.class.getDeclaredFields()) {
                    if (Integer.TYPE.isAssignableFrom(field.getType())) {
                        field.setAccessible(true);
                        try {
                            if (i == field.getInt(null)) {
                                return field.getName();
                            }
                            continue;
                        } catch (IllegalAccessException e) {
                        }
                    }
                }
                return "";
            }
        });
    }

    public ZeroMQ<T> codec(Codec<Buffer, T, T> codec) {
        this.codec = codec;
        return this;
    }

    public Promise<NetChannel<T, T>> dealer(String str) {
        return createClient(str, 5);
    }

    public Promise<NetChannel<T, T>> push(String str) {
        return createClient(str, 8);
    }

    public Promise<NetChannel<T, T>> pull(String str) {
        return createServer(str, 7);
    }

    public Promise<NetChannel<T, T>> request(String str) {
        return createClient(str, 3);
    }

    public Promise<NetChannel<T, T>> reply(String str) {
        return createServer(str, 4);
    }

    public Promise<NetChannel<T, T>> router(String str) {
        return createServer(str, 6);
    }

    private Promise<NetChannel<T, T>> createClient(String str, int i) {
        Assert.isTrue(!this.shutdown, "This ZeroMQ instance has been shut down");
        TcpClient tcpClient = (TcpClient) new TcpClientSpec(ZeroMQTcpClient.class).env(this.env).dispatcher(this.dispatcher).codec(this.codec).options(new ZeroMQClientSocketOptions().context(this.zmqCtx).connectAddresses(str).socketType(i)).get();
        this.clients.add(tcpClient);
        return tcpClient.open();
    }

    public Promise<NetChannel<T, T>> createServer(String str, int i) {
        Assert.isTrue(!this.shutdown, "This ZeroMQ instance has been shut down");
        Deferred defer = Promises.defer(this.env, this.dispatcher);
        TcpServer tcpServer = (TcpServer) new TcpServerSpec(ZeroMQTcpServer.class).env(this.env).dispatcher(this.dispatcher).codec(this.codec).options(new ZeroMQServerSocketOptions().context(this.zmqCtx).listenAddresses(str).socketType(i)).consume((Consumer) defer).get();
        this.servers.add(tcpServer);
        tcpServer.start();
        return defer.compose();
    }

    public void shutdown() {
        if (this.shutdown) {
            return;
        }
        this.shutdown = true;
        this.servers.removeIf(new CheckedPredicate<TcpServer<T, T>>() { // from class: reactor.net.zmq.tcp.ZeroMQ.2
            public boolean safeAccept(TcpServer<T, T> tcpServer) throws Exception {
                Assert.isTrue(((Boolean) tcpServer.shutdown().await(60L, TimeUnit.SECONDS)).booleanValue(), "Server " + tcpServer + " not properly shut down");
                return true;
            }
        });
        this.clients.removeIf(new CheckedPredicate<TcpClient<T, T>>() { // from class: reactor.net.zmq.tcp.ZeroMQ.3
            public boolean safeAccept(TcpClient<T, T> tcpClient) throws Exception {
                Assert.isTrue(((Boolean) tcpClient.close().await(60L, TimeUnit.SECONDS)).booleanValue(), "Client " + tcpClient + " not properly shut down");
                return true;
            }
        });
    }
}
