package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RequestResponseChannelClosedException;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.LockContainer;
import com.azure.messaging.servicebus.implementation.Messages;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.models.AbandonOptions;
import com.azure.messaging.servicebus.models.CompleteOptions;
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.DeferOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.temporal.TemporalAmount;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@ServiceClient(builder = ServiceBusClientBuilder.class, isAsync = true)
/* loaded from: input_file:com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.class */
public final class ServiceBusReceiverAsyncClient implements AutoCloseable {
    private static final String TRANSACTION_LINK_NAME = "coordinator";
    private final LockContainer<LockRenewalOperation> renewalContainer;
    private final AtomicBoolean isDisposed;
    private final LockContainer<OffsetDateTime> managementNodeLocks;
    private final String fullyQualifiedNamespace;
    private final String entityPath;
    private final MessagingEntityType entityType;
    private final ReceiverOptions receiverOptions;
    private final ServiceBusConnectionProcessor connectionProcessor;
    private final ServiceBusReceiverInstrumentation instrumentation;
    private final ServiceBusTracer tracer;
    private final MessageSerializer messageSerializer;
    private final Runnable onClientClose;
    private final ServiceBusSessionManager sessionManager;
    private final boolean isSessionEnabled;
    private final Semaphore completionLock;
    private final String identifier;
    private final AtomicLong lastPeekedSequenceNumber;
    private final AtomicReference<ServiceBusAsyncConsumer> consumer;
    private final AutoCloseable trackSettlementSequenceNumber;
    private static final DeadLetterOptions DEFAULT_DEAD_LETTER_OPTIONS = new DeadLetterOptions();
    private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReceiverAsyncClient.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusReceiverAsyncClient(String str, String str2, MessagingEntityType messagingEntityType, ReceiverOptions receiverOptions, ServiceBusConnectionProcessor serviceBusConnectionProcessor, Duration duration, ServiceBusReceiverInstrumentation serviceBusReceiverInstrumentation, MessageSerializer messageSerializer, Runnable runnable, String str3) {
        this.isDisposed = new AtomicBoolean();
        this.completionLock = new Semaphore(1);
        this.lastPeekedSequenceNumber = new AtomicLong(-1L);
        this.consumer = new AtomicReference<>();
        this.fullyQualifiedNamespace = (String) Objects.requireNonNull(str, "'fullyQualifiedNamespace' cannot be null.");
        this.entityPath = (String) Objects.requireNonNull(str2, "'entityPath' cannot be null.");
        this.entityType = (MessagingEntityType) Objects.requireNonNull(messagingEntityType, "'entityType' cannot be null.");
        this.receiverOptions = (ReceiverOptions) Objects.requireNonNull(receiverOptions, "'receiveOptions cannot be null.'");
        this.connectionProcessor = (ServiceBusConnectionProcessor) Objects.requireNonNull(serviceBusConnectionProcessor, "'connectionProcessor' cannot be null.");
        this.instrumentation = (ServiceBusReceiverInstrumentation) Objects.requireNonNull(serviceBusReceiverInstrumentation, "'tracer' cannot be null");
        this.messageSerializer = (MessageSerializer) Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.onClientClose = (Runnable) Objects.requireNonNull(runnable, "'onClientClose' cannot be null.");
        this.sessionManager = null;
        if (receiverOptions.getSessionId() != null || receiverOptions.getMaxConcurrentSessions() != null) {
            throw new IllegalStateException("Session-specific options are not expected to be present on a client for session unaware entity.");
        }
        this.isSessionEnabled = false;
        this.managementNodeLocks = new LockContainer<>(duration);
        this.renewalContainer = new LockContainer<>(Duration.ofMinutes(2L), lockRenewalOperation -> {
            LOGGER.atVerbose().addKeyValue(ServiceBusConstants.LOCK_TOKEN_KEY, lockRenewalOperation.getLockToken()).addKeyValue("status", lockRenewalOperation.getStatus()).log("Closing expired renewal operation.", new Object[]{lockRenewalOperation.getThrowable()});
            lockRenewalOperation.close();
        });
        this.identifier = str3;
        this.tracer = serviceBusReceiverInstrumentation.getTracer();
        this.trackSettlementSequenceNumber = serviceBusReceiverInstrumentation.startTrackingSettlementSequenceNumber();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusReceiverAsyncClient(String str, String str2, MessagingEntityType messagingEntityType, ReceiverOptions receiverOptions, ServiceBusConnectionProcessor serviceBusConnectionProcessor, Duration duration, ServiceBusReceiverInstrumentation serviceBusReceiverInstrumentation, MessageSerializer messageSerializer, Runnable runnable, ServiceBusSessionManager serviceBusSessionManager) {
        this.isDisposed = new AtomicBoolean();
        this.completionLock = new Semaphore(1);
        this.lastPeekedSequenceNumber = new AtomicLong(-1L);
        this.consumer = new AtomicReference<>();
        this.fullyQualifiedNamespace = (String) Objects.requireNonNull(str, "'fullyQualifiedNamespace' cannot be null.");
        this.entityPath = (String) Objects.requireNonNull(str2, "'entityPath' cannot be null.");
        this.entityType = (MessagingEntityType) Objects.requireNonNull(messagingEntityType, "'entityType' cannot be null.");
        this.receiverOptions = (ReceiverOptions) Objects.requireNonNull(receiverOptions, "'receiveOptions cannot be null.'");
        this.connectionProcessor = (ServiceBusConnectionProcessor) Objects.requireNonNull(serviceBusConnectionProcessor, "'connectionProcessor' cannot be null.");
        this.instrumentation = (ServiceBusReceiverInstrumentation) Objects.requireNonNull(serviceBusReceiverInstrumentation, "'tracer' cannot be null");
        this.messageSerializer = (MessageSerializer) Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.onClientClose = (Runnable) Objects.requireNonNull(runnable, "'onClientClose' cannot be null.");
        this.sessionManager = (ServiceBusSessionManager) Objects.requireNonNull(serviceBusSessionManager, "'sessionManager' cannot be null.");
        this.isSessionEnabled = true;
        this.managementNodeLocks = new LockContainer<>(duration);
        this.renewalContainer = new LockContainer<>(Duration.ofMinutes(2L), lockRenewalOperation -> {
            LOGGER.atInfo().addKeyValue(ServiceBusConstants.SESSION_ID_KEY, lockRenewalOperation.getSessionId()).addKeyValue("status", lockRenewalOperation.getStatus()).log("Closing expired renewal operation.", new Object[]{lockRenewalOperation.getThrowable()});
            lockRenewalOperation.close();
        });
        this.identifier = serviceBusSessionManager.getIdentifier();
        this.tracer = serviceBusReceiverInstrumentation.getTracer();
        this.trackSettlementSequenceNumber = serviceBusReceiverInstrumentation.startTrackingSettlementSequenceNumber();
    }

    public String getFullyQualifiedNamespace() {
        return this.fullyQualifiedNamespace;
    }

    public String getEntityPath() {
        return this.entityPath;
    }

    public String getSessionId() {
        return this.receiverOptions.getSessionId();
    }

    public String getIdentifier() {
        return this.identifier;
    }

    public Mono<Void> abandon(ServiceBusReceivedMessage serviceBusReceivedMessage) {
        return updateDisposition(serviceBusReceivedMessage, DispositionStatus.ABANDONED, null, null, null, null);
    }

    public Mono<Void> abandon(ServiceBusReceivedMessage serviceBusReceivedMessage, AbandonOptions abandonOptions) {
        return Objects.isNull(abandonOptions) ? FluxUtil.monoError(LOGGER, new NullPointerException("'settlementOptions' cannot be null.")) : (Objects.isNull(abandonOptions.getTransactionContext()) || !Objects.isNull(abandonOptions.getTransactionContext().getTransactionId())) ? updateDisposition(serviceBusReceivedMessage, DispositionStatus.ABANDONED, null, null, abandonOptions.getPropertiesToModify(), abandonOptions.getTransactionContext()) : FluxUtil.monoError(LOGGER, new NullPointerException("'options.transactionContext.transactionId' cannot be null."));
    }

    public Mono<Void> complete(ServiceBusReceivedMessage serviceBusReceivedMessage) {
        return updateDisposition(serviceBusReceivedMessage, DispositionStatus.COMPLETED, null, null, null, null);
    }

    public Mono<Void> complete(ServiceBusReceivedMessage serviceBusReceivedMessage, CompleteOptions completeOptions) {
        return Objects.isNull(completeOptions) ? FluxUtil.monoError(LOGGER, new NullPointerException("'options' cannot be null.")) : (Objects.isNull(completeOptions.getTransactionContext()) || !Objects.isNull(completeOptions.getTransactionContext().getTransactionId())) ? updateDisposition(serviceBusReceivedMessage, DispositionStatus.COMPLETED, null, null, null, completeOptions.getTransactionContext()) : FluxUtil.monoError(LOGGER, new NullPointerException("'options.transactionContext.transactionId' cannot be null."));
    }

    public Mono<Void> defer(ServiceBusReceivedMessage serviceBusReceivedMessage) {
        return updateDisposition(serviceBusReceivedMessage, DispositionStatus.DEFERRED, null, null, null, null);
    }

    public Mono<Void> defer(ServiceBusReceivedMessage serviceBusReceivedMessage, DeferOptions deferOptions) {
        return Objects.isNull(deferOptions) ? FluxUtil.monoError(LOGGER, new NullPointerException("'options' cannot be null.")) : (Objects.isNull(deferOptions.getTransactionContext()) || !Objects.isNull(deferOptions.getTransactionContext().getTransactionId())) ? updateDisposition(serviceBusReceivedMessage, DispositionStatus.DEFERRED, null, null, deferOptions.getPropertiesToModify(), deferOptions.getTransactionContext()) : FluxUtil.monoError(LOGGER, new NullPointerException("'options.transactionContext.transactionId' cannot be null."));
    }

    public Mono<Void> deadLetter(ServiceBusReceivedMessage serviceBusReceivedMessage) {
        return deadLetter(serviceBusReceivedMessage, DEFAULT_DEAD_LETTER_OPTIONS);
    }

    public Mono<Void> deadLetter(ServiceBusReceivedMessage serviceBusReceivedMessage, DeadLetterOptions deadLetterOptions) {
        return Objects.isNull(deadLetterOptions) ? FluxUtil.monoError(LOGGER, new NullPointerException("'options' cannot be null.")) : (Objects.isNull(deadLetterOptions.getTransactionContext()) || !Objects.isNull(deadLetterOptions.getTransactionContext().getTransactionId())) ? updateDisposition(serviceBusReceivedMessage, DispositionStatus.SUSPENDED, deadLetterOptions.getDeadLetterReason(), deadLetterOptions.getDeadLetterErrorDescription(), deadLetterOptions.getPropertiesToModify(), deadLetterOptions.getTransactionContext()) : FluxUtil.monoError(LOGGER, new NullPointerException("'options.transactionContext.transactionId' cannot be null."));
    }

    public Mono<byte[]> getSessionState() {
        return getSessionState(this.receiverOptions.getSessionId());
    }

    public Mono<ServiceBusReceivedMessage> peekMessage() {
        return peekMessage(this.receiverOptions.getSessionId());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<ServiceBusReceivedMessage> peekMessage(String str) {
        return this.isDisposed.get() ? FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peek"))) : this.tracer.traceManagementReceive("ServiceBus.peekMessage", this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        }).flatMap(serviceBusManagementNode -> {
            long j = this.lastPeekedSequenceNumber.get() + 1;
            LOGGER.atVerbose().addKeyValue(ServiceBusConstants.SEQUENCE_NUMBER_KEY, j).log("Peek message.");
            return serviceBusManagementNode.peek(j, str, getLinkName(str));
        }).onErrorMap(th -> {
            return mapError(th, ServiceBusErrorSource.RECEIVE);
        }).handle((serviceBusReceivedMessage, synchronousSink) -> {
            LOGGER.atVerbose().addKeyValue(ServiceBusConstants.SEQUENCE_NUMBER_KEY, this.lastPeekedSequenceNumber.updateAndGet(j -> {
                return Math.max(j, serviceBusReceivedMessage.getSequenceNumber());
            })).log("Updating last peeked sequence number.");
            synchronousSink.next(serviceBusReceivedMessage);
        }));
    }

    public Mono<ServiceBusReceivedMessage> peekMessage(long j) {
        return peekMessage(j, this.receiverOptions.getSessionId());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<ServiceBusReceivedMessage> peekMessage(long j, String str) {
        return this.isDisposed.get() ? FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peekAt"))) : this.tracer.traceManagementReceive("ServiceBus.peekMessage", this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        }).flatMap(serviceBusManagementNode -> {
            return serviceBusManagementNode.peek(j, str, getLinkName(str));
        }).onErrorMap(th -> {
            return mapError(th, ServiceBusErrorSource.RECEIVE);
        }));
    }

    public Flux<ServiceBusReceivedMessage> peekMessages(int i) {
        return this.tracer.traceSyncReceive("ServiceBus.peekMessages", peekMessages(i, this.receiverOptions.getSessionId()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<ServiceBusReceivedMessage> peekMessages(int i, String str) {
        return this.isDisposed.get() ? FluxUtil.fluxError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peekBatch"))) : i <= 0 ? FluxUtil.fluxError(LOGGER, new IllegalArgumentException("'maxMessages' is not positive.")) : this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        }).flatMapMany(serviceBusManagementNode -> {
            long j = this.lastPeekedSequenceNumber.get() + 1;
            LOGGER.atVerbose().addKeyValue(ServiceBusConstants.SEQUENCE_NUMBER_KEY, j).log("Peek batch.");
            return serviceBusManagementNode.peek(j, str, getLinkName(str), i).doOnNext(serviceBusReceivedMessage -> {
                LOGGER.atVerbose().addKeyValue(ServiceBusConstants.SEQUENCE_NUMBER_KEY, this.lastPeekedSequenceNumber.updateAndGet(j2 -> {
                    return Math.max(j2, serviceBusReceivedMessage.getSequenceNumber());
                })).log("Last peeked sequence number in batch.");
            });
        }).onErrorMap(th -> {
            return mapError(th, ServiceBusErrorSource.RECEIVE);
        });
    }

    public Flux<ServiceBusReceivedMessage> peekMessages(int i, long j) {
        return peekMessages(i, j, this.receiverOptions.getSessionId());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<ServiceBusReceivedMessage> peekMessages(int i, long j, String str) {
        return this.isDisposed.get() ? FluxUtil.fluxError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peekBatchAt"))) : i <= 0 ? FluxUtil.fluxError(LOGGER, new IllegalArgumentException("'maxMessages' is not positive.")) : this.tracer.traceSyncReceive("ServiceBus.peekMessages", this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        }).flatMapMany(serviceBusManagementNode -> {
            return serviceBusManagementNode.peek(j, str, getLinkName(str), i);
        }).onErrorMap(th -> {
            return mapError(th, ServiceBusErrorSource.RECEIVE);
        }));
    }

    public Flux<ServiceBusReceivedMessage> receiveMessages() {
        return this.isDisposed.get() ? FluxUtil.fluxError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "receiveMessages"))) : receiveMessagesNoBackPressure().limitRate(1, 0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<ServiceBusReceivedMessage> receiveMessagesNoBackPressure() {
        return receiveMessagesWithContext(0).handle((serviceBusMessageContext, synchronousSink) -> {
            if (serviceBusMessageContext.hasError()) {
                synchronousSink.error(serviceBusMessageContext.getThrowable());
            } else {
                synchronousSink.next(serviceBusMessageContext.getMessage());
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<ServiceBusMessageContext> receiveMessagesWithContext() {
        return receiveMessagesWithContext(1);
    }

    Flux<ServiceBusMessageContext> receiveMessagesWithContext(int i) {
        Flux fluxTrace = new FluxTrace(this.sessionManager != null ? this.sessionManager.receive() : getOrCreateConsumer().receive().map(ServiceBusMessageContext::new), this.instrumentation);
        Flux fluxAutoLockRenew = (this.isSessionEnabled || !this.receiverOptions.isAutoLockRenewEnabled()) ? fluxTrace : new FluxAutoLockRenew(fluxTrace, this.receiverOptions, this.renewalContainer, this::renewMessageLock, this.tracer);
        Flux fluxAutoComplete = this.receiverOptions.isEnableAutoComplete() ? new FluxAutoComplete(fluxAutoLockRenew, this.completionLock, serviceBusMessageContext -> {
            return serviceBusMessageContext.getMessage() != null ? complete(serviceBusMessageContext.getMessage()) : Mono.empty();
        }, serviceBusMessageContext2 -> {
            return serviceBusMessageContext2.getMessage() != null ? abandon(serviceBusMessageContext2.getMessage()) : Mono.empty();
        }) : fluxAutoLockRenew;
        if (i > 0) {
            fluxAutoComplete = fluxAutoComplete.limitRate(i, 0);
        }
        return fluxAutoComplete.onErrorMap(th -> {
            return mapError(th, ServiceBusErrorSource.RECEIVE);
        });
    }

    public Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long j) {
        return receiveDeferredMessage(j, this.receiverOptions.getSessionId());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long j, String str) {
        return this.isDisposed.get() ? FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "receiveDeferredMessage"))) : this.tracer.traceManagementReceive("ServiceBus.receiveDeferredMessage", this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        }).flatMap(serviceBusManagementNode -> {
            return serviceBusManagementNode.receiveDeferredMessages(this.receiverOptions.getReceiveMode(), str, getLinkName(str), Collections.singleton(Long.valueOf(j))).last();
        }).map(serviceBusReceivedMessage -> {
            if (CoreUtils.isNullOrEmpty(serviceBusReceivedMessage.getLockToken())) {
                return serviceBusReceivedMessage;
            }
            if (this.receiverOptions.getReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK) {
                serviceBusReceivedMessage.setLockedUntil(this.managementNodeLocks.addOrUpdate(serviceBusReceivedMessage.getLockToken(), serviceBusReceivedMessage.getLockedUntil(), serviceBusReceivedMessage.getLockedUntil()));
            }
            return serviceBusReceivedMessage;
        }).onErrorMap(th -> {
            return mapError(th, ServiceBusErrorSource.RECEIVE);
        }));
    }

    public Flux<ServiceBusReceivedMessage> receiveDeferredMessages(Iterable<Long> iterable) {
        return this.tracer.traceSyncReceive("ServiceBus.receiveDeferredMessages", receiveDeferredMessages(iterable, this.receiverOptions.getSessionId()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<ServiceBusReceivedMessage> receiveDeferredMessages(Iterable<Long> iterable, String str) {
        return this.isDisposed.get() ? FluxUtil.fluxError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "receiveDeferredMessageBatch"))) : iterable == null ? FluxUtil.fluxError(LOGGER, new NullPointerException("'sequenceNumbers' cannot be null")) : this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        }).flatMapMany(serviceBusManagementNode -> {
            return serviceBusManagementNode.receiveDeferredMessages(this.receiverOptions.getReceiveMode(), str, getLinkName(str), iterable);
        }).map(serviceBusReceivedMessage -> {
            if (CoreUtils.isNullOrEmpty(serviceBusReceivedMessage.getLockToken())) {
                return serviceBusReceivedMessage;
            }
            if (this.receiverOptions.getReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK) {
                serviceBusReceivedMessage.setLockedUntil(this.managementNodeLocks.addOrUpdate(serviceBusReceivedMessage.getLockToken(), serviceBusReceivedMessage.getLockedUntil(), serviceBusReceivedMessage.getLockedUntil()));
            }
            return serviceBusReceivedMessage;
        }).onErrorMap(th -> {
            return mapError(th, ServiceBusErrorSource.RECEIVE);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> release(ServiceBusReceivedMessage serviceBusReceivedMessage) {
        return updateDisposition(serviceBusReceivedMessage, DispositionStatus.RELEASED, null, null, null, null);
    }

    public Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage serviceBusReceivedMessage) {
        return this.isDisposed.get() ? FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "renewMessageLock"))) : this.isSessionEnabled ? FluxUtil.monoError(LOGGER, new IllegalStateException("Renewing message lock is an invalid operation when working with sessions.")) : Objects.isNull(serviceBusReceivedMessage) ? FluxUtil.monoError(LOGGER, new NullPointerException("'message' cannot be null.")) : Objects.isNull(serviceBusReceivedMessage.getLockToken()) ? FluxUtil.monoError(LOGGER, new NullPointerException("'message.getLockToken()' cannot be null.")) : serviceBusReceivedMessage.getLockToken().isEmpty() ? FluxUtil.monoError(LOGGER, new IllegalArgumentException("'message.getLockToken()' cannot be empty.")) : this.tracer.traceRenewMessageLock(renewMessageLock(serviceBusReceivedMessage.getLockToken()), serviceBusReceivedMessage).onErrorMap(th -> {
            return mapError(th, ServiceBusErrorSource.RENEW_LOCK);
        });
    }

    Mono<OffsetDateTime> renewMessageLock(String str) {
        return this.isDisposed.get() ? FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "renewMessageLock"))) : this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        }).flatMap(serviceBusManagementNode -> {
            return serviceBusManagementNode.renewMessageLock(str, getLinkName(null));
        }).map(offsetDateTime -> {
            return this.managementNodeLocks.addOrUpdate(str, offsetDateTime, offsetDateTime);
        });
    }

    public Mono<Void> renewMessageLock(ServiceBusReceivedMessage serviceBusReceivedMessage, Duration duration) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "getAutoRenewMessageLock")));
        }
        if (this.isSessionEnabled) {
            return FluxUtil.monoError(LOGGER, new IllegalStateException("Renewing message lock is an invalid operation when working with sessions."));
        }
        if (Objects.isNull(serviceBusReceivedMessage)) {
            return FluxUtil.monoError(LOGGER, new NullPointerException("'message' cannot be null."));
        }
        if (Objects.isNull(serviceBusReceivedMessage.getLockToken())) {
            return FluxUtil.monoError(LOGGER, new NullPointerException("'message.getLockToken()' cannot be null."));
        }
        if (serviceBusReceivedMessage.getLockToken().isEmpty()) {
            return FluxUtil.monoError(LOGGER, new IllegalArgumentException("'message.getLockToken()' cannot be empty."));
        }
        if (duration == null) {
            return FluxUtil.monoError(LOGGER, new NullPointerException("'maxLockRenewalDuration' cannot be null."));
        }
        if (duration.isNegative()) {
            return FluxUtil.monoError(LOGGER, new IllegalArgumentException("'maxLockRenewalDuration' cannot be negative."));
        }
        LockRenewalOperation lockRenewalOperation = new LockRenewalOperation(serviceBusReceivedMessage.getLockToken(), duration, false, str -> {
            return renewMessageLock(serviceBusReceivedMessage);
        });
        this.renewalContainer.addOrUpdate(serviceBusReceivedMessage.getLockToken(), OffsetDateTime.now().plus((TemporalAmount) duration), lockRenewalOperation);
        return lockRenewalOperation.getCompletionOperation().onErrorMap(th -> {
            return mapError(th, ServiceBusErrorSource.RENEW_LOCK);
        });
    }

    public Mono<OffsetDateTime> renewSessionLock() {
        return renewSessionLock(this.receiverOptions.getSessionId());
    }

    public Mono<Void> renewSessionLock(Duration duration) {
        return renewSessionLock(this.receiverOptions.getSessionId(), duration);
    }

    public Mono<Void> setSessionState(byte[] bArr) {
        return setSessionState(this.receiverOptions.getSessionId(), bArr);
    }

    public Mono<ServiceBusTransactionContext> createTransaction() {
        return this.isDisposed.get() ? FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "createTransaction"))) : this.tracer.traceMono("ServiceBus.commitTransaction", this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.createSession(TRANSACTION_LINK_NAME);
        }).flatMap((v0) -> {
            return v0.createTransaction();
        }).map(amqpTransaction -> {
            return new ServiceBusTransactionContext(amqpTransaction.getTransactionId());
        })).onErrorMap(th -> {
            return mapError(th, ServiceBusErrorSource.RECEIVE);
        });
    }

    public Mono<Void> commitTransaction(ServiceBusTransactionContext serviceBusTransactionContext) {
        return this.isDisposed.get() ? FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "commitTransaction"))) : Objects.isNull(serviceBusTransactionContext) ? FluxUtil.monoError(LOGGER, new NullPointerException("'transactionContext' cannot be null.")) : Objects.isNull(serviceBusTransactionContext.getTransactionId()) ? FluxUtil.monoError(LOGGER, new NullPointerException("'transactionContext.transactionId' cannot be null.")) : this.tracer.traceMono("ServiceBus.commitTransaction", this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.createSession(TRANSACTION_LINK_NAME);
        }).flatMap(amqpSession -> {
            return amqpSession.commitTransaction(new AmqpTransaction(serviceBusTransactionContext.getTransactionId()));
        })).onErrorMap(th -> {
            return mapError(th, ServiceBusErrorSource.RECEIVE);
        });
    }

    public Mono<Void> rollbackTransaction(ServiceBusTransactionContext serviceBusTransactionContext) {
        return this.isDisposed.get() ? FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "rollbackTransaction"))) : Objects.isNull(serviceBusTransactionContext) ? FluxUtil.monoError(LOGGER, new NullPointerException("'transactionContext' cannot be null.")) : Objects.isNull(serviceBusTransactionContext.getTransactionId()) ? FluxUtil.monoError(LOGGER, new NullPointerException("'transactionContext.transactionId' cannot be null.")) : this.tracer.traceMono("ServiceBus.rollbackTransaction", this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.createSession(TRANSACTION_LINK_NAME);
        }).flatMap(amqpSession -> {
            return amqpSession.rollbackTransaction(new AmqpTransaction(serviceBusTransactionContext.getTransactionId()));
        })).onErrorMap(th -> {
            return mapError(th, ServiceBusErrorSource.RECEIVE);
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.isDisposed.get()) {
            return;
        }
        try {
            if (!this.completionLock.tryAcquire(5L, TimeUnit.SECONDS)) {
                LOGGER.info("Unable to obtain completion lock.");
            }
        } catch (InterruptedException e) {
            LOGGER.info("Unable to obtain completion lock.", new Object[]{e});
        }
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        LOGGER.info("Removing receiver links.");
        ServiceBusAsyncConsumer andSet = this.consumer.getAndSet(null);
        if (andSet != null) {
            andSet.close();
        }
        if (this.sessionManager != null) {
            this.sessionManager.close();
        }
        this.managementNodeLocks.close();
        this.renewalContainer.close();
        if (this.trackSettlementSequenceNumber != null) {
            try {
                this.trackSettlementSequenceNumber.close();
            } catch (Exception e2) {
                LOGGER.info("Unable to close settlement sequence number subscription.", new Object[]{e2});
            }
        }
        this.onClientClose.run();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReceiverOptions getReceiverOptions() {
        return this.receiverOptions;
    }

    private boolean isManagementToken(String str) {
        return this.managementNodeLocks.containsUnexpired(str);
    }

    private Mono<Void> updateDisposition(ServiceBusReceivedMessage serviceBusReceivedMessage, DispositionStatus dispositionStatus, String str, String str2, Map<String, Object> map, ServiceBusTransactionContext serviceBusTransactionContext) {
        Mono then;
        if (this.isDisposed.get()) {
            return FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, dispositionStatus.getValue())));
        }
        if (Objects.isNull(serviceBusReceivedMessage)) {
            return FluxUtil.monoError(LOGGER, new NullPointerException("'message' cannot be null."));
        }
        String lockToken = serviceBusReceivedMessage.getLockToken();
        String sessionId = serviceBusReceivedMessage.getSessionId();
        if (this.receiverOptions.getReceiveMode() != ServiceBusReceiveMode.PEEK_LOCK) {
            return Mono.error(LOGGER.logExceptionAsError(new UnsupportedOperationException(String.format("'%s' is not supported on a receiver opened in ReceiveMode.RECEIVE_AND_DELETE.", dispositionStatus))));
        }
        if (serviceBusReceivedMessage.isSettled()) {
            return Mono.error(LOGGER.logExceptionAsError(new IllegalArgumentException("The message has either been deleted or already settled.")));
        }
        if (serviceBusReceivedMessage.getLockToken() == null) {
            return Mono.error(LOGGER.logExceptionAsError(new UnsupportedOperationException("This operation is not supported for peeked messages. Only messages received using receiveMessages() in PEEK_LOCK mode can be settled.")));
        }
        LOGGER.atVerbose().addKeyValue(ServiceBusConstants.LOCK_TOKEN_KEY, lockToken).addKeyValue("entityPath", this.entityPath).addKeyValue(ServiceBusConstants.SESSION_ID_KEY, (sessionId != null || CoreUtils.isNullOrEmpty(this.receiverOptions.getSessionId())) ? sessionId : this.receiverOptions.getSessionId()).addKeyValue(ServiceBusConstants.DISPOSITION_STATUS_KEY, dispositionStatus).log("Update started.");
        Mono then2 = this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        }).flatMap(serviceBusManagementNode -> {
            return serviceBusManagementNode.updateDisposition(lockToken, dispositionStatus, str, str2, map, sessionId, getLinkName(sessionId), serviceBusTransactionContext);
        }).then(Mono.fromRunnable(() -> {
            LOGGER.atInfo().addKeyValue(ServiceBusConstants.LOCK_TOKEN_KEY, lockToken).addKeyValue("entityPath", this.entityPath).addKeyValue(ServiceBusConstants.DISPOSITION_STATUS_KEY, dispositionStatus).log("Management node Update completed.");
            serviceBusReceivedMessage.setIsSettled();
            this.managementNodeLocks.remove(lockToken);
            this.renewalContainer.remove(lockToken);
        }));
        if (this.sessionManager != null) {
            then = this.sessionManager.updateDisposition(lockToken, sessionId, dispositionStatus, map, str, str2, serviceBusTransactionContext).flatMap(bool -> {
                if (!bool.booleanValue()) {
                    LOGGER.info("Could not perform on session manger. Performing on management node.");
                    return then2;
                }
                serviceBusReceivedMessage.setIsSettled();
                this.renewalContainer.remove(lockToken);
                return Mono.empty();
            });
        } else {
            ServiceBusAsyncConsumer serviceBusAsyncConsumer = this.consumer.get();
            then = (isManagementToken(lockToken) || serviceBusAsyncConsumer == null) ? then2 : serviceBusAsyncConsumer.updateDisposition(lockToken, dispositionStatus, str, str2, map, serviceBusTransactionContext).then(Mono.fromRunnable(() -> {
                LOGGER.atVerbose().addKeyValue(ServiceBusConstants.LOCK_TOKEN_KEY, lockToken).addKeyValue("entityPath", this.entityPath).addKeyValue(ServiceBusConstants.DISPOSITION_STATUS_KEY, dispositionStatus).log("Update completed.");
                serviceBusReceivedMessage.setIsSettled();
                this.renewalContainer.remove(lockToken);
            }));
        }
        return this.instrumentation.instrumentSettlement(then, serviceBusReceivedMessage, serviceBusReceivedMessage.getContext(), dispositionStatus).onErrorMap(th -> {
            if (th instanceof ServiceBusException) {
                return th;
            }
            switch (dispositionStatus) {
                case COMPLETED:
                    return new ServiceBusException(th, ServiceBusErrorSource.COMPLETE);
                case ABANDONED:
                    return new ServiceBusException(th, ServiceBusErrorSource.ABANDON);
                default:
                    return new ServiceBusException(th, ServiceBusErrorSource.UNKNOWN);
            }
        });
    }

    private ServiceBusAsyncConsumer getOrCreateConsumer() {
        if (this.isSessionEnabled) {
            throw LOGGER.logExceptionAsError(new IllegalStateException("The ServiceBusAsyncConsumer is expected to work only with session unaware entity."));
        }
        ServiceBusAsyncConsumer serviceBusAsyncConsumer = this.consumer.get();
        if (serviceBusAsyncConsumer != null) {
            return serviceBusAsyncConsumer;
        }
        String randomString = StringUtil.getRandomString(this.entityPath);
        LOGGER.atInfo().addKeyValue("linkName", randomString).addKeyValue("entityPath", this.entityPath).log("Creating consumer.");
        ServiceBusAsyncConsumer serviceBusAsyncConsumer2 = new ServiceBusAsyncConsumer(randomString, RetryUtil.withRetry(this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.createReceiveLink(randomString, this.entityPath, this.receiverOptions.getReceiveMode(), null, this.entityType, this.identifier);
        }).doOnNext(serviceBusReceiveLink -> {
            LOGGER.atVerbose().addKeyValue("linkName", randomString).addKeyValue("entityPath", serviceBusReceiveLink.getEntityPath()).addKeyValue("mode", this.receiverOptions.getReceiveMode()).addKeyValue("isSessionEnabled", false).addKeyValue(ServiceBusConstants.ENTITY_TYPE_KEY, this.entityType).log("Created consumer for Service Bus resource.");
        }).onErrorMap(RequestResponseChannelClosedException.class, requestResponseChannelClosedException -> {
            return new AmqpException(true, requestResponseChannelClosedException.getMessage(), requestResponseChannelClosedException, (AmqpErrorContext) null);
        }), this.connectionProcessor.getRetryOptions(), "Failed to create receive link " + randomString, true).repeat().filter(serviceBusReceiveLink2 -> {
            return !serviceBusReceiveLink2.isDisposed();
        }).subscribeWith(new ServiceBusReceiveLinkProcessor(this.receiverOptions.getPrefetchCount(), RetryUtil.getRetryPolicy(this.connectionProcessor.getRetryOptions()))), this.messageSerializer, this.receiverOptions);
        if (this.consumer.compareAndSet(null, serviceBusAsyncConsumer2)) {
            return serviceBusAsyncConsumer2;
        }
        serviceBusAsyncConsumer2.close();
        return this.consumer.get();
    }

    private String getLinkName(String str) {
        if (!CoreUtils.isNullOrEmpty(str)) {
            if (this.isSessionEnabled) {
                return this.sessionManager.getLinkName(str);
            }
            return null;
        }
        ServiceBusAsyncConsumer serviceBusAsyncConsumer = this.consumer.get();
        if (serviceBusAsyncConsumer != null) {
            return serviceBusAsyncConsumer.getLinkName();
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<OffsetDateTime> renewSessionLock(String str) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "renewSessionLock")));
        }
        if (!this.isSessionEnabled) {
            return FluxUtil.monoError(LOGGER, new IllegalStateException("Cannot renew session lock on a non-session receiver."));
        }
        String linkName = this.sessionManager.getLinkName(str);
        return this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        }).flatMap(serviceBusManagementNode -> {
            return this.tracer.traceMono("ServiceBus.renewSessionLock", serviceBusManagementNode.renewSessionLock(str, linkName));
        }).onErrorMap(th -> {
            return mapError(th, ServiceBusErrorSource.RENEW_LOCK);
        });
    }

    Mono<Void> renewSessionLock(String str, Duration duration) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "renewSessionLock")));
        }
        if (!this.isSessionEnabled) {
            return FluxUtil.monoError(LOGGER, new IllegalStateException("Cannot renew session lock on a non-session receiver."));
        }
        if (duration == null) {
            return FluxUtil.monoError(LOGGER, new NullPointerException("'maxLockRenewalDuration' cannot be null."));
        }
        if (duration.isNegative()) {
            return FluxUtil.monoError(LOGGER, new IllegalArgumentException("'maxLockRenewalDuration' cannot be negative."));
        }
        if (Objects.isNull(str)) {
            return FluxUtil.monoError(LOGGER, new NullPointerException("'sessionId' cannot be null."));
        }
        if (str.isEmpty()) {
            return FluxUtil.monoError(LOGGER, new IllegalArgumentException("'sessionId' cannot be empty."));
        }
        LockRenewalOperation lockRenewalOperation = new LockRenewalOperation(str, duration, true, this::renewSessionLock);
        this.renewalContainer.addOrUpdate(str, OffsetDateTime.now().plus((TemporalAmount) duration), lockRenewalOperation);
        return lockRenewalOperation.getCompletionOperation();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> setSessionState(String str, byte[] bArr) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "setSessionState")));
        }
        if (!this.isSessionEnabled) {
            return FluxUtil.monoError(LOGGER, new IllegalStateException("Cannot set session state on a non-session receiver."));
        }
        String linkName = this.sessionManager != null ? this.sessionManager.getLinkName(str) : null;
        return this.tracer.traceMono("ServiceBus.setSessionState", this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
            return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
        }).flatMap(serviceBusManagementNode -> {
            return serviceBusManagementNode.setSessionState(str, bArr, linkName);
        })).onErrorMap(th -> {
            return mapError(th, ServiceBusErrorSource.RECEIVE);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<byte[]> getSessionState(String str) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError(LOGGER, new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "getSessionState")));
        }
        if (this.isSessionEnabled) {
            return this.tracer.traceMono("ServiceBus.setSessionState", this.sessionManager != null ? this.sessionManager.getSessionState(str) : this.connectionProcessor.flatMap(serviceBusAmqpConnection -> {
                return serviceBusAmqpConnection.getManagementNode(this.entityPath, this.entityType);
            }).flatMap(serviceBusManagementNode -> {
                return serviceBusManagementNode.getSessionState(str, getLinkName(str));
            })).onErrorMap(th -> {
                return mapError(th, ServiceBusErrorSource.RECEIVE);
            });
        }
        return FluxUtil.monoError(LOGGER, new IllegalStateException("Cannot get session state on a non-session receiver."));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusReceiverInstrumentation getInstrumentation() {
        return this.instrumentation;
    }

    private Throwable mapError(Throwable th, ServiceBusErrorSource serviceBusErrorSource) {
        return !(th instanceof ServiceBusException) ? new ServiceBusException(th, serviceBusErrorSource) : th;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isConnectionClosed() {
        return this.connectionProcessor.isChannelClosed();
    }

    boolean isManagementNodeLocksClosed() {
        return this.managementNodeLocks.isClosed();
    }

    boolean isRenewalContainerClosed() {
        return this.renewalContainer.isClosed();
    }
}
