package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.implementation.handler.ConnectionHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.reactor.Reactor;
import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/core/amqp/implementation/ReactorConnection.class */
public class ReactorConnection implements AmqpConnection {
    private static final String CBS_SESSION_NAME = "cbs-session";
    private static final String CBS_ADDRESS = "$cbs";
    private static final String CBS_LINK_NAME = "cbs";
    private final ReplayProcessor<AmqpEndpointState> endpointStates;
    private final String connectionId;
    private final ConnectionHandler handler;
    private final ReactorHandlerProvider handlerProvider;
    private final TokenManagerProvider tokenManagerProvider;
    private final MessageSerializer messageSerializer;
    private final ConnectionOptions connectionOptions;
    private final ReactorProvider reactorProvider;
    private final AmqpRetryPolicy retryPolicy;
    private final SenderSettleMode senderSettleMode;
    private final ReceiverSettleMode receiverSettleMode;
    private ReactorExecutor executor;
    private ReactorExceptionHandler reactorExceptionHandler;
    private volatile ClaimsBasedSecurityChannel cbsChannel;
    private volatile Connection connection;
    private final ClientLogger logger = new ClientLogger((Class<?>) ReactorConnection.class);
    private final ConcurrentMap<String, SessionSubscription> sessionMap = new ConcurrentHashMap();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final DirectProcessor<AmqpShutdownSignal> shutdownSignals = DirectProcessor.create();
    private final FluxSink<AmqpShutdownSignal> shutdownSignalsSink = this.shutdownSignals.sink();
    private final Mono<Connection> connectionMono = Mono.fromCallable(this::getOrCreateConnection);

    /* loaded from: input_file:com/azure/core/amqp/implementation/ReactorConnection$ReactorExceptionHandler.class */
    private final class ReactorExceptionHandler extends AmqpExceptionHandler {
        private ReactorExceptionHandler() {
        }

        @Override // com.azure.core.amqp.implementation.AmqpExceptionHandler
        public void onConnectionError(Throwable th) {
            if (ReactorConnection.this.isDisposed.get()) {
                super.onConnectionError(th);
                return;
            }
            ReactorConnection.this.logger.warning("onReactorError connectionId[{}], hostName[{}], message[Starting new reactor], error[{}]", ReactorConnection.this.getId(), ReactorConnection.this.getFullyQualifiedNamespace(), th.getMessage());
            ReactorConnection.this.endpointStates.onError(th);
            ReactorConnection.this.dispose();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // com.azure.core.amqp.implementation.AmqpExceptionHandler
        public void onConnectionShutdown(AmqpShutdownSignal amqpShutdownSignal) {
            if (ReactorConnection.this.isDisposed()) {
                super.onConnectionShutdown(amqpShutdownSignal);
                return;
            }
            ReactorConnection.this.logger.warning("onReactorError connectionId[{}], hostName[{}], message[Shutting down], shutdown signal[{}]", ReactorConnection.this.getId(), ReactorConnection.this.getFullyQualifiedNamespace(), Boolean.valueOf(amqpShutdownSignal.isInitiatedByClient()), amqpShutdownSignal);
            ReactorConnection.this.dispose(new ErrorCondition(Symbol.getSymbol("onReactorError"), amqpShutdownSignal.toString()));
            ReactorConnection.this.shutdownSignalsSink.next(amqpShutdownSignal);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/amqp/implementation/ReactorConnection$SessionSubscription.class */
    public static final class SessionSubscription {
        private final AtomicBoolean isDisposed;
        private final AmqpSession session;
        private final Disposable subscription;

        private SessionSubscription(AmqpSession amqpSession, Disposable disposable) {
            this.isDisposed = new AtomicBoolean();
            this.session = amqpSession;
            this.subscription = disposable;
        }

        public AmqpSession getSession() {
            return this.session;
        }

        void dispose(ErrorCondition errorCondition) {
            if (this.isDisposed.getAndSet(true)) {
                return;
            }
            if (this.session instanceof ReactorSession) {
                ((ReactorSession) this.session).dispose(errorCondition);
            } else {
                this.session.dispose();
            }
            this.subscription.dispose();
        }
    }

    public ReactorConnection(String str, ConnectionOptions connectionOptions, ReactorProvider reactorProvider, ReactorHandlerProvider reactorHandlerProvider, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, String str2, String str3, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode) {
        this.connectionOptions = connectionOptions;
        this.reactorProvider = reactorProvider;
        this.connectionId = str;
        this.handlerProvider = reactorHandlerProvider;
        this.tokenManagerProvider = (TokenManagerProvider) Objects.requireNonNull(tokenManagerProvider, "'tokenManagerProvider' cannot be null.");
        this.messageSerializer = messageSerializer;
        this.handler = reactorHandlerProvider.createConnectionHandler(str, connectionOptions.getFullyQualifiedNamespace(), connectionOptions.getTransportType(), connectionOptions.getProxyOptions(), str2, str3, connectionOptions.getSslVerifyMode(), connectionOptions.getClientOptions());
        this.retryPolicy = RetryUtil.getRetryPolicy(connectionOptions.getRetry());
        this.senderSettleMode = senderSettleMode;
        this.receiverSettleMode = receiverSettleMode;
        this.endpointStates = (ReplayProcessor) this.handler.getEndpointStates().takeUntilOther(this.shutdownSignals).map(endpointState -> {
            this.logger.verbose("connectionId[{}]: State {}", str, endpointState);
            return AmqpEndpointStateUtil.getConnectionState(endpointState);
        }).subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED));
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates;
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public Flux<AmqpShutdownSignal> getShutdownSignals() {
        return this.shutdownSignals;
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public Mono<ClaimsBasedSecurityNode> getClaimsBasedSecurityNode() {
        return isDisposed() ? Mono.error(this.logger.logExceptionAsError(new IllegalStateException(String.format("connectionId[%s]: Connection is disposed. Cannot get CBS node.", this.connectionId)))) : this.connectionMono.then(RetryUtil.withRetry(getEndpointStates().takeUntil(amqpEndpointState -> {
            return amqpEndpointState == AmqpEndpointState.ACTIVE;
        }), this.connectionOptions.getRetry().getTryTimeout(), this.retryPolicy).then(Mono.fromCallable(this::getOrCreateCBSNode)));
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public String getId() {
        return this.connectionId;
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public String getFullyQualifiedNamespace() {
        return this.handler.getHostname();
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public int getMaxFrameSize() {
        return this.handler.getMaxFrameSize();
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public Map<String, Object> getConnectionProperties() {
        return this.handler.getConnectionProperties();
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public Mono<AmqpSession> createSession(String str) {
        if (isDisposed()) {
            return Mono.error(this.logger.logExceptionAsError(new IllegalStateException(String.format("connectionId[%s]: Connection is disposed. Cannot create session '%s'.", this.connectionId, str))));
        }
        SessionSubscription sessionSubscription = this.sessionMap.get(str);
        return sessionSubscription != null ? Mono.just(sessionSubscription.getSession()) : this.connectionMono.map(connection -> {
            return this.sessionMap.computeIfAbsent(str, str2 -> {
                SessionHandler createSessionHandler = this.handlerProvider.createSessionHandler(this.connectionId, getFullyQualifiedNamespace(), str2, this.connectionOptions.getRetry().getTryTimeout());
                Session session = connection.session();
                BaseHandler.setHandler(session, createSessionHandler);
                AmqpSession createSession = createSession(str2, session, createSessionHandler);
                return new SessionSubscription(createSession, createSession.getEndpointStates().subscribe(amqpEndpointState -> {
                }, th -> {
                    this.logger.info("connectionId[{}] sessionName[{}]: Error occurred. Removing and disposing session.", this.connectionId, str, th);
                    removeSession(str2);
                }, () -> {
                    this.logger.info("connectionId[{}] sessionName[{}]: Complete. Removing and disposing session.", this.connectionId, str);
                    removeSession(str2);
                }));
            }).getSession();
        });
    }

    protected AmqpSession createSession(String str, Session session, SessionHandler sessionHandler) {
        return new ReactorSession(session, sessionHandler, str, this.reactorProvider, this.handlerProvider, getClaimsBasedSecurityNode(), this.tokenManagerProvider, this.messageSerializer, this.connectionOptions.getRetry().getTryTimeout(), this.retryPolicy);
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public boolean removeSession(String str) {
        return removeSession(str, null);
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        dispose(null);
        this.shutdownSignalsSink.next(new AmqpShutdownSignal(false, true, "Disposed by client."));
    }

    void dispose(ErrorCondition errorCondition) {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        ClientLogger clientLogger = this.logger;
        Object[] objArr = new Object[2];
        objArr[0] = this.connectionId;
        objArr[1] = errorCondition != null ? errorCondition : ClientConstants.NOT_APPLICABLE;
        clientLogger.info("connectionId[{}], errorCondition[{}]: Disposing of ReactorConnection.", objArr);
        for (String str : (String[]) this.sessionMap.keySet().toArray(new String[0])) {
            this.logger.info("connectionId[{}]: Removing session '{}'", this.connectionId, str);
            removeSession(str, errorCondition);
        }
        if (this.connection != null) {
            this.connection.close();
        }
        if (this.executor != null) {
            this.executor.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<Connection> getReactorConnection() {
        return this.connectionMono;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<RequestResponseChannel> createRequestResponseChannel(String str, String str2, String str3) {
        return (Mono) createSession(str).cast(ReactorSession.class).map(reactorSession -> {
            return new RequestResponseChannel(getId(), getFullyQualifiedNamespace(), str2, str3, reactorSession.session(), this.connectionOptions.getRetry(), this.handlerProvider, this.reactorProvider, this.messageSerializer, this.senderSettleMode, this.receiverSettleMode);
        }).doOnNext(requestResponseChannel -> {
            this.logger.info("Emitting new response channel. connectionId: {}. entityPath: {}. linkName: {}.", getId(), str3, str2);
        }).repeat().subscribeWith(new AmqpChannelProcessor(this.connectionId, str3, requestResponseChannel2 -> {
            return requestResponseChannel2.getEndpointStates();
        }, this.retryPolicy, new ClientLogger((Class<?>) RequestResponseChannel.class)));
    }

    private boolean removeSession(String str, ErrorCondition errorCondition) {
        if (str == null) {
            return false;
        }
        SessionSubscription remove = this.sessionMap.remove(str);
        if (remove != null) {
            remove.dispose(errorCondition);
        }
        return remove != null;
    }

    private synchronized ClaimsBasedSecurityNode getOrCreateCBSNode() {
        if (this.cbsChannel == null) {
            this.logger.info("Setting CBS channel.");
            this.cbsChannel = new ClaimsBasedSecurityChannel(createRequestResponseChannel(CBS_SESSION_NAME, CBS_LINK_NAME, CBS_ADDRESS), this.connectionOptions.getTokenCredential(), this.connectionOptions.getAuthorizationType(), this.connectionOptions.getRetry());
        }
        return this.cbsChannel;
    }

    private synchronized Connection getOrCreateConnection() throws IOException {
        if (this.connection == null) {
            this.logger.info("connectionId[{}]: Creating and starting connection to {}:{}", this.connectionId, this.handler.getHostname(), Integer.valueOf(this.handler.getProtocolPort()));
            Reactor createReactor = this.reactorProvider.createReactor(this.connectionId, this.handler.getMaxFrameSize());
            this.connection = createReactor.connectionToHost(this.handler.getHostname(), this.handler.getProtocolPort(), this.handler);
            this.reactorExceptionHandler = new ReactorExceptionHandler();
            this.executor = new ReactorExecutor(createReactor, Schedulers.single(), this.connectionId, this.reactorExceptionHandler, this.connectionOptions.getRetry().getTryTimeout(), this.connectionOptions.getFullyQualifiedNamespace());
            this.executor.start();
        }
        return this.connection;
    }
}
