package com.azure.messaging.servicebus;

import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

/* loaded from: input_file:com/azure/messaging/servicebus/ServiceBusProcessor.class */
final class ServiceBusProcessor {
    private final Object lock;
    private final Kind kind;
    private final ServiceBusClientBuilder.ServiceBusReceiverClientBuilder nonSessionBuilder;
    private final ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionBuilder;
    private final Consumer<ServiceBusReceivedMessageContext> processMessage;
    private final Consumer<ServiceBusErrorContext> processError;
    private final int concurrency;
    private final Boolean enableAutoDisposition;
    private boolean isRunning;
    private RollingMessagePump rollingMessagePump;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/azure/messaging/servicebus/ServiceBusProcessor$Kind.class */
    public enum Kind {
        NON_SESSION,
        SESSION
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/azure/messaging/servicebus/ServiceBusProcessor$RollingMessagePump.class */
    public static final class RollingMessagePump extends AtomicBoolean {
        private static final RuntimeException DISPOSED_ERROR = new RuntimeException("The Processor closure disposed the RollingMessagePump.");
        private static final Duration NEXT_PUMP_BACKOFF = Duration.ofSeconds(5);
        private final ClientLogger logger;
        private final Kind kind;
        private final ServiceBusClientBuilder.ServiceBusReceiverClientBuilder nonSessionBuilder;
        private final ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionBuilder;
        private final int concurrency;
        private final Consumer<ServiceBusReceivedMessageContext> processMessage;
        private final Consumer<ServiceBusErrorContext> processError;
        private final Boolean enableAutoDisposition;
        private final Disposable.Composite disposable;
        private final AtomicReference<String> clientIdentifier;

        RollingMessagePump(ServiceBusClientBuilder.ServiceBusReceiverClientBuilder serviceBusReceiverClientBuilder, Consumer<ServiceBusReceivedMessageContext> consumer, Consumer<ServiceBusErrorContext> consumer2, int i, boolean z) {
            this.disposable = Disposables.composite();
            this.clientIdentifier = new AtomicReference<>();
            this.logger = new ClientLogger(RollingMessagePump.class);
            this.kind = Kind.NON_SESSION;
            this.nonSessionBuilder = serviceBusReceiverClientBuilder;
            this.sessionBuilder = null;
            this.concurrency = i;
            this.processError = consumer2;
            this.processMessage = consumer;
            this.enableAutoDisposition = Boolean.valueOf(z);
        }

        RollingMessagePump(ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder serviceBusSessionReceiverClientBuilder, Consumer<ServiceBusReceivedMessageContext> consumer, Consumer<ServiceBusErrorContext> consumer2, int i) {
            this.disposable = Disposables.composite();
            this.clientIdentifier = new AtomicReference<>();
            this.logger = new ClientLogger(RollingMessagePump.class);
            this.kind = Kind.SESSION;
            this.sessionBuilder = serviceBusSessionReceiverClientBuilder;
            this.nonSessionBuilder = null;
            this.processError = consumer2;
            this.processMessage = consumer;
            this.concurrency = i;
            this.enableAutoDisposition = null;
        }

        void begin() {
            if (getAndSet(true)) {
                throw this.logger.atInfo().log(new IllegalStateException("The streaming cannot begin more than once."));
            }
            if (!this.disposable.add(beginIntern().subscribe())) {
                throw this.logger.atInfo().log(new IllegalStateException("Cannot begin streaming after the disposal."));
            }
        }

        Mono<Void> beginIntern() {
            return (this.kind == Kind.NON_SESSION ? Mono.using(() -> {
                return this.nonSessionBuilder.buildAsyncClientForProcessor();
            }, serviceBusReceiverAsyncClient -> {
                this.clientIdentifier.set(serviceBusReceiverAsyncClient.getIdentifier());
                return new MessagePump(serviceBusReceiverAsyncClient, this.processMessage, this.processError, this.concurrency, this.enableAutoDisposition.booleanValue()).begin();
            }, serviceBusReceiverAsyncClient2 -> {
                serviceBusReceiverAsyncClient2.close();
            }, true) : Mono.using(() -> {
                return this.sessionBuilder.buildPumpForProcessor(this.logger, this.processMessage, this.processError, this.concurrency);
            }, sessionsMessagePump -> {
                this.clientIdentifier.set(sessionsMessagePump.getIdentifier());
                return sessionsMessagePump.begin();
            }, sessionsMessagePump2 -> {
            }, true)).onErrorResume(MessagePumpTerminatedException.class, messagePumpTerminatedException -> {
                return notifyError(messagePumpTerminatedException).then(Mono.error(messagePumpTerminatedException));
            }).retryWhen(retrySpecForNextPump());
        }

        String getClientIdentifier() {
            return this.clientIdentifier.get();
        }

        void dispose() {
            this.disposable.dispose();
        }

        private Mono<Void> notifyError(MessagePumpTerminatedException messagePumpTerminatedException) {
            ServiceBusErrorContext errorContext = messagePumpTerminatedException.getErrorContext();
            return errorContext == null ? Mono.empty() : Mono.fromRunnable(() -> {
                try {
                    this.processError.accept(errorContext);
                } catch (Exception e) {
                    this.logger.atVerbose().log("Ignoring error from user processError handler.", new Object[]{e});
                }
            }).subscribeOn(Schedulers.boundedElastic());
        }

        private Retry retrySpecForNextPump() {
            return Retry.from(flux -> {
                return flux.concatMap(retrySignal -> {
                    Throwable failure = retrySignal.copy().failure();
                    if (failure == null) {
                        return FluxUtil.monoError(this.logger, new IllegalStateException("RetrySignal::failure() not expected to be null."));
                    }
                    if (!(failure instanceof MessagePumpTerminatedException)) {
                        return FluxUtil.monoError(this.logger, new IllegalStateException("RetrySignal::failure() expected to be MessagePumpTerminatedException.", failure));
                    }
                    MessagePumpTerminatedException messagePumpTerminatedException = (MessagePumpTerminatedException) failure;
                    if (this.disposable.isDisposed()) {
                        messagePumpTerminatedException.log(this.logger, "The Processor closure disposed the streaming, canceling retry for the next MessagePump.", true);
                        return Mono.error(DISPOSED_ERROR);
                    }
                    messagePumpTerminatedException.log(this.logger, "The current MessagePump is terminated, scheduling retry for the next pump.", true);
                    return Mono.delay(NEXT_PUMP_BACKOFF, Schedulers.boundedElastic()).handle((l, synchronousSink) -> {
                        if (this.disposable.isDisposed()) {
                            messagePumpTerminatedException.log(this.logger, "During backoff, The Processor closure disposed the streaming, canceling retry for the next MessagePump.", false);
                            synchronousSink.error(DISPOSED_ERROR);
                        } else {
                            messagePumpTerminatedException.log(this.logger, "Retrying for the next MessagePump.", false);
                            synchronousSink.next(l);
                        }
                    });
                });
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusProcessor(ServiceBusClientBuilder.ServiceBusReceiverClientBuilder serviceBusReceiverClientBuilder, Consumer<ServiceBusReceivedMessageContext> consumer, Consumer<ServiceBusErrorContext> consumer2, int i, boolean z) {
        this.lock = new Object();
        this.kind = Kind.NON_SESSION;
        this.nonSessionBuilder = serviceBusReceiverClientBuilder;
        this.sessionBuilder = null;
        this.processError = consumer2;
        this.processMessage = consumer;
        this.concurrency = i;
        this.enableAutoDisposition = Boolean.valueOf(z);
        synchronized (this.lock) {
            this.isRunning = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusProcessor(ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder serviceBusSessionReceiverClientBuilder, Consumer<ServiceBusReceivedMessageContext> consumer, Consumer<ServiceBusErrorContext> consumer2, int i) {
        this.lock = new Object();
        this.kind = Kind.SESSION;
        this.sessionBuilder = serviceBusSessionReceiverClientBuilder;
        this.nonSessionBuilder = null;
        this.processError = consumer2;
        this.processMessage = consumer;
        this.concurrency = i;
        this.enableAutoDisposition = null;
        synchronized (this.lock) {
            this.isRunning = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        synchronized (this.lock) {
            if (this.isRunning) {
                return;
            }
            this.isRunning = true;
            if (this.kind == Kind.NON_SESSION) {
                this.rollingMessagePump = new RollingMessagePump(this.nonSessionBuilder, this.processMessage, this.processError, this.concurrency, this.enableAutoDisposition.booleanValue());
            } else {
                this.rollingMessagePump = new RollingMessagePump(this.sessionBuilder, this.processMessage, this.processError, this.concurrency);
            }
            this.rollingMessagePump.begin();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isRunning() {
        boolean z;
        synchronized (this.lock) {
            z = this.isRunning;
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        synchronized (this.lock) {
            if (this.isRunning) {
                this.isRunning = false;
                this.rollingMessagePump.dispose();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getIdentifier() {
        RollingMessagePump rollingMessagePump;
        synchronized (this.lock) {
            rollingMessagePump = this.rollingMessagePump;
        }
        if (rollingMessagePump == null) {
            return null;
        }
        return rollingMessagePump.getClientIdentifier();
    }
}
