/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.processor.CancelException;
import reactor.fn.Consumer;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.ReactorChannel;
import reactor.io.net.impl.netty.NettyChannelHandlerBridge;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.subscription.PushSubscription;

public class NettyChannelStream<IN, OUT>
extends ChannelStream<IN, OUT> {
    private final Channel ioChannel;

    public NettyChannelStream(Environment env, Codec<Buffer, IN, OUT> codec, long prefetch, Dispatcher eventsDispatcher, Channel ioChannel) {
        super(env, codec, prefetch, eventsDispatcher);
        this.ioChannel = ioChannel;
    }

    public void subscribe(Subscriber<? super IN> subscriber) {
        this.ioChannel.pipeline().fireUserEventTriggered(new NettyChannelHandlerBridge.ChannelInputSubscriberEvent<IN>(subscriber));
    }

    @Override
    public void doSubscribeWriter(Publisher<? extends OUT> writer, final Subscriber<? super Void> postWriter) {
        Stream encodedWriter = this.getEncoder() != null ? Streams.wrap(writer).map(this.getEncoder()) : writer;
        this.ioChannel.write((Object)encodedWriter).addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    postWriter.onSubscribe(Broadcaster.HOT_SUBSCRIPTION);
                    postWriter.onComplete();
                } else {
                    postWriter.onError(future.cause());
                }
            }
        });
    }

    @Override
    public InetSocketAddress remoteAddress() {
        return (InetSocketAddress)this.ioChannel.remoteAddress();
    }

    @Override
    public ReactorChannel.ConsumerSpec on() {
        return new NettyConsumerSpec();
    }

    public Channel delegate() {
        return this.ioChannel;
    }

    @Override
    public void doDecoded(IN in) {
        PushSubscription subscription;
        NettyChannelHandlerBridge ch = (NettyChannelHandlerBridge)this.ioChannel.pipeline().get(NettyChannelHandlerBridge.class);
        PushSubscription pushSubscription = subscription = ch == null ? null : ch.subscription();
        if (subscription != null) {
            try {
                subscription.onNext(in);
            }
            catch (CancelException cancelException) {
                // empty catch block
            }
        }
    }

    public String toString() {
        return this.getClass().getName() + " {" + "channel=" + this.ioChannel + '}';
    }

    private class NettyConsumerSpec
    implements ReactorChannel.ConsumerSpec {
        private NettyConsumerSpec() {
        }

        @Override
        public ReactorChannel.ConsumerSpec close(final Consumer<Void> onClose) {
            NettyChannelStream.this.ioChannel.pipeline().addLast(new ChannelHandler[]{new ChannelDuplexHandler(){

                public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                    onClose.accept(null);
                    super.channelInactive(ctx);
                }
            }});
            return this;
        }

        @Override
        public ReactorChannel.ConsumerSpec readIdle(long idleTimeout, final Consumer<Void> onReadIdle) {
            NettyChannelStream.this.ioChannel.pipeline().addFirst(new ChannelHandler[]{new IdleStateHandler(idleTimeout, 0L, 0L, TimeUnit.MILLISECONDS){

                protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
                    if (evt.state() == IdleState.READER_IDLE) {
                        onReadIdle.accept(null);
                    }
                    super.channelIdle(ctx, evt);
                }
            }});
            return this;
        }

        @Override
        public ReactorChannel.ConsumerSpec writeIdle(long idleTimeout, final Consumer<Void> onWriteIdle) {
            NettyChannelStream.this.ioChannel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(0L, idleTimeout, 0L, TimeUnit.MILLISECONDS){

                protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
                    if (evt.state() == IdleState.WRITER_IDLE) {
                        onWriteIdle.accept(null);
                    }
                    super.channelIdle(ctx, evt);
                }
            }});
            return this;
        }
    }
}

