package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import java.io.Closeable;
import java.nio.channels.UnresolvedAddressException;
import java.time.Duration;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.engine.HandlerException;
import org.apache.qpid.proton.reactor.Reactor;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:com/azure/core/amqp/implementation/ReactorExecutor.class */
class ReactorExecutor implements Closeable {
    private static final String LOG_MESSAGE = "connectionId[{}], message[{}]";
    private final ClientLogger logger = new ClientLogger((Class<?>) ReactorExecutor.class);
    private final AtomicBoolean hasStarted = new AtomicBoolean();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Semaphore disposeSemaphore = new Semaphore(1);
    private final Object lock = new Object();

    /* renamed from: reactor, reason: collision with root package name */
    private final Reactor f1reactor;
    private final Scheduler scheduler;
    private final String connectionId;
    private final Duration timeout;
    private final AmqpExceptionHandler exceptionHandler;
    private final String hostname;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReactorExecutor(Reactor reactor2, Scheduler scheduler, String str, AmqpExceptionHandler amqpExceptionHandler, Duration duration, String str2) {
        this.f1reactor = (Reactor) Objects.requireNonNull(reactor2, "'reactor' cannot be null.");
        this.scheduler = (Scheduler) Objects.requireNonNull(scheduler, "'scheduler' cannot be null.");
        this.connectionId = (String) Objects.requireNonNull(str, "'connectionId' cannot be null.");
        this.timeout = (Duration) Objects.requireNonNull(duration, "'timeout' cannot be null.");
        this.exceptionHandler = (AmqpExceptionHandler) Objects.requireNonNull(amqpExceptionHandler, "'exceptionHandler' cannot be null.");
        this.hostname = (String) Objects.requireNonNull(str2, "'hostname' cannot be null.");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        if (this.hasStarted.getAndSet(true)) {
            this.logger.warning("ReactorExecutor has already started.");
            return;
        }
        this.logger.info(LOG_MESSAGE, this.connectionId, "Starting reactor.");
        this.f1reactor.start();
        this.scheduler.schedule(this::run);
    }

    private void run() {
        boolean z;
        if (!this.isDisposed.get() && !this.hasStarted.get()) {
            this.logger.warning("Cannot run work items on ReactorExecutor if ReactorExecutor.start() has not been invoked.");
            return;
        }
        boolean z2 = false;
        try {
            try {
                synchronized (this.lock) {
                    z = this.hasStarted.get() && !Thread.interrupted() && this.f1reactor.process();
                }
                if (z) {
                    try {
                        this.scheduler.schedule(this::run);
                        z2 = true;
                    } catch (RejectedExecutionException e) {
                        this.logger.warning(LOG_MESSAGE, this.connectionId, "Scheduling reactor failed because the scheduler has been shut down.", e);
                        this.f1reactor.attachments().set(RejectedExecutionException.class, RejectedExecutionException.class, e);
                    }
                }
                if (z2) {
                    return;
                }
                if (this.hasStarted.getAndSet(false)) {
                    scheduleCompletePendingTasks();
                } else {
                    this.logger.info(LOG_MESSAGE, this.connectionId, "Stopping the reactor because thread was interrupted or the reactor has no more events to process.");
                    close(false, "Stopping the reactor because thread was interrupted or the reactor has no more events to process.");
                }
            } catch (HandlerException e2) {
                Throwable cause = e2.getCause() == null ? e2 : e2.getCause();
                this.logger.warning(LOG_MESSAGE, this.connectionId, "Unhandled exception while processing events in reactor, report this error.", e2);
                String message = !CoreUtils.isNullOrEmpty(cause.getMessage()) ? cause.getMessage() : !CoreUtils.isNullOrEmpty(e2.getMessage()) ? e2.getMessage() : "Reactor encountered unrecoverable error";
                AmqpErrorContext amqpErrorContext = new AmqpErrorContext(this.hostname);
                this.exceptionHandler.onConnectionError(cause instanceof UnresolvedAddressException ? new AmqpException(true, String.format(Locale.US, "%s. This is usually caused by incorrect hostname or network configuration. Check correctness of namespace information. %s", message, StringUtil.getTrackingIdAndTimeToLog()), cause, amqpErrorContext) : new AmqpException(true, String.format(Locale.US, "%s, %s", message, StringUtil.getTrackingIdAndTimeToLog()), cause, amqpErrorContext));
                if (0 == 0) {
                    if (this.hasStarted.getAndSet(false)) {
                        scheduleCompletePendingTasks();
                    } else {
                        this.logger.info(LOG_MESSAGE, this.connectionId, "Stopping the reactor because thread was interrupted or the reactor has no more events to process.");
                        close(false, "Stopping the reactor because thread was interrupted or the reactor has no more events to process.");
                    }
                }
            }
        } catch (Throwable th) {
            if (0 == 0) {
                if (this.hasStarted.getAndSet(false)) {
                    scheduleCompletePendingTasks();
                } else {
                    this.logger.info(LOG_MESSAGE, this.connectionId, "Stopping the reactor because thread was interrupted or the reactor has no more events to process.");
                    close(false, "Stopping the reactor because thread was interrupted or the reactor has no more events to process.");
                }
            }
            throw th;
        }
    }

    private void scheduleCompletePendingTasks() {
        try {
            if (!this.disposeSemaphore.tryAcquire(this.timeout.toMillis(), TimeUnit.MILLISECONDS)) {
                this.logger.info("Unable to acquire dispose reactor semaphore within timeout to schedule pending tasks.");
            }
        } catch (InterruptedException e) {
            this.logger.warning("Could not acquire dispose semaphore to schedule pending tasks", e);
        }
        this.scheduler.schedule(() -> {
            this.logger.info(LOG_MESSAGE, this.connectionId, "Processing all pending tasks and closing old reactor.");
            try {
                try {
                    this.f1reactor.stop();
                    this.f1reactor.process();
                } finally {
                    try {
                        this.f1reactor.free();
                    } catch (IllegalStateException e2) {
                    }
                    this.disposeSemaphore.release();
                }
            } catch (HandlerException e3) {
                this.logger.warning(LOG_MESSAGE, this.connectionId, StringUtil.toStackTraceString(e3, "scheduleCompletePendingTasks - exception occurred while processing events."));
                try {
                    this.f1reactor.free();
                } catch (IllegalStateException e4) {
                }
                this.disposeSemaphore.release();
            }
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        close(true, "ReactorExecutor.close() was called.");
        try {
            if (!this.disposeSemaphore.tryAcquire(this.timeout.toMillis(), TimeUnit.MILLISECONDS)) {
                this.logger.info("Unable to acquire dispose reactor semaphore within timeout.");
            }
        } catch (InterruptedException e) {
            this.logger.warning("Could not acquire semaphore to finish close operation.", e);
        }
    }

    private void close(boolean z, String str) {
        if (this.hasStarted.getAndSet(false)) {
            if (z) {
                scheduleCompletePendingTasks();
            }
            this.exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, z, str));
        }
    }
}
