package reactor.tcp;

import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Promises;
import reactor.core.spec.Reactors;
import reactor.event.Event;
import reactor.event.registry.Registration;
import reactor.event.selector.Selector;
import reactor.event.selector.Selectors;
import reactor.function.Consumer;
import reactor.io.Buffer;
import reactor.tcp.config.ClientSocketOptions;
import reactor.tcp.config.SslOptions;
import reactor.tcp.encoding.Codec;
import reactor.tuple.Tuple2;
import reactor.util.Assert;

/* loaded from: input_file:reactor/tcp/TcpClient.class */
public abstract class TcpClient<IN, OUT> {

    /* renamed from: reactor, reason: collision with root package name */
    private final Reactor f0reactor;
    private final Codec<Buffer, IN, OUT> codec;
    protected final Environment env;
    private final Tuple2<Selector, Object> open = Selectors.$();
    private final Tuple2<Selector, Object> close = Selectors.$();
    protected final Map<Object, WeakReference<TcpConnection<IN, OUT>>> connections = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: protected */
    public TcpClient(@Nonnull Environment environment, @Nonnull Reactor reactor2, @Nonnull InetSocketAddress inetSocketAddress, @Nullable ClientSocketOptions clientSocketOptions, @Nullable SslOptions sslOptions, @Nullable Codec<Buffer, IN, OUT> codec) {
        Assert.notNull(environment, "A TcpClient cannot be created without a properly-configured Environment.");
        Assert.notNull(reactor2, "A TcpClient cannot be created without a properly-configured Reactor.");
        Assert.notNull(inetSocketAddress, "A TcpClient cannot be created without a properly-configured connect InetSocketAddress.");
        this.env = environment;
        this.f0reactor = reactor2;
        this.codec = codec;
    }

    public abstract Promise<TcpConnection<IN, OUT>> open();

    public abstract Stream<TcpConnection<IN, OUT>> open(Reconnect reconnect);

    public Promise<Void> close() {
        final Deferred defer = Promises.defer(this.env, this.f0reactor.getDispatcher());
        Reactors.schedule(new Consumer<Void>() { // from class: reactor.tcp.TcpClient.1
            public void accept(Void r4) {
                for (Map.Entry<Object, WeakReference<TcpConnection<IN, OUT>>> entry : TcpClient.this.connections.entrySet()) {
                    if (null != entry.getValue().get()) {
                        entry.getValue().get().close();
                    }
                }
                TcpClient.this.doClose(defer);
            }
        }, (Object) null, this.f0reactor);
        return defer.compose();
    }

    protected <C> Registration<? extends TcpConnection<IN, OUT>> register(@Nonnull final C c, @Nonnull final TcpConnection<IN, OUT> tcpConnection) {
        Assert.notNull(c, "Channel cannot be null.");
        Assert.notNull(tcpConnection, "TcpConnection cannot be null.");
        this.connections.put(c, new WeakReference<>(tcpConnection));
        return new Registration<TcpConnection<IN, OUT>>() { // from class: reactor.tcp.TcpClient.2
            public Selector getSelector() {
                return null;
            }

            /* renamed from: getObject, reason: merged with bridge method [inline-methods] */
            public TcpConnection<IN, OUT> m0getObject() {
                return tcpConnection;
            }

            public Registration<TcpConnection<IN, OUT>> cancelAfterUse() {
                return this;
            }

            public boolean isCancelAfterUse() {
                return false;
            }

            public Registration<TcpConnection<IN, OUT>> cancel() {
                TcpClient.this.connections.remove(c);
                return this;
            }

            public boolean isCancelled() {
                return !TcpClient.this.connections.containsKey(c);
            }

            public Registration<TcpConnection<IN, OUT>> pause() {
                return this;
            }

            public boolean isPaused() {
                return false;
            }

            public Registration<TcpConnection<IN, OUT>> resume() {
                return this;
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <C> TcpConnection<IN, OUT> select(@Nonnull C c) {
        Assert.notNull(c, "Channel cannot be null.");
        if (this.connections.containsKey(c)) {
            return this.connections.get(c).get();
        }
        TcpConnection<IN, OUT> createConnection = createConnection(c);
        register(c, createConnection);
        notifyOpen(createConnection);
        return createConnection;
    }

    protected <C> void close(@Nonnull C c) {
        Assert.notNull(c, "Channel cannot be null");
        WeakReference<TcpConnection<IN, OUT>> remove = this.connections.remove(c);
        if (null == remove || null == remove.get()) {
            return;
        }
        remove.get().close();
    }

    protected abstract <C> TcpConnection<IN, OUT> createConnection(C c);

    protected void notifyError(@Nonnull Throwable th) {
        Assert.notNull(th, "Error cannot be null.");
        this.f0reactor.notify(th.getClass(), Event.wrap(th));
    }

    protected void notifyOpen(@Nonnull TcpConnection<IN, OUT> tcpConnection) {
        this.f0reactor.notify(this.open.getT2(), Event.wrap(tcpConnection));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifyClose(@Nonnull TcpConnection<IN, OUT> tcpConnection) {
        this.f0reactor.notify(this.close.getT2(), Event.wrap(tcpConnection));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nullable
    public Codec<Buffer, IN, OUT> getCodec() {
        return this.codec;
    }

    protected abstract void doClose(Deferred<Void, Promise<Void>> deferred);
}
