package reactor.tcp;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import reactor.core.Environment;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Promises;
import reactor.core.composable.spec.Streams;
import reactor.core.spec.Reactors;
import reactor.core.support.NotifyConsumer;
import reactor.event.Event;
import reactor.event.dispatch.Dispatcher;
import reactor.event.registry.Registration;
import reactor.event.selector.Selector;
import reactor.event.selector.Selectors;
import reactor.event.support.EventConsumer;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.function.batch.BatchConsumer;
import reactor.io.Buffer;
import reactor.queue.BlockingQueueFactory;
import reactor.tcp.encoding.Codec;
import reactor.tuple.Tuple2;

/* loaded from: input_file:reactor/tcp/AbstractTcpConnection.class */
public abstract class AbstractTcpConnection<IN, OUT> implements TcpConnection<IN, OUT> {
    protected final long created = System.currentTimeMillis();
    protected final Tuple2<Selector, Object> read = Selectors.$();
    protected final Environment env;
    protected final Dispatcher ioDispatcher;
    protected final Reactor ioReactor;
    protected final Reactor eventsReactor;
    protected final Function<Buffer, IN> decoder;
    protected final Function<OUT, Buffer> encoder;
    protected final Queue<Object> replyToKeys;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/tcp/AbstractTcpConnection$WriteConsumer.class */
    public final class WriteConsumer implements BatchConsumer<OUT> {
        private final Deferred<Void, Promise<Void>> onComplete;
        private volatile boolean autoflush;

        private WriteConsumer(Deferred<Void, Promise<Void>> deferred) {
            this.autoflush = true;
            this.onComplete = deferred;
        }

        public void start() {
            this.autoflush = false;
        }

        public void end() {
            AbstractTcpConnection.this.flush();
            this.autoflush = true;
        }

        public void accept(OUT out) {
            try {
                if (null != AbstractTcpConnection.this.encoder) {
                    Buffer buffer = (Buffer) AbstractTcpConnection.this.encoder.apply(out);
                    if (buffer.remaining() > 0) {
                        AbstractTcpConnection.this.write(buffer, this.onComplete, this.autoflush);
                    }
                } else if (Buffer.class.isInstance(out)) {
                    AbstractTcpConnection.this.write((Buffer) out, this.onComplete, this.autoflush);
                } else {
                    AbstractTcpConnection.this.write(out, this.onComplete, this.autoflush);
                }
            } catch (Throwable th) {
                AbstractTcpConnection.this.eventsReactor.notify(th.getClass(), Event.wrap(th));
                if (null != this.onComplete) {
                    this.onComplete.accept(th);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractTcpConnection(Environment environment, Codec<Buffer, IN, OUT> codec, Dispatcher dispatcher, Dispatcher dispatcher2) {
        this.env = environment;
        this.ioDispatcher = dispatcher;
        this.ioReactor = Reactors.reactor(environment, dispatcher);
        this.eventsReactor = Reactors.reactor(environment, dispatcher2);
        if (null != codec) {
            this.decoder = codec.decoder(new NotifyConsumer(this.read.getT2(), this.eventsReactor));
            this.encoder = codec.encoder();
        } else {
            this.decoder = null;
            this.encoder = null;
        }
        this.replyToKeys = BlockingQueueFactory.createQueue();
        consume(new Consumer<IN>() { // from class: reactor.tcp.AbstractTcpConnection.1
            public void accept(IN in) {
                try {
                    AbstractTcpConnection.this.eventsReactor.notify(AbstractTcpConnection.this.replyToKeys.remove(), Event.wrap(in));
                } catch (NoSuchElementException e) {
                }
            }
        });
    }

    public long getCreated() {
        return this.created;
    }

    @Override // reactor.tcp.TcpConnection
    public void close() {
        Iterator it = this.ioReactor.getConsumerRegistry().iterator();
        while (it.hasNext()) {
            ((Registration) it.next()).cancel();
        }
        Iterator it2 = this.eventsReactor.getConsumerRegistry().iterator();
        while (it2.hasNext()) {
            ((Registration) it2.next()).cancel();
        }
    }

    @Override // reactor.tcp.TcpConnection
    public Stream<IN> in() {
        final Deferred defer = Streams.defer(this.env, this.eventsReactor.getDispatcher());
        consume(new Consumer<IN>() { // from class: reactor.tcp.AbstractTcpConnection.2
            public void accept(IN in) {
                defer.accept(in);
            }
        });
        return defer.compose();
    }

    @Override // reactor.tcp.TcpConnection
    public BatchConsumer<OUT> out() {
        return new WriteConsumer(null);
    }

    @Override // reactor.tcp.TcpConnection
    public <T extends Throwable> TcpConnection<IN, OUT> when(Class<T> cls, Consumer<T> consumer) {
        this.eventsReactor.on(Selectors.T(cls), new EventConsumer(consumer));
        return this;
    }

    @Override // reactor.tcp.TcpConnection
    public TcpConnection<IN, OUT> consume(final Consumer<IN> consumer) {
        this.eventsReactor.on((Selector) this.read.getT1(), new Consumer<Event<IN>>() { // from class: reactor.tcp.AbstractTcpConnection.3
            public void accept(Event<IN> event) {
                consumer.accept(event.getData());
            }
        });
        return this;
    }

    @Override // reactor.tcp.TcpConnection
    public TcpConnection<IN, OUT> receive(final Function<IN, OUT> function) {
        consume(new Consumer<IN>() { // from class: reactor.tcp.AbstractTcpConnection.4
            /* JADX WARN: Multi-variable type inference failed */
            public void accept(IN in) {
                AbstractTcpConnection.this.send((AbstractTcpConnection) function.apply(in));
            }
        });
        return this;
    }

    @Override // reactor.tcp.TcpConnection
    public TcpConnection<IN, OUT> send(Stream<OUT> stream) {
        stream.consume(new Consumer<OUT>() { // from class: reactor.tcp.AbstractTcpConnection.5
            public void accept(OUT out) {
                AbstractTcpConnection.this.send(out, null);
            }
        });
        return this;
    }

    @Override // reactor.tcp.TcpConnection
    public Promise<Void> send(OUT out) {
        Deferred<Void, Promise<Void>> defer = Promises.defer(this.env, this.eventsReactor.getDispatcher());
        send(out, defer);
        return defer.compose();
    }

    @Override // reactor.tcp.TcpConnection
    public TcpConnection<IN, OUT> sendAndForget(OUT out) {
        send(out, null);
        return this;
    }

    @Override // reactor.tcp.TcpConnection
    public Promise<IN> sendAndReceive(OUT out) {
        Deferred defer = Promises.defer(this.env, this.eventsReactor.getDispatcher());
        Tuple2 $ = Selectors.$();
        this.eventsReactor.on((Selector) $.getT1(), new EventConsumer(defer)).cancelAfterUse();
        this.replyToKeys.add($.getT2());
        send(out, null);
        return defer.compose();
    }

    protected void send(OUT out, Deferred<Void, Promise<Void>> deferred) {
        Reactors.schedule(new WriteConsumer(deferred), out, this.ioReactor);
    }

    public boolean read(Buffer buffer) {
        if (null == this.decoder || null == buffer.byteBuffer()) {
            this.eventsReactor.notify(this.read.getT2(), Event.wrap(buffer));
        } else {
            this.decoder.apply(buffer);
        }
        return buffer.remaining() > 0;
    }

    protected abstract void write(Buffer buffer, Deferred<Void, Promise<Void>> deferred, boolean z);

    protected abstract void write(Object obj, Deferred<Void, Promise<Void>> deferred, boolean z);

    protected abstract void flush();
}
