/*
 * Decompiled with CFR 0.152.
 */
package org.wso2.am.analytics.publisher.client;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.log4j.Logger;
import org.wso2.am.analytics.publisher.client.ClientStatus;
import org.wso2.am.analytics.publisher.client.EventHubProducerClientFactory;
import org.wso2.am.analytics.publisher.exception.ConnectionRecoverableException;
import org.wso2.am.analytics.publisher.exception.ConnectionUnrecoverableException;
import org.wso2.am.analytics.publisher.reporter.cloud.DefaultAnalyticsThreadFactory;
import org.wso2.am.analytics.publisher.util.BackoffRetryCounter;

public class EventHubClient {
    private static final Logger log = Logger.getLogger(EventHubClient.class);
    private final String authEndpoint;
    private final String authToken;
    private final ReadWriteLock readWriteLock;
    private final BackoffRetryCounter producerRetryCounter;
    private final BackoffRetryCounter eventBatchRetryCounter;
    private final Lock threadBarrier = new ReentrantLock();
    private final AmqpRetryOptions retryOptions;
    private Condition waitCondition = this.threadBarrier.newCondition();
    private EventHubProducerClient producer;
    private EventDataBatch batch;
    private ClientStatus clientStatus;
    private ScheduledExecutorService scheduledExecutorService;

    public EventHubClient(String authEndpoint, String authToken, AmqpRetryOptions retryOptions) {
        this.readWriteLock = new ReentrantReadWriteLock(true);
        this.scheduledExecutorService = Executors.newScheduledThreadPool(2, new DefaultAnalyticsThreadFactory("Reconnection-Service"));
        this.producerRetryCounter = new BackoffRetryCounter();
        this.eventBatchRetryCounter = new BackoffRetryCounter();
        this.authEndpoint = authEndpoint;
        this.authToken = authToken;
        this.retryOptions = retryOptions;
        this.createProducerWithRetry(authEndpoint, authToken, retryOptions, true);
    }

    private void retryWithBackoff(final String authEndpoint, final String authToken, final AmqpRetryOptions retryOptions, final boolean createBatch) {
        this.scheduledExecutorService.schedule(new Runnable(){

            @Override
            public void run() {
                EventHubClient.this.createProducerWithRetry(authEndpoint, authToken, retryOptions, createBatch);
            }
        }, this.producerRetryCounter.getTimeIntervalMillis(), TimeUnit.MILLISECONDS);
        this.producerRetryCounter.increment();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createProducerWithRetry(String authEndpoint, String authToken, AmqpRetryOptions retryOptions, boolean createBatch) {
        block11: {
            try {
                if (this.producer != null) {
                    this.producer.close();
                }
                this.producer = EventHubProducerClientFactory.create(authEndpoint, authToken, retryOptions);
                try {
                    if (createBatch) {
                        this.batch = this.producer.createBatch();
                    }
                }
                catch (IllegalStateException e) {
                    throw new ConnectionRecoverableException("Event batch creation failed. " + e.getMessage().replaceAll("[\r\n]", ""));
                }
                this.clientStatus = ClientStatus.CONNECTED;
                this.producerRetryCounter.reset();
                try {
                    this.threadBarrier.lock();
                    this.waitCondition.signalAll();
                }
                finally {
                    this.threadBarrier.unlock();
                }
            }
            catch (ConnectionRecoverableException e) {
                this.clientStatus = ClientStatus.RETRYING;
                log.error("Recoverable error occurred when creating Eventhub Client. Retry attempts will be made. Reason :" + e.getMessage().replaceAll("[\r\n]", ""));
                if (log.isDebugEnabled()) {
                    log.debug("Recoverable error occurred when creating Eventhub Client using following attributes. Auth endpoint: " + authEndpoint.replaceAll("[\r\n]", "") + ". Retry attempts will be made. Reason : " + e.getMessage().replaceAll("[\r\n]", ""), e);
                }
                this.retryWithBackoff(authEndpoint, authToken, retryOptions, createBatch);
            }
            catch (ConnectionUnrecoverableException e) {
                this.clientStatus = ClientStatus.NOT_CONNECTED;
                log.error("Unrecoverable error occurred when creating Eventhub Client. Analytics event publishing will be disabled until issue is rectified. Reason: " + e.getMessage().replaceAll("[\r\n]", ""));
                if (!log.isDebugEnabled()) break block11;
                log.debug("Unrecoverable error occurred when creating Eventhub Client using following attributes. Auth endpoint: " + authEndpoint.replaceAll("[\r\n]", "") + ". Analytics event publishing will be disabled until issue is rectified. Reason: " + e.getMessage().replaceAll("[\r\n]", ""), e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void sendEvent(String event) {
        if (this.clientStatus == ClientStatus.CONNECTED) {
            EventData eventData = new EventData(event);
            this.readWriteLock.readLock().lock();
            try {
                boolean isAdded;
                block39: {
                    isAdded = this.batch.tryAdd(eventData);
                    if (!isAdded) {
                        try {
                            this.readWriteLock.readLock().unlock();
                            this.readWriteLock.writeLock().lock();
                            isAdded = this.batch.tryAdd(eventData);
                            if (!isAdded) {
                                int size = this.batch.getCount();
                                this.producer.send(this.batch);
                                this.batch = this.createBatchWithRetry();
                                isAdded = this.batch.tryAdd(eventData);
                                log.info("Published " + size + " events to Analytics cluster.");
                            }
                        }
                        catch (AmqpException e) {
                            if (this.isAuthenticationFailure(e)) {
                                log.error("Authentication issue happened. Producer client will be re-initialized retaining the EventDataBatch");
                                this.clientStatus = ClientStatus.RETRYING;
                                this.createProducerWithRetry(this.authEndpoint, this.authToken, this.retryOptions, false);
                                this.readWriteLock.writeLock().unlock();
                                this.sendEvent(event);
                                break block39;
                            }
                            if (e.getErrorCondition() == AmqpErrorCondition.RESOURCE_LIMIT_EXCEEDED) {
                                log.error("Resource limit exceeded when publishing EventDataBatch. Operation will be retried after constant delay");
                                try {
                                    Thread.sleep(60000L);
                                }
                                catch (InterruptedException interruptedException) {
                                    Thread.currentThread().interrupt();
                                }
                                this.readWriteLock.writeLock().unlock();
                                this.sendEvent(event);
                                break block39;
                            }
                            log.error("Unknown error occurred while publishing EventDataBatch. Producer client will be re-initialized. Events may be lost in the process.");
                            this.clientStatus = ClientStatus.RETRYING;
                            this.readWriteLock.writeLock().unlock();
                            this.createProducerWithRetry(this.authEndpoint, this.authToken, this.retryOptions, true);
                        }
                        catch (Exception e) {
                            if (e.getCause() instanceof TimeoutException) {
                                log.error("Timeout occurred after retrying " + this.retryOptions.getMaxRetries() + " times with an timeout of " + this.retryOptions.getTryTimeout() + " seconds while trying to publish EventDataBatch. Next retry cycle will begin shortly.");
                                this.readWriteLock.writeLock().unlock();
                                this.sendEvent(event);
                            } else {
                                log.error("Unknown error occurred while publishing EventDataBatch. Producer client will be re-initialized. Events may be lost in the process.");
                                this.clientStatus = ClientStatus.RETRYING;
                                this.readWriteLock.writeLock().unlock();
                                this.createProducerWithRetry(this.authEndpoint, this.authToken, this.retryOptions, true);
                            }
                        }
                        finally {
                            try {
                                this.readWriteLock.writeLock().unlock();
                            }
                            catch (IllegalMonitorStateException e) {}
                        }
                    }
                }
                if (isAdded) {
                    if (!log.isDebugEnabled()) return;
                    log.debug("Adding event: " + event.replaceAll("[\r\n]", ""));
                    return;
                }
                if (!log.isDebugEnabled()) return;
                log.debug("Failed to add event: " + event.replaceAll("[\r\n]", ""));
                return;
            }
            finally {
                try {
                    this.readWriteLock.readLock().unlock();
                }
                catch (IllegalMonitorStateException illegalMonitorStateException) {}
            }
        }
        try {
            this.threadBarrier.lock();
            if (log.isDebugEnabled()) {
                log.debug(Thread.currentThread().getName().replaceAll("[\r\n]", "") + " will be parked as EventHub Client is inactive.");
            }
            this.waitCondition.await();
            if (log.isDebugEnabled()) {
                log.debug(Thread.currentThread().getName().replaceAll("[\r\n]", "") + " will be resumes as EventHub Client is active.");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        finally {
            this.threadBarrier.unlock();
        }
        this.sendEvent(event);
    }

    private boolean isAuthenticationFailure(AmqpException exception) {
        AmqpErrorCondition condition = exception.getErrorCondition();
        return condition == AmqpErrorCondition.UNAUTHORIZED_ACCESS || condition == AmqpErrorCondition.PUBLISHER_REVOKED_ERROR;
    }

    private EventDataBatch createBatchWithRetry() {
        try {
            EventDataBatch batch = this.producer.createBatch();
            this.eventBatchRetryCounter.reset();
            return batch;
        }
        catch (IllegalStateException e) {
            log.error("Error in creating EventDataBatch. Will be retried in " + this.eventBatchRetryCounter.getTimeInterval().replaceAll("[\r\n]", ""));
            try {
                Thread.sleep(this.eventBatchRetryCounter.getTimeIntervalMillis());
            }
            catch (InterruptedException interruptedException) {
                Thread.currentThread().interrupt();
            }
            this.eventBatchRetryCounter.increment();
            return this.createBatchWithRetry();
        }
    }

    public void flushEvents() {
        try {
            this.readWriteLock.writeLock().lock();
            int size = this.batch.getCount();
            this.producer.send(this.batch);
            this.batch = this.producer.createBatch();
            log.debug("Flushed " + size + " events to Analytics cluster.");
        }
        finally {
            this.readWriteLock.writeLock().unlock();
        }
    }

    public ClientStatus getStatus() {
        return this.clientStatus;
    }
}

