/*
 * Decompiled with CFR 0.152.
 */
package reactor.tcp;

import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import reactor.core.Environment;
import reactor.core.Observable;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Promises;
import reactor.core.spec.Reactors;
import reactor.event.Event;
import reactor.event.dispatch.Dispatcher;
import reactor.event.registry.Registration;
import reactor.event.selector.Selector;
import reactor.event.selector.Selectors;
import reactor.function.Consumer;
import reactor.io.Buffer;
import reactor.tcp.Reconnect;
import reactor.tcp.TcpConnection;
import reactor.tcp.config.ClientSocketOptions;
import reactor.tcp.config.SslOptions;
import reactor.tcp.encoding.Codec;
import reactor.tuple.Tuple2;
import reactor.util.Assert;

public abstract class TcpClient<IN, OUT> {
    private final Tuple2<Selector, Object> open = Selectors.$();
    private final Tuple2<Selector, Object> close = Selectors.$();
    private final Reactor reactor;
    private final Codec<Buffer, IN, OUT> codec;
    protected final Map<Object, WeakReference<TcpConnection<IN, OUT>>> connections = new ConcurrentHashMap<Object, WeakReference<TcpConnection<IN, OUT>>>();
    protected final Environment env;

    protected TcpClient(@Nonnull Environment env, @Nonnull Reactor reactor, @Nonnull InetSocketAddress connectAddress, @Nullable ClientSocketOptions options, @Nullable SslOptions sslOptions, @Nullable Codec<Buffer, IN, OUT> codec) {
        Assert.notNull((Object)env, (String)"A TcpClient cannot be created without a properly-configured Environment.");
        Assert.notNull((Object)reactor, (String)"A TcpClient cannot be created without a properly-configured Reactor.");
        Assert.notNull((Object)connectAddress, (String)"A TcpClient cannot be created without a properly-configured connect InetSocketAddress.");
        this.env = env;
        this.reactor = reactor;
        this.codec = codec;
    }

    public abstract Promise<TcpConnection<IN, OUT>> open();

    public abstract Stream<TcpConnection<IN, OUT>> open(Reconnect var1);

    public Promise<Void> close() {
        final Deferred d = Promises.defer((Environment)this.env, (Dispatcher)this.reactor.getDispatcher());
        Reactors.schedule((Consumer)new Consumer<Void>(){

            public void accept(Void v) {
                for (Map.Entry entry : TcpClient.this.connections.entrySet()) {
                    if (null == entry.getValue().get()) continue;
                    ((TcpConnection)entry.getValue().get()).close();
                }
                TcpClient.this.doClose((Deferred<Void, Promise<Void>>)d);
            }
        }, null, (Observable)this.reactor);
        return (Promise)d.compose();
    }

    protected <C> Registration<? extends TcpConnection<IN, OUT>> register(final @Nonnull C channel, final @Nonnull TcpConnection<IN, OUT> connection) {
        Assert.notNull(channel, (String)"Channel cannot be null.");
        Assert.notNull(connection, (String)"TcpConnection cannot be null.");
        this.connections.put(channel, new WeakReference<TcpConnection<IN, OUT>>(connection));
        return new Registration<TcpConnection<IN, OUT>>(){

            public Selector getSelector() {
                return null;
            }

            public TcpConnection<IN, OUT> getObject() {
                return connection;
            }

            public Registration<TcpConnection<IN, OUT>> cancelAfterUse() {
                return this;
            }

            public boolean isCancelAfterUse() {
                return false;
            }

            public Registration<TcpConnection<IN, OUT>> cancel() {
                TcpClient.this.connections.remove(channel);
                return this;
            }

            public boolean isCancelled() {
                return !TcpClient.this.connections.containsKey(channel);
            }

            public Registration<TcpConnection<IN, OUT>> pause() {
                return this;
            }

            public boolean isPaused() {
                return false;
            }

            public Registration<TcpConnection<IN, OUT>> resume() {
                return this;
            }
        };
    }

    protected <C> TcpConnection<IN, OUT> select(@Nonnull C channel) {
        Assert.notNull(channel, (String)"Channel cannot be null.");
        if (!this.connections.containsKey(channel)) {
            TcpConnection<IN, OUT> conn = this.createConnection(channel);
            this.register(channel, conn);
            this.notifyOpen(conn);
            return conn;
        }
        TcpConnection conn = (TcpConnection)this.connections.get(channel).get();
        return conn;
    }

    protected <C> void close(@Nonnull C channel) {
        Assert.notNull(channel, (String)"Channel cannot be null");
        WeakReference<TcpConnection<IN, OUT>> ref = this.connections.remove(channel);
        if (null != ref && null != ref.get()) {
            ((TcpConnection)ref.get()).close();
        }
    }

    protected abstract <C> TcpConnection<IN, OUT> createConnection(C var1);

    protected void notifyError(@Nonnull Throwable error) {
        Assert.notNull((Object)error, (String)"Error cannot be null.");
        this.reactor.notify(error.getClass(), Event.wrap((Object)error));
    }

    protected void notifyOpen(@Nonnull TcpConnection<IN, OUT> conn) {
        this.reactor.notify(this.open.getT2(), Event.wrap(conn));
    }

    protected void notifyClose(@Nonnull TcpConnection<IN, OUT> conn) {
        this.reactor.notify(this.close.getT2(), Event.wrap(conn));
    }

    @Nullable
    protected Codec<Buffer, IN, OUT> getCodec() {
        return this.codec;
    }

    protected abstract void doClose(Deferred<Void, Promise<Void>> var1);
}

