package com.linecorp.armeria.common.stream;

import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.internal.common.stream.ByteBufsInputStream;
import io.netty.util.concurrent.EventExecutor;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/linecorp/armeria/common/stream/StreamMessageInputStream.class */
public final class StreamMessageInputStream<T> extends InputStream {
    private final StreamMessage<T> source;
    private final EventExecutor executor;
    private final StreamMessageInputStreamSubscriber<T> subscriber;
    private boolean subscribed;
    private boolean closed;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linecorp/armeria/common/stream/StreamMessageInputStream$StreamMessageInputStreamSubscriber.class */
    public static final class StreamMessageInputStreamSubscriber<T> implements Subscriber<T> {
        private final Function<? super T, ? extends HttpData> httpDataConverter;
        private final CompletableFuture<Void> whenSubscribed = new CompletableFuture<>();
        private final ByteBufsInputStream byteBufsInputStream = new ByteBufsInputStream();

        @Nullable
        private volatile Subscription upstream;
        static final /* synthetic */ boolean $assertionsDisabled;

        StreamMessageInputStreamSubscriber(Function<? super T, ? extends HttpData> function) {
            Objects.requireNonNull(function, "httpDataConverter");
            this.httpDataConverter = function;
        }

        public void onSubscribe(Subscription subscription) {
            Objects.requireNonNull(subscription, "subscription");
            this.upstream = subscription;
            this.whenSubscribed.complete(null);
        }

        public void onNext(T t) {
            Objects.requireNonNull(t, "item");
            if (this.byteBufsInputStream.isEos()) {
                com.linecorp.armeria.internal.common.stream.StreamMessageUtil.closeOrAbort(t);
                return;
            }
            try {
                HttpData apply = this.httpDataConverter.apply(t);
                if (!apply.isEmpty()) {
                    this.byteBufsInputStream.add(apply.byteBuf());
                } else {
                    apply.close();
                    request();
                }
            } catch (Throwable th) {
                com.linecorp.armeria.internal.common.stream.StreamMessageUtil.closeOrAbort(t, th);
                onError(th);
                Subscription subscription = this.upstream;
                if (!$assertionsDisabled && subscription == null) {
                    throw new AssertionError();
                }
                subscription.cancel();
            }
        }

        public void onError(Throwable th) {
            this.byteBufsInputStream.interrupt(th);
        }

        public void onComplete() {
            this.byteBufsInputStream.setEos();
        }

        public void request() {
            if (this.byteBufsInputStream.isEos()) {
                return;
            }
            Subscription subscription = this.upstream;
            if (!$assertionsDisabled && subscription == null) {
                throw new AssertionError();
            }
            subscription.request(1L);
        }

        static {
            $assertionsDisabled = !StreamMessageInputStream.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamMessageInputStream(StreamMessage<T> streamMessage, Function<? super T, ? extends HttpData> function, EventExecutor eventExecutor) {
        Objects.requireNonNull(streamMessage, "source");
        Objects.requireNonNull(function, "httpDataConverter");
        Objects.requireNonNull(eventExecutor, "executor");
        this.source = streamMessage;
        this.executor = eventExecutor;
        this.subscriber = new StreamMessageInputStreamSubscriber<>(function);
    }

    @Override // java.io.InputStream
    public int read() throws IOException {
        ensureOpen();
        ensureSubscribed();
        maybeRequest();
        return byteBufsInputStream().read();
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr, int i, int i2) throws IOException {
        ensureOpen();
        ensureSubscribed();
        maybeRequest();
        return byteBufsInputStream().read(bArr, i, i2);
    }

    @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.source.abort();
        byteBufsInputStream().close();
    }

    @Override // java.io.InputStream
    public int available() throws IOException {
        ensureOpen();
        return byteBufsInputStream().available();
    }

    private void ensureOpen() throws IOException {
        if (this.closed) {
            throw new IOException("Stream closed");
        }
    }

    private void ensureSubscribed() {
        if (this.subscribed) {
            return;
        }
        this.subscribed = true;
        this.source.subscribe(this.subscriber, this.executor);
        ((StreamMessageInputStreamSubscriber) this.subscriber).whenSubscribed.join();
    }

    private void maybeRequest() throws IOException {
        if (available() == 0) {
            this.subscriber.request();
        }
    }

    private ByteBufsInputStream byteBufsInputStream() {
        return ((StreamMessageInputStreamSubscriber) this.subscriber).byteBufsInputStream;
    }
}
