package org.wso2.am.analytics.publisher.client;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.log4j.Logger;
import org.wso2.am.analytics.publisher.exception.ConnectionRecoverableException;
import org.wso2.am.analytics.publisher.exception.ConnectionUnrecoverableException;
import org.wso2.am.analytics.publisher.reporter.cloud.DefaultAnalyticsThreadFactory;
import org.wso2.am.analytics.publisher.util.BackoffRetryCounter;

/* loaded from: input_file:org/wso2/am/analytics/publisher/client/EventHubClient.class */
public class EventHubClient {
    private static final Logger log = Logger.getLogger(EventHubClient.class);
    private final String authEndpoint;
    private final String authToken;
    private final AmqpRetryOptions retryOptions;
    private EventHubProducerClient producer;
    private EventDataBatch batch;
    private ClientStatus clientStatus;
    private final Lock threadBarrier = new ReentrantLock();
    private Condition waitCondition = this.threadBarrier.newCondition();
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new DefaultAnalyticsThreadFactory("Reconnection-Service"));
    private final BackoffRetryCounter producerRetryCounter = new BackoffRetryCounter();
    private final BackoffRetryCounter eventBatchRetryCounter = new BackoffRetryCounter();

    public EventHubClient(String str, String str2, AmqpRetryOptions amqpRetryOptions) {
        this.authEndpoint = str;
        this.authToken = str2;
        this.retryOptions = amqpRetryOptions;
        createProducerWithRetry(str, str2, amqpRetryOptions, true);
    }

    private void retryWithBackoff(final String str, final String str2, final AmqpRetryOptions amqpRetryOptions, final boolean z) {
        this.scheduledExecutorService.schedule(new Runnable() { // from class: org.wso2.am.analytics.publisher.client.EventHubClient.1
            @Override // java.lang.Runnable
            public void run() {
                EventHubClient.this.createProducerWithRetry(str, str2, amqpRetryOptions, z);
            }
        }, this.producerRetryCounter.getTimeIntervalMillis(), TimeUnit.MILLISECONDS);
        this.producerRetryCounter.increment();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createProducerWithRetry(String str, String str2, AmqpRetryOptions amqpRetryOptions, boolean z) {
        try {
            if (this.producer != null) {
                this.producer.close();
            }
            this.producer = EventHubProducerClientFactory.create(str, str2, amqpRetryOptions);
            if (z) {
                try {
                    this.batch = this.producer.createBatch();
                } catch (IllegalStateException e) {
                    throw new ConnectionRecoverableException("Event batch creation failed. " + e.getMessage().replaceAll("[\r\n]", ""));
                }
            }
            this.clientStatus = ClientStatus.CONNECTED;
            this.producerRetryCounter.reset();
            try {
                this.threadBarrier.lock();
                this.waitCondition.signalAll();
                this.threadBarrier.unlock();
            } catch (Throwable th) {
                this.threadBarrier.unlock();
                throw th;
            }
        } catch (ConnectionRecoverableException e2) {
            this.clientStatus = ClientStatus.RETRYING;
            log.error("Recoverable error occurred when creating Eventhub Client. Retry attempts will be made. Reason :" + e2.getMessage().replaceAll("[\r\n]", ""));
            if (log.isDebugEnabled()) {
                log.debug("Recoverable error occurred when creating Eventhub Client using following attributes. Auth endpoint: " + str.replaceAll("[\r\n]", "") + ". Retry attempts will be made. Reason : " + e2.getMessage().replaceAll("[\r\n]", ""), e2);
            }
            retryWithBackoff(str, str2, amqpRetryOptions, z);
        } catch (ConnectionUnrecoverableException e3) {
            this.clientStatus = ClientStatus.NOT_CONNECTED;
            log.error("Unrecoverable error occurred when creating Eventhub Client. Analytics event publishing will be disabled until issue is rectified. Reason: " + e3.getMessage().replaceAll("[\r\n]", ""));
            if (log.isDebugEnabled()) {
                log.debug("Unrecoverable error occurred when creating Eventhub Client using following attributes. Auth endpoint: " + str.replaceAll("[\r\n]", "") + ". Analytics event publishing will be disabled until issue is rectified. Reason: " + e3.getMessage().replaceAll("[\r\n]", ""), e3);
            }
        }
    }

    public void sendEvent(String str) {
        if (this.clientStatus != ClientStatus.CONNECTED) {
            try {
                try {
                    this.threadBarrier.lock();
                    if (log.isDebugEnabled()) {
                        log.debug(Thread.currentThread().getName().replaceAll("[\r\n]", "") + " will be parked as EventHub Client is inactive.");
                    }
                    this.waitCondition.await();
                    if (log.isDebugEnabled()) {
                        log.debug(Thread.currentThread().getName().replaceAll("[\r\n]", "") + " will be resumes as EventHub Client is active.");
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.threadBarrier.unlock();
                }
                sendEvent(str);
                return;
            } finally {
                this.threadBarrier.unlock();
            }
        }
        EventData eventData = new EventData(str);
        this.readWriteLock.readLock().lock();
        try {
            boolean tryAdd = this.batch.tryAdd(eventData);
            try {
                if (!tryAdd) {
                    try {
                        this.readWriteLock.readLock().unlock();
                        this.readWriteLock.writeLock().lock();
                        tryAdd = this.batch.tryAdd(eventData);
                        if (!tryAdd) {
                            int count = this.batch.getCount();
                            this.producer.send(this.batch);
                            this.batch = createBatchWithRetry();
                            tryAdd = this.batch.tryAdd(eventData);
                            log.info("Published " + count + " events to Analytics cluster.");
                        }
                    } catch (AmqpException e2) {
                        if (isAuthenticationFailure(e2)) {
                            log.error("Authentication issue happened. Producer client will be re-initialized retaining the EventDataBatch");
                            this.clientStatus = ClientStatus.RETRYING;
                            createProducerWithRetry(this.authEndpoint, this.authToken, this.retryOptions, false);
                            this.readWriteLock.writeLock().unlock();
                            sendEvent(str);
                        } else if (e2.getErrorCondition() == AmqpErrorCondition.RESOURCE_LIMIT_EXCEEDED) {
                            log.error("Resource limit exceeded when publishing EventDataBatch. Operation will be retried after constant delay");
                            try {
                                Thread.sleep(60000L);
                            } catch (InterruptedException e3) {
                                Thread.currentThread().interrupt();
                            }
                            this.readWriteLock.writeLock().unlock();
                            sendEvent(str);
                        } else {
                            log.error("Unknown error occurred while publishing EventDataBatch. Producer client will be re-initialized. Events may be lost in the process.");
                            this.clientStatus = ClientStatus.RETRYING;
                            this.readWriteLock.writeLock().unlock();
                            createProducerWithRetry(this.authEndpoint, this.authToken, this.retryOptions, true);
                        }
                        try {
                            this.readWriteLock.writeLock().unlock();
                        } catch (IllegalMonitorStateException e4) {
                        }
                    } catch (Exception e5) {
                        if (e5.getCause() instanceof TimeoutException) {
                            log.error("Timeout occurred after retrying " + this.retryOptions.getMaxRetries() + " times with an timeout of " + this.retryOptions.getTryTimeout() + " seconds while trying to publish EventDataBatch. Next retry cycle will begin shortly.");
                            this.readWriteLock.writeLock().unlock();
                            sendEvent(str);
                        } else {
                            log.error("Unknown error occurred while publishing EventDataBatch. Producer client will be re-initialized. Events may be lost in the process.");
                            this.clientStatus = ClientStatus.RETRYING;
                            this.readWriteLock.writeLock().unlock();
                            createProducerWithRetry(this.authEndpoint, this.authToken, this.retryOptions, true);
                        }
                        try {
                            this.readWriteLock.writeLock().unlock();
                        } catch (IllegalMonitorStateException e6) {
                        }
                    }
                }
                if (tryAdd) {
                    if (log.isDebugEnabled()) {
                        log.debug("Adding event: " + str.replaceAll("[\r\n]", ""));
                    }
                } else if (log.isDebugEnabled()) {
                    log.debug("Failed to add event: " + str.replaceAll("[\r\n]", ""));
                }
                try {
                    this.readWriteLock.readLock().unlock();
                } catch (IllegalMonitorStateException e7) {
                }
            } finally {
                try {
                    this.readWriteLock.writeLock().unlock();
                } catch (IllegalMonitorStateException e8) {
                }
            }
        } catch (Throwable th) {
            try {
                this.readWriteLock.readLock().unlock();
            } catch (IllegalMonitorStateException e9) {
            }
            throw th;
        }
    }

    private boolean isAuthenticationFailure(AmqpException amqpException) {
        AmqpErrorCondition errorCondition = amqpException.getErrorCondition();
        return errorCondition == AmqpErrorCondition.UNAUTHORIZED_ACCESS || errorCondition == AmqpErrorCondition.PUBLISHER_REVOKED_ERROR;
    }

    private EventDataBatch createBatchWithRetry() {
        try {
            EventDataBatch createBatch = this.producer.createBatch();
            this.eventBatchRetryCounter.reset();
            return createBatch;
        } catch (IllegalStateException e) {
            log.error("Error in creating EventDataBatch. Will be retried in " + this.eventBatchRetryCounter.getTimeInterval().replaceAll("[\r\n]", ""));
            try {
                Thread.sleep(this.eventBatchRetryCounter.getTimeIntervalMillis());
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
            this.eventBatchRetryCounter.increment();
            return createBatchWithRetry();
        }
    }

    public void flushEvents() {
        try {
            this.readWriteLock.writeLock().lock();
            int count = this.batch.getCount();
            this.producer.send(this.batch);
            this.batch = this.producer.createBatch();
            log.debug("Flushed " + count + " events to Analytics cluster.");
        } finally {
            this.readWriteLock.writeLock().unlock();
        }
    }

    public ClientStatus getStatus() {
        return this.clientStatus;
    }
}
