/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.ExponentialAmqpRetryPolicy;
import com.azure.core.amqp.FixedAmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.logging.ClientLogger;
import java.time.Duration;
import java.util.Locale;
import java.util.concurrent.TimeoutException;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RetryUtil {
    private static final ClientLogger LOGGER = new ClientLogger(RetryUtil.class);

    private RetryUtil() {
    }

    public static AmqpRetryPolicy getRetryPolicy(AmqpRetryOptions options) {
        switch (options.getMode()) {
            case FIXED: {
                return new FixedAmqpRetryPolicy(options);
            }
            case EXPONENTIAL: {
                return new ExponentialAmqpRetryPolicy(options);
            }
        }
        throw new IllegalArgumentException(String.format(Locale.ROOT, "Mode is not supported: %s", new Object[]{options.getMode()}));
    }

    public static <T> Flux<T> withRetry(Flux<T> source, Duration operationTimeout, AmqpRetryPolicy retryPolicy) {
        return Flux.defer(() -> source.timeout(operationTimeout)).retryWhen(errors -> RetryUtil.retry(errors, retryPolicy));
    }

    public static <T> Mono<T> withRetry(Mono<T> source, Duration operationTimeout, AmqpRetryPolicy retryPolicy) {
        return Mono.defer(() -> source.timeout(operationTimeout)).retryWhen(errors -> RetryUtil.retry(errors, retryPolicy));
    }

    private static Flux<Long> retry(Flux<Throwable> source, AmqpRetryPolicy retryPolicy) {
        return source.zipWith(Flux.range(1, retryPolicy.getMaxRetries() + 1), (error, attempt) -> {
            if (attempt > retryPolicy.getMaxRetries()) {
                LOGGER.warning("Retry attempts are exhausted. Current: {}. Max: {}.", attempt, retryPolicy.getMaxRetries());
                throw Exceptions.propagate(error);
            }
            if (error instanceof TimeoutException) {
                LOGGER.info("TimeoutException error occurred. Retrying operation. Attempt: {}.", attempt, error);
                return retryPolicy.calculateRetryDelay((Throwable)error, (int)attempt);
            }
            if (error instanceof AmqpException && ((AmqpException)error).isTransient()) {
                LOGGER.info("Retryable error occurred. Retrying operation. Attempt: {}. Error condition: {}", new Object[]{attempt, ((AmqpException)error).getErrorCondition(), error});
                return retryPolicy.calculateRetryDelay((Throwable)error, (int)attempt);
            }
            LOGGER.warning("Error is not a TimeoutException nor is it a retryable AMQP exception.", error);
            throw Exceptions.propagate(error);
        }).flatMap(Mono::delay);
    }
}

