package com.microsoft.azure.servicebus.primitives;

import com.microsoft.azure.servicebus.amqp.DispatchHandler;
import com.microsoft.azure.servicebus.amqp.IAmqpReceiver;
import com.microsoft.azure.servicebus.amqp.ReceiveLinkHandler;
import com.microsoft.azure.servicebus.amqp.SessionHandler;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.class */
public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver, IErrorContextProvider {
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CoreMessageReceiver.class);
    private static final Duration LINK_REOPEN_TIMEOUT = Duration.ofMinutes(5);
    private static final Duration RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(1);
    private static final Duration UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(500);
    private static final Duration ZERO_TIMEOUT_APPROXIMATION = Duration.ofMillis(200);
    private static final int CREDIT_FLOW_BATCH_SIZE = 50;
    private final Object requestResonseLinkCreationLock;
    private final ConcurrentLinkedQueue<ReceiveWorkItem> pendingReceives;
    private final ConcurrentHashMap<String, UpdateStateWorkItem> pendingUpdateStateRequests;
    private final ConcurrentHashMap<String, Delivery> tagsToDeliveriesMap;
    private final MessagingFactory underlyingFactory;
    private final String receivePath;
    private final String sasTokenAudienceURI;
    private final Duration operationTimeout;
    private final CompletableFuture<Void> linkClose;
    private final Object prefetchCountSync;
    private final SettleModePair settleModePair;
    private final RetryPolicy retryPolicy;
    private int prefetchCount;
    private String sessionId;
    private boolean isSessionReceiver;
    private boolean isBrowsableSession;
    private Instant sessionLockedUntilUtc;
    private boolean isSessionLockLost;
    private ConcurrentLinkedQueue<MessageWithDeliveryTag> prefetchedMessages;
    private Receiver receiveLink;
    private RequestResponseLink requestResponseLink;
    private WorkItem<CoreMessageReceiver> linkOpen;
    private Exception lastKnownLinkError;
    private Instant lastKnownErrorReportedAt;
    private final AtomicInteger creditToFlow;
    private final AtomicInteger creditNeededtoServePendingReceives;
    private final AtomicInteger currentPrefetechedMessagesCount;
    private ScheduledFuture<?> sasTokenRenewTimerFuture;
    private CompletableFuture<Void> requestResponseLinkCreationFuture;
    private CompletableFuture<Void> receiveLinkReopenFuture;
    private CompletableFuture<Void> ensureLinkReopenFutureToWaitOn;
    private final Runnable timedOutUpdateStateRequestsDaemon;
    private final Runnable returnMesagesLoopDaemon;
    private final ScheduledFuture<?> updateStateRequestsTimeoutChecker;
    private final ScheduledFuture<?> returnMessagesLoopRunner;
    private final MessagingEntityType entityType;
    private boolean shouldRetryLinkReopenOnTransientFailure;

    private CoreMessageReceiver(MessagingFactory messagingFactory, String str, String str2, String str3, int i, SettleModePair settleModePair, MessagingEntityType messagingEntityType) {
        super(str);
        this.requestResonseLinkCreationLock = new Object();
        this.shouldRetryLinkReopenOnTransientFailure = true;
        this.underlyingFactory = messagingFactory;
        this.operationTimeout = messagingFactory.getOperationTimeout();
        this.receivePath = str2;
        this.sasTokenAudienceURI = String.format("amqp://%s/%s", messagingFactory.getHostName(), str2);
        this.sessionId = str3;
        this.isSessionReceiver = false;
        this.isBrowsableSession = false;
        this.prefetchCount = i;
        this.settleModePair = settleModePair;
        this.prefetchedMessages = new ConcurrentLinkedQueue<>();
        this.linkClose = new CompletableFuture<>();
        this.lastKnownLinkError = null;
        this.prefetchCountSync = new Object();
        this.retryPolicy = messagingFactory.getRetryPolicy();
        this.pendingReceives = new ConcurrentLinkedQueue<>();
        this.pendingUpdateStateRequests = new ConcurrentHashMap<>();
        this.tagsToDeliveriesMap = new ConcurrentHashMap<>();
        this.lastKnownErrorReportedAt = Instant.now();
        this.receiveLinkReopenFuture = null;
        this.creditToFlow = new AtomicInteger();
        this.creditNeededtoServePendingReceives = new AtomicInteger();
        this.currentPrefetechedMessagesCount = new AtomicInteger();
        this.entityType = messagingEntityType;
        this.timedOutUpdateStateRequestsDaemon = new Runnable() { // from class: com.microsoft.azure.servicebus.primitives.CoreMessageReceiver.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    CoreMessageReceiver.TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to complete timed out update state requests.", CoreMessageReceiver.this.receivePath);
                    for (Map.Entry entry : CoreMessageReceiver.this.pendingUpdateStateRequests.entrySet()) {
                        Duration remaining = ((UpdateStateWorkItem) entry.getValue()).getTimeoutTracker().remaining();
                        if (remaining.isZero() || remaining.isNegative()) {
                            CoreMessageReceiver.this.pendingUpdateStateRequests.remove(entry.getKey());
                            Exception lastKnownException = ((UpdateStateWorkItem) entry.getValue()).getLastKnownException();
                            if (lastKnownException == null) {
                                lastKnownException = new TimeoutException("Request timed out.");
                            }
                            CoreMessageReceiver.TRACE_LOGGER.error("UpdateState request timed out. Delivery:{}", entry.getKey(), lastKnownException);
                            AsyncUtil.completeFutureExceptionally(((UpdateStateWorkItem) entry.getValue()).getWork(), lastKnownException);
                        }
                    }
                    CoreMessageReceiver.TRACE_LOGGER.trace("'{}' core message receiver's internal loop to complete timed out update state requests stopped.", CoreMessageReceiver.this.receivePath);
                } catch (Throwable th) {
                }
            }
        };
        this.returnMesagesLoopDaemon = new Runnable() { // from class: com.microsoft.azure.servicebus.primitives.CoreMessageReceiver.2
            @Override // java.lang.Runnable
            public void run() {
                ReceiveWorkItem receiveWorkItem;
                try {
                    CoreMessageReceiver.TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to return messages to waiting clients.", CoreMessageReceiver.this.receivePath);
                    while (!CoreMessageReceiver.this.prefetchedMessages.isEmpty() && (receiveWorkItem = (ReceiveWorkItem) CoreMessageReceiver.this.pendingReceives.poll()) != null) {
                        if (!receiveWorkItem.getWork().isDone()) {
                            CoreMessageReceiver.TRACE_LOGGER.debug("Returning the message received from '{}' to a pending receive request", CoreMessageReceiver.this.receivePath);
                            receiveWorkItem.cancelTimeoutTask(false);
                            List receiveCore = CoreMessageReceiver.this.receiveCore(receiveWorkItem.getMaxMessageCount());
                            CoreMessageReceiver.this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount());
                            AsyncUtil.completeFuture(receiveWorkItem.getWork(), receiveCore);
                        }
                    }
                    CoreMessageReceiver.TRACE_LOGGER.trace("'{}' core message receiver's internal loop to return messages to waiting clients stopped.", CoreMessageReceiver.this.receivePath);
                } catch (Throwable th) {
                }
            }
        };
        this.updateStateRequestsTimeoutChecker = Timer.schedule(this.timedOutUpdateStateRequestsDaemon, UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun);
        this.returnMessagesLoopRunner = Timer.schedule(this.returnMesagesLoopDaemon, RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun);
    }

    @Deprecated
    public static CompletableFuture<CoreMessageReceiver> create(MessagingFactory messagingFactory, String str, String str2, int i, SettleModePair settleModePair) {
        return create(messagingFactory, str, str2, i, settleModePair, null);
    }

    @Deprecated
    public static CompletableFuture<CoreMessageReceiver> create(MessagingFactory messagingFactory, String str, String str2, String str3, boolean z, int i, SettleModePair settleModePair) {
        return create(messagingFactory, str, str2, str3, z, i, settleModePair, null);
    }

    public static CompletableFuture<CoreMessageReceiver> create(MessagingFactory messagingFactory, String str, String str2, int i, SettleModePair settleModePair, MessagingEntityType messagingEntityType) {
        TRACE_LOGGER.info("Creating core message receiver to '{}'", str2);
        return new CoreMessageReceiver(messagingFactory, str, str2, null, i, settleModePair, messagingEntityType).createLink();
    }

    public static CompletableFuture<CoreMessageReceiver> create(MessagingFactory messagingFactory, String str, String str2, String str3, boolean z, int i, SettleModePair settleModePair, MessagingEntityType messagingEntityType) {
        TRACE_LOGGER.info("Creating core session receiver to '{}', sessionId '{}', browseonly session '{}'", new Object[]{str2, str3, Boolean.valueOf(z)});
        CoreMessageReceiver coreMessageReceiver = new CoreMessageReceiver(messagingFactory, str, str2, str3, i, settleModePair, messagingEntityType);
        coreMessageReceiver.isSessionReceiver = true;
        coreMessageReceiver.isBrowsableSession = z;
        return coreMessageReceiver.createLink();
    }

    private CompletableFuture<CoreMessageReceiver> createLink() {
        this.linkOpen = new WorkItem<>(new CompletableFuture(), this.operationTimeout);
        scheduleLinkOpenTimeout(this.linkOpen.getTimeoutTracker());
        sendTokenAndSetRenewTimer(false).handleAsync((r8, th) -> {
            if (th != null) {
                Throwable extractAsyncCompletionCause = ExceptionUtil.extractAsyncCompletionCause(th);
                TRACE_LOGGER.error("Sending SAS Token failed. ReceivePath:{}", this.receivePath, extractAsyncCompletionCause);
                this.linkOpen.getWork().completeExceptionally(extractAsyncCompletionCause);
                return null;
            }
            try {
                this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { // from class: com.microsoft.azure.servicebus.primitives.CoreMessageReceiver.3
                    @Override // com.microsoft.azure.servicebus.amqp.DispatchHandler
                    public void onEvent() {
                        CoreMessageReceiver.this.createReceiveLink();
                    }
                });
                return null;
            } catch (IOException e) {
                cancelSASTokenRenewTimer();
                this.linkOpen.getWork().completeExceptionally(new ServiceBusException(false, "Failed to create Receiver, see cause for more details.", e));
                return null;
            }
        }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
        return this.linkOpen.getWork();
    }

    private CompletableFuture<Void> createRequestResponseLinkAsync() {
        CompletableFuture<Void> completableFuture;
        synchronized (this.requestResonseLinkCreationLock) {
            if (this.requestResponseLinkCreationFuture == null) {
                this.requestResponseLinkCreationFuture = new CompletableFuture<>();
                this.underlyingFactory.obtainRequestResponseLinkAsync(this.receivePath, this.entityType).handleAsync((requestResponseLink, th) -> {
                    if (th == null) {
                        this.requestResponseLink = requestResponseLink;
                        this.requestResponseLinkCreationFuture.complete(null);
                        return null;
                    }
                    this.requestResponseLinkCreationFuture.completeExceptionally(ExceptionUtil.extractAsyncCompletionCause(th));
                    synchronized (this.requestResonseLinkCreationLock) {
                        this.requestResponseLinkCreationFuture = null;
                    }
                    return null;
                }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
            }
            completableFuture = this.requestResponseLinkCreationFuture;
        }
        return completableFuture;
    }

    private void closeRequestResponseLink() {
        synchronized (this.requestResonseLinkCreationLock) {
            if (this.requestResponseLinkCreationFuture != null) {
                this.requestResponseLinkCreationFuture.thenRun(() -> {
                    this.underlyingFactory.releaseRequestResponseLink(this.receivePath);
                    this.requestResponseLink = null;
                });
                this.requestResponseLinkCreationFuture = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createReceiveLink() {
        TRACE_LOGGER.info("Creating receive link to '{}'", this.receivePath);
        Connection activeConnectionOrNothing = this.underlyingFactory.getActiveConnectionOrNothing();
        if (activeConnectionOrNothing == null) {
            TRACE_LOGGER.warn("Idle connection closed by service just after sending CBS token. Very rare case. Will retry.");
            ServiceBusException serviceBusException = new ServiceBusException(true, "Idle connection closed by service just after sending CBS token. Please retry.");
            if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
                AsyncUtil.completeFutureExceptionally(this.linkOpen.getWork(), serviceBusException);
            }
            if (this.receiveLinkReopenFuture == null || this.receiveLinkReopenFuture.isDone()) {
                return;
            }
            AsyncUtil.completeFutureExceptionally(this.receiveLinkReopenFuture, serviceBusException);
            return;
        }
        Session session = activeConnectionOrNothing.session();
        session.setIncomingCapacity(Integer.MAX_VALUE);
        session.open();
        BaseHandler.setHandler(session, new SessionHandler(this.receivePath));
        String concat = "Receiver".concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(StringUtil.getShortRandomString());
        Receiver receiver = session.receiver(!StringUtil.isNullOrEmpty(activeConnectionOrNothing.getRemoteContainer()) ? concat.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(activeConnectionOrNothing.getRemoteContainer()) : concat);
        Source source = new Source();
        source.setAddress(this.receivePath);
        HashMap hashMap = new HashMap();
        hashMap.put(ClientConstants.LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(Util.adjustServerTimeout(this.underlyingFactory.getOperationTimeout()).toMillis()));
        if (this.entityType != null) {
            hashMap.put(ClientConstants.ENTITY_TYPE_PROPERTY, Integer.valueOf(this.entityType.getIntValue()));
        }
        if (this.isSessionReceiver) {
            HashMap hashMap2 = new HashMap();
            hashMap2.put(ClientConstants.SESSION_FILTER, this.sessionId);
            source.setFilter(hashMap2);
            hashMap.put(ClientConstants.LINK_PEEKMODE_PROPERTY, Boolean.valueOf(this.isBrowsableSession));
        }
        receiver.setSource(source);
        receiver.setTarget(new Target());
        TRACE_LOGGER.debug("Receive link settle mode '{}'", this.settleModePair);
        receiver.setSenderSettleMode(this.settleModePair.getSenderSettleMode());
        receiver.setReceiverSettleMode(this.settleModePair.getReceiverSettleMode());
        receiver.setProperties(hashMap);
        BaseHandler.setHandler(receiver, new ReceiveLinkHandler(this));
        receiver.open();
        this.receiveLink = receiver;
        this.underlyingFactory.registerForConnectionError(this.receiveLink);
    }

    CompletableFuture<Void> sendTokenAndSetRenewTimer(boolean z) {
        return getIsClosingOrClosed() ? CompletableFuture.completedFuture(null) : this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, z, () -> {
            sendTokenAndSetRenewTimer(true);
        }).thenAccept(scheduledFuture -> {
            this.sasTokenRenewTimerFuture = scheduledFuture;
        });
    }

    private void throwIfInUnusableState() {
        if (this.isSessionReceiver && this.isSessionLockLost) {
            throw new IllegalStateException("Session lock lost and cannot be used. Close this session and accept another session.");
        }
        throwIfClosed(this.lastKnownLinkError);
    }

    private void cancelSASTokenRenewTimer() {
        if (this.sasTokenRenewTimerFuture == null || this.sasTokenRenewTimerFuture.isDone()) {
            return;
        }
        this.sasTokenRenewTimerFuture.cancel(true);
        TRACE_LOGGER.debug("Cancelled SAS Token renew timer");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<MessageWithDeliveryTag> receiveCore(int i) {
        LinkedList linkedList = null;
        MessageWithDeliveryTag poll = this.prefetchedMessages.poll();
        int i2 = 0;
        while (poll != null) {
            this.currentPrefetechedMessagesCount.decrementAndGet();
            if (linkedList == null) {
                linkedList = new LinkedList();
            }
            linkedList.add(poll);
            i2++;
            if (i2 >= i) {
                break;
            }
            poll = this.prefetchedMessages.poll();
        }
        return linkedList;
    }

    public int getPrefetchCount() {
        int i;
        synchronized (this.prefetchCountSync) {
            i = this.prefetchCount;
        }
        return i;
    }

    public String getSessionId() {
        return this.sessionId;
    }

    public Instant getSessionLockedUntilUtc() {
        if (this.isSessionReceiver) {
            return this.sessionLockedUntilUtc;
        }
        throw new RuntimeException("Object is not a session receiver");
    }

    public void setPrefetchCount(int i) throws ServiceBusException {
        final int i2;
        if (i < 0) {
            throw new IllegalArgumentException("Prefetch count cannot be negative.");
        }
        throwIfInUnusableState();
        synchronized (this.prefetchCountSync) {
            i2 = i - this.prefetchCount;
            this.prefetchCount = i;
            TRACE_LOGGER.info("Setting prefetch count to '{}' on recieve link to '{}'", Integer.valueOf(i), this.receivePath);
        }
        if (i2 > 0) {
            try {
                this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { // from class: com.microsoft.azure.servicebus.primitives.CoreMessageReceiver.4
                    @Override // com.microsoft.azure.servicebus.amqp.DispatchHandler
                    public void onEvent() {
                        CoreMessageReceiver.this.sendFlow(i2);
                    }
                });
            } catch (IOException e) {
                throw new ServiceBusException(false, "Setting prefetch count failed, see cause for more details", e);
            }
        }
    }

    public CompletableFuture<Collection<MessageWithDeliveryTag>> receiveAsync(int i, Duration duration) {
        throwIfInUnusableState();
        if (i <= 0) {
            throw new IllegalArgumentException("parameter 'maxMessageCount' should be a positive number");
        }
        TRACE_LOGGER.debug("Receiving maximum of '{}' messages from '{}'", Integer.valueOf(i), this.receivePath);
        CompletableFuture<Collection<MessageWithDeliveryTag>> completableFuture = new CompletableFuture<>();
        final ReceiveWorkItem receiveWorkItem = new ReceiveWorkItem(completableFuture, duration, i);
        this.creditNeededtoServePendingReceives.addAndGet(i);
        this.pendingReceives.add(receiveWorkItem);
        if (duration == Duration.ZERO) {
            duration = ZERO_TIMEOUT_APPROXIMATION;
        }
        Timer.schedule(new Runnable() { // from class: com.microsoft.azure.servicebus.primitives.CoreMessageReceiver.5
            @Override // java.lang.Runnable
            public void run() {
                if (CoreMessageReceiver.this.pendingReceives.remove(receiveWorkItem)) {
                    CoreMessageReceiver.this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount());
                    CoreMessageReceiver.TRACE_LOGGER.info("No messages received from '{}'. Pending receive request timed out. Returning null to the client.", CoreMessageReceiver.this.receivePath);
                    AsyncUtil.completeFuture(receiveWorkItem.getWork(), null);
                }
            }
        }, duration, TimerType.OneTimeRun);
        ensureLinkIsOpen().thenRun(() -> {
            addCredit(receiveWorkItem);
        });
        return completableFuture;
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpLink
    public void onOpenComplete(Exception exc) {
        if (exc == null) {
            TRACE_LOGGER.info("Receive link to '{}' opened.", this.receivePath);
            if (this.isSessionReceiver) {
                Map filter = this.receiveLink.getRemoteSource().getFilter();
                if (filter == null || !filter.containsKey(ClientConstants.SESSION_FILTER)) {
                    exc = new ServiceBusException(false, "SessionId filter not set on the remote source.");
                } else {
                    this.sessionId = (String) filter.get(ClientConstants.SESSION_FILTER);
                    if (this.receiveLink.getRemoteProperties() == null || !this.receiveLink.getRemoteProperties().containsKey(ClientConstants.LOCKED_UNTIL_UTC)) {
                        TRACE_LOGGER.warn("Accepted a session with id '{}', from '{}' which didn't set '{}' property on the receive link.", new Object[]{this.sessionId, this.receivePath, ClientConstants.LOCKED_UNTIL_UTC});
                        this.sessionLockedUntilUtc = Instant.ofEpochMilli(0L);
                    } else {
                        this.sessionLockedUntilUtc = Util.convertDotNetTicksToInstant(((Long) this.receiveLink.getRemoteProperties().get(ClientConstants.LOCKED_UNTIL_UTC)).longValue());
                    }
                    TRACE_LOGGER.info("Accepted session with id '{}', lockedUntilUtc '{}' from '{}'.", new Object[]{this.sessionId, this.sessionLockedUntilUtc, this.receivePath});
                }
            }
        }
        if (exc == null) {
            if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
                AsyncUtil.completeFuture(this.linkOpen.getWork(), this);
            }
            if (this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) {
                AsyncUtil.completeFuture(this.receiveLinkReopenFuture, null);
            }
            this.lastKnownLinkError = null;
            this.underlyingFactory.getRetryPolicy().resetRetryCount(this.underlyingFactory.getClientId());
            sendFlow(this.prefetchCount - this.currentPrefetechedMessagesCount.get());
            TRACE_LOGGER.debug("receiverPath:{}, linkname:{}, updated-link-credit:{}, sentCredits:{}", new Object[]{this.receivePath, this.receiveLink.getName(), Integer.valueOf(this.receiveLink.getCredit()), Integer.valueOf(this.prefetchCount)});
            return;
        }
        cancelSASTokenRenewTimer();
        if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) {
            TRACE_LOGGER.error("Opening receive link '{}' to '{}' failed.", new Object[]{this.receiveLink.getName(), this.receivePath, exc});
            setClosed();
            ExceptionUtil.completeExceptionally(this.linkOpen.getWork(), exc, this, true);
        }
        if (this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) {
            TRACE_LOGGER.warn("Opening receive link '{}' to '{}' failed.", new Object[]{this.receiveLink.getName(), this.receivePath, exc});
            AsyncUtil.completeFutureExceptionally(this.receiveLinkReopenFuture, exc);
        }
        this.lastKnownLinkError = exc;
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpReceiver
    public void onReceiveComplete(final Delivery delivery) {
        this.underlyingFactory.getRetryPolicy().resetRetryCount(getClientId());
        byte[] tag = delivery.getTag();
        String convertBytesToString = StringUtil.convertBytesToString(delivery.getTag());
        TRACE_LOGGER.debug("Received a delivery '{}' from '{}'", convertBytesToString, this.receivePath);
        if (tag == null || tag.length == 0 || !this.tagsToDeliveriesMap.containsKey(convertBytesToString)) {
            TRACE_LOGGER.debug("Received a message from '{}'. Adding to prefecthed messages.", this.receivePath);
            try {
                Message readMessageFromDelivery = Util.readMessageFromDelivery(this.receiveLink, delivery);
                if (this.settleModePair.getSenderSettleMode() == SenderSettleMode.SETTLED) {
                    delivery.disposition(Accepted.getInstance());
                    delivery.settle();
                } else {
                    this.tagsToDeliveriesMap.put(StringUtil.convertBytesToString(delivery.getTag()), delivery);
                    this.receiveLink.advance();
                }
                this.currentPrefetechedMessagesCount.incrementAndGet();
                this.prefetchedMessages.add(new MessageWithDeliveryTag(readMessageFromDelivery, delivery.getTag()));
                return;
            } catch (Exception e) {
                TRACE_LOGGER.warn("Reading message from delivery '{}' from '{}', session '{}' failed with unexpected exception.", new Object[]{convertBytesToString, this.receivePath, this.sessionId, e});
                delivery.disposition(Released.getInstance());
                delivery.settle();
                return;
            }
        }
        Rejected remoteState = delivery.getRemoteState();
        TRACE_LOGGER.debug("Received a delivery '{}' with state '{}' from '{}'", new Object[]{convertBytesToString, remoteState, this.receivePath});
        if (remoteState instanceof Outcome) {
            Rejected rejected = (Outcome) remoteState;
            final UpdateStateWorkItem updateStateWorkItem = this.pendingUpdateStateRequests.get(convertBytesToString);
            if (updateStateWorkItem != null) {
                if (rejected.getClass().getName().equals(updateStateWorkItem.outcome.getClass().getName())) {
                    TRACE_LOGGER.debug("Completing a pending updateState operation for delivery '{}' from '{}'", convertBytesToString, this.receivePath);
                    completePendingUpdateStateWorkItem(delivery, convertBytesToString, updateStateWorkItem, null);
                    return;
                }
                TRACE_LOGGER.warn("Received delivery '{}' state '{}' doesn't match expected state '{}'", new Object[]{convertBytesToString, remoteState, updateStateWorkItem.outcome});
                if (!(rejected instanceof Rejected)) {
                    if (rejected instanceof Released) {
                        OperationCancelledException operationCancelledException = new OperationCancelledException(rejected.toString());
                        TRACE_LOGGER.error("Completing pending updateState operation for delivery '{}' with exception", convertBytesToString, operationCancelledException);
                        completePendingUpdateStateWorkItem(delivery, convertBytesToString, updateStateWorkItem, operationCancelledException);
                        return;
                    } else {
                        ServiceBusException serviceBusException = new ServiceBusException(false, rejected.toString());
                        TRACE_LOGGER.error("Completing pending updateState operation for delivery '{}' with exception", convertBytesToString, serviceBusException);
                        completePendingUpdateStateWorkItem(delivery, convertBytesToString, updateStateWorkItem, serviceBusException);
                        return;
                    }
                }
                ErrorCondition error = rejected.getError();
                Exception exception = ExceptionUtil.toException(error);
                if (ExceptionUtil.isGeneralError(error.getCondition())) {
                    this.lastKnownLinkError = exception;
                    this.lastKnownErrorReportedAt = Instant.now();
                }
                Duration nextRetryInterval = this.retryPolicy.getNextRetryInterval(getClientId(), exception, updateStateWorkItem.getTimeoutTracker().remaining());
                if (nextRetryInterval == null) {
                    TRACE_LOGGER.error("Completing pending updateState operation for delivery '{}' with exception", convertBytesToString, exception);
                    completePendingUpdateStateWorkItem(delivery, convertBytesToString, updateStateWorkItem, exception);
                    return;
                }
                updateStateWorkItem.setLastKnownException(exception);
                TRACE_LOGGER.debug("Pending updateState operation for delivery '{}' will be retried after '{}'", convertBytesToString, nextRetryInterval);
                try {
                    this.underlyingFactory.scheduleOnReactorThread((int) nextRetryInterval.toMillis(), new DispatchHandler() { // from class: com.microsoft.azure.servicebus.primitives.CoreMessageReceiver.6
                        @Override // com.microsoft.azure.servicebus.amqp.DispatchHandler
                        public void onEvent() {
                            delivery.disposition(updateStateWorkItem.getOutcome());
                        }
                    });
                } catch (IOException e2) {
                    completePendingUpdateStateWorkItem(delivery, convertBytesToString, updateStateWorkItem, new ServiceBusException(false, "Operation failed while scheduling a retry on Reactor, see cause for more details.", e2));
                }
            }
        }
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpLink
    public void onError(Exception exc) {
        Duration nextRetryInterval;
        this.creditToFlow.set(0);
        cancelSASTokenRenewTimer();
        if (this.settleModePair.getSenderSettleMode() == SenderSettleMode.UNSETTLED) {
            this.prefetchedMessages.clear();
            this.currentPrefetechedMessagesCount.set(0);
            this.tagsToDeliveriesMap.clear();
        }
        if (getIsClosingOrClosed()) {
            TRACE_LOGGER.info("Receive link to '{}', sessionId '{}' closed", this.receivePath, this.sessionId);
            AsyncUtil.completeFuture(this.linkClose, null);
            clearAllPendingWorkItems(exc);
            return;
        }
        this.underlyingFactory.deregisterForConnectionError(this.receiveLink);
        TRACE_LOGGER.warn("Receive link '{}' to '{}', sessionId '{}' closed with error.", new Object[]{this.receiveLink.getName(), this.receivePath, this.sessionId, exc});
        this.lastKnownLinkError = exc;
        if ((this.linkOpen != null && !this.linkOpen.getWork().isDone()) || (this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone())) {
            onOpenComplete(exc);
        }
        if (exc == null || ((exc instanceof ServiceBusException) && ((ServiceBusException) exc).getIsTransient())) {
            ReceiveWorkItem peek = this.pendingReceives.peek();
            if (peek == null || peek.getTimeoutTracker() == null || (nextRetryInterval = this.underlyingFactory.getRetryPolicy().getNextRetryInterval(getClientId(), exc, peek.getTimeoutTracker().remaining())) == null) {
                return;
            }
            TRACE_LOGGER.info("Receive link '{}' to '{}', sessionId '{}' will be reopened after '{}'", new Object[]{this.receiveLink.getName(), this.receivePath, this.sessionId, nextRetryInterval});
            Timer.schedule(() -> {
                ensureLinkIsOpen();
            }, nextRetryInterval, TimerType.OneTimeRun);
            return;
        }
        clearAllPendingWorkItems(exc);
        if (this.isSessionReceiver) {
            if ((exc instanceof SessionLockLostException) || (exc instanceof SessionCannotBeLockedException)) {
                TRACE_LOGGER.warn("SessionId '{}' lock lost. Closing receiver.", this.sessionId);
                this.isSessionLockLost = true;
                closeAsync();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reduceCreditForCompletedReceiveRequest(int i) {
        this.creditNeededtoServePendingReceives.updateAndGet(i2 -> {
            int i2 = i2 - i;
            if (i2 > 0) {
                return i2;
            }
            return 0;
        });
    }

    private void addCredit(ReceiveWorkItem receiveWorkItem) {
        int credit = (this.creditNeededtoServePendingReceives.get() - ((this.receiveLink.getCredit() + this.currentPrefetechedMessagesCount.get()) + this.creditToFlow.get())) + this.prefetchCount;
        if (credit > 0) {
            int addAndGet = this.creditToFlow.addAndGet(credit);
            if (addAndGet >= this.prefetchCount || addAndGet >= CREDIT_FLOW_BATCH_SIZE) {
                try {
                    this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { // from class: com.microsoft.azure.servicebus.primitives.CoreMessageReceiver.7
                        @Override // com.microsoft.azure.servicebus.amqp.DispatchHandler
                        public void onEvent() {
                            CoreMessageReceiver.this.sendFlow(CoreMessageReceiver.this.creditToFlow.getAndSet(0));
                        }
                    });
                } catch (IOException e) {
                    this.pendingReceives.remove(receiveWorkItem);
                    reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount());
                    receiveWorkItem.getWork().completeExceptionally(generateDispatacherSchedulingFailedException("completeMessage", e));
                    receiveWorkItem.cancelTimeoutTask(false);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendFlow(int i) {
        if (this.isBrowsableSession || i <= 0) {
            return;
        }
        this.receiveLink.flow(i);
        TRACE_LOGGER.debug("Sent flow to the service. receiverPath:{}, linkname:{}, updated-link-credit:{}, sentCredits:{}", new Object[]{this.receivePath, this.receiveLink.getName(), Integer.valueOf(this.receiveLink.getCredit()), Integer.valueOf(i)});
    }

    private void scheduleLinkOpenTimeout(TimeoutTracker timeoutTracker) {
        Timer.schedule(new Runnable() { // from class: com.microsoft.azure.servicebus.primitives.CoreMessageReceiver.8
            @Override // java.lang.Runnable
            public void run() {
                if (CoreMessageReceiver.this.linkOpen.getWork().isDone()) {
                    return;
                }
                CoreMessageReceiver.this.closeInternals(false);
                CoreMessageReceiver.this.setClosed();
                TimeoutException timeoutException = new TimeoutException(String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", CoreMessageReceiver.this.receiveLink.getName(), CoreMessageReceiver.this.receivePath, ZonedDateTime.now()), CoreMessageReceiver.this.lastKnownLinkError);
                CoreMessageReceiver.TRACE_LOGGER.warn(timeoutException.getMessage());
                ExceptionUtil.completeExceptionally(CoreMessageReceiver.this.linkOpen.getWork(), timeoutException, CoreMessageReceiver.this, true);
            }
        }, timeoutTracker.remaining(), TimerType.OneTimeRun);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleLinkCloseTimeout(TimeoutTracker timeoutTracker) {
        Timer.schedule(new Runnable() { // from class: com.microsoft.azure.servicebus.primitives.CoreMessageReceiver.9
            @Override // java.lang.Runnable
            public void run() {
                if (CoreMessageReceiver.this.linkClose.isDone()) {
                    return;
                }
                TimeoutException timeoutException = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", CoreMessageReceiver.this.receiveLink.getName(), ZonedDateTime.now()));
                CoreMessageReceiver.TRACE_LOGGER.warn(timeoutException.getMessage());
                ExceptionUtil.completeExceptionally(CoreMessageReceiver.this.linkClose, timeoutException, CoreMessageReceiver.this, true);
            }
        }, timeoutTracker.remaining(), TimerType.OneTimeRun);
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpLink
    public void onClose(ErrorCondition errorCondition) {
        if (errorCondition == null) {
            onError(new ServiceBusException(true, String.format(Locale.US, "Closing the link. LinkName(%s), EntityPath(%s)", this.receiveLink.getName(), this.receivePath)));
        } else {
            onError(ExceptionUtil.toException(errorCondition));
        }
    }

    @Override // com.microsoft.azure.servicebus.primitives.IErrorContextProvider
    public ErrorContext getContext() {
        boolean z = this.linkOpen != null && this.linkOpen.getWork().isDone();
        return new ReceiverErrorContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null, this.receivePath, (this.receiveLink == null || this.receiveLink.getRemoteProperties() == null || !this.receiveLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY)) ? this.receiveLink != null ? this.receiveLink.getName() : null : this.receiveLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString(), z ? Integer.valueOf(this.prefetchCount) : null, (!z || this.receiveLink == null) ? null : Integer.valueOf(this.receiveLink.getCredit()), Integer.valueOf(this.currentPrefetechedMessagesCount.get()));
    }

    @Override // com.microsoft.azure.servicebus.primitives.ClientEntity
    protected CompletableFuture<Void> onClose() {
        closeInternals(true);
        return this.linkClose;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeInternals(final boolean z) {
        if (getIsClosed()) {
            return;
        }
        if (this.receiveLink == null || this.receiveLink.getLocalState() == EndpointState.CLOSED) {
            AsyncUtil.completeFuture(this.linkClose, null);
        } else {
            try {
                this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { // from class: com.microsoft.azure.servicebus.primitives.CoreMessageReceiver.10
                    @Override // com.microsoft.azure.servicebus.amqp.DispatchHandler
                    public void onEvent() {
                        if (CoreMessageReceiver.this.receiveLink == null || CoreMessageReceiver.this.receiveLink.getLocalState() == EndpointState.CLOSED) {
                            return;
                        }
                        CoreMessageReceiver.TRACE_LOGGER.info("Closing receive link to '{}'", CoreMessageReceiver.this.receivePath);
                        CoreMessageReceiver.this.receiveLink.close();
                        CoreMessageReceiver.this.underlyingFactory.deregisterForConnectionError(CoreMessageReceiver.this.receiveLink);
                        if (z) {
                            CoreMessageReceiver.this.scheduleLinkCloseTimeout(TimeoutTracker.create(CoreMessageReceiver.this.operationTimeout));
                        } else {
                            AsyncUtil.completeFuture(CoreMessageReceiver.this.linkClose, null);
                        }
                    }
                });
            } catch (IOException e) {
                AsyncUtil.completeFutureExceptionally(this.linkClose, e);
            }
        }
        cancelSASTokenRenewTimer();
        closeRequestResponseLink();
        this.updateStateRequestsTimeoutChecker.cancel(false);
        this.returnMessagesLoopRunner.cancel(false);
    }

    public CompletableFuture<Void> completeMessageAsync(byte[] bArr) {
        return updateMessageStateAsync(bArr, Accepted.getInstance());
    }

    public CompletableFuture<Void> completeMessageAsync(UUID uuid) {
        return updateDispositionAsync(new UUID[]{uuid}, ClientConstants.DISPOSITION_STATUS_COMPLETED, null, null, null);
    }

    public CompletableFuture<Void> abandonMessageAsync(byte[] bArr, Map<String, Object> map) {
        Modified modified = new Modified();
        if (map != null) {
            modified.setMessageAnnotations(map);
        }
        return updateMessageStateAsync(bArr, modified);
    }

    public CompletableFuture<Void> abandonMessageAsync(UUID uuid, Map<String, Object> map) {
        return updateDispositionAsync(new UUID[]{uuid}, ClientConstants.DISPOSITION_STATUS_ABANDONED, null, null, map);
    }

    public CompletableFuture<Void> deferMessageAsync(byte[] bArr, Map<String, Object> map) {
        Modified modified = new Modified();
        modified.setUndeliverableHere(true);
        if (map != null) {
            modified.setMessageAnnotations(map);
        }
        return updateMessageStateAsync(bArr, modified);
    }

    public CompletableFuture<Void> deferMessageAsync(UUID uuid, Map<String, Object> map) {
        return updateDispositionAsync(new UUID[]{uuid}, ClientConstants.DISPOSITION_STATUS_DEFERED, null, null, map);
    }

    public CompletableFuture<Void> deadLetterMessageAsync(byte[] bArr, String str, String str2, Map<String, Object> map) {
        Rejected rejected = new Rejected();
        ErrorCondition errorCondition = new ErrorCondition(ClientConstants.DEADLETTERNAME, (String) null);
        HashMap hashMap = new HashMap();
        if (!StringUtil.isNullOrEmpty(str)) {
            hashMap.put(ClientConstants.DEADLETTER_REASON_HEADER, str);
        }
        if (!StringUtil.isNullOrEmpty(str2)) {
            hashMap.put(ClientConstants.DEADLETTER_ERROR_DESCRIPTION_HEADER, str2);
        }
        if (map != null) {
            hashMap.putAll(map);
        }
        errorCondition.setInfo(hashMap);
        rejected.setError(errorCondition);
        return updateMessageStateAsync(bArr, rejected);
    }

    public CompletableFuture<Void> deadLetterMessageAsync(UUID uuid, String str, String str2, Map<String, Object> map) {
        return updateDispositionAsync(new UUID[]{uuid}, ClientConstants.DISPOSITION_STATUS_SUSPENDED, str, str2, map);
    }

    private CompletableFuture<Void> updateMessageStateAsync(byte[] bArr, Outcome outcome) {
        throwIfInUnusableState();
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        String convertBytesToString = StringUtil.convertBytesToString(bArr);
        TRACE_LOGGER.debug("Updating message state of delivery '{}' to '{}'", convertBytesToString, outcome);
        Delivery delivery = this.tagsToDeliveriesMap.get(convertBytesToString);
        if (delivery == null) {
            TRACE_LOGGER.error("Delivery not found for delivery tag '{}'. Either receive link to '{}' closed with a transient error and reopened or the delivery was already settled by complete/abandon/defer/deadletter.", convertBytesToString, this.receivePath);
            completableFuture.completeExceptionally(generateDeliveryNotFoundException());
        } else {
            this.pendingUpdateStateRequests.put(convertBytesToString, new UpdateStateWorkItem(completableFuture, outcome, this.operationTimeout));
            ensureLinkIsOpen().thenRun(() -> {
                try {
                    this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { // from class: com.microsoft.azure.servicebus.primitives.CoreMessageReceiver.11
                        @Override // com.microsoft.azure.servicebus.amqp.DispatchHandler
                        public void onEvent() {
                            delivery.disposition(outcome);
                        }
                    });
                } catch (IOException e) {
                    completableFuture.completeExceptionally(generateDispatacherSchedulingFailedException("completeMessage", e));
                }
            });
        }
        return completableFuture;
    }

    private synchronized CompletableFuture<Void> ensureLinkIsOpen() {
        if (this.receiveLink.getLocalState() == EndpointState.ACTIVE && this.receiveLink.getRemoteState() == EndpointState.ACTIVE) {
            return CompletableFuture.completedFuture(null);
        }
        if (this.receiveLinkReopenFuture == null || this.receiveLinkReopenFuture.isDone()) {
            TRACE_LOGGER.info("Recreating receive link to '{}'", this.receivePath);
            this.retryPolicy.incrementRetryCount(getClientId());
            this.receiveLinkReopenFuture = new CompletableFuture<>();
            CompletableFuture<Void> completableFuture = this.receiveLinkReopenFuture;
            Timer.schedule(() -> {
                if (completableFuture.isDone()) {
                    return;
                }
                cancelSASTokenRenewTimer();
                TimeoutException timeoutException = new TimeoutException(String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", this.receiveLink.getName(), this.receivePath, ZonedDateTime.now()));
                TRACE_LOGGER.warn(timeoutException.getMessage());
                AsyncUtil.completeFutureExceptionally(completableFuture, timeoutException);
            }, LINK_REOPEN_TIMEOUT, TimerType.OneTimeRun);
            cancelSASTokenRenewTimer();
            sendTokenAndSetRenewTimer(false).handleAsync((r6, th) -> {
                if (th != null) {
                    TRACE_LOGGER.error("Sending SAS Token to '{}' failed.", this.receivePath, ExceptionUtil.extractAsyncCompletionCause(th));
                    this.receiveLinkReopenFuture.completeExceptionally(th);
                    clearAllPendingWorkItems(th);
                    return null;
                }
                try {
                    this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { // from class: com.microsoft.azure.servicebus.primitives.CoreMessageReceiver.12
                        @Override // com.microsoft.azure.servicebus.amqp.DispatchHandler
                        public void onEvent() {
                            CoreMessageReceiver.this.createReceiveLink();
                        }
                    });
                    return null;
                } catch (IOException e) {
                    this.receiveLinkReopenFuture.completeExceptionally(e);
                    return null;
                }
            }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
        }
        if (this.ensureLinkReopenFutureToWaitOn == null || this.ensureLinkReopenFutureToWaitOn.isDone()) {
            this.ensureLinkReopenFutureToWaitOn = new CompletableFuture<>();
            this.shouldRetryLinkReopenOnTransientFailure = true;
        }
        this.receiveLinkReopenFuture.handleAsync((r4, th2) -> {
            if (th2 == null) {
                this.ensureLinkReopenFutureToWaitOn.complete(null);
                return null;
            }
            if (!(th2 instanceof ServiceBusException) || !((ServiceBusException) th2).getIsTransient()) {
                this.ensureLinkReopenFutureToWaitOn.completeExceptionally(th2);
                return null;
            }
            if (!this.shouldRetryLinkReopenOnTransientFailure) {
                this.ensureLinkReopenFutureToWaitOn.completeExceptionally(th2);
                return null;
            }
            this.shouldRetryLinkReopenOnTransientFailure = false;
            ensureLinkIsOpen();
            return null;
        }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
        return this.ensureLinkReopenFutureToWaitOn;
    }

    private void completePendingUpdateStateWorkItem(Delivery delivery, String str, UpdateStateWorkItem updateStateWorkItem, Exception exc) {
        delivery.settle();
        if (exc == null) {
            AsyncUtil.completeFuture(updateStateWorkItem.getWork(), null);
        } else {
            ExceptionUtil.completeExceptionally(updateStateWorkItem.getWork(), exc, this, true);
        }
        this.tagsToDeliveriesMap.remove(str);
        this.pendingUpdateStateRequests.remove(str);
    }

    private void clearAllPendingWorkItems(Throwable th) {
        TRACE_LOGGER.info("Completeing all pending receive and updateState operation on the receiver to '{}'", this.receivePath);
        boolean z = th == null || ((th instanceof ServiceBusException) && ((ServiceBusException) th).getIsTransient());
        Iterator<ReceiveWorkItem> it = this.pendingReceives.iterator();
        while (it.hasNext()) {
            ReceiveWorkItem next = it.next();
            it.remove();
            CompletableFuture<Collection<MessageWithDeliveryTag>> work = next.getWork();
            next.cancelTimeoutTask(false);
            reduceCreditForCompletedReceiveRequest(next.getMaxMessageCount());
            if (z) {
                AsyncUtil.completeFuture(work, null);
            } else {
                ExceptionUtil.completeExceptionally(work, th, this, true);
            }
        }
        for (Map.Entry<String, UpdateStateWorkItem> entry : this.pendingUpdateStateRequests.entrySet()) {
            this.pendingUpdateStateRequests.remove(entry.getKey());
            ExceptionUtil.completeExceptionally(entry.getValue().getWork(), th, this, true);
        }
    }

    private static IllegalArgumentException generateDeliveryNotFoundException() {
        return new IllegalArgumentException("Delivery not found on the receive link.");
    }

    private static ServiceBusException generateDispatacherSchedulingFailedException(String str, Exception exc) {
        return new ServiceBusException(false, str + " failed while dispatching to Reactor, see cause for more details.", exc);
    }

    public CompletableFuture<Collection<Instant>> renewMessageLocksAsync(UUID[] uuidArr) {
        throwIfInUnusableState();
        if (TRACE_LOGGER.isDebugEnabled()) {
            Logger logger = TRACE_LOGGER;
            Object[] objArr = new Object[3];
            objArr[0] = Arrays.toString(uuidArr);
            objArr[1] = this.receivePath;
            objArr[2] = this.isSessionReceiver ? getSessionId() : StringUtil.EMPTY;
            logger.debug("Renewing message locks for lock tokens '{}' of entity '{}', sesion '{}'", objArr);
        }
        return createRequestResponseLinkAsync().thenComposeAsync(r7 -> {
            HashMap hashMap = new HashMap();
            hashMap.put(ClientConstants.REQUEST_RESPONSE_LOCKTOKENS, uuidArr);
            if (this.isSessionReceiver) {
                hashMap.put("session-id", getSessionId());
            }
            return this.requestResponseLink.requestAysnc(RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_RENEWLOCK_OPERATION, hashMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName()), this.operationTimeout).thenComposeAsync(message -> {
                CompletableFuture completableFuture = new CompletableFuture();
                if (RequestResponseUtils.getResponseStatusCode(message) == 200) {
                    if (TRACE_LOGGER.isDebugEnabled()) {
                        TRACE_LOGGER.debug("Message locks for lock tokens '{}' renewed", Arrays.toString(uuidArr));
                    }
                    completableFuture.complete(Arrays.stream((Date[]) RequestResponseUtils.getResponseBody(message).get(ClientConstants.REQUEST_RESPONSE_EXPIRATIONS)).map(date -> {
                        return date.toInstant();
                    }).collect(Collectors.toList()));
                } else {
                    Exception genereateExceptionFromResponse = RequestResponseUtils.genereateExceptionFromResponse(message);
                    TRACE_LOGGER.error("Renewing message locks for lock tokens '{}' on entity '{}' failed", new Object[]{Arrays.toString(uuidArr), this.receivePath, genereateExceptionFromResponse});
                    completableFuture.completeExceptionally(genereateExceptionFromResponse);
                }
                return completableFuture;
            }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
        }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
    }

    public CompletableFuture<Collection<MessageWithLockToken>> receiveDeferredMessageBatchAsync(Long[] lArr) {
        throwIfInUnusableState();
        if (TRACE_LOGGER.isDebugEnabled()) {
            Logger logger = TRACE_LOGGER;
            Object[] objArr = new Object[3];
            objArr[0] = Arrays.toString(lArr);
            objArr[1] = this.receivePath;
            objArr[2] = this.isSessionReceiver ? getSessionId() : StringUtil.EMPTY;
            logger.debug("Receiving messages for sequence numbers '{}' from entity '{}', sesion '{}'", objArr);
        }
        return createRequestResponseLinkAsync().thenComposeAsync(r7 -> {
            HashMap hashMap = new HashMap();
            hashMap.put(ClientConstants.REQUEST_RESPONSE_SEQUENCE_NUMBERS, lArr);
            hashMap.put(ClientConstants.REQUEST_RESPONSE_RECEIVER_SETTLE_MODE, UnsignedInteger.valueOf(this.settleModePair.getReceiverSettleMode() == ReceiverSettleMode.FIRST ? 0 : 1));
            if (this.isSessionReceiver) {
                hashMap.put("session-id", getSessionId());
            }
            return this.requestResponseLink.requestAysnc(RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER, hashMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName()), this.operationTimeout).thenComposeAsync(message -> {
                Object obj;
                CompletableFuture completableFuture = new CompletableFuture();
                if (RequestResponseUtils.getResponseStatusCode(message) == 200) {
                    if (TRACE_LOGGER.isDebugEnabled()) {
                        Logger logger2 = TRACE_LOGGER;
                        Object[] objArr2 = new Object[3];
                        objArr2[0] = Arrays.toString(lArr);
                        objArr2[1] = this.receivePath;
                        objArr2[2] = this.isSessionReceiver ? getSessionId() : StringUtil.EMPTY;
                        logger2.debug("Received messges for sequence numbers '{}' from entity '{}', sesion '{}'", objArr2);
                    }
                    ArrayList arrayList = new ArrayList();
                    Object value = message.getBody().getValue();
                    if (value != null && (value instanceof Map) && (obj = ((Map) value).get(ClientConstants.REQUEST_RESPONSE_MESSAGES)) != null && (obj instanceof Iterable)) {
                        for (Object obj2 : (Iterable) obj) {
                            if (obj2 instanceof Map) {
                                Message create = Message.Factory.create();
                                Binary binary = (Binary) ((Map) obj2).get(ClientConstants.REQUEST_RESPONSE_MESSAGE);
                                create.decode(binary.getArray(), binary.getArrayOffset(), binary.getLength());
                                UUID uuid = ClientConstants.ZEROLOCKTOKEN;
                                if (((Map) obj2).containsKey(ClientConstants.REQUEST_RESPONSE_LOCKTOKEN)) {
                                    uuid = (UUID) ((Map) obj2).get(ClientConstants.REQUEST_RESPONSE_LOCKTOKEN);
                                }
                                arrayList.add(new MessageWithLockToken(create, uuid));
                            }
                        }
                    }
                    completableFuture.complete(arrayList);
                } else {
                    Exception genereateExceptionFromResponse = RequestResponseUtils.genereateExceptionFromResponse(message);
                    TRACE_LOGGER.error("Receiving messages by sequence numbers '{}' from entity '{}' failed", new Object[]{Arrays.toString(lArr), this.receivePath, genereateExceptionFromResponse});
                    completableFuture.completeExceptionally(genereateExceptionFromResponse);
                }
                return completableFuture;
            }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
        }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
    }

    public CompletableFuture<Void> updateDispositionAsync(UUID[] uuidArr, String str, String str2, String str3, Map<String, Object> map) {
        throwIfInUnusableState();
        if (TRACE_LOGGER.isDebugEnabled()) {
            Logger logger = TRACE_LOGGER;
            Object[] objArr = new Object[4];
            objArr[0] = Arrays.toString(uuidArr);
            objArr[1] = str;
            objArr[2] = this.receivePath;
            objArr[3] = this.isSessionReceiver ? getSessionId() : StringUtil.EMPTY;
            logger.debug("Update disposition of deliveries '{}' to '{}' on entity '{}', sesion '{}'", objArr);
        }
        return createRequestResponseLinkAsync().thenComposeAsync(r11 -> {
            HashMap hashMap = new HashMap();
            hashMap.put(ClientConstants.REQUEST_RESPONSE_LOCKTOKENS, uuidArr);
            hashMap.put(ClientConstants.REQUEST_RESPONSE_DISPOSITION_STATUS, str);
            if (str2 != null) {
                hashMap.put(ClientConstants.REQUEST_RESPONSE_DEADLETTER_REASON, str2);
            }
            if (str3 != null) {
                hashMap.put(ClientConstants.REQUEST_RESPONSE_DEADLETTER_DESCRIPTION, str3);
            }
            if (map != null && map.size() > 0) {
                hashMap.put(ClientConstants.REQUEST_RESPONSE_PROPERTIES_TO_MODIFY, map);
            }
            if (this.isSessionReceiver) {
                hashMap.put("session-id", getSessionId());
            }
            return this.requestResponseLink.requestAysnc(RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION, hashMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName()), this.operationTimeout).thenComposeAsync(message -> {
                CompletableFuture completableFuture = new CompletableFuture();
                if (RequestResponseUtils.getResponseStatusCode(message) == 200) {
                    if (TRACE_LOGGER.isDebugEnabled()) {
                        Logger logger2 = TRACE_LOGGER;
                        Object[] objArr2 = new Object[4];
                        objArr2[0] = Arrays.toString(uuidArr);
                        objArr2[1] = str;
                        objArr2[2] = this.receivePath;
                        objArr2[3] = this.isSessionReceiver ? getSessionId() : StringUtil.EMPTY;
                        logger2.debug("Update disposition of deliveries '{}' to '{}' on entity '{}', sesion '{}' succeeded.", objArr2);
                    }
                    completableFuture.complete(null);
                } else {
                    Exception genereateExceptionFromResponse = RequestResponseUtils.genereateExceptionFromResponse(message);
                    TRACE_LOGGER.error("Update disposition on entity '{}' failed", this.receivePath, genereateExceptionFromResponse);
                    completableFuture.completeExceptionally(genereateExceptionFromResponse);
                }
                return completableFuture;
            }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
        }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
    }

    public CompletableFuture<Void> renewSessionLocksAsync() {
        throwIfInUnusableState();
        TRACE_LOGGER.debug("Renewing session lock on entity '{}' of sesion '{}'", this.receivePath, getSessionId());
        return createRequestResponseLinkAsync().thenComposeAsync(r6 -> {
            HashMap hashMap = new HashMap();
            hashMap.put("session-id", getSessionId());
            return this.requestResponseLink.requestAysnc(RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_RENEW_SESSIONLOCK_OPERATION, hashMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName()), this.operationTimeout).thenComposeAsync(message -> {
                CompletableFuture completableFuture = new CompletableFuture();
                if (RequestResponseUtils.getResponseStatusCode(message) == 200) {
                    this.sessionLockedUntilUtc = ((Date) RequestResponseUtils.getResponseBody(message).get("expiration")).toInstant();
                    TRACE_LOGGER.debug("Session lock on entity '{}' of sesion '{}' renewed until '{}'", new Object[]{this.receivePath, getSessionId(), this.sessionLockedUntilUtc});
                    completableFuture.complete(null);
                } else {
                    Exception genereateExceptionFromResponse = RequestResponseUtils.genereateExceptionFromResponse(message);
                    TRACE_LOGGER.error("Renewing session lock on entity '{}' of sesion '{}' failed", new Object[]{this.receivePath, getSessionId(), genereateExceptionFromResponse});
                    completableFuture.completeExceptionally(genereateExceptionFromResponse);
                }
                return completableFuture;
            }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
        }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
    }

    public CompletableFuture<byte[]> getSessionStateAsync() {
        throwIfInUnusableState();
        TRACE_LOGGER.debug("Getting session state of sesion '{}' from entity '{}'", getSessionId(), this.receivePath);
        return createRequestResponseLinkAsync().thenComposeAsync(r6 -> {
            HashMap hashMap = new HashMap();
            hashMap.put("session-id", getSessionId());
            return this.requestResponseLink.requestAysnc(RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_GET_SESSION_STATE_OPERATION, hashMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName()), this.operationTimeout).thenComposeAsync(message -> {
                Object obj;
                CompletableFuture completableFuture = new CompletableFuture();
                if (RequestResponseUtils.getResponseStatusCode(message) == 200) {
                    TRACE_LOGGER.debug("Got session state of sesion '{}' from entity '{}'", getSessionId(), this.receivePath);
                    byte[] bArr = null;
                    Map responseBody = RequestResponseUtils.getResponseBody(message);
                    if (responseBody.containsKey(ClientConstants.REQUEST_RESPONSE_SESSION_STATE) && (obj = responseBody.get(ClientConstants.REQUEST_RESPONSE_SESSION_STATE)) != null) {
                        bArr = ((Binary) obj).getArray();
                    }
                    completableFuture.complete(bArr);
                } else {
                    Exception genereateExceptionFromResponse = RequestResponseUtils.genereateExceptionFromResponse(message);
                    TRACE_LOGGER.error("Getting session state of sesion '{}' from entity '{}' failed", new Object[]{getSessionId(), this.receivePath, genereateExceptionFromResponse});
                    completableFuture.completeExceptionally(genereateExceptionFromResponse);
                }
                return completableFuture;
            }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
        }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
    }

    public CompletableFuture<Void> setSessionStateAsync(byte[] bArr) {
        throwIfInUnusableState();
        TRACE_LOGGER.debug("Setting session state of sesion '{}' on entity '{}'", getSessionId(), this.receivePath);
        return createRequestResponseLinkAsync().thenComposeAsync(r8 -> {
            HashMap hashMap = new HashMap();
            hashMap.put("session-id", getSessionId());
            hashMap.put(ClientConstants.REQUEST_RESPONSE_SESSION_STATE, bArr == null ? null : new Binary(bArr));
            return this.requestResponseLink.requestAysnc(RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_SET_SESSION_STATE_OPERATION, hashMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName()), this.operationTimeout).thenComposeAsync(message -> {
                CompletableFuture completableFuture = new CompletableFuture();
                if (RequestResponseUtils.getResponseStatusCode(message) == 200) {
                    TRACE_LOGGER.debug("Setting session state of sesion '{}' on entity '{}' succeeded", getSessionId(), this.receivePath);
                    completableFuture.complete(null);
                } else {
                    Exception genereateExceptionFromResponse = RequestResponseUtils.genereateExceptionFromResponse(message);
                    TRACE_LOGGER.error("Setting session state of sesion '{}' on entity '{}' failed", new Object[]{getSessionId(), this.receivePath, genereateExceptionFromResponse});
                    completableFuture.completeExceptionally(genereateExceptionFromResponse);
                }
                return completableFuture;
            }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
        }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
    }

    public CompletableFuture<Collection<Message>> peekMessagesAsync(long j, int i, String str) {
        throwIfInUnusableState();
        return createRequestResponseLinkAsync().thenComposeAsync(r13 -> {
            return CommonRequestResponseOperations.peekMessagesAsync(this.requestResponseLink, this.operationTimeout, j, i, str, this.receiveLink.getName());
        }, (Executor) MessagingFactory.INTERNAL_THREAD_POOL);
    }
}
