package org.wso2.am.choreo.extensions.cleanup.service.listeners;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusFailureReason;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.am.choreo.extensions.cleanup.service.publishers.CleanupEventPublisher;

/* loaded from: input_file:org/wso2/am/choreo/extensions/cleanup/service/listeners/CleanupEventReceiver.class */
public class CleanupEventReceiver {
    private static final Log log = LogFactory.getLog(CleanupEventPublisher.class);
    private static ServiceBusProcessorClient processorClient = null;
    private final String connectionString;
    private final AmqpRetryOptions amqpRetryOptions;
    private final String topic;
    private final String subscription;

    public CleanupEventReceiver(String str, AmqpRetryOptions amqpRetryOptions, String str2, String str3) {
        this.connectionString = str;
        this.amqpRetryOptions = amqpRetryOptions;
        this.topic = str2;
        this.subscription = str3;
    }

    public void startCleanupListener() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        processorClient = new ServiceBusClientBuilder().connectionString(this.connectionString).retryOptions(this.amqpRetryOptions).processor().topicName(this.topic).subscriptionName(this.subscription).processMessage(OrganizationCleanupServiceStartupListener::processMessage).processError(serviceBusErrorContext -> {
            processError(serviceBusErrorContext, countDownLatch);
        }).buildProcessorClient();
        processorClient.start();
        log.info("Starting the organization cleanup receiver.");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void closeCleanupReceiver() {
        processorClient.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void processError(ServiceBusErrorContext serviceBusErrorContext, CountDownLatch countDownLatch) {
        log.error("Error when receiving messages from namespace: " + serviceBusErrorContext.getFullyQualifiedNamespace() + ". Entity: " + serviceBusErrorContext.getEntityPath());
        if (!(serviceBusErrorContext.getException() instanceof ServiceBusException)) {
            log.error("Non-ServiceBusException occurred: " + serviceBusErrorContext.getException());
            return;
        }
        ServiceBusException exception = serviceBusErrorContext.getException();
        ServiceBusFailureReason reason = exception.getReason();
        if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND || reason == ServiceBusFailureReason.UNAUTHORIZED) {
            log.error("An unrecoverable error occurred. Stopping processing with reason " + reason + ": " + exception.getMessage());
            countDownLatch.countDown();
        } else {
            if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
                log.error("Message lock lost for message: " + serviceBusErrorContext.getException());
                return;
            }
            if (reason != ServiceBusFailureReason.SERVICE_BUSY) {
                log.error("Error source: " + serviceBusErrorContext.getErrorSource() + " reason " + reason + ", message: ", serviceBusErrorContext.getException());
                return;
            }
            try {
                TimeUnit.SECONDS.sleep(1L);
            } catch (InterruptedException e) {
                log.error("Unable to sleep for period of time", e);
            }
        }
    }
}
