package org.wso2.siddhi.core.util;

import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventPool;
import org.wso2.siddhi.core.event.stream.converter.ConversionStreamEventChunk;
import org.wso2.siddhi.core.event.stream.converter.StreamEventConverter;
import org.wso2.siddhi.core.query.input.stream.single.SingleThreadEntryValveProcessor;
import org.wso2.siddhi.core.util.snapshot.Snapshotable;

/* loaded from: input_file:org/wso2/siddhi/core/util/Scheduler.class */
public class Scheduler implements Snapshotable {
    private static final Logger log = Logger.getLogger(Scheduler.class);
    private ScheduledExecutorService scheduledExecutorService;
    private EventCaller eventCaller;
    private StreamEventPool streamEventPool;
    private ComplexEventChunk<StreamEvent> streamEventChunk;
    private ExecutionPlanContext executionPlanContext;
    private String elementId;
    private final BlockingQueue<Long> toNotifyQueue = new LinkedBlockingQueue();
    private volatile boolean running = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wso2/siddhi/core/util/Scheduler$EventCaller.class */
    public class EventCaller implements Runnable {
        private Schedulable singleThreadEntryValve;

        public EventCaller(Schedulable schedulable) {
            this.singleThreadEntryValve = schedulable;
        }

        @Override // java.lang.Runnable
        public void run() {
            long j;
            Long l = (Long) Scheduler.this.toNotifyQueue.peek();
            long currentTimeMillis = System.currentTimeMillis();
            while (true) {
                j = currentTimeMillis;
                if (l == null || l.longValue() - j > 0) {
                    break;
                }
                Scheduler.this.toNotifyQueue.poll();
                StreamEvent borrowEvent = Scheduler.this.streamEventPool.borrowEvent();
                borrowEvent.setType(ComplexEvent.Type.TIMER);
                borrowEvent.setTimestamp(j);
                Scheduler.this.streamEventChunk.add(borrowEvent);
                this.singleThreadEntryValve.process(Scheduler.this.streamEventChunk);
                Scheduler.this.streamEventChunk.clear();
                l = (Long) Scheduler.this.toNotifyQueue.peek();
                currentTimeMillis = System.currentTimeMillis();
            }
            if (l != null) {
                Scheduler.this.scheduledExecutorService.schedule(Scheduler.this.eventCaller, j - l.longValue(), TimeUnit.MILLISECONDS);
                return;
            }
            synchronized (Scheduler.this.toNotifyQueue) {
                Scheduler.this.running = false;
                if (Scheduler.this.toNotifyQueue.peek() != null) {
                    Scheduler.this.running = true;
                    Scheduler.this.scheduledExecutorService.schedule(Scheduler.this.eventCaller, 0L, TimeUnit.MILLISECONDS);
                }
            }
        }
    }

    public Scheduler(ScheduledExecutorService scheduledExecutorService, Schedulable schedulable) {
        this.scheduledExecutorService = scheduledExecutorService;
        this.eventCaller = new EventCaller(schedulable);
    }

    public void notifyAt(long j) {
        try {
            this.toNotifyQueue.put(Long.valueOf(j));
            if (!this.running && this.toNotifyQueue.size() == 1) {
                synchronized (this.toNotifyQueue) {
                    if (!this.running) {
                        this.running = true;
                        long currentTimeMillis = j - System.currentTimeMillis();
                        if (currentTimeMillis > 0) {
                            this.scheduledExecutorService.schedule(this.eventCaller, currentTimeMillis, TimeUnit.MILLISECONDS);
                        } else {
                            this.scheduledExecutorService.schedule(this.eventCaller, 0L, TimeUnit.MILLISECONDS);
                        }
                    }
                }
            }
        } catch (InterruptedException e) {
            log.error("Error when adding time:" + j + " to TimeNotifier ", e);
        }
    }

    public void setStreamEventPool(StreamEventPool streamEventPool) {
        this.streamEventPool = streamEventPool;
        this.streamEventChunk = new ConversionStreamEventChunk((StreamEventConverter) null, streamEventPool);
    }

    public void init(ExecutionPlanContext executionPlanContext) {
        this.executionPlanContext = executionPlanContext;
        if (this.elementId == null) {
            this.elementId = executionPlanContext.getElementIdGenerator().createNewId();
        }
        executionPlanContext.getSnapshotService().addSnapshotable(this);
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public Object[] currentState() {
        return new Object[]{this.toNotifyQueue};
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public void restoreState(Object[] objArr) {
        Iterator it = ((BlockingQueue) objArr[0]).iterator();
        while (it.hasNext()) {
            notifyAt(((Long) it.next()).longValue());
        }
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public String getElementId() {
        return this.elementId;
    }

    public Scheduler clone(String str, SingleThreadEntryValveProcessor singleThreadEntryValveProcessor) {
        Scheduler scheduler = new Scheduler(this.scheduledExecutorService, singleThreadEntryValveProcessor);
        scheduler.elementId = this.elementId + "-" + str;
        scheduler.init(this.executionPlanContext);
        return scheduler;
    }
}
