package org.wso2.carbon.event.receiver.core.internal.management;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.event.processor.manager.core.EventManagementUtil;
import org.wso2.carbon.event.processor.manager.core.EventSync;
import org.wso2.carbon.event.processor.manager.core.Manager;
import org.wso2.carbon.event.receiver.core.internal.ds.EventReceiverServiceValueHolder;
import org.wso2.siddhi.core.util.snapshot.ByteSerializer;

/* loaded from: input_file:org/wso2/carbon/event/receiver/core/internal/management/QueueInputEventDispatcher.class */
public class QueueInputEventDispatcher extends AbstractInputEventDispatcher implements EventSync {
    private final StreamDefinition streamDefinition;
    private final BlockingEventQueue eventQueue;
    private Lock readLock;
    private String syncId;
    private int tenantId;
    private Logger log = Logger.getLogger(AbstractInputEventDispatcher.class);
    private ReentrantLock threadBarrier = new ReentrantLock();
    private ExecutorService executorService = Executors.newSingleThreadExecutor();

    /* loaded from: input_file:org/wso2/carbon/event/receiver/core/internal/management/QueueInputEventDispatcher$QueueInputEventDispatcherWorker.class */
    private class QueueInputEventDispatcherWorker implements Runnable {
        private QueueInputEventDispatcherWorker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    PrivilegedCarbonContext.startTenantFlow();
                    PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(QueueInputEventDispatcher.this.tenantId);
                    PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(true);
                    while (true) {
                        try {
                            QueueInputEventDispatcher.this.readLock.lock();
                            QueueInputEventDispatcher.this.readLock.unlock();
                            Event take = QueueInputEventDispatcher.this.eventQueue.take();
                            QueueInputEventDispatcher.this.readLock.lock();
                            QueueInputEventDispatcher.this.readLock.unlock();
                            QueueInputEventDispatcher.this.callBack.sendEvent(take);
                            if (QueueInputEventDispatcher.this.isSendToOther()) {
                                EventReceiverServiceValueHolder.getEventManagementService().syncEvent(QueueInputEventDispatcher.this.syncId, Manager.ManagerType.Receiver, take);
                            }
                        } catch (InterruptedException e) {
                            QueueInputEventDispatcher.this.log.error("Interrupted while waiting to get an event from queue.", e);
                        }
                    }
                } catch (Exception e2) {
                    QueueInputEventDispatcher.this.log.error("Error in dispatching events:" + e2.getMessage(), e2);
                    PrivilegedCarbonContext.endTenantFlow();
                }
            } catch (Throwable th) {
                PrivilegedCarbonContext.endTenantFlow();
                throw th;
            }
        }
    }

    public QueueInputEventDispatcher(int i, String str, Lock lock, StreamDefinition streamDefinition, int i2, int i3) {
        this.readLock = lock;
        this.tenantId = i;
        this.syncId = str;
        this.eventQueue = new BlockingEventQueue(i2, i3);
        this.streamDefinition = EventManagementUtil.constructDatabridgeStreamDefinition(str, streamDefinition);
        this.executorService.submit(new QueueInputEventDispatcherWorker());
    }

    @Override // org.wso2.carbon.event.receiver.core.internal.management.AbstractInputEventDispatcher
    public void onEvent(Event event) {
        try {
            this.threadBarrier.lock();
            this.eventQueue.put(event);
            this.threadBarrier.unlock();
        } catch (InterruptedException e) {
            this.log.error("Interrupted while waiting to put the event to queue.", e);
        }
    }

    @Override // org.wso2.carbon.event.receiver.core.internal.management.AbstractInputEventDispatcher
    public void shutdown() {
        this.executorService.shutdown();
    }

    @Override // org.wso2.carbon.event.receiver.core.internal.management.AbstractInputEventDispatcher
    public byte[] getState() {
        this.threadBarrier.lock();
        byte[] OToB = ByteSerializer.OToB(this.eventQueue);
        this.threadBarrier.unlock();
        return OToB;
    }

    @Override // org.wso2.carbon.event.receiver.core.internal.management.AbstractInputEventDispatcher
    public void syncState(byte[] bArr) {
        BlockingEventQueue blockingEventQueue = (BlockingEventQueue) ByteSerializer.BToO(bArr);
        while (blockingEventQueue.peek() != null && blockingEventQueue.poll().equals(this.eventQueue.peek())) {
            this.eventQueue.poll();
        }
    }

    public void process(Event event) {
        this.readLock.lock();
        this.readLock.unlock();
        this.callBack.sendEvent(event);
    }

    public StreamDefinition getStreamDefinition() {
        return this.streamDefinition;
    }
}
