package com.linecorp.armeria.common.stream;

import com.google.common.collect.ImmutableList;
import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.common.Flags;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.stream.AbstractStreamMessage;
import com.linecorp.armeria.common.stream.AbstractStreamMessageAndWriter;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ImmediateEventExecutor;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nullable;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linecorp/armeria/common/stream/EventLoopStreamMessage.class */
public class EventLoopStreamMessage<T> extends AbstractStreamMessageAndWriter<T> {
    private static final ConcurrentHashMap<List<StackTraceElement>, Boolean> UNEXPECTED_EVENT_LOOP_STACK_TRACES = new ConcurrentHashMap<>();
    private static final AtomicIntegerFieldUpdater<EventLoopStreamMessage> abortedUpdater = AtomicIntegerFieldUpdater.newUpdater(EventLoopStreamMessage.class, "aborted");
    private static final AtomicIntegerFieldUpdater<EventLoopStreamMessage> subscribedUpdater = AtomicIntegerFieldUpdater.newUpdater(EventLoopStreamMessage.class, "subscribed");
    private static final Logger logger = LoggerFactory.getLogger(EventLoopStreamMessage.class);
    private final EventLoop eventLoop;
    private final Queue<Object> queue;

    @Nullable
    private AbstractStreamMessage.SubscriptionImpl subscription;
    private long demand;
    private boolean invokedOnSubscribe;
    private boolean inOnNext;
    private AbstractStreamMessageAndWriter.State state;
    private volatile int subscribed;
    private volatile int aborted;
    private volatile boolean isOpen;
    private volatile boolean wroteAny;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linecorp/armeria/common/stream/EventLoopStreamMessage$UnexpectedEventLoopException.class */
    public static class UnexpectedEventLoopException extends RuntimeException {
        private static final long serialVersionUID = 5610415039321743416L;

        private UnexpectedEventLoopException() {
        }
    }

    public EventLoopStreamMessage() {
        this((EventLoop) RequestContext.mapCurrent((v0) -> {
            return v0.eventLoop();
        }, () -> {
            UnexpectedEventLoopException unexpectedEventLoopException = new UnexpectedEventLoopException();
            UNEXPECTED_EVENT_LOOP_STACK_TRACES.computeIfAbsent(ImmutableList.copyOf(unexpectedEventLoopException.getStackTrace()), list -> {
                logger.warn("Creating EventLoopStreamMessage without specifying EventLoop. This will be very slow if writer or subscriber run in a different EventLoop.", unexpectedEventLoopException);
                return true;
            });
            return CommonPools.workerGroup().next();
        }));
    }

    public EventLoopStreamMessage(EventLoop eventLoop) {
        this.state = AbstractStreamMessageAndWriter.State.OPEN;
        this.isOpen = true;
        this.eventLoop = (EventLoop) Objects.requireNonNull(eventLoop, "eventLoop");
        this.queue = new ArrayDeque();
    }

    @Override // com.linecorp.armeria.common.stream.StreamMessage
    public boolean isOpen() {
        return this.isOpen;
    }

    @Override // com.linecorp.armeria.common.stream.StreamMessage
    public boolean isEmpty() {
        return (isOpen() || this.wroteAny) ? false : true;
    }

    @Override // com.linecorp.armeria.common.stream.AbstractStreamMessage
    /* renamed from: defaultSubscriberExecutor */
    protected EventExecutor mo235defaultSubscriberExecutor() {
        return this.eventLoop;
    }

    @Override // com.linecorp.armeria.common.stream.AbstractStreamMessage
    void subscribe(AbstractStreamMessage.SubscriptionImpl subscriptionImpl) {
        Subscriber<Object> subscriber = subscriptionImpl.subscriber();
        if (!subscribedUpdater.compareAndSet(this, 0, 1)) {
            this.eventLoop.execute(() -> {
                failLateSubscriber(this.subscription, subscriber);
            });
        } else if (this.eventLoop.inEventLoop()) {
            doSubscribe(subscriptionImpl);
        } else {
            this.eventLoop.execute(() -> {
                doSubscribe(subscriptionImpl);
            });
        }
    }

    @Override // com.linecorp.armeria.common.stream.StreamWriter
    public void close() {
        if (this.eventLoop.inEventLoop()) {
            doClose(null);
        } else {
            this.eventLoop.execute(() -> {
                doClose(null);
            });
        }
    }

    @Override // com.linecorp.armeria.common.stream.StreamWriter
    public void close(Throwable th) {
        Objects.requireNonNull(th, "cause");
        if (th instanceof CancelledSubscriptionException) {
            throw new IllegalArgumentException("cause: " + th + " (must use Subscription.cancel())");
        }
        if (this.eventLoop.inEventLoop()) {
            doClose(th);
        } else {
            this.eventLoop.execute(() -> {
                doClose(th);
            });
        }
    }

    @Override // com.linecorp.armeria.common.stream.StreamMessage
    public void abort() {
        if (abortedUpdater.compareAndSet(this, 0, 1)) {
            this.isOpen = false;
            if (!subscribedUpdater.compareAndSet(this, 0, 1)) {
                cancelOrAbort(false);
            } else if (!this.eventLoop.inEventLoop()) {
                this.eventLoop.execute(() -> {
                    doSetAbortedSubscription();
                    doCancelOrAbort(false);
                });
            } else {
                doSetAbortedSubscription();
                doCancelOrAbort(false);
            }
        }
    }

    @Override // com.linecorp.armeria.common.stream.AbstractStreamMessage
    long demand() {
        return this.demand;
    }

    @Override // com.linecorp.armeria.common.stream.AbstractStreamMessage
    void request(long j) {
        if (this.eventLoop.inEventLoop()) {
            doRequest(j);
        } else {
            this.eventLoop.execute(() -> {
                doRequest(j);
            });
        }
    }

    @Override // com.linecorp.armeria.common.stream.AbstractStreamMessage
    void cancel() {
        cancelOrAbort(true);
    }

    @Override // com.linecorp.armeria.common.stream.AbstractStreamMessage
    void notifySubscriberOfCloseEvent(AbstractStreamMessage.SubscriptionImpl subscriptionImpl, AbstractStreamMessage.CloseEvent closeEvent) {
        if (!subscriptionImpl.needsDirectInvocation()) {
            subscriptionImpl.executor().execute(() -> {
                try {
                    closeEvent.notifySubscriber(subscriptionImpl, completionFuture());
                } finally {
                    subscriptionImpl.clearSubscriber();
                    this.eventLoop.execute(this::cleanup);
                }
            });
            return;
        }
        try {
            closeEvent.notifySubscriber(subscriptionImpl, completionFuture());
        } finally {
            subscriptionImpl.clearSubscriber();
            cleanup();
        }
    }

    @Override // com.linecorp.armeria.common.stream.AbstractStreamMessageAndWriter
    void addObject(T t) {
        this.wroteAny = true;
        if (this.eventLoop.inEventLoop()) {
            doAddObject(t);
        } else {
            this.eventLoop.execute(() -> {
                doAddObject(t);
            });
        }
    }

    @Override // com.linecorp.armeria.common.stream.AbstractStreamMessageAndWriter
    void addObjectOrEvent(Object obj) {
        if (this.eventLoop.inEventLoop()) {
            doAddObjectOrEvent(obj);
        } else {
            this.eventLoop.execute(() -> {
                doAddObjectOrEvent(obj);
            });
        }
    }

    private void doClose(@Nullable Throwable th) {
        if (this.state != AbstractStreamMessageAndWriter.State.OPEN) {
            return;
        }
        doSetState(AbstractStreamMessageAndWriter.State.CLOSED);
        doAddObjectOrEvent(th == null ? SUCCESSFUL_CLOSE : new AbstractStreamMessage.CloseEvent(th));
    }

    private void doSetState(AbstractStreamMessageAndWriter.State state) {
        this.state = state;
        this.isOpen = false;
    }

    private void doSubscribe(AbstractStreamMessage.SubscriptionImpl subscriptionImpl) {
        this.subscription = subscriptionImpl;
        if (!subscriptionImpl.needsDirectInvocation()) {
            subscriptionImpl.executor().execute(() -> {
                subscriptionImpl.subscriber().onSubscribe(subscriptionImpl);
                this.eventLoop.execute(() -> {
                    this.invokedOnSubscribe = true;
                });
            });
        } else {
            this.invokedOnSubscribe = true;
            subscriptionImpl.subscriber().onSubscribe(subscriptionImpl);
        }
    }

    private void doRequest(long j) {
        long j2 = this.demand;
        if (j2 >= Long.MAX_VALUE - j) {
            this.demand = Long.MAX_VALUE;
        } else {
            this.demand = j2 + j;
        }
        if (j2 == 0) {
            doNotifySubscriberIfNotEmpty();
        }
    }

    private void doAddObject(T t) {
        if (!this.queue.isEmpty() || this.demand <= 0 || this.inOnNext) {
            doAddObjectOrEvent(t);
            return;
        }
        AbstractStreamMessage.SubscriptionImpl subscriptionImpl = this.subscription;
        this.demand--;
        doNotifySubscriberOfObject(subscriptionImpl, t);
    }

    private void doAddObjectOrEvent(Object obj) {
        this.queue.add(obj);
        if (this.subscription != null) {
            doNotifySubscriber(this.subscription);
        }
    }

    private void doNotifySubscriberIfNotEmpty() {
        AbstractStreamMessage.SubscriptionImpl subscriptionImpl = this.subscription;
        if (subscriptionImpl == null || this.queue.isEmpty()) {
            return;
        }
        doNotifySubscriber(subscriptionImpl);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void doNotifySubscriber(AbstractStreamMessage.SubscriptionImpl subscriptionImpl) {
        if (this.inOnNext) {
            return;
        }
        if (!this.invokedOnSubscribe) {
            this.eventLoop.execute(() -> {
                doNotifySubscriber(subscriptionImpl);
            });
            return;
        }
        while (this.state != AbstractStreamMessageAndWriter.State.CLEANUP) {
            Object peek = this.queue.peek();
            if (peek == null) {
                return;
            }
            if (peek instanceof AbstractStreamMessage.CloseEvent) {
                doHandleCloseEvent(subscriptionImpl, (AbstractStreamMessage.CloseEvent) this.queue.remove());
                return;
            } else {
                if (this.demand == 0) {
                    return;
                }
                if (peek instanceof AbstractStreamMessageAndWriter.AwaitDemandFuture) {
                    ((AbstractStreamMessageAndWriter.AwaitDemandFuture) this.queue.remove()).complete(null);
                } else {
                    this.demand--;
                    doNotifySubscriberOfObject(subscriptionImpl, this.queue.remove());
                }
            }
        }
        cleanup();
    }

    private void doNotifySubscriberOfObject(AbstractStreamMessage.SubscriptionImpl subscriptionImpl, T t) {
        Subscriber<Object> subscriber = subscriptionImpl.subscriber();
        T prepareObjectForNotification = prepareObjectForNotification(subscriptionImpl, t);
        if (!subscriptionImpl.needsDirectInvocation()) {
            subscriptionImpl.executor().execute(() -> {
                subscriber.onNext(prepareObjectForNotification);
            });
            return;
        }
        this.inOnNext = true;
        try {
            subscriber.onNext(prepareObjectForNotification);
            this.inOnNext = false;
        } catch (Throwable th) {
            this.inOnNext = false;
            throw th;
        }
    }

    private void doHandleCloseEvent(AbstractStreamMessage.SubscriptionImpl subscriptionImpl, AbstractStreamMessage.CloseEvent closeEvent) {
        if (!this.invokedOnSubscribe) {
            this.eventLoop.execute(() -> {
                doHandleCloseEvent(subscriptionImpl, closeEvent);
            });
        } else {
            doSetState(AbstractStreamMessageAndWriter.State.CLEANUP);
            notifySubscriberOfCloseEvent(subscriptionImpl, closeEvent);
        }
    }

    private void cancelOrAbort(boolean z) {
        if (this.eventLoop.inEventLoop()) {
            doCancelOrAbort(z);
        } else {
            this.eventLoop.execute(() -> {
                doCancelOrAbort(z);
            });
        }
    }

    private void doCancelOrAbort(boolean z) {
        AbstractStreamMessage.CloseEvent closeEvent;
        if (this.state == AbstractStreamMessageAndWriter.State.OPEN) {
            doSetState(AbstractStreamMessageAndWriter.State.CLEANUP);
            if (z) {
                closeEvent = Flags.verboseExceptions() ? new AbstractStreamMessage.CloseEvent(CancelledSubscriptionException.get()) : CANCELLED_CLOSE;
            } else {
                closeEvent = Flags.verboseExceptions() ? new AbstractStreamMessage.CloseEvent(AbortedStreamException.get()) : ABORTED_CLOSE;
            }
            doAddObjectOrEvent(closeEvent);
            return;
        }
        switch (this.state) {
            case CLOSED:
                doSetState(AbstractStreamMessageAndWriter.State.CLEANUP);
                cleanup();
                return;
            case CLEANUP:
                return;
            default:
                throw new Error();
        }
    }

    private void doSetAbortedSubscription() {
        this.subscription = new AbstractStreamMessage.SubscriptionImpl(this, AbortingSubscriber.get(), ImmediateEventExecutor.INSTANCE, false);
        this.invokedOnSubscribe = true;
    }

    private void cleanup() {
        cleanupQueue(this.subscription, this.queue);
    }

    @Override // com.linecorp.armeria.common.stream.AbstractStreamMessageAndWriter, com.linecorp.armeria.common.stream.StreamWriter
    public /* bridge */ /* synthetic */ CompletableFuture onDemand(Runnable runnable) {
        return super.onDemand(runnable);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.linecorp.armeria.common.stream.AbstractStreamMessageAndWriter, com.linecorp.armeria.common.stream.StreamWriter
    public /* bridge */ /* synthetic */ boolean tryWrite(Object obj) {
        return super.tryWrite((EventLoopStreamMessage<T>) obj);
    }
}
