package org.wso2.am.choreo.extensions.asb;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.exception.AzureException;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusSenderClient;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.apimgt.eventing.EventPublisher;
import org.wso2.carbon.apimgt.eventing.EventPublisherEvent;
import org.wso2.carbon.apimgt.eventing.EventPublisherException;

/* loaded from: input_file:org/wso2/am/choreo/extensions/asb/AsbEventPublisher.class */
public class AsbEventPublisher implements EventPublisher {
    private static final Log log = LogFactory.getLog(AsbEventPublisher.class);
    private final String topicName;
    private final String connectionString;
    private final AmqpRetryOptions amqpRetryOptions;
    private final AsbMessageBuilder asbMessageBuilder;
    private ServiceBusSenderClient serviceBusSenderClient = getNewServiceBusSenderClient();
    private boolean isDisposed = false;

    public AsbEventPublisher(String str, String str2, AmqpRetryOptions amqpRetryOptions, AsbMessageBuilder asbMessageBuilder) throws EventPublisherException {
        this.topicName = str;
        this.connectionString = str2;
        this.amqpRetryOptions = amqpRetryOptions;
        this.asbMessageBuilder = asbMessageBuilder;
    }

    public ServiceBusSenderClient getNewServiceBusSenderClient() throws EventPublisherException {
        try {
            ServiceBusSenderClient buildClient = new ServiceBusClientBuilder().connectionString(this.connectionString).retryOptions(this.amqpRetryOptions).sender().topicName(this.topicName).buildClient();
            log.debug("ServiceBusSenderClient initialized for topic: " + this.topicName);
            return buildClient;
        } catch (AzureException e) {
            this.isDisposed = true;
            throw new EventPublisherException(e);
        }
    }

    private void publish(ServiceBusMessage serviceBusMessage) throws EventPublisherException {
        if (this.isDisposed) {
            synchronized (this) {
                if (this.isDisposed) {
                    log.info("Re-initializing disposed ServiceBusSenderClient.");
                    this.serviceBusSenderClient.close();
                    this.serviceBusSenderClient = getNewServiceBusSenderClient();
                    this.isDisposed = false;
                }
            }
        }
        this.serviceBusSenderClient.sendMessage(serviceBusMessage);
    }

    public void init() {
        log.info("ASBEventPublisher is initialized.");
    }

    public void publish(EventPublisherEvent eventPublisherEvent) throws EventPublisherException {
        if (log.isDebugEnabled()) {
            log.debug("Event to be sent: " + eventPublisherEvent.toString().replaceAll("[\r\n]", ""));
        }
        ServiceBusMessage buildMessage = this.asbMessageBuilder.buildMessage(eventPublisherEvent);
        try {
            long currentTimeMillis = System.currentTimeMillis();
            publish(buildMessage);
            if (log.isDebugEnabled()) {
                log.debug("Event:" + eventPublisherEvent.toString().replaceAll("[\r\n]", "") + " published in " + (System.currentTimeMillis() - currentTimeMillis) + "ms.");
            }
        } catch (IllegalStateException e) {
            this.isDisposed = true;
            throw new EventPublisherException(e);
        } catch (AmqpException | ServiceBusException e2) {
            throw new EventPublisherException(e2);
        }
    }
}
