package org.wso2.carbon.apimgt.hybrid.gateway.usage.publisher;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.apimgt.hybrid.gateway.usage.publisher.util.MicroGatewayAPIUsageConstants;
import org.wso2.carbon.apimgt.hybrid.gateway.usage.publisher.util.UsageFileWriter;
import org.wso2.carbon.apimgt.hybrid.gateway.usage.publisher.util.UsagePublisherException;
import org.wso2.carbon.apimgt.hybrid.gateway.usage.publisher.util.WrappedEventFactory;
import org.wso2.carbon.databridge.agent.AgentHolder;
import org.wso2.carbon.databridge.agent.exception.DataEndpointAgentConfigurationException;
import org.wso2.carbon.databridge.agent.exception.EventQueueFullException;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.utils.DataBridgeThreadFactory;

/* loaded from: input_file:org/wso2/carbon/apimgt/hybrid/gateway/usage/publisher/FileDataPublisher.class */
public class FileDataPublisher {
    private static final Log log = LogFactory.getLog(FileDataPublisher.class);
    private EventQueue eventQueue;
    private static final int FAILED_EVENT_LOG_INTERVAL = 10000;
    private long lastFailedEventTime;
    private long failedEventCount;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/carbon/apimgt/hybrid/gateway/usage/publisher/FileDataPublisher$EventQueue.class */
    public static class EventQueue {
        private RingBuffer<WrappedEventFactory.WrappedEvent> ringBuffer;
        private Disruptor<WrappedEventFactory.WrappedEvent> eventQueueDisruptor;
        private ExecutorService eventQueuePool;

        EventQueue(int i) {
            this.ringBuffer = null;
            this.eventQueueDisruptor = null;
            this.eventQueuePool = null;
            this.eventQueuePool = Executors.newSingleThreadExecutor(new DataBridgeThreadFactory("EventQueue"));
            this.eventQueueDisruptor = new Disruptor<>(new WrappedEventFactory(), i, this.eventQueuePool, ProducerType.MULTI, new BlockingWaitStrategy());
            this.eventQueueDisruptor.handleEventsWith(new EventHandler[]{new EventQueueWorker()});
            this.ringBuffer = this.eventQueueDisruptor.start();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void tryPut(Event event) throws EventQueueFullException {
            try {
                long tryNext = this.ringBuffer.tryNext(1);
                ((WrappedEventFactory.WrappedEvent) this.ringBuffer.get(tryNext)).setEvent(event);
                this.ringBuffer.publish(tryNext);
            } catch (InsufficientCapacityException e) {
                throw new EventQueueFullException("Cannot persist events because the event queue is full", e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void shutdown() {
            this.eventQueuePool.shutdown();
            this.eventQueueDisruptor.shutdown();
        }
    }

    /* loaded from: input_file:org/wso2/carbon/apimgt/hybrid/gateway/usage/publisher/FileDataPublisher$EventQueueWorker.class */
    static class EventQueueWorker implements EventHandler<WrappedEventFactory.WrappedEvent> {
        EventQueueWorker() {
        }

        public void onEvent(WrappedEventFactory.WrappedEvent wrappedEvent, long j, boolean z) {
            Event event = wrappedEvent.getEvent();
            StringBuilder sb = new StringBuilder();
            sb.append(MicroGatewayAPIUsageConstants.STREAM_ID).append(MicroGatewayAPIUsageConstants.KEY_VALUE_SEPARATOR).append(event.getStreamId()).append(MicroGatewayAPIUsageConstants.EVENT_SEPARATOR);
            sb.append(MicroGatewayAPIUsageConstants.TIME_STAMP).append(MicroGatewayAPIUsageConstants.KEY_VALUE_SEPARATOR).append(event.getTimeStamp()).append(MicroGatewayAPIUsageConstants.EVENT_SEPARATOR);
            sb.append(MicroGatewayAPIUsageConstants.META_DATA).append(MicroGatewayAPIUsageConstants.KEY_VALUE_SEPARATOR).append(event.getMetaData() == null ? null : StringUtils.join(event.getMetaData(), MicroGatewayAPIUsageConstants.OBJECT_SEPARATOR)).append(MicroGatewayAPIUsageConstants.EVENT_SEPARATOR);
            sb.append(MicroGatewayAPIUsageConstants.CORRELATION_DATA).append(MicroGatewayAPIUsageConstants.KEY_VALUE_SEPARATOR).append(event.getCorrelationData() == null ? null : StringUtils.join(event.getCorrelationData(), MicroGatewayAPIUsageConstants.OBJECT_SEPARATOR)).append(MicroGatewayAPIUsageConstants.EVENT_SEPARATOR);
            sb.append(MicroGatewayAPIUsageConstants.PAYLOAD_DATA).append(MicroGatewayAPIUsageConstants.KEY_VALUE_SEPARATOR).append(event.getPayloadData() == null ? null : StringUtils.join(event.getPayloadData(), MicroGatewayAPIUsageConstants.OBJECT_SEPARATOR));
            try {
                UsageFileWriter.getInstance().writeToFile(sb.toString());
            } catch (UsagePublisherException e) {
                FileDataPublisher.log.warn("Error occurred while getting the Usage File Writer.", e);
            }
        }
    }

    public FileDataPublisher() throws UsagePublisherException {
        this.eventQueue = null;
        int i = 32768;
        try {
            i = AgentHolder.getInstance().getDefaultDataEndpointAgent().getAgentConfiguration().getQueueSize();
        } catch (DataEndpointAgentConfigurationException e) {
            log.warn("Error occurred while getting the Queue size from Agent Configuration. Hence default size (32768) will be used");
        }
        this.eventQueue = new EventQueue(i);
        UsageFileWriter.getInstance();
    }

    public boolean tryPublish(String str, long j, Object[] objArr, Object[] objArr2, Object[] objArr3) {
        return tryPublish(new Event(str, j, objArr, objArr2, objArr3));
    }

    private boolean tryPublish(Event event) {
        boolean z = true;
        try {
            if (this.eventQueue != null) {
                this.eventQueue.tryPut(event);
            }
        } catch (EventQueueFullException e) {
            onEventQueueFull(event);
            z = false;
        }
        return z;
    }

    private void onEventQueueFull(Event event) {
        this.failedEventCount++;
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.lastFailedEventTime > 10000) {
            log.warn("Event queue is full, unable to process the event, " + this.failedEventCount + " events dropped so far.");
            this.lastFailedEventTime = currentTimeMillis;
        }
        if (log.isDebugEnabled()) {
            log.debug("Dropped Event: " + event.toString());
        }
    }

    public void shutdown() throws UsagePublisherException {
        if (this.eventQueue != null) {
            this.eventQueue.shutdown();
        }
        UsageFileWriter.getInstance().closeFileResources();
    }
}
