package com.linecorp.armeria.common.stream;

import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.EventLoopCheckingFuture;
import com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil;
import com.linecorp.armeria.internal.common.stream.NoopSubscription;
import com.linecorp.armeria.internal.shaded.guava.base.Preconditions;
import com.linecorp.armeria.internal.shaded.guava.math.LongMath;
import com.linecorp.armeria.internal.shaded.guava.primitives.Ints;
import com.linecorp.armeria.server.ServiceRequestContext;
import io.netty.util.concurrent.EventExecutor;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/linecorp/armeria/common/stream/InputStreamStreamMessage.class */
public final class InputStreamStreamMessage implements ByteStreamMessage {
    private static final Logger logger = LoggerFactory.getLogger(InputStreamStreamMessage.class);
    private static final AtomicIntegerFieldUpdater<InputStreamStreamMessage> subscribedUpdater = AtomicIntegerFieldUpdater.newUpdater(InputStreamStreamMessage.class, "subscribed");
    private final InputStream inputStream;

    @Nullable
    private final ExecutorService blockingTaskExecutor;
    private final int bufferSize;
    private long offset;
    private volatile int subscribed;

    @Nullable
    private volatile InputStreamSubscription inputStreamSubscription;
    private long length = Long.MAX_VALUE;
    private final CompletableFuture<Void> completionFuture = new EventLoopCheckingFuture();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linecorp/armeria/common/stream/InputStreamStreamMessage$InputStreamSubscription.class */
    public final class InputStreamSubscription implements Subscription {
        private Subscriber<? super HttpData> downstream;
        private final EventExecutor executor;
        private final ExecutorService blockingTaskExecutor;
        private final int bufferSize;
        private final long offset;
        private final long end;
        private long position;
        private volatile boolean written;
        private final boolean notifyCancellation;
        private boolean closed;
        private volatile long requested;
        private boolean reading;

        private InputStreamSubscription(Subscriber<? super HttpData> subscriber, EventExecutor eventExecutor, ExecutorService executorService, int i, long j, long j2, boolean z) {
            Objects.requireNonNull(subscriber, "downstream");
            Objects.requireNonNull(eventExecutor, "executor");
            Objects.requireNonNull(executorService, "blockingTaskExecutor");
            this.downstream = subscriber;
            this.executor = eventExecutor;
            this.blockingTaskExecutor = executorService;
            this.bufferSize = i;
            this.offset = j;
            this.end = LongMath.saturatedAdd(j, j2);
            this.notifyCancellation = z;
        }

        public void request(long j) {
            if (j <= 0) {
                close(new IllegalArgumentException("Rule §3.9 violated: non-positive subscription requests are forbidden."));
            } else if (this.executor.inEventLoop()) {
                request0(j);
            } else {
                this.executor.execute(() -> {
                    request0(j);
                });
            }
        }

        private void request0(long j) {
            if (this.closed) {
                return;
            }
            if (this.offset >= this.end) {
                close0(null);
                return;
            }
            long j2 = this.requested;
            if (j2 == Long.MAX_VALUE) {
                return;
            }
            if (j == Long.MAX_VALUE) {
                this.requested = Long.MAX_VALUE;
            } else {
                this.requested = LongMath.saturatedAdd(j2, j);
            }
            if (j2 > 0) {
                return;
            }
            readBytes();
        }

        private void readBytes() {
            if (this.reading || this.closed || this.requested <= 0) {
                return;
            }
            this.reading = true;
            this.requested--;
            this.blockingTaskExecutor.execute(() -> {
                if (this.position >= this.end) {
                    close(null);
                    return;
                }
                if (this.position < this.offset) {
                    long j = this.offset - this.position;
                    try {
                        if (InputStreamStreamMessage.this.inputStream.skip(j) < j) {
                            close(null);
                            return;
                        }
                        this.position += j;
                    } catch (Exception e) {
                        close(e);
                        return;
                    }
                }
                byte[] bArr = new byte[Math.min(this.bufferSize, Ints.saturatedCast(this.end - this.position))];
                try {
                    int read = InputStreamStreamMessage.this.inputStream.read(bArr);
                    if (read == -1) {
                        close(null);
                        return;
                    }
                    HttpData wrap = HttpData.wrap(bArr, 0, read);
                    this.position += read;
                    if (!this.written) {
                        this.written = true;
                    }
                    this.executor.execute(() -> {
                        publishDownstream(wrap);
                    });
                } catch (Exception e2) {
                    close(e2);
                }
            });
        }

        private void publishDownstream(HttpData httpData) {
            if (this.closed) {
                return;
            }
            this.downstream.onNext(httpData);
            this.reading = false;
            readBytes();
        }

        public void cancel() {
            if (this.executor.inEventLoop()) {
                cancel0();
            } else {
                this.executor.execute(this::cancel0);
            }
        }

        private void cancel0() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            CancelledSubscriptionException cancelledSubscriptionException = CancelledSubscriptionException.get();
            if (this.notifyCancellation) {
                this.downstream.onError(cancelledSubscriptionException);
            }
            this.downstream = NoopSubscriber.get();
            InputStreamStreamMessage.this.completionFuture.completeExceptionally(cancelledSubscriptionException);
            closeInputStream();
        }

        private void closeInputStream() {
            try {
                InputStreamStreamMessage.this.inputStream.close();
            } catch (IOException e) {
                InputStreamStreamMessage.logger.warn("Unexpected exception while closing input stream {}.", InputStreamStreamMessage.this.inputStream, e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close(@Nullable Throwable th) {
            if (this.executor.inEventLoop()) {
                close0(th);
            } else {
                this.executor.execute(() -> {
                    close0(th);
                });
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close0(@Nullable Throwable th) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (th == null) {
                this.downstream.onComplete();
                InputStreamStreamMessage.this.completionFuture.complete(null);
            } else {
                this.downstream.onError(th);
                InputStreamStreamMessage.this.completionFuture.completeExceptionally(th);
            }
            closeInputStream();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public InputStreamStreamMessage(InputStream inputStream, @Nullable ExecutorService executorService, int i) {
        Objects.requireNonNull(inputStream, "inputStream");
        this.inputStream = inputStream;
        this.blockingTaskExecutor = executorService;
        this.bufferSize = i;
    }

    @Override // com.linecorp.armeria.common.stream.ByteStreamMessage
    public ByteStreamMessage range(long j, long j2) {
        Preconditions.checkArgument(j >= 0, "offset: %s (expected: >= 0)", j);
        Preconditions.checkArgument(j2 >= 0, "length: %s (expected: >= 0)", j2);
        Preconditions.checkState(this.subscribed == 0, "cannot specify range(%s, %s) once this stream is subscribed", j, j2);
        this.offset = j;
        this.length = j2;
        return this;
    }

    @Override // com.linecorp.armeria.common.stream.StreamMessage
    public boolean isOpen() {
        return !this.completionFuture.isDone();
    }

    @Override // com.linecorp.armeria.common.stream.StreamMessage
    public boolean isEmpty() {
        if (isOpen()) {
            return false;
        }
        InputStreamSubscription inputStreamSubscription = this.inputStreamSubscription;
        return inputStreamSubscription == null || !inputStreamSubscription.written;
    }

    @Override // com.linecorp.armeria.common.stream.StreamMessage
    public long demand() {
        InputStreamSubscription inputStreamSubscription = this.inputStreamSubscription;
        if (inputStreamSubscription != null) {
            return inputStreamSubscription.requested;
        }
        return 0L;
    }

    @Override // com.linecorp.armeria.common.stream.StreamMessage
    public CompletableFuture<Void> whenComplete() {
        return this.completionFuture;
    }

    @Override // com.linecorp.armeria.common.stream.StreamMessage
    public void subscribe(Subscriber<? super HttpData> subscriber, EventExecutor eventExecutor, SubscriptionOption... subscriptionOptionArr) {
        Objects.requireNonNull(subscriber, "subscriber");
        Objects.requireNonNull(eventExecutor, "executor");
        Objects.requireNonNull(subscriptionOptionArr, "options");
        if (!subscribedUpdater.compareAndSet(this, 0, 1)) {
            subscriber.onSubscribe(NoopSubscription.get());
            subscriber.onError(new IllegalStateException("Only single subscriber is allowed!"));
        } else if (eventExecutor.inEventLoop()) {
            subscribe0(subscriber, eventExecutor, subscriptionOptionArr);
        } else {
            eventExecutor.execute(() -> {
                subscribe0(subscriber, eventExecutor, subscriptionOptionArr);
            });
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v17, types: [java.util.concurrent.ExecutorService] */
    private void subscribe0(Subscriber<? super HttpData> subscriber, EventExecutor eventExecutor, SubscriptionOption... subscriptionOptionArr) {
        ScheduledExecutorService blockingTaskExecutor;
        if (this.blockingTaskExecutor != null) {
            blockingTaskExecutor = this.blockingTaskExecutor;
        } else {
            ServiceRequestContext currentOrNull = ServiceRequestContext.currentOrNull();
            blockingTaskExecutor = currentOrNull != null ? currentOrNull.blockingTaskExecutor() : CommonPools.blockingTaskExecutor();
        }
        InputStreamSubscription inputStreamSubscription = new InputStreamSubscription(subscriber, eventExecutor, blockingTaskExecutor, this.bufferSize, this.offset, this.length, InternalStreamMessageUtil.containsNotifyCancellation(subscriptionOptionArr));
        this.inputStreamSubscription = inputStreamSubscription;
        subscriber.onSubscribe(inputStreamSubscription);
        if (this.completionFuture.isCompletedExceptionally()) {
            this.completionFuture.exceptionally(th -> {
                inputStreamSubscription.close0(th);
                return null;
            });
        }
    }

    @Override // com.linecorp.armeria.common.stream.StreamMessage
    public void abort() {
        abort(AbortedStreamException.get());
    }

    @Override // com.linecorp.armeria.common.stream.StreamMessage
    public void abort(Throwable th) {
        Objects.requireNonNull(th, "cause");
        this.completionFuture.completeExceptionally(th);
        InputStreamSubscription inputStreamSubscription = this.inputStreamSubscription;
        if (inputStreamSubscription != null) {
            inputStreamSubscription.close(th);
        }
    }
}
