package org.wso2.carbon.analytics.spark.event;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.analytics.spark.event.internal.ServiceHolder;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.ntask.core.Task;

/* loaded from: input_file:org/wso2/carbon/analytics/spark/event/EventingTask.class */
public class EventingTask implements Task {
    private static final Log log = LogFactory.getLog(EventingTask.class);

    public void init() {
    }

    public void setProperties(Map<String, String> map) {
    }

    public void execute() {
        Map<Integer, List<Event>> extractNextEventBatch;
        do {
            extractNextEventBatch = EventStreamDataStore.extractNextEventBatch();
            processEventBatches(extractNextEventBatch);
        } while (extractNextEventBatch.size() > 0);
    }

    private void processEventBatches(Map<Integer, List<Event>> map) {
        for (Map.Entry<Integer, List<Event>> entry : map.entrySet()) {
            try {
                PrivilegedCarbonContext.startTenantFlow();
                PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(entry.getKey().intValue());
                Iterator<Event> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    ServiceHolder.getEventStreamService().publish(it.next());
                }
                if (log.isDebugEnabled()) {
                    log.debug("Dispatched " + entry.getValue().size() + " events for tenant: " + entry.getKey());
                }
                PrivilegedCarbonContext.endTenantFlow();
            } catch (Throwable th) {
                PrivilegedCarbonContext.endTenantFlow();
                throw th;
            }
        }
    }
}
