/*
 * Decompiled with CFR 0.152.
 */
package reactor.tcp;

import java.util.NoSuchElementException;
import java.util.Queue;
import reactor.core.Environment;
import reactor.core.Observable;
import reactor.core.Reactor;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Promises;
import reactor.core.composable.spec.Streams;
import reactor.core.spec.Reactors;
import reactor.core.support.NotifyConsumer;
import reactor.event.Event;
import reactor.event.dispatch.Dispatcher;
import reactor.event.registry.Registration;
import reactor.event.selector.Selector;
import reactor.event.selector.Selectors;
import reactor.event.support.EventConsumer;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.function.batch.BatchConsumer;
import reactor.io.Buffer;
import reactor.queue.BlockingQueueFactory;
import reactor.tcp.TcpConnection;
import reactor.tcp.encoding.Codec;
import reactor.tuple.Tuple2;

public abstract class AbstractTcpConnection<IN, OUT>
implements TcpConnection<IN, OUT> {
    protected final long created = System.currentTimeMillis();
    protected final Tuple2<Selector, Object> read = Selectors.$();
    protected final Environment env;
    protected final Dispatcher ioDispatcher;
    protected final Reactor ioReactor;
    protected final Reactor eventsReactor;
    protected final Function<Buffer, IN> decoder;
    protected final Function<OUT, Buffer> encoder;
    protected final Queue<Object> replyToKeys;

    protected AbstractTcpConnection(Environment env, Codec<Buffer, IN, OUT> codec, Dispatcher ioDispatcher, Dispatcher eventsDispatcher) {
        this.env = env;
        this.ioDispatcher = ioDispatcher;
        this.ioReactor = Reactors.reactor((Environment)env, (Dispatcher)ioDispatcher);
        this.eventsReactor = Reactors.reactor((Environment)env, (Dispatcher)eventsDispatcher);
        if (null != codec) {
            this.decoder = codec.decoder((Consumer<IN>)new NotifyConsumer(this.read.getT2(), (Observable)this.eventsReactor));
            this.encoder = codec.encoder();
        } else {
            this.decoder = null;
            this.encoder = null;
        }
        this.replyToKeys = BlockingQueueFactory.createQueue();
        this.consume(new Consumer<IN>(){

            public void accept(IN in) {
                try {
                    AbstractTcpConnection.this.eventsReactor.notify(AbstractTcpConnection.this.replyToKeys.remove(), Event.wrap(in));
                }
                catch (NoSuchElementException noSuchElementException) {
                    // empty catch block
                }
            }
        });
    }

    public long getCreated() {
        return this.created;
    }

    @Override
    public void close() {
        for (Registration reg : this.ioReactor.getConsumerRegistry()) {
            reg.cancel();
        }
        for (Registration reg : this.eventsReactor.getConsumerRegistry()) {
            reg.cancel();
        }
    }

    @Override
    public Stream<IN> in() {
        final Deferred d = Streams.defer((Environment)this.env, (Dispatcher)this.eventsReactor.getDispatcher());
        this.consume(new Consumer<IN>(){

            public void accept(IN in) {
                d.accept(in);
            }
        });
        return (Stream)d.compose();
    }

    @Override
    public BatchConsumer<OUT> out() {
        return new WriteConsumer(null);
    }

    @Override
    public <T extends Throwable> TcpConnection<IN, OUT> when(Class<T> errorType, Consumer<T> errorConsumer) {
        this.eventsReactor.on(Selectors.T(errorType), (Consumer)new EventConsumer(errorConsumer));
        return this;
    }

    @Override
    public TcpConnection<IN, OUT> consume(final Consumer<IN> consumer) {
        this.eventsReactor.on((Selector)this.read.getT1(), new Consumer<Event<IN>>(){

            public void accept(Event<IN> ev) {
                consumer.accept(ev.getData());
            }
        });
        return this;
    }

    @Override
    public TcpConnection<IN, OUT> receive(final Function<IN, OUT> fn) {
        this.consume(new Consumer<IN>(){

            public void accept(IN in) {
                AbstractTcpConnection.this.send(fn.apply(in));
            }
        });
        return this;
    }

    @Override
    public TcpConnection<IN, OUT> send(Stream<OUT> data) {
        data.consume(new Consumer<OUT>(){

            public void accept(OUT out) {
                AbstractTcpConnection.this.send(out, null);
            }
        });
        return this;
    }

    @Override
    public Promise<Void> send(OUT data) {
        Deferred d = Promises.defer((Environment)this.env, (Dispatcher)this.eventsReactor.getDispatcher());
        this.send(data, (Deferred<Void, Promise<Void>>)d);
        return (Promise)d.compose();
    }

    @Override
    public TcpConnection<IN, OUT> sendAndForget(OUT data) {
        this.send(data, null);
        return this;
    }

    @Override
    public Promise<IN> sendAndReceive(OUT data) {
        Deferred d = Promises.defer((Environment)this.env, (Dispatcher)this.eventsReactor.getDispatcher());
        Tuple2 tup = Selectors.$();
        this.eventsReactor.on((Selector)tup.getT1(), (Consumer)new EventConsumer((Consumer)d)).cancelAfterUse();
        this.replyToKeys.add(tup.getT2());
        this.send(data, null);
        return (Promise)d.compose();
    }

    protected void send(OUT data, Deferred<Void, Promise<Void>> onComplete) {
        Reactors.schedule((Consumer)new WriteConsumer(onComplete), data, (Observable)this.ioReactor);
    }

    public boolean read(Buffer data) {
        if (null != this.decoder && null != data.byteBuffer()) {
            this.decoder.apply((Object)data);
        } else {
            this.eventsReactor.notify(this.read.getT2(), Event.wrap((Object)data));
        }
        return data.remaining() > 0;
    }

    protected abstract void write(Buffer var1, Deferred<Void, Promise<Void>> var2, boolean var3);

    protected abstract void write(Object var1, Deferred<Void, Promise<Void>> var2, boolean var3);

    protected abstract void flush();

    private final class WriteConsumer
    implements BatchConsumer<OUT> {
        private final Deferred<Void, Promise<Void>> onComplete;
        private volatile boolean autoflush = true;

        private WriteConsumer(Deferred<Void, Promise<Void>> onComplete) {
            this.onComplete = onComplete;
        }

        public void start() {
            this.autoflush = false;
        }

        public void end() {
            AbstractTcpConnection.this.flush();
            this.autoflush = true;
        }

        public void accept(OUT data) {
            block7: {
                try {
                    if (null != AbstractTcpConnection.this.encoder) {
                        Buffer bytes = (Buffer)AbstractTcpConnection.this.encoder.apply(data);
                        if (bytes.remaining() > 0) {
                            AbstractTcpConnection.this.write(bytes, this.onComplete, this.autoflush);
                        }
                    } else if (Buffer.class.isInstance(data)) {
                        AbstractTcpConnection.this.write((Buffer)data, this.onComplete, this.autoflush);
                    } else {
                        AbstractTcpConnection.this.write(data, this.onComplete, this.autoflush);
                    }
                }
                catch (Throwable t) {
                    AbstractTcpConnection.this.eventsReactor.notify(t.getClass(), Event.wrap((Object)t));
                    if (null == this.onComplete) break block7;
                    this.onComplete.accept(t);
                }
            }
        }
    }
}

