package karate.com.linecorp.armeria.server;

import java.nio.channels.ClosedChannelException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import karate.com.linecorp.armeria.common.AggregatedHttpResponse;
import karate.com.linecorp.armeria.common.ClosedSessionException;
import karate.com.linecorp.armeria.common.HttpData;
import karate.com.linecorp.armeria.common.HttpHeaders;
import karate.com.linecorp.armeria.common.HttpObject;
import karate.com.linecorp.armeria.common.HttpStatus;
import karate.com.linecorp.armeria.common.ResponseHeaders;
import karate.com.linecorp.armeria.common.logging.RequestLog;
import karate.com.linecorp.armeria.common.logging.RequestLogBuilder;
import karate.com.linecorp.armeria.common.stream.ClosedStreamException;
import karate.com.linecorp.armeria.common.util.Exceptions;
import karate.com.linecorp.armeria.common.util.SafeCloseable;
import karate.com.linecorp.armeria.internal.common.CancellationScheduler;
import karate.com.linecorp.armeria.internal.common.Http1ObjectEncoder;
import karate.com.linecorp.armeria.internal.common.RequestContextUtil;
import karate.com.linecorp.armeria.server.logging.AccessLogWriter;
import karate.io.netty.channel.ChannelFuture;
import karate.io.netty.channel.ChannelFutureListener;
import karate.io.netty.channel.ChannelHandlerContext;
import karate.io.netty.handler.codec.http2.Http2Error;
import karate.io.netty.util.concurrent.Future;
import karate.io.netty.util.concurrent.GenericFutureListener;
import karate.org.reactivestreams.Subscriber;
import karate.org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:karate/com/linecorp/armeria/server/HttpResponseSubscriber.class */
public final class HttpResponseSubscriber implements Subscriber<HttpObject> {
    private static final Logger logger;
    private static final AggregatedHttpResponse internalServerErrorResponse;
    private static final AggregatedHttpResponse serviceUnavailableResponse;
    private final ChannelHandlerContext ctx;
    private final ServerHttpObjectEncoder responseEncoder;
    private final DecodedHttpRequest req;
    private final DefaultServiceRequestContext reqCtx;

    @Nullable
    private Subscription subscription;
    private State state = State.NEEDS_HEADERS;
    private boolean isComplete;
    private boolean loggedResponseHeadersFirstBytesTransferred;

    @Nullable
    private WriteHeadersFutureListener cachedWriteHeadersListener;

    @Nullable
    private WriteDataFutureListener cachedWriteDataListener;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:karate/com/linecorp/armeria/server/HttpResponseSubscriber$State.class */
    public enum State {
        NEEDS_HEADERS,
        NEEDS_DATA_OR_TRAILERS,
        NEEDS_TRAILERS,
        DONE
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:karate/com/linecorp/armeria/server/HttpResponseSubscriber$WriteDataFutureListener.class */
    public class WriteDataFutureListener implements ChannelFutureListener {
        private final boolean endOfStream;
        private final boolean wroteEmptyData;

        WriteDataFutureListener(boolean z, boolean z2) {
            this.endOfStream = z;
            this.wroteEmptyData = z2;
        }

        @Override // karate.io.netty.util.concurrent.GenericFutureListener
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            boolean z;
            SafeCloseable pop = RequestContextUtil.pop();
            try {
                if (channelFuture.isSuccess()) {
                    z = true;
                } else {
                    z = this.endOfStream && this.wroteEmptyData && (channelFuture.cause() instanceof ClosedChannelException) && (HttpResponseSubscriber.this.responseEncoder instanceof Http1ObjectEncoder);
                }
                HttpResponseSubscriber.this.handleWriteComplete(channelFuture, this.endOfStream, z);
                if (pop != null) {
                    pop.close();
                }
            } catch (Throwable th) {
                if (pop != null) {
                    try {
                        pop.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:karate/com/linecorp/armeria/server/HttpResponseSubscriber$WriteHeadersFutureListener.class */
    public class WriteHeadersFutureListener implements ChannelFutureListener {
        private final boolean endOfStream;

        WriteHeadersFutureListener(boolean z) {
            this.endOfStream = z;
        }

        @Override // karate.io.netty.util.concurrent.GenericFutureListener
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            SafeCloseable pop = RequestContextUtil.pop();
            try {
                HttpResponseSubscriber.this.handleWriteComplete(channelFuture, this.endOfStream, channelFuture.isSuccess());
                if (pop != null) {
                    pop.close();
                }
            } catch (Throwable th) {
                if (pop != null) {
                    try {
                        pop.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HttpResponseSubscriber(ChannelHandlerContext channelHandlerContext, ServerHttpObjectEncoder serverHttpObjectEncoder, DefaultServiceRequestContext defaultServiceRequestContext, DecodedHttpRequest decodedHttpRequest) {
        this.ctx = channelHandlerContext;
        this.responseEncoder = serverHttpObjectEncoder;
        this.req = decodedHttpRequest;
        this.reqCtx = defaultServiceRequestContext;
    }

    private HttpService service() {
        return this.reqCtx.config().service();
    }

    private RequestLogBuilder logBuilder() {
        return this.reqCtx.logBuilder();
    }

    @Override // karate.org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        if (!$assertionsDisabled && this.subscription != null) {
            throw new AssertionError();
        }
        this.subscription = subscription;
        if (this.state == State.DONE) {
            subscription.cancel();
        } else {
            this.reqCtx.requestCancellationScheduler().init(this.reqCtx.eventLoop(), newCancellationTask(), 0L, RequestTimeoutException.get());
            subscription.request(1L);
        }
    }

    /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
    /* JADX WARN: Failed to find 'out' block for switch in B:13:0x005c. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:53:0x01f7  */
    /* JADX WARN: Removed duplicated region for block: B:58:0x027e  */
    @Override // karate.org.reactivestreams.Subscriber
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void onNext(karate.com.linecorp.armeria.common.HttpObject r8) {
        /*
            Method dump skipped, instructions count: 839
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: karate.com.linecorp.armeria.server.HttpResponseSubscriber.onNext(karate.com.linecorp.armeria.common.HttpObject):void");
    }

    private boolean failIfStreamOrSessionClosed() {
        if (this.responseEncoder.isWritable(this.req.id(), this.req.streamId())) {
            return false;
        }
        if (this.reqCtx.sessionProtocol().isMultiplex()) {
            fail(ClosedStreamException.get());
            return true;
        }
        fail(ClosedSessionException.get());
        return true;
    }

    private State setDone(boolean z) {
        if (z && this.subscription != null) {
            this.subscription.cancel();
        }
        this.reqCtx.requestCancellationScheduler().clearTimeout(false);
        State state = this.state;
        this.state = State.DONE;
        return state;
    }

    @Override // karate.org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        if (th instanceof HttpResponseException) {
            ((HttpResponseException) th).httpResponse().aggregate(this.ctx.executor()).handleAsync((aggregatedHttpResponse, th2) -> {
                if (th2 != null) {
                    failAndRespond(th2, internalServerErrorResponse, Http2Error.CANCEL, false);
                    return null;
                }
                failAndRespond(th, aggregatedHttpResponse, Http2Error.CANCEL, false);
                return null;
            }, (Executor) this.ctx.executor());
            return;
        }
        if (th instanceof HttpStatusException) {
            failAndRespond(th, AggregatedHttpResponse.of(((HttpStatusException) th).httpStatus()), Http2Error.CANCEL, false);
        } else if (Exceptions.isStreamCancelling(th)) {
            failAndReset(th);
        } else {
            logger.warn("{} Unexpected exception from a service or a response publisher: {}", new Object[]{this.ctx.channel(), service(), th});
            failAndRespond(th, internalServerErrorResponse, Http2Error.INTERNAL_ERROR, false);
        }
    }

    @Override // karate.org.reactivestreams.Subscriber
    public void onComplete() {
        if (this.reqCtx.requestCancellationScheduler().isFinished()) {
            return;
        }
        State done = setDone(false);
        if (done == State.NEEDS_HEADERS) {
            logger.warn("{} Published nothing (or only informational responses): {}", this.ctx.channel(), service());
            this.responseEncoder.writeReset(this.req.id(), this.req.streamId(), Http2Error.INTERNAL_ERROR);
            this.ctx.flush();
        } else if (done != State.DONE) {
            HttpHeaders additionalResponseTrailers = this.reqCtx.additionalResponseTrailers();
            if (!additionalResponseTrailers.isEmpty()) {
                logBuilder().responseTrailers(additionalResponseTrailers);
                this.responseEncoder.writeTrailers(this.req.id(), this.req.streamId(), additionalResponseTrailers).addListener2((GenericFutureListener<? extends Future<? super Void>>) writeHeadersFutureListener(true));
                this.ctx.flush();
            } else if (this.responseEncoder.isWritable(this.req.id(), this.req.streamId())) {
                this.responseEncoder.writeData(this.req.id(), this.req.streamId(), HttpData.empty(), true).addListener2((GenericFutureListener<? extends Future<? super Void>>) writeDataFutureListener(true, true));
                this.ctx.flush();
            }
        }
    }

    private void fail(Throwable th) {
        if (tryComplete()) {
            setDone(true);
            logBuilder().endRequest(th);
            logBuilder().endResponse(th);
            ServiceConfig config = this.reqCtx.config();
            if (config.transientServiceOptions().contains(TransientServiceOption.WITH_ACCESS_LOGGING)) {
                CompletableFuture<RequestLog> whenComplete = this.reqCtx.log().whenComplete();
                AccessLogWriter accessLogWriter = config.accessLogWriter();
                Objects.requireNonNull(accessLogWriter);
                whenComplete.thenAccept(accessLogWriter::log);
            }
        }
    }

    private boolean tryComplete() {
        if (this.isComplete) {
            return false;
        }
        this.isComplete = true;
        return true;
    }

    private void failAndRespond(Throwable th) {
        failAndRespond(th, internalServerErrorResponse, Http2Error.INTERNAL_ERROR, true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void failAndRespond(Throwable th, AggregatedHttpResponse aggregatedHttpResponse, Http2Error http2Error, boolean z) {
        ChannelFuture writeReset;
        boolean z2;
        State done = setDone(z);
        int id = this.req.id();
        int streamId = this.req.streamId();
        if (done == State.NEEDS_HEADERS) {
            ResponseHeaders headers = aggregatedHttpResponse.headers();
            logBuilder().responseHeaders(headers);
            HttpData content = aggregatedHttpResponse.content();
            if (content.isEmpty()) {
                writeReset = this.responseEncoder.writeHeaders(id, streamId, headers, true);
            } else {
                this.responseEncoder.writeHeaders(id, streamId, headers, false);
                logBuilder().increaseResponseLength(content);
                writeReset = this.responseEncoder.writeData(id, streamId, content, true);
            }
            z2 = false;
        } else {
            writeReset = this.responseEncoder.writeReset(id, streamId, http2Error);
            z2 = true;
        }
        addCallbackAndFlush(th, done, writeReset, z2);
    }

    private void failAndReset(Throwable th) {
        addCallbackAndFlush(th, setDone(false), this.responseEncoder.writeReset(this.req.id(), this.req.streamId(), Http2Error.CANCEL), true);
    }

    private void addCallbackAndFlush(Throwable th, State state, ChannelFuture channelFuture, boolean z) {
        if (state != State.DONE) {
            channelFuture.addListener2(future -> {
                SafeCloseable pop = RequestContextUtil.pop();
                try {
                    if (future.isSuccess() && !z) {
                        maybeLogFirstResponseBytesTransferred();
                    }
                    if (tryComplete()) {
                        logBuilder().endRequest(th);
                        logBuilder().endResponse(th);
                        ServiceConfig config = this.reqCtx.config();
                        if (config.transientServiceOptions().contains(TransientServiceOption.WITH_ACCESS_LOGGING)) {
                            CompletableFuture<RequestLog> whenComplete = this.reqCtx.log().whenComplete();
                            AccessLogWriter accessLogWriter = config.accessLogWriter();
                            Objects.requireNonNull(accessLogWriter);
                            whenComplete.thenAccept(accessLogWriter::log);
                        }
                    }
                    if (pop != null) {
                        pop.close();
                    }
                } catch (Throwable th2) {
                    if (pop != null) {
                        try {
                            pop.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    }
                    throw th2;
                }
            });
        }
        this.ctx.flush();
    }

    private CancellationScheduler.CancellationTask newCancellationTask() {
        return new CancellationScheduler.CancellationTask() { // from class: karate.com.linecorp.armeria.server.HttpResponseSubscriber.1
            static final /* synthetic */ boolean $assertionsDisabled;

            @Override // karate.com.linecorp.armeria.internal.common.CancellationScheduler.CancellationTask
            public boolean canSchedule() {
                return HttpResponseSubscriber.this.state != State.DONE;
            }

            @Override // karate.com.linecorp.armeria.internal.common.CancellationScheduler.CancellationTask
            public void run(Throwable th) {
                if (!$assertionsDisabled && HttpResponseSubscriber.this.state == State.DONE) {
                    throw new AssertionError();
                }
                HttpResponseSubscriber.this.failAndRespond(th, HttpResponseSubscriber.serviceUnavailableResponse, Http2Error.INTERNAL_ERROR, true);
            }

            static {
                $assertionsDisabled = !HttpResponseSubscriber.class.desiredAssertionStatus();
            }
        };
    }

    private WriteHeadersFutureListener writeHeadersFutureListener(boolean z) {
        if (z) {
            return new WriteHeadersFutureListener(true);
        }
        if (this.cachedWriteHeadersListener == null) {
            this.cachedWriteHeadersListener = new WriteHeadersFutureListener(false);
        }
        return this.cachedWriteHeadersListener;
    }

    private WriteDataFutureListener writeDataFutureListener(boolean z, boolean z2) {
        if (z || z2) {
            return new WriteDataFutureListener(z, z2);
        }
        if (this.cachedWriteDataListener == null) {
            this.cachedWriteDataListener = new WriteDataFutureListener(false, false);
        }
        return this.cachedWriteDataListener;
    }

    void handleWriteComplete(ChannelFuture channelFuture, boolean z, boolean z2) throws Exception {
        if (!z2) {
            fail(channelFuture.cause());
            HttpServerHandler.CLOSE_ON_FAILURE.operationComplete(channelFuture);
            return;
        }
        maybeLogFirstResponseBytesTransferred();
        if (!z) {
            if (!$assertionsDisabled && this.subscription == null) {
                throw new AssertionError();
            }
            this.subscription.request(1L);
            return;
        }
        if (tryComplete()) {
            logBuilder().endRequest();
            logBuilder().endResponse();
            ServiceConfig config = this.reqCtx.config();
            if (config.transientServiceOptions().contains(TransientServiceOption.WITH_ACCESS_LOGGING)) {
                CompletableFuture<RequestLog> whenComplete = this.reqCtx.log().whenComplete();
                AccessLogWriter accessLogWriter = config.accessLogWriter();
                Objects.requireNonNull(accessLogWriter);
                whenComplete.thenAccept(accessLogWriter::log);
            }
        }
    }

    private void maybeLogFirstResponseBytesTransferred() {
        if (this.loggedResponseHeadersFirstBytesTransferred) {
            return;
        }
        this.loggedResponseHeadersFirstBytesTransferred = true;
        logBuilder().responseFirstBytesTransferred();
    }

    static {
        $assertionsDisabled = !HttpResponseSubscriber.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(HttpResponseSubscriber.class);
        internalServerErrorResponse = AggregatedHttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR);
        serviceUnavailableResponse = AggregatedHttpResponse.of(HttpStatus.SERVICE_UNAVAILABLE);
    }
}
