/*
 * Decompiled with CFR 0.152.
 */
package io.micronaut.http.client.netty;

import io.micronaut.core.annotation.Internal;
import io.micronaut.http.client.exceptions.ResponseClosedException;
import io.micronaut.http.netty.reactive.HotObservable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

@Internal
abstract class ReactiveClientReader
extends ChannelInboundHandlerAdapter
implements HotObservable<HttpContent>,
Subscription {
    private EventLoop eventLoop;
    private ChannelHandlerContext ctx;
    private Subscriber<? super HttpContent> subscriber;
    private long demand;
    private boolean cancelled = false;

    ReactiveClientReader() {
    }

    public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        this.ctx = ctx;
        this.eventLoop = ctx.channel().eventLoop();
    }

    public final void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        if (!this.cancelled) {
            this.cancelled = true;
            this.subscriber.onError((Throwable)new ResponseClosedException("Connection closed before full response body was transferred"));
        }
    }

    public final void subscribe(Subscriber<? super HttpContent> s) {
        if (this.subscriber != null) {
            throw new IllegalStateException("Already subscribed");
        }
        if (!this.eventLoop.inEventLoop()) {
            this.eventLoop.execute(() -> this.subscribe(s));
            return;
        }
        this.subscriber = s;
        s.onSubscribe((Subscription)this);
    }

    public final void request(long n) {
        if (!this.eventLoop.inEventLoop()) {
            this.eventLoop.execute(() -> this.request(n));
            return;
        }
        long oldDemand = this.demand;
        long newDemand = oldDemand + n;
        if (newDemand < 0L) {
            newDemand = Long.MAX_VALUE;
        }
        this.demand = newDemand;
        if (oldDemand == 0L) {
            this.ctx.read();
        }
    }

    public final void cancel() {
        if (!this.eventLoop.inEventLoop()) {
            this.eventLoop.execute(this::cancel);
            return;
        }
        this.cancelled = true;
        if (this.demand == 0L) {
            this.ctx.read();
        }
    }

    public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean last = msg instanceof LastHttpContent;
        if (this.cancelled) {
            ((HttpContent)msg).release();
            if (last) {
                this.remove(ctx);
            } else {
                ctx.read();
            }
        } else {
            assert (this.demand > 0L) : "should be ensured by FlowControlHandler";
            this.subscriber.onNext((Object)((HttpContent)msg));
            if (last) {
                this.cancelled = true;
                this.remove(ctx);
                this.subscriber.onComplete();
            } else if (--this.demand > 0L) {
                ctx.read();
            }
        }
    }

    public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (this.cancelled) {
            ctx.fireExceptionCaught(cause);
        } else {
            this.cancelled = true;
            this.remove(ctx);
            this.subscriber.onError(cause);
        }
    }

    public final void closeIfNoSubscriber() {
        this.cancel();
    }

    protected abstract void remove(ChannelHandlerContext var1);
}

