package org.wso2.carbon.analytics.eventsink.internal.queue;

import com.lmax.disruptor.EventHandler;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.analytics.eventsink.internal.queue.WrappedEventFactory;
import org.wso2.carbon.analytics.eventsink.internal.util.ServiceHolder;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.Event;

/* loaded from: input_file:org/wso2/carbon/analytics/eventsink/internal/queue/AnalyticsEventQueueWorker.class */
public class AnalyticsEventQueueWorker implements EventHandler<WrappedEventFactory.WrappedEvent> {
    private static final Log log = LogFactory.getLog(AnalyticsEventQueueWorker.class);
    private AnalyticsEventQueue queue;
    private int tenantId;
    private List<Event> events = new ArrayList();
    private AnalyticsBlockingExecutor threadPoolExecutor = new AnalyticsBlockingExecutor(ServiceHolder.getAnalyticsEventSinkConfiguration().getWorkerPoolSize());
    private int totalSize = 0;

    /* loaded from: input_file:org/wso2/carbon/analytics/eventsink/internal/queue/AnalyticsEventQueueWorker$AnalyticsEventProcessor.class */
    public class AnalyticsEventProcessor extends Thread {
        private List<Event> events;
        private int size;

        private AnalyticsEventProcessor(List<Event> list, int i) {
            this.events = list;
            this.size = i;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                PrivilegedCarbonContext.startTenantFlow();
                PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(AnalyticsEventQueueWorker.this.tenantId, true);
                if (AnalyticsEventQueueWorker.log.isDebugEnabled()) {
                    AnalyticsEventQueueWorker.log.debug("Batch size of : " + this.size + " is going to be inserted in DAL");
                }
                ServiceHolder.getAnalyticsDSConnector().insertEvents(AnalyticsEventQueueWorker.this.tenantId, this.events);
            } catch (Exception e) {
                AnalyticsEventQueueWorker.log.error("Error processing event. ", e);
            } finally {
                PrivilegedCarbonContext.endTenantFlow();
            }
        }
    }

    public AnalyticsEventQueueWorker(int i, AnalyticsEventQueue analyticsEventQueue) {
        this.tenantId = i;
        this.queue = analyticsEventQueue;
    }

    public void onEvent(WrappedEventFactory.WrappedEvent wrappedEvent, long j, boolean z) throws Exception {
        if (this.totalSize + wrappedEvent.getSize() <= ServiceHolder.getAnalyticsEventSinkConfiguration().getBatchSize()) {
            this.events.add(wrappedEvent.getEvent());
            this.totalSize += wrappedEvent.getSize();
            if (log.isDebugEnabled()) {
                log.debug("Collecting events, current totalSize : " + this.totalSize);
            }
        } else if (this.events.isEmpty()) {
            this.events.add(wrappedEvent.getEvent());
            this.totalSize += wrappedEvent.getSize();
            pushEvents();
        } else {
            pushEvents();
            this.events.add(wrappedEvent.getEvent());
            this.totalSize += wrappedEvent.getSize();
        }
        if (z && !this.events.isEmpty()) {
            pushEvents();
        }
        this.queue.notifyReleasedEvent(wrappedEvent, z);
    }

    private void pushEvents() {
        List<Event> list = this.events;
        this.events = new ArrayList();
        submitJob(list);
        this.totalSize = 0;
    }

    private void submitJob(List<Event> list) {
        this.threadPoolExecutor.submit(new AnalyticsEventProcessor(list, this.totalSize));
    }
}
