package com.linecorp.armeria.internal;

import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpResponseWriter;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:com/linecorp/armeria/internal/ResponseConversionUtil.class */
public final class ResponseConversionUtil {

    /* loaded from: input_file:com/linecorp/armeria/internal/ResponseConversionUtil$StreamingSubscriber.class */
    private static final class StreamingSubscriber<T> implements Subscriber<T> {
        private final HttpResponseWriter writer;
        private final HttpHeaders headers;
        private final HttpHeaders trailingHeaders;
        private final Function<T, HttpData> contentConverter;

        @Nullable
        private Subscription subscription;
        private boolean headersSent;
        static final /* synthetic */ boolean $assertionsDisabled;

        StreamingSubscriber(HttpResponseWriter httpResponseWriter, HttpHeaders httpHeaders, HttpHeaders httpHeaders2, Function<T, HttpData> function) {
            this.writer = (HttpResponseWriter) Objects.requireNonNull(httpResponseWriter, "writer");
            this.headers = (HttpHeaders) Objects.requireNonNull(httpHeaders, "headers");
            this.trailingHeaders = (HttpHeaders) Objects.requireNonNull(httpHeaders2, "trailingHeaders");
            this.contentConverter = (Function) Objects.requireNonNull(function, "contentConverter");
        }

        public void onSubscribe(final Subscription subscription) {
            if (!$assertionsDisabled && this.subscription != null) {
                throw new AssertionError();
            }
            this.subscription = subscription;
            this.writer.closeFuture().handle((r3, th) -> {
                if (th == null) {
                    return null;
                }
                subscription.cancel();
                return null;
            });
            try {
                this.writer.onDemand(new Runnable() { // from class: com.linecorp.armeria.internal.ResponseConversionUtil.StreamingSubscriber.1
                    @Override // java.lang.Runnable
                    public void run() {
                        subscription.request(1L);
                        StreamingSubscriber.this.writer.onDemand(this);
                    }
                });
            } catch (Exception e) {
                onError(e);
            }
        }

        public void onNext(T t) {
            if (this.writer.isOpen()) {
                try {
                    HttpData apply = this.contentConverter.apply(t);
                    if (!this.headersSent) {
                        this.writer.write((HttpResponseWriter) this.headers);
                        this.headersSent = true;
                    }
                    this.writer.write((HttpResponseWriter) apply);
                } catch (Exception e) {
                    onError(e);
                }
            }
        }

        public void onError(Throwable th) {
            if (this.writer.isOpen()) {
                try {
                    this.writer.close(th);
                } catch (Exception e) {
                    if (!$assertionsDisabled && this.subscription == null) {
                        throw new AssertionError();
                    }
                    this.subscription.cancel();
                }
            }
        }

        public void onComplete() {
            if (this.writer.isOpen()) {
                if (!this.trailingHeaders.isEmpty()) {
                    this.writer.write((HttpResponseWriter) this.trailingHeaders);
                }
                this.writer.close();
            }
        }

        static {
            $assertionsDisabled = !ResponseConversionUtil.class.desiredAssertionStatus();
        }
    }

    public static HttpResponseWriter aggregateFrom(Stream<?> stream, HttpHeaders httpHeaders, HttpHeaders httpHeaders2, Function<Object, HttpData> function, Executor executor) {
        Objects.requireNonNull(stream, "stream");
        Objects.requireNonNull(httpHeaders, "headers");
        Objects.requireNonNull(httpHeaders2, "trailingHeaders");
        Objects.requireNonNull(function, "contentConverter");
        Objects.requireNonNull(executor, "executor");
        return aggregateFrom((CompletableFuture<?>) ObjectCollectingUtil.collectFrom(stream, executor), httpHeaders, httpHeaders2, function);
    }

    public static HttpResponseWriter aggregateFrom(Publisher<?> publisher, HttpHeaders httpHeaders, HttpHeaders httpHeaders2, Function<Object, HttpData> function) {
        Objects.requireNonNull(publisher, "publisher");
        Objects.requireNonNull(httpHeaders, "headers");
        Objects.requireNonNull(httpHeaders2, "trailingHeaders");
        Objects.requireNonNull(function, "contentConverter");
        return aggregateFrom((CompletableFuture<?>) ObjectCollectingUtil.collectFrom(publisher), httpHeaders, httpHeaders2, function);
    }

    private static HttpResponseWriter aggregateFrom(CompletableFuture<?> completableFuture, HttpHeaders httpHeaders, HttpHeaders httpHeaders2, Function<Object, HttpData> function) {
        HttpResponseWriter streaming = HttpResponse.streaming();
        completableFuture.handle((obj, th) -> {
            if (th != null) {
                streaming.close(th);
                return null;
            }
            try {
                HttpData httpData = (HttpData) function.apply(obj);
                streaming.write((HttpResponseWriter) httpHeaders);
                streaming.write((HttpResponseWriter) httpData);
                if (!httpHeaders2.isEmpty()) {
                    streaming.write((HttpResponseWriter) httpHeaders2);
                }
                streaming.close();
                return null;
            } catch (Exception e) {
                streaming.close((Throwable) e);
                return null;
            }
        });
        return streaming;
    }

    public static <T> HttpResponseWriter streamingFrom(Stream<T> stream, HttpHeaders httpHeaders, HttpHeaders httpHeaders2, Function<T, HttpData> function, Executor executor) {
        Objects.requireNonNull(stream, "stream");
        Objects.requireNonNull(httpHeaders, "headers");
        Objects.requireNonNull(httpHeaders2, "trailingHeaders");
        Objects.requireNonNull(function, "contentConverter");
        Objects.requireNonNull(executor, "executor");
        HttpResponseWriter streaming = HttpResponse.streaming();
        executor.execute(() -> {
            try {
                Iterator it = ((Stream) stream.sequential()).iterator();
                boolean z = false;
                while (it.hasNext()) {
                    HttpData httpData = (HttpData) function.apply(it.next());
                    if (!z) {
                        streaming.write((HttpResponseWriter) httpHeaders);
                        z = true;
                    }
                    streaming.write((HttpResponseWriter) httpData);
                }
                if (!httpHeaders2.isEmpty()) {
                    streaming.write((HttpResponseWriter) httpHeaders2);
                }
                streaming.close();
            } catch (Exception e) {
                streaming.close((Throwable) e);
            }
        });
        return streaming;
    }

    public static <T> HttpResponseWriter streamingFrom(Publisher<T> publisher, HttpHeaders httpHeaders, HttpHeaders httpHeaders2, Function<T, HttpData> function) {
        HttpResponseWriter streaming = HttpResponse.streaming();
        publisher.subscribe(new StreamingSubscriber(streaming, httpHeaders, httpHeaders2, function));
        return streaming;
    }

    private ResponseConversionUtil() {
    }
}
