/*
 * Decompiled with CFR 0.152.
 */
package reactor.tcp.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import reactor.core.Environment;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.event.Event;
import reactor.event.dispatch.Dispatcher;
import reactor.io.Buffer;
import reactor.tcp.AbstractTcpConnection;
import reactor.tcp.TcpConnection;
import reactor.tcp.encoding.Codec;

public class NettyTcpConnection<IN, OUT>
extends AbstractTcpConnection<IN, OUT> {
    private final SocketChannel channel;
    private final NettyTcpConnectionConsumerSpec consumerSpec;
    private final InetSocketAddress remoteAddress;
    private volatile boolean closing = false;

    NettyTcpConnection(Environment env, Codec<Buffer, IN, OUT> codec, Dispatcher ioDispatcher, Dispatcher eventsDispatcher, SocketChannel channel) {
        this(env, codec, ioDispatcher, eventsDispatcher, channel, null);
    }

    NettyTcpConnection(Environment env, Codec<Buffer, IN, OUT> codec, Dispatcher ioDispatcher, Dispatcher eventsDispatcher, SocketChannel channel, InetSocketAddress remoteAddress) {
        super(env, codec, ioDispatcher, eventsDispatcher);
        this.channel = channel;
        this.consumerSpec = new NettyTcpConnectionConsumerSpec(channel);
        this.remoteAddress = remoteAddress;
    }

    public SocketChannel channel() {
        return this.channel;
    }

    @Override
    public TcpConnection.ConsumerSpec on() {
        return this.consumerSpec;
    }

    @Override
    public void close() {
        super.close();
        this.closing = true;
        this.channel.disconnect().awaitUninterruptibly();
        this.channel.close().awaitUninterruptibly();
    }

    @Override
    public boolean consumable() {
        return !this.channel.isInputShutdown();
    }

    @Override
    public boolean writable() {
        return !this.channel.isOutputShutdown();
    }

    @Override
    public InetSocketAddress remoteAddress() {
        return this.remoteAddress;
    }

    boolean isClosing() {
        return this.closing;
    }

    void notifyRead(Object obj) {
        this.eventsReactor.notify(this.read.getT2(), Event.class.isInstance(obj) ? (Event)obj : Event.wrap((Object)obj));
    }

    void notifyError(Throwable throwable) {
        this.eventsReactor.notify(throwable.getClass(), Event.wrap((Object)throwable));
    }

    @Override
    protected void write(Buffer data, Deferred<Void, Promise<Void>> onComplete, boolean flush) {
        this.write(data.byteBuffer(), onComplete, flush);
    }

    protected void write(ByteBuffer data, Deferred<Void, Promise<Void>> onComplete, boolean flush) {
        ByteBuf buf = this.channel.alloc().buffer(data.remaining());
        buf.writeBytes(data);
        this.write(buf, onComplete, flush);
    }

    @Override
    protected void write(Object data, final Deferred<Void, Promise<Void>> onComplete, boolean flush) {
        ChannelFuture writeFuture = flush ? this.channel.writeAndFlush(data) : this.channel.write(data);
        writeFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                boolean success = future.isSuccess();
                if (!success) {
                    Throwable t = future.cause();
                    NettyTcpConnection.this.eventsReactor.notify((Object)t, Event.wrap((Object)t));
                    if (null != onComplete) {
                        onComplete.accept(t);
                    }
                } else if (null != onComplete) {
                    onComplete.accept((Object)null);
                }
            }
        });
    }

    @Override
    protected void flush() {
        this.channel.flush();
    }

    public String toString() {
        return "NettyTcpConnection{channel=" + this.channel + ", remoteAddress=" + this.remoteAddress + '}';
    }

    private static class NettyTcpConnectionConsumerSpec<IN, OUT>
    implements TcpConnection.ConsumerSpec<IN, OUT> {
        private final SocketChannel channel;

        private NettyTcpConnectionConsumerSpec(SocketChannel channel) {
            this.channel = channel;
        }

        @Override
        public TcpConnection.ConsumerSpec close(Runnable onClose) {
            this.channel.closeFuture().addListener((GenericFutureListener)new ChannelCloseListener(onClose));
            return this;
        }

        @Override
        public TcpConnection.ConsumerSpec readIdle(long idleTimeout, Runnable onReadIdle) {
            final IdleReadListener irl = new IdleReadListener(idleTimeout, onReadIdle);
            this.channel.closeFuture().addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    try {
                        irl.channelInactive(null);
                    }
                    catch (NullPointerException nullPointerException) {
                        // empty catch block
                    }
                }
            });
            this.channel.pipeline().addFirst(new ChannelHandler[]{irl});
            return this;
        }

        @Override
        public TcpConnection.ConsumerSpec writeIdle(long idleTimeout, Runnable onWriteIdle) {
            final IdleWriteListener iwl = new IdleWriteListener(idleTimeout, onWriteIdle);
            this.channel.closeFuture().addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    try {
                        iwl.channelInactive(null);
                    }
                    catch (NullPointerException nullPointerException) {
                        // empty catch block
                    }
                }
            });
            this.channel.pipeline().addLast(new ChannelHandler[]{iwl});
            return this;
        }
    }

    private static class IdleWriteListener
    extends IdleStateHandler {
        private final Runnable onWriteIdle;

        private IdleWriteListener(long writerIdleTimeSeconds, Runnable onWriteIdle) {
            super(0L, writerIdleTimeSeconds, 0L, TimeUnit.MILLISECONDS);
            this.onWriteIdle = onWriteIdle;
        }

        protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
            if (evt.state() == IdleState.WRITER_IDLE) {
                this.onWriteIdle.run();
            }
            super.channelIdle(ctx, evt);
        }
    }

    private static class IdleReadListener
    extends IdleStateHandler {
        private final Runnable onReadIdle;

        private IdleReadListener(long readerIdleTimeSeconds, Runnable onReadIdle) {
            super(readerIdleTimeSeconds, 0L, 0L, TimeUnit.MILLISECONDS);
            this.onReadIdle = onReadIdle;
        }

        protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
            if (evt.state() == IdleState.READER_IDLE) {
                this.onReadIdle.run();
            }
            super.channelIdle(ctx, evt);
        }
    }

    private static class ChannelCloseListener
    implements ChannelFutureListener {
        private final Runnable onClose;

        private ChannelCloseListener(Runnable onClose) {
            this.onClose = onClose;
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            if (null != this.onClose) {
                this.onClose.run();
            }
        }
    }
}

