/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.zmq.tcp;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import org.reactivestreams.Publisher;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import reactor.Environment;
import reactor.bus.registry.Registration;
import reactor.bus.registry.Registries;
import reactor.bus.registry.Registry;
import reactor.bus.selector.Selectors;
import reactor.core.Dispatcher;
import reactor.core.support.Assert;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.codec.StandardCodecs;
import reactor.io.net.ChannelStream;
import reactor.io.net.NetStreams;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.ReactorPeer;
import reactor.io.net.Spec;
import reactor.io.net.impl.zmq.ZeroMQClientSocketOptions;
import reactor.io.net.impl.zmq.ZeroMQServerSocketOptions;
import reactor.io.net.impl.zmq.tcp.ZeroMQTcpClient;
import reactor.io.net.impl.zmq.tcp.ZeroMQTcpServer;
import reactor.io.net.tcp.TcpClient;
import reactor.io.net.tcp.TcpServer;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Streams;

public class ZeroMQ<T> {
    private static final Registry<Integer, String> SOCKET_TYPES = Registries.create();
    private final Environment env;
    private final Dispatcher dispatcher;
    private final ZContext zmqCtx;
    private final List<ReactorPeer<T, T, ChannelStream<T, T>>> peers = new ArrayList<ReactorPeer<T, T, ChannelStream<T, T>>>();
    private volatile Codec<Buffer, T, T> codec = StandardCodecs.PASS_THROUGH_CODEC;
    private volatile boolean shutdown = false;

    public ZeroMQ(Environment env) {
        this(env, env.getDefaultDispatcher());
    }

    public ZeroMQ(Environment env, String dispatcher) {
        this(env, env.getDispatcher(dispatcher));
    }

    public ZeroMQ(Environment env, Dispatcher dispatcher) {
        this.env = env;
        this.dispatcher = dispatcher;
        this.zmqCtx = new ZContext();
        this.zmqCtx.setLinger(100);
    }

    public static String findSocketTypeName(int socketType) {
        List registrations = SOCKET_TYPES.select((Object)socketType);
        if (registrations.isEmpty()) {
            return "";
        }
        return (String)((Registration)registrations.get(0)).getObject();
    }

    public ZeroMQ<T> codec(Codec<Buffer, T, T> codec) {
        this.codec = codec;
        return this;
    }

    public Promise<ChannelStream<T, T>> dealer(String addrs) {
        return this.createClient(addrs, 5);
    }

    public Promise<ChannelStream<T, T>> push(String addrs) {
        return this.createClient(addrs, 8);
    }

    public Promise<ChannelStream<T, T>> pull(String addrs) {
        return this.createServer(addrs, 7);
    }

    public Promise<ChannelStream<T, T>> request(String addrs) {
        return this.createClient(addrs, 3);
    }

    public Promise<ChannelStream<T, T>> reply(String addrs) {
        return this.createServer(addrs, 4);
    }

    public Promise<ChannelStream<T, T>> router(String addrs) {
        return this.createServer(addrs, 6);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Promise<ChannelStream<T, T>> createClient(final String addrs, final int socketType) {
        Assert.isTrue((!this.shutdown ? 1 : 0) != 0, (String)"This ZeroMQ instance has been shut down");
        TcpClient client = NetStreams.tcpClient(ZeroMQTcpClient.class, new NetStreams.TcpClientFactory<T, T>(){

            public Spec.TcpClientSpec<T, T> apply(Spec.TcpClientSpec<T, T> spec) {
                return ((Spec.TcpClientSpec)((Spec.TcpClientSpec)spec.env(ZeroMQ.this.env)).dispatcher(ZeroMQ.this.dispatcher)).codec(ZeroMQ.this.codec).options(new ZeroMQClientSocketOptions().context(ZeroMQ.this.zmqCtx).connectAddresses(addrs).socketType(socketType));
            }
        });
        final Promise promise = Promises.ready((Environment)this.env, (Dispatcher)this.dispatcher);
        client.start(new ReactorChannelHandler<T, T, ChannelStream<T, T>>(){

            public Publisher<Void> apply(ChannelStream<T, T> ttChannelStream) {
                promise.onNext(ttChannelStream);
                return Streams.never();
            }
        });
        List<ReactorPeer<T, T, ChannelStream<T, T>>> list = this.peers;
        synchronized (list) {
            this.peers.add(client);
        }
        return promise;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Promise<ChannelStream<T, T>> createServer(final String addrs, final int socketType) {
        Assert.isTrue((!this.shutdown ? 1 : 0) != 0, (String)"This ZeroMQ instance has been shut down");
        TcpServer server = NetStreams.tcpServer(ZeroMQTcpServer.class, new NetStreams.TcpServerFactory<T, T>(){

            public Spec.TcpServerSpec<T, T> apply(Spec.TcpServerSpec<T, T> spec) {
                return (Spec.TcpServerSpec)((Object)((Spec.TcpServerSpec)((Object)((Spec.TcpServerSpec)((Spec.TcpServerSpec)spec.env(ZeroMQ.this.env)).dispatcher(ZeroMQ.this.dispatcher)).codec(ZeroMQ.this.codec))).options(new ZeroMQServerSocketOptions().context(ZeroMQ.this.zmqCtx).listenAddresses(addrs).socketType(socketType)));
            }
        });
        final Promise promise = Promises.ready((Environment)this.env, (Dispatcher)this.dispatcher);
        server.start(new ReactorChannelHandler<T, T, ChannelStream<T, T>>(){

            public Publisher<Void> apply(ChannelStream<T, T> ttChannelStream) {
                promise.onNext(ttChannelStream);
                return Streams.never();
            }
        });
        List<ReactorPeer<T, T, ChannelStream<T, T>>> list = this.peers;
        synchronized (list) {
            this.peers.add(server);
        }
        return promise;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        ArrayList<ReactorPeer<T, T, ChannelStream<T, T>>> _peers;
        if (this.shutdown) {
            return;
        }
        this.shutdown = true;
        List<ReactorPeer<T, T, ChannelStream<T, T>>> list = this.peers;
        synchronized (list) {
            _peers = new ArrayList<ReactorPeer<T, T, ChannelStream<T, T>>>(this.peers);
        }
        Streams.from(_peers).flatMap((Function)new Function<ReactorPeer, Publisher<Void>>(){

            public Publisher<Void> apply(final ReactorPeer ttChannelStreamReactorPeer) {
                return ttChannelStreamReactorPeer.shutdown().onSuccess(new Consumer(){

                    public void accept(Object o) {
                        ZeroMQ.this.peers.remove(ttChannelStreamReactorPeer);
                    }
                });
            }
        }).consume();
    }

    static {
        for (Field f : ZMQ.class.getDeclaredFields()) {
            if (!Integer.TYPE.isAssignableFrom(f.getType())) continue;
            f.setAccessible(true);
            try {
                int val = f.getInt(null);
                SOCKET_TYPES.register(Selectors.$((Object)val), (Object)f.getName());
            }
            catch (IllegalAccessException illegalAccessException) {
                // empty catch block
            }
        }
    }
}

