package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpLink;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;

/* loaded from: input_file:com/azure/core/amqp/implementation/ReactorSession.class */
public class ReactorSession implements AmqpSession {
    private static final String TRANSACTION_LINK_NAME = "coordinator";
    private final ReplayProcessor<AmqpEndpointState> endpointStates;
    private final Session session;
    private final SessionHandler sessionHandler;
    private final String sessionName;
    private final ReactorProvider provider;
    private final TokenManagerProvider tokenManagerProvider;
    private final MessageSerializer messageSerializer;
    private final String activeTimeoutMessage;
    private final AmqpRetryOptions retryOptions;
    private final ReactorHandlerProvider handlerProvider;
    private final Mono<ClaimsBasedSecurityNode> cbsNodeSupplier;
    private final ConcurrentMap<String, LinkSubscription<AmqpSendLink>> openSendLinks = new ConcurrentHashMap();
    private final ConcurrentMap<String, LinkSubscription<AmqpReceiveLink>> openReceiveLinks = new ConcurrentHashMap();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final ClientLogger logger = new ClientLogger((Class<?>) ReactorSession.class);
    private final AtomicReference<TransactionCoordinator> transactionCoordinator = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/amqp/implementation/ReactorSession$LinkSubscription.class */
    public static final class LinkSubscription<T extends AmqpLink> {
        private final AtomicBoolean isDisposed;
        private final T link;
        private final Disposable subscription;

        private LinkSubscription(T t, Disposable disposable) {
            this.isDisposed = new AtomicBoolean();
            this.link = t;
            this.subscription = disposable;
        }

        public T getLink() {
            return this.link;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void dispose(ErrorCondition errorCondition) {
            if (this.isDisposed.getAndSet(true)) {
                return;
            }
            if (this.link instanceof ReactorReceiver) {
                ((ReactorReceiver) this.link).dispose(errorCondition);
            } else if (this.link instanceof ReactorSender) {
                ((ReactorSender) this.link).dispose(errorCondition);
            } else {
                this.link.dispose();
            }
            this.subscription.dispose();
        }
    }

    public ReactorSession(Session session, SessionHandler sessionHandler, String str, ReactorProvider reactorProvider, ReactorHandlerProvider reactorHandlerProvider, Mono<ClaimsBasedSecurityNode> mono, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, AmqpRetryOptions amqpRetryOptions) {
        this.session = session;
        this.sessionHandler = sessionHandler;
        this.handlerProvider = reactorHandlerProvider;
        this.sessionName = str;
        this.provider = reactorProvider;
        this.cbsNodeSupplier = mono;
        this.tokenManagerProvider = tokenManagerProvider;
        this.messageSerializer = messageSerializer;
        this.retryOptions = amqpRetryOptions;
        this.activeTimeoutMessage = String.format("ReactorSession connectionId[%s], session[%s]: Retries exhausted waiting for ACTIVE endpoint state.", sessionHandler.getConnectionId(), str);
        this.endpointStates = (ReplayProcessor) sessionHandler.getEndpointStates().map(endpointState -> {
            this.logger.verbose("connectionId[{}], sessionName[{}], state[{}]", sessionHandler.getConnectionId(), str, endpointState);
            return AmqpEndpointStateUtil.getConnectionState(endpointState);
        }).subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED));
        session.open();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Session session() {
        return this.session;
    }

    @Override // com.azure.core.amqp.AmqpSession
    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates;
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        dispose(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dispose(ErrorCondition errorCondition) {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        ClientLogger clientLogger = this.logger;
        Object[] objArr = new Object[3];
        objArr[0] = this.sessionHandler.getConnectionId();
        objArr[1] = this.sessionName;
        objArr[2] = errorCondition != null ? errorCondition : ClientConstants.NOT_APPLICABLE;
        clientLogger.info("connectionId[{}], sessionId[{}], errorCondition[{}]: Disposing of session.", objArr);
        if (this.session.getLocalState() != EndpointState.CLOSED) {
            this.session.close();
            if (this.session.getCondition() == null) {
                this.session.setCondition(errorCondition);
            }
        }
        this.openReceiveLinks.forEach((str, linkSubscription) -> {
            linkSubscription.dispose(errorCondition);
        });
        this.openSendLinks.forEach((str2, linkSubscription2) -> {
            linkSubscription2.dispose(errorCondition);
        });
    }

    @Override // com.azure.core.amqp.AmqpSession
    public String getSessionName() {
        return this.sessionName;
    }

    @Override // com.azure.core.amqp.AmqpSession
    public Duration getOperationTimeout() {
        return this.retryOptions.getTryTimeout();
    }

    @Override // com.azure.core.amqp.AmqpSession
    public Mono<AmqpTransaction> createTransaction() {
        return createTransactionCoordinator().flatMap(transactionCoordinator -> {
            return transactionCoordinator.createTransaction();
        });
    }

    @Override // com.azure.core.amqp.AmqpSession
    public Mono<Void> commitTransaction(AmqpTransaction amqpTransaction) {
        return createTransactionCoordinator().flatMap(transactionCoordinator -> {
            return transactionCoordinator.completeTransaction(amqpTransaction, true);
        });
    }

    @Override // com.azure.core.amqp.AmqpSession
    public Mono<Void> rollbackTransaction(AmqpTransaction amqpTransaction) {
        return createTransactionCoordinator().flatMap(transactionCoordinator -> {
            return transactionCoordinator.completeTransaction(amqpTransaction, false);
        });
    }

    @Override // com.azure.core.amqp.AmqpSession
    public Mono<AmqpLink> createProducer(String str, String str2, Duration duration, AmqpRetryPolicy amqpRetryPolicy) {
        return createProducer(str, str2, duration, amqpRetryPolicy, null);
    }

    @Override // com.azure.core.amqp.AmqpSession
    public Mono<AmqpLink> createConsumer(String str, String str2, Duration duration, AmqpRetryPolicy amqpRetryPolicy) {
        return createConsumer(str, str2, duration, amqpRetryPolicy, null, null, null, SenderSettleMode.UNSETTLED, ReceiverSettleMode.SECOND).cast(AmqpLink.class);
    }

    @Override // com.azure.core.amqp.AmqpSession
    public boolean removeLink(String str) {
        return removeLink(this.openSendLinks, str) || removeLink(this.openReceiveLinks, str);
    }

    private Mono<TransactionCoordinator> createTransactionCoordinator() {
        if (isDisposed()) {
            return Mono.error(this.logger.logExceptionAsError(new IllegalStateException(String.format("Cannot create coordinator send link '%s' from a closed session.", TRANSACTION_LINK_NAME))));
        }
        TransactionCoordinator transactionCoordinator = this.transactionCoordinator.get();
        if (transactionCoordinator == null) {
            return createProducer(TRANSACTION_LINK_NAME, TRANSACTION_LINK_NAME, new Coordinator(), this.retryOptions, null, false).map(amqpSendLink -> {
                TransactionCoordinator transactionCoordinator2 = new TransactionCoordinator(amqpSendLink, this.messageSerializer);
                return this.transactionCoordinator.compareAndSet(null, transactionCoordinator2) ? transactionCoordinator2 : this.transactionCoordinator.get();
            });
        }
        this.logger.verbose("Coordinator[{}]: Returning existing transaction coordinator.", TRANSACTION_LINK_NAME);
        return Mono.just(transactionCoordinator);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<AmqpReceiveLink> createConsumer(String str, String str2, Duration duration, AmqpRetryPolicy amqpRetryPolicy, Map<Symbol, Object> map, Map<Symbol, Object> map2, Symbol[] symbolArr, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode) {
        if (isDisposed()) {
            return Mono.error(this.logger.logExceptionAsError(new IllegalStateException(String.format("Cannot create receive link '%s' from a closed session. entityPath[%s]", str, str2))));
        }
        LinkSubscription<AmqpReceiveLink> linkSubscription = this.openReceiveLinks.get(str);
        if (linkSubscription != null) {
            this.logger.info("linkName[{}] entityPath[{}]: Returning existing receive link.", str, str2);
            return Mono.just(linkSubscription.getLink());
        }
        TokenManager tokenManager = this.tokenManagerProvider.getTokenManager(this.cbsNodeSupplier, str2);
        return Mono.when((Publisher<?>[]) new Publisher[]{onActiveEndpoint(), tokenManager.authorize()}).then(Mono.create(monoSink -> {
            try {
                this.provider.getReactorDispatcher().invoke(() -> {
                    monoSink.success(this.openReceiveLinks.compute(str, (str3, linkSubscription2) -> {
                        if (linkSubscription2 == null) {
                            this.logger.info("Creating a new receiver link with linkName {}", str);
                            return getSubscription(str3, str2, map, map2, symbolArr, senderSettleMode, receiverSettleMode, tokenManager);
                        }
                        this.logger.info("linkName[{}]: Another receive link exists. Disposing of new one.", str);
                        tokenManager.close();
                        return linkSubscription2;
                    }).getLink());
                });
            } catch (IOException e) {
                monoSink.error(e);
            }
        }));
    }

    protected ReactorReceiver createConsumer(String str, Receiver receiver, ReceiveLinkHandler receiveLinkHandler, TokenManager tokenManager, ReactorProvider reactorProvider) {
        return new ReactorReceiver(str, receiver, receiveLinkHandler, tokenManager, reactorProvider.getReactorDispatcher());
    }

    protected Mono<AmqpLink> createProducer(String str, String str2, Duration duration, AmqpRetryPolicy amqpRetryPolicy, Map<Symbol, Object> map) {
        Target target = new Target();
        target.setAddress(str2);
        AmqpRetryOptions amqpRetryOptions = amqpRetryPolicy != null ? new AmqpRetryOptions(amqpRetryPolicy.getRetryOptions()) : new AmqpRetryOptions();
        if (duration != null) {
            amqpRetryOptions.setTryTimeout(duration);
        }
        return createProducer(str, str2, target, amqpRetryOptions, map, true).cast(AmqpLink.class);
    }

    private Mono<AmqpSendLink> createProducer(String str, String str2, org.apache.qpid.proton.amqp.transport.Target target, AmqpRetryOptions amqpRetryOptions, Map<Symbol, Object> map, boolean z) {
        TokenManager tokenManager;
        Mono<Long> empty;
        if (isDisposed()) {
            return Mono.error(this.logger.logExceptionAsError(new IllegalStateException(String.format("Cannot create send link '%s' from a closed session. entityPath[%s]", str, str2))));
        }
        LinkSubscription<AmqpSendLink> linkSubscription = this.openSendLinks.get(str);
        if (linkSubscription != null) {
            this.logger.verbose("linkName[{}]: Returning existing send link.", str);
            return Mono.just(linkSubscription.getLink());
        }
        if (z) {
            tokenManager = this.tokenManagerProvider.getTokenManager(this.cbsNodeSupplier, str2);
            empty = tokenManager.authorize();
        } else {
            tokenManager = null;
            empty = Mono.empty();
        }
        TokenManager tokenManager2 = tokenManager;
        return Mono.when((Publisher<?>[]) new Publisher[]{onActiveEndpoint(), empty}).then(Mono.create(monoSink -> {
            try {
                this.provider.getReactorDispatcher().invoke(() -> {
                    monoSink.success(this.openSendLinks.compute(str, (str3, linkSubscription2) -> {
                        if (linkSubscription2 == null) {
                            this.logger.info("Creating a new sender link with linkName {}", str);
                            return getSubscription(str, str2, target, map, amqpRetryOptions, tokenManager2);
                        }
                        this.logger.info("linkName[{}]: Another send link exists. Disposing of new one.", str);
                        if (tokenManager2 != null) {
                            tokenManager2.close();
                        }
                        return linkSubscription2;
                    }).getLink());
                });
            } catch (IOException e) {
                monoSink.error(e);
            }
        }));
    }

    private LinkSubscription<AmqpSendLink> getSubscription(String str, String str2, org.apache.qpid.proton.amqp.transport.Target target, Map<Symbol, Object> map, AmqpRetryOptions amqpRetryOptions, TokenManager tokenManager) {
        Sender sender = this.session.sender(str);
        sender.setTarget(target);
        sender.setSource(new Source());
        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        if (map != null && map.size() > 0) {
            sender.setProperties(map);
        }
        SendLinkHandler createSendLinkHandler = this.handlerProvider.createSendLinkHandler(this.sessionHandler.getConnectionId(), this.sessionHandler.getHostname(), str, str2);
        BaseHandler.setHandler(sender, createSendLinkHandler);
        sender.open();
        ReactorSender reactorSender = new ReactorSender(str2, sender, createSendLinkHandler, this.provider, tokenManager, this.messageSerializer, amqpRetryOptions);
        return new LinkSubscription<>(reactorSender, reactorSender.getEndpointStates().subscribe(amqpEndpointState -> {
        }, th -> {
            this.logger.info("linkName[{}]: Error occurred. Removing and disposing send link.", str, th);
            removeLink(this.openSendLinks, str);
        }, () -> {
            this.logger.info("linkName[{}]: Complete. Removing and disposing send link.", str);
            removeLink(this.openSendLinks, str);
        }));
    }

    private LinkSubscription<AmqpReceiveLink> getSubscription(String str, String str2, Map<Symbol, Object> map, Map<Symbol, Object> map2, Symbol[] symbolArr, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode, TokenManager tokenManager) {
        Receiver receiver = this.session.receiver(str);
        Source source = new Source();
        source.setAddress(str2);
        if (map != null && map.size() > 0) {
            source.setFilter(map);
        }
        receiver.setSource(source);
        receiver.setTarget(new Target());
        receiver.setSenderSettleMode(senderSettleMode);
        receiver.setReceiverSettleMode(receiverSettleMode);
        if (map2 != null && !map2.isEmpty()) {
            receiver.setProperties(map2);
        }
        if (symbolArr != null && symbolArr.length > 0) {
            receiver.setDesiredCapabilities(symbolArr);
        }
        ReceiveLinkHandler createReceiveLinkHandler = this.handlerProvider.createReceiveLinkHandler(this.sessionHandler.getConnectionId(), this.sessionHandler.getHostname(), str, str2);
        BaseHandler.setHandler(receiver, createReceiveLinkHandler);
        receiver.open();
        ReactorReceiver createConsumer = createConsumer(str2, receiver, createReceiveLinkHandler, tokenManager, this.provider);
        return new LinkSubscription<>(createConsumer, createConsumer.getEndpointStates().subscribe(amqpEndpointState -> {
        }, th -> {
            this.logger.info("linkName[{}] entityPath[{}]: Error occurred. Removing receive link.", str, str2, th);
            removeLink(this.openReceiveLinks, str);
        }, () -> {
            this.logger.info("linkName[{}] entityPath[{}]: Complete. Removing receive link.", str, str2);
            removeLink(this.openReceiveLinks, str);
        }));
    }

    private Mono<Void> onActiveEndpoint() {
        return RetryUtil.withRetry(getEndpointStates().takeUntil(amqpEndpointState -> {
            return amqpEndpointState == AmqpEndpointState.ACTIVE;
        }), this.retryOptions, this.activeTimeoutMessage).then();
    }

    private <T extends AmqpLink> boolean removeLink(ConcurrentMap<String, LinkSubscription<T>> concurrentMap, String str) {
        if (str == null) {
            return false;
        }
        LinkSubscription<T> remove = concurrentMap.remove(str);
        if (remove != null) {
            remove.dispose(null);
        }
        return remove != null;
    }
}
