package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.ReactorConnection;
import com.azure.core.exception.AzureException;
import com.azure.core.util.logging.ClientLogger;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

/* loaded from: input_file:com/azure/core/amqp/implementation/ReactorConnectionCache.class */
public final class ReactorConnectionCache<T extends ReactorConnection> implements Disposable {
    private static final AmqpException TERMINATED_ERROR = new AmqpException(false, "Connection recovery support is terminated.", null);
    private static final String TRY_COUNT_KEY = "tryCount";
    private final String fullyQualifiedNamespace;
    private final String entityPath;
    private final AmqpRetryOptions retryOptions;
    private final AmqpErrorContext errorContext;
    private final ClientLogger logger;
    private final Mono<T> createOrGetCachedConnection;
    private final Object lock = new Object();
    private volatile boolean terminated;
    private volatile T currentConnection;

    public ReactorConnectionCache(Supplier<T> supplier, String str, String str2, AmqpRetryPolicy amqpRetryPolicy, Map<String, Object> map) {
        this.fullyQualifiedNamespace = (String) Objects.requireNonNull(str, "'fullyQualifiedNamespace' cannot be null.");
        this.entityPath = str2;
        Objects.requireNonNull(amqpRetryPolicy, "'retryPolicy' cannot be null.");
        this.retryOptions = amqpRetryPolicy.getRetryOptions();
        this.errorContext = new AmqpErrorContext(str);
        this.logger = new ClientLogger(getClass(), (Map) Objects.requireNonNull(map, "'loggingContext' cannot be null."));
        Objects.requireNonNull(supplier, "'connectionSupplier' cannot be null.");
        this.createOrGetCachedConnection = Mono.fromSupplier(() -> {
            if (!this.terminated) {
                return (ReactorConnection) supplier.get();
            }
            this.logger.info("Connection recovery support is terminated, dropping the request for new connection.");
            throw TERMINATED_ERROR;
        }).flatMap(reactorConnection -> {
            this.logger.atInfo().addKeyValue(ClientConstants.CONNECTION_ID_KEY, reactorConnection.getId()).log("Waiting to connect and become active.");
            return reactorConnection.connectAndAwaitToActive().doOnCancel(() -> {
                if (reactorConnection.isDisposed()) {
                    return;
                }
                reactorConnection.closeAsync(createShutdownSignal("The connection request was canceled while waiting to active.")).subscribe();
            });
        }).retryWhen(retryWhenSpec(amqpRetryPolicy)).handle((reactorConnection2, synchronousSink) -> {
            boolean z;
            synchronized (this.lock) {
                z = this.terminated;
                this.currentConnection = reactorConnection2;
            }
            if (z) {
                reactorConnection2.closeAsync(createShutdownSignal("Connection recovery support is terminated.")).subscribe();
                synchronousSink.error(TERMINATED_ERROR);
            } else {
                this.logger.atInfo().addKeyValue(ClientConstants.CONNECTION_ID_KEY, reactorConnection2.getId()).log("Emitting the new active connection.");
                synchronousSink.next(reactorConnection2);
            }
        }).cacheInvalidateIf(reactorConnection3 -> {
            if (!reactorConnection3.isDisposed()) {
                return false;
            }
            this.logger.atInfo().addKeyValue(ClientConstants.CONNECTION_ID_KEY, reactorConnection3.getId()).log("The connection is closed, requesting a new connection.");
            return true;
        });
    }

    public Mono<T> get() {
        return this.createOrGetCachedConnection;
    }

    public String getFullyQualifiedNamespace() {
        return this.fullyQualifiedNamespace;
    }

    public String getEntityPath() {
        return this.entityPath;
    }

    public AmqpRetryOptions getRetryOptions() {
        return this.retryOptions;
    }

    public boolean isCurrentConnectionClosed() {
        return (this.currentConnection != null && this.currentConnection.isDisposed()) || this.terminated;
    }

    public void dispose() {
        synchronized (this.lock) {
            if (this.terminated) {
                return;
            }
            this.terminated = true;
            T t = this.currentConnection;
            if (t == null || t.isDisposed()) {
                this.logger.info("Terminating the connection recovery support.");
            } else {
                t.closeAsync(createShutdownSignal("Terminating the connection recovery support.")).subscribe();
            }
        }
    }

    public boolean isDisposed() {
        return this.terminated;
    }

    Retry retryWhenSpec(AmqpRetryPolicy amqpRetryPolicy) {
        return Retry.from(flux -> {
            return flux.concatMap(retrySignal -> {
                Retry.RetrySignal copy = retrySignal.copy();
                AzureException failure = copy.failure();
                long j = copy.totalRetriesInARow();
                if (failure == null) {
                    return Mono.error(new IllegalStateException("RetrySignal::failure() not expected to be null."));
                }
                if (!((failure instanceof TimeoutException) || ((failure instanceof AmqpException) && ((AmqpException) failure).isTransient()) || (failure instanceof IllegalStateException) || (failure instanceof RejectedExecutionException))) {
                    this.logger.atError().addKeyValue(TRY_COUNT_KEY, j).log("Exception is non-retriable, not retrying for a new connection.", new Object[]{failure});
                    return Mono.error(failure);
                }
                Duration calculateRetryDelay = amqpRetryPolicy.calculateRetryDelay(failure instanceof AmqpException ? failure : new AmqpException(true, "Non-AmqpException occurred upstream.", (Throwable) failure, this.errorContext), (int) Math.min(j, amqpRetryPolicy.getMaxRetries()));
                if (calculateRetryDelay == null) {
                    this.logger.atError().addKeyValue(TRY_COUNT_KEY, j).log("Retry is disabled, not retrying for a new connection.", new Object[]{failure});
                    return Mono.error(failure);
                }
                if (this.terminated) {
                    return Mono.error(TERMINATED_ERROR);
                }
                this.logger.atInfo().addKeyValue(TRY_COUNT_KEY, j).addKeyValue(ClientConstants.INTERVAL_KEY, calculateRetryDelay.toMillis()).log("Transient error occurred. Retrying.", new Object[]{failure});
                return Mono.delay(calculateRetryDelay);
            });
        });
    }

    private static AmqpShutdownSignal createShutdownSignal(String str) {
        return new AmqpShutdownSignal(false, false, str);
    }
}
