package com.linecorp.armeria.common.stream;

import com.linecorp.armeria.common.util.CompositeException;
import com.linecorp.armeria.common.util.EventLoopCheckingFuture;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.internal.common.util.PooledObjects;
import com.linecorp.armeria.internal.shaded.futures.CompletableFutures;
import com.linecorp.armeria.internal.shaded.guava.base.MoreObjects;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/linecorp/armeria/common/stream/AbstractStreamMessage.class */
public abstract class AbstractStreamMessage<T> implements StreamMessage<T> {
    static final Logger logger = LoggerFactory.getLogger(AbstractStreamMessage.class);
    static final CloseEvent SUCCESSFUL_CLOSE = new CloseEvent(null);
    static final CloseEvent CANCELLED_CLOSE = new CloseEvent(CancelledSubscriptionException.INSTANCE);
    static final CloseEvent ABORTED_CLOSE = new CloseEvent(AbortedStreamException.INSTANCE);
    private final CompletableFuture<Void> completionFuture = new EventLoopCheckingFuture();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linecorp/armeria/common/stream/AbstractStreamMessage$CloseEvent.class */
    public static final class CloseEvent {

        @Nullable
        final Throwable cause;

        /* JADX INFO: Access modifiers changed from: package-private */
        public CloseEvent(@Nullable Throwable th) {
            this.cause = th;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void notifySubscriber(SubscriptionImpl subscriptionImpl, CompletableFuture<?> completableFuture) {
            if (completableFuture.isDone()) {
                return;
            }
            Subscriber<Object> subscriber = subscriptionImpl.subscriber();
            Throwable th = this.cause;
            if (th == null && subscriptionImpl.cancelRequested()) {
                th = CancelledSubscriptionException.get();
            }
            if (th == null) {
                try {
                    subscriber.onComplete();
                    completableFuture.complete(null);
                    return;
                } catch (Throwable th2) {
                    completableFuture.completeExceptionally(th2);
                    Exceptions.throwIfFatal(th2);
                    AbstractStreamMessage.logger.warn("Subscriber.onComplete() should not raise an exception. subscriber: {}", subscriber, th2);
                    return;
                }
            }
            try {
                if (subscriptionImpl.notifyCancellation || !(th instanceof CancelledSubscriptionException)) {
                    subscriber.onError(th);
                }
                completableFuture.completeExceptionally(th);
            } catch (Throwable th3) {
                CompositeException compositeException = new CompositeException(th3, th);
                completableFuture.completeExceptionally(compositeException);
                Exceptions.throwIfFatal(th3);
                AbstractStreamMessage.logger.warn("Subscriber.onError() should not raise an exception. subscriber: {}", subscriber, compositeException);
            }
        }

        public String toString() {
            return this.cause == null ? "CloseEvent" : "CloseEvent(" + this.cause + ')';
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linecorp/armeria/common/stream/AbstractStreamMessage$SubscriptionImpl.class */
    public static final class SubscriptionImpl implements Subscription {
        private final AbstractStreamMessage<?> publisher;
        private Subscriber<Object> subscriber;
        private final EventExecutor executor;
        private final boolean withPooledObjects;
        private final boolean notifyCancellation;
        private volatile boolean cancelRequested;

        /* JADX INFO: Access modifiers changed from: package-private */
        public SubscriptionImpl(AbstractStreamMessage<?> abstractStreamMessage, Subscriber<?> subscriber, EventExecutor eventExecutor, boolean z, boolean z2) {
            this.publisher = abstractStreamMessage;
            this.subscriber = subscriber;
            this.executor = eventExecutor;
            this.withPooledObjects = z;
            this.notifyCancellation = z2;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Subscriber<Object> subscriber() {
            return this.subscriber;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void clearSubscriber() {
            if (this.subscriber instanceof AbortingSubscriber) {
                return;
            }
            this.subscriber = NeverInvokedSubscriber.get();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public EventExecutor executor() {
            return this.executor;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean withPooledObjects() {
            return this.withPooledObjects;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean notifyCancellation() {
            return this.notifyCancellation;
        }

        boolean cancelRequested() {
            return this.cancelRequested;
        }

        public void request(long j) {
            if (j <= 0) {
                this.publisher.abort(new IllegalArgumentException("n: " + j + " (expected: > 0, see Reactive Streams specification rule 3.9)"));
            } else {
                this.publisher.request(j);
            }
        }

        public void cancel() {
            this.cancelRequested = true;
            this.publisher.cancel();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean needsDirectInvocation() {
            return this.executor.inEventLoop();
        }

        public String toString() {
            return MoreObjects.toStringHelper((Class<?>) Subscription.class).add("publisher", this.publisher).add("demand", this.publisher.demand()).add("executor", this.executor).toString();
        }
    }

    @Override // com.linecorp.armeria.common.stream.StreamMessage
    public final void subscribe(Subscriber<? super T> subscriber, EventExecutor eventExecutor) {
        subscribe(subscriber, eventExecutor, false, false);
    }

    @Override // com.linecorp.armeria.common.stream.StreamMessage
    public final void subscribe(Subscriber<? super T> subscriber, EventExecutor eventExecutor, SubscriptionOption... subscriptionOptionArr) {
        Objects.requireNonNull(subscriptionOptionArr, "options");
        subscribe(subscriber, eventExecutor, StreamMessageUtil.containsWithPooledObjects(subscriptionOptionArr), StreamMessageUtil.containsNotifyCancellation(subscriptionOptionArr));
    }

    private void subscribe(Subscriber<? super T> subscriber, EventExecutor eventExecutor, boolean z, boolean z2) {
        Objects.requireNonNull(subscriber, "subscriber");
        Objects.requireNonNull(eventExecutor, "executor");
        SubscriptionImpl subscriptionImpl = new SubscriptionImpl(this, subscriber, eventExecutor, z, z2);
        SubscriptionImpl subscribe = subscribe(subscriptionImpl);
        if (subscribe != subscriptionImpl) {
            failLateSubscriber(subscribe, subscriber);
        }
    }

    abstract SubscriptionImpl subscribe(SubscriptionImpl subscriptionImpl);

    @Override // com.linecorp.armeria.common.stream.StreamMessage
    public final CompletableFuture<List<T>> drainAll(EventExecutor eventExecutor) {
        return drainAll(eventExecutor, false);
    }

    @Override // com.linecorp.armeria.common.stream.StreamMessage
    public final CompletableFuture<List<T>> drainAll(EventExecutor eventExecutor, SubscriptionOption... subscriptionOptionArr) {
        Objects.requireNonNull(subscriptionOptionArr, "options");
        return drainAll(eventExecutor, StreamMessageUtil.containsWithPooledObjects(subscriptionOptionArr));
    }

    private CompletableFuture<List<T>> drainAll(EventExecutor eventExecutor, boolean z) {
        Objects.requireNonNull(eventExecutor, "executor");
        StreamMessageDrainer streamMessageDrainer = new StreamMessageDrainer(z);
        SubscriptionImpl subscriptionImpl = new SubscriptionImpl(this, streamMessageDrainer, eventExecutor, z, false);
        SubscriptionImpl subscribe = subscribe(subscriptionImpl);
        return subscribe != subscriptionImpl ? CompletableFutures.exceptionallyCompletedFuture(StreamMessageUtil.abortedOrLate(subscribe.subscriber())) : streamMessageDrainer.future();
    }

    @Override // com.linecorp.armeria.common.stream.StreamMessage
    public final CompletableFuture<Void> whenComplete() {
        return this.completionFuture;
    }

    abstract long demand();

    abstract void request(long j);

    abstract void cancel();

    /* JADX INFO: Access modifiers changed from: protected */
    public void onRemoval(T t) {
    }

    static void failLateSubscriber(SubscriptionImpl subscriptionImpl, Subscriber<?> subscriber) {
        Throwable abortedOrLate = StreamMessageUtil.abortedOrLate(subscriptionImpl.subscriber());
        if (subscriptionImpl.needsDirectInvocation()) {
            handleLateSubscriber(subscriber, abortedOrLate);
        } else {
            subscriptionImpl.executor().execute(() -> {
                handleLateSubscriber(subscriber, abortedOrLate);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void handleLateSubscriber(Subscriber<?> subscriber, Throwable th) {
        try {
            subscriber.onSubscribe(NoopSubscription.INSTANCE);
            subscriber.onError(th);
        } catch (Throwable th2) {
            Exceptions.throwIfFatal(th2);
            logger.warn("Subscriber should not throw an exception. subscriber: {}", subscriber, th2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public T prepareObjectForNotification(SubscriptionImpl subscriptionImpl, T t) {
        ReferenceCountUtil.touch(t);
        onRemoval(t);
        if (!subscriptionImpl.withPooledObjects()) {
            t = PooledObjects.toUnpooled(t);
        }
        return t;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static CloseEvent newCloseEvent(Throwable th) {
        return th == CancelledSubscriptionException.INSTANCE ? CANCELLED_CLOSE : th == AbortedStreamException.INSTANCE ? ABORTED_CLOSE : new CloseEvent(th);
    }
}
