/*
 * Decompiled with CFR 0.152.
 */
package reactor.net.zmq.tcp;

import com.gs.collections.api.block.function.Function0;
import com.gs.collections.api.block.predicate.Predicate;
import com.gs.collections.api.list.MutableList;
import com.gs.collections.impl.block.function.checked.CheckedFunction0;
import com.gs.collections.impl.block.predicate.checked.CheckedPredicate;
import com.gs.collections.impl.list.mutable.FastList;
import com.gs.collections.impl.list.mutable.SynchronizedMutableList;
import com.gs.collections.impl.map.mutable.SynchronizedMutableMap;
import com.gs.collections.impl.map.mutable.UnifiedMap;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.spec.Promises;
import reactor.core.spec.Reactors;
import reactor.event.dispatch.Dispatcher;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.io.encoding.StandardCodecs;
import reactor.net.NetChannel;
import reactor.net.tcp.TcpClient;
import reactor.net.tcp.TcpServer;
import reactor.net.tcp.spec.TcpClientSpec;
import reactor.net.tcp.spec.TcpServerSpec;
import reactor.net.zmq.ZeroMQClientSocketOptions;
import reactor.net.zmq.ZeroMQServerSocketOptions;
import reactor.net.zmq.tcp.ZeroMQTcpClient;
import reactor.net.zmq.tcp.ZeroMQTcpServer;
import reactor.support.NamedDaemonThreadFactory;
import reactor.util.Assert;

public class ZeroMQ<T> {
    private static final SynchronizedMutableMap<Integer, String> SOCKET_TYPES = SynchronizedMutableMap.of((Map)UnifiedMap.newMap());
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final MutableList<TcpClient<T, T>> clients = SynchronizedMutableList.of((List)FastList.newList());
    private final MutableList<TcpServer<T, T>> servers = SynchronizedMutableList.of((List)FastList.newList());
    private final ExecutorService threadPool = Executors.newCachedThreadPool((ThreadFactory)new NamedDaemonThreadFactory("zmq"));
    private final Environment env;
    private final Dispatcher dispatcher;
    private final Reactor reactor;
    private final ZContext zmqCtx;
    private volatile Codec<Buffer, T, T> codec = StandardCodecs.PASS_THROUGH_CODEC;
    private volatile boolean shutdown = false;

    public ZeroMQ(Environment env) {
        this(env, env.getDefaultDispatcher());
    }

    public ZeroMQ(Environment env, String dispatcher) {
        this(env, env.getDispatcher(dispatcher));
    }

    public ZeroMQ(Environment env, Dispatcher dispatcher) {
        this.env = env;
        this.dispatcher = dispatcher;
        this.reactor = Reactors.reactor((Environment)env, (Dispatcher)dispatcher);
        this.zmqCtx = new ZContext();
        this.zmqCtx.setLinger(100);
    }

    public static String findSocketTypeName(final int socketType) {
        return (String)SOCKET_TYPES.getIfAbsentPut((Object)socketType, (Function0)new CheckedFunction0<String>(){

            public String safeValue() throws Exception {
                for (Field f : ZMQ.class.getDeclaredFields()) {
                    if (!Integer.TYPE.isAssignableFrom(f.getType())) continue;
                    f.setAccessible(true);
                    try {
                        int val = f.getInt(null);
                        if (socketType != val) continue;
                        return f.getName();
                    }
                    catch (IllegalAccessException e) {
                        // empty catch block
                    }
                }
                return "";
            }
        });
    }

    public ZeroMQ<T> codec(Codec<Buffer, T, T> codec) {
        this.codec = codec;
        return this;
    }

    public Promise<NetChannel<T, T>> dealer(String addrs) {
        return this.createClient(addrs, 5);
    }

    public Promise<NetChannel<T, T>> push(String addrs) {
        return this.createClient(addrs, 8);
    }

    public Promise<NetChannel<T, T>> pull(String addrs) {
        return this.createServer(addrs, 7);
    }

    public Promise<NetChannel<T, T>> request(String addrs) {
        return this.createClient(addrs, 3);
    }

    public Promise<NetChannel<T, T>> reply(String addrs) {
        return this.createServer(addrs, 4);
    }

    public Promise<NetChannel<T, T>> router(String addrs) {
        return this.createServer(addrs, 6);
    }

    private Promise<NetChannel<T, T>> createClient(String addrs, int socketType) {
        Assert.isTrue((!this.shutdown ? 1 : 0) != 0, (String)"This ZeroMQ instance has been shut down");
        TcpClient client = (TcpClient)((TcpClientSpec)((TcpClientSpec)new TcpClientSpec(ZeroMQTcpClient.class).env(this.env)).dispatcher(this.dispatcher)).codec(this.codec).options(new ZeroMQClientSocketOptions().context(this.zmqCtx).connectAddresses(addrs).socketType(socketType)).get();
        this.clients.add((Object)client);
        return client.open();
    }

    public Promise<NetChannel<T, T>> createServer(String addrs, int socketType) {
        Assert.isTrue((!this.shutdown ? 1 : 0) != 0, (String)"This ZeroMQ instance has been shut down");
        Deferred d = Promises.defer((Environment)this.env, (Dispatcher)this.dispatcher);
        TcpServer server = (TcpServer)((TcpServerSpec)((Object)((TcpServerSpec)((Object)((TcpServerSpec)((Object)((TcpServerSpec)((TcpServerSpec)new TcpServerSpec(ZeroMQTcpServer.class).env(this.env)).dispatcher(this.dispatcher)).codec(this.codec))).options(new ZeroMQServerSocketOptions().context(this.zmqCtx).listenAddresses(addrs).socketType(socketType)))).consume(d))).get();
        this.servers.add((Object)server);
        server.start();
        return (Promise)d.compose();
    }

    public void shutdown() {
        if (this.shutdown) {
            return;
        }
        this.shutdown = true;
        this.servers.removeIf((Predicate)new CheckedPredicate<TcpServer<T, T>>(){

            public boolean safeAccept(TcpServer<T, T> server) throws Exception {
                Assert.isTrue((boolean)((Boolean)server.shutdown().await(60L, TimeUnit.SECONDS)), (String)("Server " + server + " not properly shut down"));
                return true;
            }
        });
        this.clients.removeIf((Predicate)new CheckedPredicate<TcpClient<T, T>>(){

            public boolean safeAccept(TcpClient<T, T> client) throws Exception {
                Assert.isTrue((boolean)((Boolean)client.close().await(60L, TimeUnit.SECONDS)), (String)("Client " + client + " not properly shut down"));
                return true;
            }
        });
    }
}

