package org.ballerinalang.siddhi.core.util;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.ballerinalang.siddhi.core.config.SiddhiAppContext;
import org.ballerinalang.siddhi.core.event.ComplexEvent;
import org.ballerinalang.siddhi.core.event.ComplexEventChunk;
import org.ballerinalang.siddhi.core.event.stream.StreamEvent;
import org.ballerinalang.siddhi.core.event.stream.StreamEventPool;
import org.ballerinalang.siddhi.core.event.stream.converter.ConversionStreamEventChunk;
import org.ballerinalang.siddhi.core.event.stream.converter.StreamEventConverter;
import org.ballerinalang.siddhi.core.query.input.stream.single.EntryValveProcessor;
import org.ballerinalang.siddhi.core.util.lock.LockWrapper;
import org.ballerinalang.siddhi.core.util.snapshot.Snapshotable;
import org.ballerinalang.siddhi.core.util.statistics.LatencyTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/ballerinalang/siddhi/core/util/Scheduler.class */
public abstract class Scheduler implements Snapshotable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Scheduler.class);
    protected final BlockingQueue<Long> toNotifyQueue = new LinkedBlockingQueue();
    private final ThreadBarrier threadBarrier;
    private final Schedulable singleThreadEntryValve;
    protected SiddhiAppContext siddhiAppContext;
    protected String elementId;
    protected String queryName;
    private StreamEventPool streamEventPool;
    private ComplexEventChunk<StreamEvent> streamEventChunk;
    private LatencyTracker latencyTracker;
    private LockWrapper lockWrapper;

    public Scheduler(Schedulable schedulable, SiddhiAppContext siddhiAppContext) {
        this.threadBarrier = siddhiAppContext.getThreadBarrier();
        this.siddhiAppContext = siddhiAppContext;
        this.singleThreadEntryValve = schedulable;
    }

    public abstract void schedule(long j);

    public abstract Scheduler clone(String str, EntryValveProcessor entryValveProcessor);

    public void notifyAt(long j) {
        try {
            this.toNotifyQueue.put(Long.valueOf(j));
            schedule(j);
        } catch (InterruptedException e) {
            log.error("Error when adding time:" + j + " to toNotifyQueue at Scheduler", (Throwable) e);
        }
    }

    public void setStreamEventPool(StreamEventPool streamEventPool) {
        this.streamEventPool = streamEventPool;
        this.streamEventChunk = new ConversionStreamEventChunk((StreamEventConverter) null, streamEventPool);
    }

    public void init(LockWrapper lockWrapper, String str) {
        this.lockWrapper = lockWrapper;
        this.queryName = str;
        if (this.elementId == null) {
            this.elementId = "Scheduler-" + this.siddhiAppContext.getElementIdGenerator().createNewId();
        }
        this.siddhiAppContext.getSnapshotService().addSnapshotable(str, this);
    }

    @Override // org.ballerinalang.siddhi.core.util.snapshot.Snapshotable
    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put("ToNotifyQueue", this.toNotifyQueue);
        return hashMap;
    }

    @Override // org.ballerinalang.siddhi.core.util.snapshot.Snapshotable
    public void restoreState(Map<String, Object> map) {
        Iterator it = ((BlockingQueue) map.get("ToNotifyQueue")).iterator();
        while (it.hasNext()) {
            notifyAt(((Long) it.next()).longValue());
        }
    }

    @Override // org.ballerinalang.siddhi.core.util.snapshot.Snapshotable
    public String getElementId() {
        return this.elementId;
    }

    public void setLatencyTracker(LatencyTracker latencyTracker) {
        this.latencyTracker = latencyTracker;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendTimerEvents() {
        Long peek = this.toNotifyQueue.peek();
        long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
        while (true) {
            long j = currentTime;
            if (peek == null || peek.longValue() - j > 0) {
                return;
            }
            this.toNotifyQueue.poll();
            StreamEvent borrowEvent = this.streamEventPool.borrowEvent();
            borrowEvent.setType(ComplexEvent.Type.TIMER);
            borrowEvent.setTimestamp(peek.longValue());
            this.streamEventChunk.add(borrowEvent);
            if (this.lockWrapper != null) {
                this.lockWrapper.lock();
            }
            this.threadBarrier.pass();
            try {
                if (!this.siddhiAppContext.isStatsEnabled() || this.latencyTracker == null) {
                    this.singleThreadEntryValve.process(this.streamEventChunk);
                } else {
                    try {
                        this.latencyTracker.markIn();
                        this.singleThreadEntryValve.process(this.streamEventChunk);
                        this.latencyTracker.markOut();
                    } catch (Throwable th) {
                        this.latencyTracker.markOut();
                        throw th;
                    }
                }
                this.streamEventChunk.clear();
                peek = this.toNotifyQueue.peek();
                currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
            } finally {
                if (this.lockWrapper != null) {
                    this.lockWrapper.unlock();
                }
            }
        }
    }
}
