package com.microsoft.azure.servicebus.primitives;

import com.microsoft.azure.servicebus.ClientSettings;
import com.microsoft.azure.servicebus.amqp.BaseLinkHandler;
import com.microsoft.azure.servicebus.amqp.ConnectionHandler;
import com.microsoft.azure.servicebus.amqp.DispatchHandler;
import com.microsoft.azure.servicebus.amqp.IAmqpConnection;
import com.microsoft.azure.servicebus.amqp.ProtonUtil;
import com.microsoft.azure.servicebus.amqp.ReactorDispatcher;
import com.microsoft.azure.servicebus.amqp.ReactorHandler;
import java.io.IOException;
import java.net.URI;
import java.nio.channels.UnresolvedAddressException;
import java.time.Duration;
import java.time.Instant;
import java.util.LinkedList;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.HandlerException;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.reactor.Reactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MarkerFactory;

/* loaded from: input_file:com/microsoft/azure/servicebus/primitives/MessagingFactory.class */
public class MessagingFactory extends ClientEntity implements IAmqpConnection {
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(MessagingFactory.class);
    public static final ExecutorService INTERNAL_THREAD_POOL = Executors.newCachedThreadPool();
    private static final String REACTOR_THREAD_NAME_PREFIX = "ReactorThread";
    private static final int MAX_CBS_LINK_CREATION_ATTEMPTS = 3;
    private final String hostName;
    private final CompletableFuture<Void> connetionCloseFuture;
    private final ConnectionHandler connectionHandler;
    private final ReactorHandler reactorHandler;
    private final LinkedList<Link> registeredLinks;
    private final Object reactorLock;
    private final RequestResponseLinkcache managementLinksCache;
    private Reactor reactor;
    private ReactorDispatcher reactorScheduler;
    private Connection connection;
    private CompletableFuture<MessagingFactory> factoryOpenFuture;
    private CompletableFuture<Void> cbsLinkCreationFuture;
    private RequestResponseLink cbsLink;
    private int cbsLinkCreationAttempts;
    private Throwable lastCBSLinkCreationException;
    private final ClientSettings clientSettings;
    private final URI namespaceEndpointUri;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/microsoft/azure/servicebus/primitives/MessagingFactory$RunReactor.class */
    public class RunReactor implements Runnable {
        private final Reactor rctr;

        public RunReactor() {
            this.rctr = MessagingFactory.this.getReactor();
        }

        @Override // java.lang.Runnable
        public void run() {
            MessagingFactory.TRACE_LOGGER.info("starting reactor instance.");
            try {
                try {
                    this.rctr.setTimeout(3141L);
                    this.rctr.start();
                    boolean z = true;
                    while (true) {
                        if (Thread.interrupted() || !z) {
                            break;
                        }
                        if (MessagingFactory.this.getIsClosed()) {
                            MessagingFactory.TRACE_LOGGER.info("Gracefully releasing reactor thread as messaging factory is closed");
                            break;
                        }
                        z = this.rctr.process();
                    }
                    MessagingFactory.TRACE_LOGGER.info("Stopping reactor");
                    this.rctr.stop();
                    this.rctr.free();
                } catch (HandlerException e) {
                    Throwable cause = e.getCause();
                    if (cause == null) {
                        cause = e;
                    }
                    MessagingFactory.TRACE_LOGGER.warn("UnHandled exception while processing events in reactor:", e);
                    String message = !StringUtil.isNullOrEmpty(cause.getMessage()) ? cause.getMessage() : !StringUtil.isNullOrEmpty(e.getMessage()) ? e.getMessage() : "Reactor encountered unrecoverable error";
                    ServiceBusException serviceBusException = new ServiceBusException(true, String.format(Locale.US, "%s, %s", message, ExceptionUtil.getTrackingIDAndTimeToLog()), cause);
                    if (cause instanceof UnresolvedAddressException) {
                        serviceBusException = new CommunicationException(String.format(Locale.US, "%s. This is usually caused by incorrect hostname or network configuration. Please check to see if namespace information is correct. %s", message, ExceptionUtil.getTrackingIDAndTimeToLog()), cause);
                    }
                    MessagingFactory.this.onReactorError(serviceBusException);
                    this.rctr.free();
                }
            } catch (Throwable th) {
                this.rctr.free();
                throw th;
            }
        }
    }

    private MessagingFactory(URI uri, ClientSettings clientSettings) {
        super("MessagingFactory".concat(StringUtil.getShortRandomString()));
        this.cbsLinkCreationAttempts = 0;
        this.lastCBSLinkCreationException = null;
        this.namespaceEndpointUri = uri;
        this.clientSettings = clientSettings;
        this.hostName = uri.getHost();
        this.registeredLinks = new LinkedList<>();
        this.connetionCloseFuture = new CompletableFuture<>();
        this.reactorLock = new Object();
        this.connectionHandler = new ConnectionHandler(this);
        this.factoryOpenFuture = new CompletableFuture<>();
        this.cbsLinkCreationFuture = new CompletableFuture<>();
        this.managementLinksCache = new RequestResponseLinkcache(this);
        this.reactorHandler = new ReactorHandler() { // from class: com.microsoft.azure.servicebus.primitives.MessagingFactory.1
            @Override // com.microsoft.azure.servicebus.amqp.ReactorHandler
            public void onReactorInit(Event event) {
                super.onReactorInit(event);
                Reactor reactor = event.getReactor();
                MessagingFactory.TRACE_LOGGER.info("Creating connection to host '{}:{}'", MessagingFactory.this.hostName, Integer.valueOf(ClientConstants.AMQPS_PORT));
                MessagingFactory.this.connection = reactor.connectionToHost(MessagingFactory.this.hostName, ClientConstants.AMQPS_PORT, MessagingFactory.this.connectionHandler);
            }
        };
        Timer.register(getClientId());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getHostName() {
        return this.hostName;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Reactor getReactor() {
        Reactor reactor;
        synchronized (this.reactorLock) {
            reactor = this.reactor;
        }
        return reactor;
    }

    private ReactorDispatcher getReactorScheduler() {
        ReactorDispatcher reactorDispatcher;
        synchronized (this.reactorLock) {
            reactorDispatcher = this.reactorScheduler;
        }
        return reactorDispatcher;
    }

    private void startReactor(ReactorHandler reactorHandler) throws IOException {
        TRACE_LOGGER.info("Creating and starting reactor");
        Reactor reactor = ProtonUtil.reactor(reactorHandler);
        synchronized (this.reactorLock) {
            this.reactor = reactor;
            this.reactorScheduler = new ReactorDispatcher(reactor);
        }
        new Thread(new RunReactor(), REACTOR_THREAD_NAME_PREFIX + UUID.randomUUID().toString()).start();
        TRACE_LOGGER.info("Started reactor");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Connection getConnection() {
        if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) {
            TRACE_LOGGER.info("Creating connection to host '{}:{}'", this.hostName, Integer.valueOf(ClientConstants.AMQPS_PORT));
            this.connection = getReactor().connectionToHost(this.hostName, ClientConstants.AMQPS_PORT, this.connectionHandler);
        }
        return this.connection;
    }

    public Duration getOperationTimeout() {
        return this.clientSettings.getOperationTimeout();
    }

    public RetryPolicy getRetryPolicy() {
        return this.clientSettings.getRetryPolicy();
    }

    public ClientSettings getClientSetttings() {
        return this.clientSettings;
    }

    public static CompletableFuture<MessagingFactory> createFromNamespaceNameAsyc(String str, ClientSettings clientSettings) {
        return createFromNamespaceEndpointURIAsyc(Util.convertNamespaceToEndPointURI(str), clientSettings);
    }

    public static CompletableFuture<MessagingFactory> createFromNamespaceEndpointURIAsyc(URI uri, ClientSettings clientSettings) {
        if (TRACE_LOGGER.isInfoEnabled()) {
            TRACE_LOGGER.info("Creating messaging factory from namespace endpoint uri '{}'", uri.toString());
        }
        MessagingFactory messagingFactory = new MessagingFactory(uri, clientSettings);
        try {
            messagingFactory.startReactor(messagingFactory.reactorHandler);
        } catch (IOException e) {
            TRACE_LOGGER.error(MarkerFactory.getMarker(ClientConstants.FATAL_MARKER), "Starting reactor failed", e);
            messagingFactory.factoryOpenFuture.completeExceptionally(e);
        }
        return messagingFactory.factoryOpenFuture;
    }

    public static MessagingFactory createFromNamespaceName(String str, ClientSettings clientSettings) throws InterruptedException, ServiceBusException {
        return (MessagingFactory) completeFuture(createFromNamespaceNameAsyc(str, clientSettings));
    }

    public static MessagingFactory createFromNamespaceEndpointURI(URI uri, ClientSettings clientSettings) throws InterruptedException, ServiceBusException {
        return (MessagingFactory) completeFuture(createFromNamespaceEndpointURIAsyc(uri, clientSettings));
    }

    public static CompletableFuture<MessagingFactory> createFromConnectionStringBuilderAsync(ConnectionStringBuilder connectionStringBuilder) {
        if (TRACE_LOGGER.isInfoEnabled()) {
            TRACE_LOGGER.info("Creating messaging factory from connection string '{}'", connectionStringBuilder.toLoggableString());
        }
        return createFromNamespaceEndpointURIAsyc(connectionStringBuilder.getEndpoint(), Util.getClientSettingsFromConnectionStringBuilder(connectionStringBuilder));
    }

    public static CompletableFuture<MessagingFactory> createFromConnectionStringAsync(String str) {
        return createFromConnectionStringBuilderAsync(new ConnectionStringBuilder(str));
    }

    public static MessagingFactory createFromConnectionStringBuilder(ConnectionStringBuilder connectionStringBuilder) throws InterruptedException, ExecutionException {
        return createFromConnectionStringBuilderAsync(connectionStringBuilder).get();
    }

    public static MessagingFactory createFromConnectionString(String str) throws InterruptedException, ExecutionException {
        return createFromConnectionStringAsync(str).get();
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpConnection
    public void onConnectionOpen() {
        if (!this.factoryOpenFuture.isDone()) {
            TRACE_LOGGER.info("MessagingFactory opened.");
            AsyncUtil.completeFuture(this.factoryOpenFuture, this);
        }
        TRACE_LOGGER.info("Connection opened to host.");
        if (this.cbsLink == null) {
            createCBSLinkAsync();
        }
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpConnection
    public void onConnectionError(ErrorCondition errorCondition) {
        if (errorCondition != null && errorCondition.getCondition() != null) {
            TRACE_LOGGER.error("Connection error. '{}'", errorCondition);
        }
        if (this.factoryOpenFuture.isDone()) {
            closeConnection(errorCondition, null);
        } else {
            AsyncUtil.completeFutureExceptionally(this.factoryOpenFuture, ExceptionUtil.toException(errorCondition));
            setClosed();
        }
        if (!getIsClosingOrClosed() || this.connetionCloseFuture.isDone()) {
            return;
        }
        TRACE_LOGGER.info("Connection to host closed.");
        AsyncUtil.completeFuture(this.connetionCloseFuture, null);
        Timer.unregister(getClientId());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onReactorError(Exception exc) {
        if (!this.factoryOpenFuture.isDone()) {
            TRACE_LOGGER.error("Reactor error occured", exc);
            AsyncUtil.completeFutureExceptionally(this.factoryOpenFuture, exc);
            setClosed();
        } else {
            if (getIsClosingOrClosed()) {
                return;
            }
            TRACE_LOGGER.warn("Reactor error occured", exc);
            try {
                startReactor(this.reactorHandler);
            } catch (IOException e) {
                TRACE_LOGGER.error(MarkerFactory.getMarker(ClientConstants.FATAL_MARKER), "Re-starting reactor failed with exception.", e);
                onReactorError(exc);
            }
            closeConnection(null, exc);
        }
    }

    private void closeConnection(ErrorCondition errorCondition, Exception exc) {
        Connection connection = this.connection;
        if (connection != null) {
            Link[] linkArr = (Link[]) this.registeredLinks.toArray(new Link[0]);
            this.registeredLinks.clear();
            TRACE_LOGGER.debug("Closing all links on the connection. Number of links '{}'", Integer.valueOf(linkArr.length));
            for (Link link : linkArr) {
                link.close();
            }
            TRACE_LOGGER.debug("Closed all links on the connection. Number of links '{}'", Integer.valueOf(linkArr.length));
            if (connection.getLocalState() != EndpointState.CLOSED) {
                TRACE_LOGGER.info("Closing connection to host");
                connection.close();
            }
            for (Link link2 : linkArr) {
                BaseLinkHandler handler = BaseHandler.getHandler(link2);
                if (handler != null && (handler instanceof BaseLinkHandler)) {
                    BaseLinkHandler baseLinkHandler = handler;
                    if (errorCondition != null) {
                        baseLinkHandler.processOnClose(link2, errorCondition);
                    } else {
                        baseLinkHandler.processOnClose(link2, exc);
                    }
                }
            }
        }
    }

    @Override // com.microsoft.azure.servicebus.primitives.ClientEntity
    protected CompletableFuture<Void> onClose() {
        CompletableFuture<Void> closeAsync;
        if (getIsClosed()) {
            return CompletableFuture.completedFuture(null);
        }
        TRACE_LOGGER.info("Closing messaging factory");
        if (this.cbsLink == null) {
            closeAsync = CompletableFuture.completedFuture(null);
        } else {
            TRACE_LOGGER.info("Closing CBS link");
            closeAsync = this.cbsLink.closeAsync();
        }
        closeAsync.thenRun(() -> {
            this.managementLinksCache.freeAsync();
        }).thenRun(() -> {
            if (this.cbsLinkCreationFuture != null && !this.cbsLinkCreationFuture.isDone()) {
                this.cbsLinkCreationFuture.completeExceptionally(new Exception("Connection closed."));
            }
            if (this.connection == null || this.connection.getRemoteState() == EndpointState.CLOSED) {
                this.connetionCloseFuture.complete(null);
                Timer.unregister(getClientId());
            } else {
                try {
                    scheduleOnReactorThread(new DispatchHandler() { // from class: com.microsoft.azure.servicebus.primitives.MessagingFactory.2
                        @Override // com.microsoft.azure.servicebus.amqp.DispatchHandler
                        public void onEvent() {
                            if (MessagingFactory.this.connection == null || MessagingFactory.this.connection.getLocalState() == EndpointState.CLOSED) {
                                return;
                            }
                            MessagingFactory.TRACE_LOGGER.info("Closing connection to host");
                            MessagingFactory.this.connection.close();
                        }
                    });
                } catch (IOException e) {
                    this.connetionCloseFuture.completeExceptionally(e);
                }
                Timer.schedule(new Runnable() { // from class: com.microsoft.azure.servicebus.primitives.MessagingFactory.3
                    @Override // java.lang.Runnable
                    public void run() {
                        if (MessagingFactory.this.connetionCloseFuture.isDone()) {
                            return;
                        }
                        MessagingFactory.TRACE_LOGGER.warn("Closing MessagingFactory timed out.");
                        AsyncUtil.completeFutureExceptionally(MessagingFactory.this.connetionCloseFuture, new TimeoutException("Closing MessagingFactory timed out."));
                    }
                }, this.clientSettings.getOperationTimeout(), TimerType.OneTimeRun);
            }
        });
        return this.connetionCloseFuture;
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpConnection
    public void registerForConnectionError(Link link) {
        if (link != null) {
            this.registeredLinks.add(link);
        }
    }

    @Override // com.microsoft.azure.servicebus.amqp.IAmqpConnection
    public void deregisterForConnectionError(Link link) {
        if (link != null) {
            this.registeredLinks.remove(link);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void scheduleOnReactorThread(DispatchHandler dispatchHandler) throws IOException {
        getReactorScheduler().invoke(dispatchHandler);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void scheduleOnReactorThread(int i, DispatchHandler dispatchHandler) throws IOException {
        getReactorScheduler().invoke(i, dispatchHandler);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<ScheduledFuture<?>> sendSecurityTokenAndSetRenewTimer(String str, boolean z, Runnable runnable) {
        TRACE_LOGGER.debug("Sending token for {}", str);
        return this.clientSettings.getTokenProvider().getSecurityTokenAsync(str).thenComposeAsync(securityToken -> {
            CompletableFuture<U> thenComposeAsync = this.cbsLinkCreationFuture.thenComposeAsync(r6 -> {
                return CommonRequestResponseOperations.sendCBSTokenAsync(this.cbsLink, Util.adjustServerTimeout(this.clientSettings.getOperationTimeout()), securityToken);
            }, (Executor) INTERNAL_THREAD_POOL);
            return z ? thenComposeAsync.handleAsync((BiFunction<? super U, Throwable, ? extends U>) (r8, th) -> {
                if (th == null) {
                    TRACE_LOGGER.debug("Sent token for {}", str);
                    return scheduleRenewTimer(securityToken.getValidUntil(), runnable);
                }
                TRACE_LOGGER.warn("Sending CBS Token for {} failed.", str, th);
                TRACE_LOGGER.info("Will retry sending CBS Token for {} after {} seconds.", str, 5);
                return Timer.schedule(runnable, Duration.ofSeconds(5L), TimerType.OneTimeRun);
            }, (Executor) INTERNAL_THREAD_POOL) : thenComposeAsync.thenApply((Function<? super U, ? extends U>) r7 -> {
                TRACE_LOGGER.debug("Sent token for {}", str);
                return scheduleRenewTimer(securityToken.getValidUntil(), runnable);
            });
        }, (Executor) INTERNAL_THREAD_POOL);
    }

    private static ScheduledFuture<?> scheduleRenewTimer(Instant instant, Runnable runnable) {
        return Timer.schedule(runnable, Duration.ofSeconds(Util.getTokenRenewIntervalInSeconds((int) Duration.between(Instant.now(), instant).getSeconds())), TimerType.OneTimeRun);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<RequestResponseLink> obtainRequestResponseLinkAsync(String str, MessagingEntityType messagingEntityType) {
        throwIfClosed(null);
        return this.managementLinksCache.obtainRequestResponseLinkAsync(str, messagingEntityType);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releaseRequestResponseLink(String str) {
        if (getIsClosed()) {
            return;
        }
        this.managementLinksCache.releaseRequestResponseLink(str);
    }

    private CompletableFuture<Void> createCBSLinkAsync() {
        int i = this.cbsLinkCreationAttempts + 1;
        this.cbsLinkCreationAttempts = i;
        if (i > MAX_CBS_LINK_CREATION_ATTEMPTS) {
            this.cbsLinkCreationFuture.completeExceptionally(this.lastCBSLinkCreationException == null ? new Exception("CBS link creation failed multiple times.") : this.lastCBSLinkCreationException);
            return CompletableFuture.completedFuture(null);
        }
        String cBSNodeLinkPath = RequestResponseLink.getCBSNodeLinkPath();
        TRACE_LOGGER.info("Creating CBS link to {}", cBSNodeLinkPath);
        return RequestResponseLink.createAsync(this, getClientId() + "-cbs", cBSNodeLinkPath, null, null).handleAsync((requestResponseLink, th) -> {
            if (th == null) {
                TRACE_LOGGER.info("Created CBS link to {}", cBSNodeLinkPath);
                this.cbsLink = requestResponseLink;
                this.cbsLinkCreationFuture.complete(null);
                return null;
            }
            this.lastCBSLinkCreationException = ExceptionUtil.extractAsyncCompletionCause(th);
            TRACE_LOGGER.warn("Creating CBS link to {} failed. Attempts '{}'", cBSNodeLinkPath, Integer.valueOf(this.cbsLinkCreationAttempts));
            createCBSLinkAsync();
            return null;
        }, (Executor) INTERNAL_THREAD_POOL);
    }

    private static <T> T completeFuture(CompletableFuture<T> completableFuture) throws InterruptedException, ServiceBusException {
        try {
            return completableFuture.get();
        } catch (InterruptedException e) {
            throw e;
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            if (cause instanceof ServiceBusException) {
                throw ((ServiceBusException) cause);
            }
            throw new ServiceBusException(true, cause);
        }
    }
}
