package org.wso2.carbon.event.receiver.core.internal.management;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.event.processor.manager.core.EventManagementUtil;
import org.wso2.carbon.event.processor.manager.core.EventSync;
import org.wso2.carbon.event.processor.manager.core.Manager;
import org.wso2.carbon.event.receiver.core.internal.ds.EventReceiverServiceValueHolder;

/* loaded from: input_file:org/wso2/carbon/event/receiver/core/internal/management/QueueInputEventDispatcher.class */
public class QueueInputEventDispatcher extends AbstractInputEventDispatcher implements EventSync {
    private final StreamDefinition streamDefinition;
    private final BlockingEventQueue eventQueue;
    private Lock readLock;
    private String syncId;
    private int tenantId;
    private ExecutorService executorService;
    private String originalEventStreamId;
    private Log log = LogFactory.getLog(AbstractInputEventDispatcher.class);
    private ReentrantLock threadBarrier = new ReentrantLock();
    private boolean isContinue = false;

    /* loaded from: input_file:org/wso2/carbon/event/receiver/core/internal/management/QueueInputEventDispatcher$QueueInputEventDispatcherWorker.class */
    class QueueInputEventDispatcherWorker implements Runnable {
        QueueInputEventDispatcherWorker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    PrivilegedCarbonContext.startTenantFlow();
                    PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(QueueInputEventDispatcher.this.tenantId);
                    PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(true);
                    while (true) {
                        try {
                            QueueInputEventDispatcher.this.readLock.lock();
                            QueueInputEventDispatcher.this.readLock.unlock();
                            Event take = QueueInputEventDispatcher.this.eventQueue.take();
                            QueueInputEventDispatcher.this.readLock.lock();
                            QueueInputEventDispatcher.this.readLock.unlock();
                            if (QueueInputEventDispatcher.this.isContinueProcess()) {
                                QueueInputEventDispatcher.this.callBack.sendEvent(take);
                            }
                            if (QueueInputEventDispatcher.this.isSendToOther()) {
                                EventReceiverServiceValueHolder.getEventManagementService().syncEvent(QueueInputEventDispatcher.this.syncId, Manager.ManagerType.Receiver, take);
                            }
                        } catch (InterruptedException e) {
                            QueueInputEventDispatcher.this.log.error("Interrupted while waiting to get an event from queue.", e);
                        } catch (Throwable th) {
                            QueueInputEventDispatcher.this.log.error("Error has occured while waiting to get an event from the queue which is belonging to tenentId:" + QueueInputEventDispatcher.this.tenantId + " and Stream Definition: " + QueueInputEventDispatcher.this.streamDefinition, th);
                        }
                    }
                } catch (Exception e2) {
                    QueueInputEventDispatcher.this.log.error("Error in dispatching events:" + e2.getMessage(), e2);
                    PrivilegedCarbonContext.endTenantFlow();
                }
            } catch (Throwable th2) {
                PrivilegedCarbonContext.endTenantFlow();
                throw th2;
            }
        }
    }

    public QueueInputEventDispatcher(int i, String str, Lock lock, StreamDefinition streamDefinition, int i2, int i3) {
        this.readLock = lock;
        this.tenantId = i;
        this.syncId = str;
        this.eventQueue = new BlockingEventQueue(i2, i3);
        this.originalEventStreamId = streamDefinition.getStreamId();
        this.streamDefinition = EventManagementUtil.constructDatabridgeStreamDefinition(str, streamDefinition);
        this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("Thread pool- component - QueueInputEventDispatcher.executorService;tenant - " + this.tenantId + ";stream - " + streamDefinition.getStreamId() + ";receiver - " + this.streamDefinition.getName()).build());
        this.executorService.submit(new QueueInputEventDispatcherWorker());
    }

    @Override // org.wso2.carbon.event.receiver.core.internal.management.AbstractInputEventDispatcher
    public void onEvent(Event event) {
        try {
            this.eventQueue.put(event, this.threadBarrier);
        } catch (InterruptedException e) {
            this.log.error("Interrupted while waiting to put the event to queue.", e);
        }
    }

    @Override // org.wso2.carbon.event.receiver.core.internal.management.AbstractInputEventDispatcher
    public void shutdown() {
        this.executorService.shutdown();
    }

    @Override // org.wso2.carbon.event.receiver.core.internal.management.AbstractInputEventDispatcher
    public byte[] getState() {
        this.threadBarrier.lock();
        byte[] objectToBytes = objectToBytes(this.eventQueue);
        this.threadBarrier.unlock();
        return objectToBytes;
    }

    @Override // org.wso2.carbon.event.receiver.core.internal.management.AbstractInputEventDispatcher
    public void syncState(byte[] bArr) {
        BlockingEventQueue blockingEventQueue = (BlockingEventQueue) bytesToObject(bArr);
        while (blockingEventQueue.peek() != null && blockingEventQueue.poll().equals(this.eventQueue.peek())) {
            this.eventQueue.poll();
        }
    }

    public void process(Event event) {
        if (isContinueProcess()) {
            this.readLock.lock();
            this.readLock.unlock();
            this.callBack.sendEvent(event);
        }
    }

    public StreamDefinition getStreamDefinition() {
        return this.streamDefinition;
    }

    public boolean isContinueProcess() {
        return this.isContinue;
    }

    public void setContinueProcess(boolean z) {
        this.isContinue = z;
    }

    public String getOriginalEventStreamId() {
        return this.originalEventStreamId;
    }

    private byte[] objectToBytes(Object obj) {
        long currentTimeMillis = System.currentTimeMillis();
        byte[] bArr = null;
        if (obj != null) {
            try {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                new ObjectOutputStream(byteArrayOutputStream).writeObject(obj);
                bArr = byteArrayOutputStream.toByteArray();
            } catch (IOException e) {
                this.log.error("Error when writing byte array. " + e.getMessage(), e);
                return null;
            }
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Encoded in :" + (currentTimeMillis2 - currentTimeMillis) + " msec");
        }
        return bArr;
    }

    private Object bytesToObject(byte[] bArr) {
        long currentTimeMillis = System.currentTimeMillis();
        Object obj = null;
        if (bArr != null) {
            try {
                obj = new ObjectInputStream(new ByteArrayInputStream(bArr)).readObject();
            } catch (IOException e) {
                this.log.error("Error when writing to object. " + e.getMessage(), e);
                return null;
            } catch (ClassNotFoundException e2) {
                this.log.error("Error when writing to object. " + e2.getMessage(), e2);
                return null;
            }
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Decoded in :" + (currentTimeMillis2 - currentTimeMillis) + " msec");
        }
        return obj;
    }
}
