/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.LinkErrorContext;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AsyncAutoCloseable;
import com.azure.core.util.logging.ClientLogger;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;

public class AmqpReceiveLinkProcessor
extends FluxProcessor<AmqpReceiveLink, Message>
implements Subscription {
    private final ClientLogger logger = new ClientLogger(AmqpReceiveLinkProcessor.class);
    private final Object lock = new Object();
    private final AtomicBoolean isTerminated = new AtomicBoolean();
    private final AtomicInteger retryAttempts = new AtomicInteger();
    private final Deque<Message> messageQueue = new ConcurrentLinkedDeque<Message>();
    private final Object creditsAdded = new Object();
    private final AtomicReference<CoreSubscriber<? super Message>> downstream = new AtomicReference();
    private final AtomicInteger wip = new AtomicInteger();
    private final int prefetch;
    private final String entityPath;
    private final Disposable parentConnection;
    private final int maxQueueSize;
    private volatile Throwable lastError;
    private volatile boolean isCancelled;
    private volatile AmqpReceiveLink currentLink;
    private volatile String currentLinkName;
    private volatile Disposable currentLinkSubscriptions;
    private volatile Subscription upstream;
    private static final AtomicReferenceFieldUpdater<AmqpReceiveLinkProcessor, Subscription> UPSTREAM = AtomicReferenceFieldUpdater.newUpdater(AmqpReceiveLinkProcessor.class, Subscription.class, "upstream");
    private volatile long requested;
    private static final AtomicLongFieldUpdater<AmqpReceiveLinkProcessor> REQUESTED = AtomicLongFieldUpdater.newUpdater(AmqpReceiveLinkProcessor.class, "requested");

    public AmqpReceiveLinkProcessor(String entityPath, int prefetch, Disposable parentConnection) {
        this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        this.parentConnection = Objects.requireNonNull(parentConnection, "'parentConnection' cannot be null.");
        if (prefetch < 0) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalArgumentException("'prefetch' cannot be less than 0."));
        }
        this.prefetch = prefetch;
        this.maxQueueSize = prefetch * 2;
    }

    public Throwable getError() {
        return this.lastError;
    }

    public boolean isTerminated() {
        return this.isTerminated.get() || this.isCancelled;
    }

    public void onSubscribe(Subscription subscription) {
        Objects.requireNonNull(subscription, "'subscription' cannot be null");
        this.logger.info("Setting new subscription for receive link processor");
        if (!Operators.setOnce(UPSTREAM, (Object)((Object)this), (Subscription)subscription)) {
            throw this.logger.logExceptionAsError((RuntimeException)new IllegalStateException("Cannot set upstream twice."));
        }
        this.requestUpstream();
    }

    public int getPrefetch() {
        return this.prefetch;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onNext(AmqpReceiveLink next) {
        Disposable oldSubscription;
        AmqpReceiveLink oldChannel;
        Objects.requireNonNull(next, "'next' cannot be null.");
        if (this.isTerminated()) {
            this.logger.warning("linkName[{}] entityPath[{}]. Got another link when we have already terminated processor.", new Object[]{next.getLinkName(), next.getEntityPath()});
            Operators.onNextDropped((Object)next, (Context)this.currentContext());
            return;
        }
        String linkName = next.getLinkName();
        this.logger.info("linkName[{}] entityPath[{}]. Setting next AMQP receive link.", new Object[]{linkName, this.entityPath});
        Object object = this.lock;
        synchronized (object) {
            oldChannel = this.currentLink;
            oldSubscription = this.currentLinkSubscriptions;
            this.currentLink = next;
            this.currentLinkName = next.getLinkName();
            next.setEmptyCreditListener(this::getCreditsToAdd);
            this.currentLinkSubscriptions = Disposables.composite((Disposable[])new Disposable[]{next.getEndpointStates().filter(e -> e == AmqpEndpointState.ACTIVE).next().flatMap(state -> {
                int creditsToAdd = this.getCreditsToAdd();
                int total = Math.max(this.prefetch, creditsToAdd);
                this.logger.verbose("linkName[{}] prefetch[{}] creditsToAdd[{}] Adding initial credits.", new Object[]{linkName, this.prefetch, creditsToAdd});
                return next.addCredits(total);
            }).onErrorResume(IllegalStateException.class, error -> {
                this.logger.info("linkName[{}] was already closed. Could not add credits.", new Object[]{linkName});
                return Mono.empty();
            }).subscribe(), next.getEndpointStates().subscribeOn(Schedulers.boundedElastic()).subscribe(state -> {
                if (state == AmqpEndpointState.ACTIVE) {
                    this.logger.info("linkName[{}] credits[{}] is active.", new Object[]{linkName, next.getCredits()});
                    this.retryAttempts.set(0);
                }
            }, error -> {
                AmqpException amqpException;
                if (error instanceof AmqpException && (amqpException = (AmqpException)error).getErrorCondition() == AmqpErrorCondition.LINK_STOLEN && amqpException.getContext() != null && amqpException.getContext() instanceof LinkErrorContext) {
                    LinkErrorContext errorContext = (LinkErrorContext)amqpException.getContext();
                    if (this.currentLink != null && !this.currentLink.getLinkName().equals(errorContext.getTrackingId())) {
                        this.logger.info("linkName[{}] entityPath[{}] trackingId[{}] Link lost signal received for a link that is not current. Ignoring the error.", new Object[]{linkName, this.entityPath, errorContext.getTrackingId()});
                        return;
                    }
                }
                this.currentLink = null;
                this.onError((Throwable)error);
            }, () -> {
                if (this.parentConnection.isDisposed() || this.isTerminated() || UPSTREAM.get(this) == Operators.cancelledSubscription()) {
                    this.logger.info("linkName[{}] entityPath[{}] Terminal state reached. Disposing of link processor.", new Object[]{linkName, this.entityPath});
                    this.dispose();
                } else {
                    this.logger.info("linkName[{}] entityPath[{}] Receive link endpoint states are closed. Requesting another.", new Object[]{linkName, this.entityPath});
                    AmqpReceiveLink existing = this.currentLink;
                    this.currentLink = null;
                    this.currentLinkName = null;
                    this.disposeReceiver(existing);
                    this.requestUpstream();
                }
            }), next.receive().onBackpressureBuffer(this.maxQueueSize, BufferOverflowStrategy.ERROR).subscribe(message -> {
                this.messageQueue.add((Message)message);
                this.drain();
            })});
        }
        this.disposeReceiver(oldChannel);
        if (oldSubscription != null) {
            oldSubscription.dispose();
        }
    }

    public void subscribe(CoreSubscriber<? super Message> actual) {
        boolean terminateSubscriber;
        Objects.requireNonNull(actual, "'actual' cannot be null.");
        boolean bl = terminateSubscriber = this.isTerminated() || this.currentLink == null && this.upstream == Operators.cancelledSubscription();
        if (this.isTerminated()) {
            this.logger.info("linkName[{}] entityPath[{}]. AmqpReceiveLink is already terminated.", new Object[]{this.currentLinkName, this.entityPath});
        } else if (this.currentLink == null && this.upstream == Operators.cancelledSubscription()) {
            this.logger.info("There is no current link and upstream is terminated.");
        }
        if (terminateSubscriber) {
            actual.onSubscribe(Operators.emptySubscription());
            if (this.hasError()) {
                actual.onError(this.lastError);
            } else {
                actual.onComplete();
            }
            return;
        }
        if (this.downstream.compareAndSet(null, actual)) {
            actual.onSubscribe((Subscription)this);
            this.drain();
        } else {
            Operators.error(actual, (Throwable)this.logger.logExceptionAsError((RuntimeException)new IllegalStateException("There is already one downstream subscriber.'")));
        }
    }

    public void onError(Throwable throwable) {
        Objects.requireNonNull(throwable, "'throwable' is required.");
        this.logger.info("linkName[{}] Error on receive link.", new Object[]{this.currentLinkName, throwable});
        if (this.isTerminated() || this.isCancelled) {
            this.logger.info("linkName[{}] AmqpReceiveLinkProcessor is terminated. Cannot process another error.", new Object[]{this.currentLinkName, throwable});
            Operators.onErrorDropped((Throwable)throwable, (Context)this.currentContext());
            return;
        }
        if (this.parentConnection.isDisposed()) {
            this.logger.info("linkName[{}] Parent connection is disposed. Not reopening on error.", new Object[]{this.currentLinkName});
        }
        this.lastError = throwable;
        this.isTerminated.set(true);
        CoreSubscriber<? super Message> subscriber = this.downstream.get();
        if (subscriber != null) {
            subscriber.onError(throwable);
        }
        this.onDispose();
    }

    public void onComplete() {
        this.logger.info("linkName[{}] Receive link completed from upstream.", new Object[]{this.currentLinkName});
        UPSTREAM.set(this, Operators.cancelledSubscription());
    }

    public void dispose() {
        if (this.isTerminated.getAndSet(true)) {
            return;
        }
        this.logger.info("linkName[{}] Disposing receive link.", new Object[]{this.currentLinkName});
        this.drain();
        this.onDispose();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void request(long request) {
        if (!Operators.validate((long)request)) {
            this.logger.warning("Invalid request: {}", new Object[]{request});
            return;
        }
        Operators.addCap(REQUESTED, (Object)((Object)this), (long)request);
        Object object = this.creditsAdded;
        synchronized (object) {
            AmqpReceiveLink link = this.currentLink;
            int credits = this.getCreditsToAdd();
            this.logger.verbose("linkName[{}] entityPath[{}] request[{}] credits[{}] Backpressure request from downstream.", new Object[]{this.currentLinkName, this.entityPath, request, credits});
            if (link != null) {
                link.addCredits(credits).onErrorResume(IllegalStateException.class, error -> {
                    this.logger.info("linkName[{}] was already closed. Could not add credits.", new Object[]{link.getLinkName()});
                    return Mono.empty();
                }).subscribe();
            } else {
                this.logger.verbose("entityPath[{}] credits[{}] totalRequest[{}] totalSent[{}] totalCredits[{}] There is no link to add credits to, yet.", new Object[]{this.entityPath, credits});
            }
        }
        this.drain();
    }

    public void cancel() {
        if (this.isCancelled) {
            return;
        }
        this.isCancelled = true;
        this.drain();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void requestUpstream() {
        if (this.isTerminated()) {
            this.logger.info("Processor is terminated. Not requesting another link.");
            return;
        }
        if (UPSTREAM.get(this) == null) {
            this.logger.info("There is no upstream. Not requesting another link.");
            return;
        }
        if (UPSTREAM.get(this) == Operators.cancelledSubscription()) {
            this.logger.info("Upstream is cancelled or complete. Not requesting another link.");
            return;
        }
        Object object = this.lock;
        synchronized (object) {
            if (this.currentLink != null) {
                this.logger.info("Current link exists. Not requesting another link.");
                return;
            }
        }
        this.logger.info("Requesting a new AmqpReceiveLink from upstream.");
        UPSTREAM.get(this).request(1L);
    }

    private void onDispose() {
        this.disposeReceiver(this.currentLink);
        this.currentLink = null;
        this.currentLinkName = null;
        if (this.currentLinkSubscriptions != null) {
            this.currentLinkSubscriptions.dispose();
        }
        Operators.onDiscardQueueWithClear(this.messageQueue, (Context)this.currentContext(), null);
    }

    private void drain() {
        if (this.wip.getAndIncrement() != 0) {
            return;
        }
        int missed = 1;
        while (missed != 0) {
            this.drainQueue();
            missed = this.wip.addAndGet(-missed);
        }
    }

    private void drainQueue() {
        CoreSubscriber<? super Message> subscriber = this.downstream.get();
        if (subscriber == null || this.checkAndSetTerminated()) {
            return;
        }
        long numberRequested = REQUESTED.get(this);
        boolean isEmpty = this.messageQueue.isEmpty();
        while (numberRequested != 0L && !isEmpty && !this.checkAndSetTerminated()) {
            Message message;
            long numberEmitted;
            for (numberEmitted = 0L; !(numberRequested == numberEmitted || isEmpty && this.checkAndSetTerminated() || (message = this.messageQueue.poll()) == null); ++numberEmitted) {
                if (this.isCancelled) {
                    Operators.onDiscard((Object)message, (Context)subscriber.currentContext());
                    Operators.onDiscardQueueWithClear(this.messageQueue, (Context)subscriber.currentContext(), null);
                    return;
                }
                try {
                    subscriber.onNext((Object)message);
                }
                catch (Exception e) {
                    this.logger.error("Exception occurred while handling downstream onNext operation.", new Object[]{e});
                    throw this.logger.logExceptionAsError(Exceptions.propagate((Throwable)Operators.onOperatorError((Subscription)this.upstream, (Throwable)e, (Object)message, (Context)subscriber.currentContext())));
                }
                isEmpty = this.messageQueue.isEmpty();
            }
            long requestedMessages = REQUESTED.get(this);
            if (requestedMessages == Long.MAX_VALUE) continue;
            numberRequested = REQUESTED.addAndGet(this, -numberEmitted);
        }
    }

    private boolean checkAndSetTerminated() {
        if (!this.isTerminated()) {
            return false;
        }
        CoreSubscriber<? super Message> subscriber = this.downstream.get();
        Throwable error = this.lastError;
        if (error != null) {
            subscriber.onError(error);
        } else {
            subscriber.onComplete();
        }
        this.disposeReceiver(this.currentLink);
        this.messageQueue.clear();
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int getCreditsToAdd() {
        Object object = this.creditsAdded;
        synchronized (object) {
            CoreSubscriber<? super Message> subscriber = this.downstream.get();
            long request = REQUESTED.get(this);
            int credits = subscriber == null || request == 0L ? 0 : (request == Long.MAX_VALUE ? 1 : Long.valueOf(request).intValue());
            return credits;
        }
    }

    private void disposeReceiver(AmqpReceiveLink link) {
        if (link == null) {
            return;
        }
        try {
            if (link instanceof AsyncAutoCloseable) {
                ((AsyncAutoCloseable)link).closeAsync().subscribe();
            } else {
                link.dispose();
            }
        }
        catch (Exception error) {
            this.logger.warning("linkName[{}] entityPath[{}] Unable to dispose of link.", new Object[]{link.getLinkName(), link.getEntityPath(), error});
        }
    }
}

