package io.siddhi.core.util;

import com.google.common.collect.TreeMultimap;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventFactory;
import io.siddhi.core.util.extension.holder.ExternalReferencedHolder;
import io.siddhi.core.util.lock.LockWrapper;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateHolder;
import io.siddhi.core.util.statistics.LatencyTracker;
import io.siddhi.core.util.statistics.metrics.Level;
import io.siddhi.core.util.timestamp.TimestampGeneratorImpl;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

/* JADX WARN: Classes with same name are omitted:
  input_file:dependencies/siddhi-core-5.1.17.jar:io/siddhi/core/util/Scheduler.class
 */
/* loaded from: input_file:io/siddhi/core/util/Scheduler.class */
public class Scheduler implements ExternalReferencedHolder {
    private static final Logger log = Logger.getLogger(Scheduler.class);
    private final ThreadBarrier threadBarrier;
    private final Schedulable singleThreadEntryValve;
    private final Semaphore mutex = new Semaphore(1);
    protected String queryName;
    private SiddhiQueryContext siddhiQueryContext;
    private LockWrapper lockWrapper;
    private ScheduledExecutorService scheduledExecutorService;
    private StreamEventFactory streamEventFactory;
    private LatencyTracker latencyTracker;
    private StateHolder<SchedulerState> stateHolder;
    private boolean stop;

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:dependencies/siddhi-core-5.1.17.jar:io/siddhi/core/util/Scheduler$EventCaller.class
     */
    /* loaded from: input_file:io/siddhi/core/util/Scheduler$EventCaller.class */
    public class EventCaller implements Runnable {
        private SchedulerState state;
        private String key;

        public EventCaller(SchedulerState schedulerState, String str) {
            this.state = schedulerState;
            this.key = str;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public synchronized void run() {
            if (Scheduler.this.stop) {
                return;
            }
            SiddhiAppContext.startPartitionFlow(this.key);
            try {
                if (Scheduler.this.siddhiQueryContext.getSiddhiAppContext().isPlayback()) {
                    this.state.running = false;
                } else {
                    Scheduler.this.sendTimerEvents(this.state);
                    Long l = (Long) this.state.toNotifyQueue.peek();
                    long currentTime = Scheduler.this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime();
                    if (l != null) {
                        this.state.scheduledFuture = Scheduler.this.scheduledExecutorService.schedule(this, l.longValue() - currentTime, TimeUnit.MILLISECONDS);
                    } else {
                        try {
                            try {
                                Scheduler.this.mutex.acquire();
                                this.state.running = false;
                                if (this.state.toNotifyQueue.peek() != null) {
                                    this.state.running = true;
                                    this.state.scheduledFuture = Scheduler.this.scheduledExecutorService.schedule(this, 0L, TimeUnit.MILLISECONDS);
                                }
                                Scheduler.this.mutex.release();
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                Scheduler.log.error("Error when scheduling System Time Based Scheduler", e);
                                Scheduler.this.mutex.release();
                            }
                        } catch (Throwable th) {
                            Scheduler.this.mutex.release();
                            throw th;
                        }
                    }
                }
            } catch (Throwable th2) {
                Scheduler.log.error("Error while executing Scheduled Timer Event Caller, " + th2.getMessage(), th2);
            } finally {
                SiddhiAppContext.stopPartitionFlow();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:dependencies/siddhi-core-5.1.17.jar:io/siddhi/core/util/Scheduler$SchedulerState.class
     */
    /* loaded from: input_file:io/siddhi/core/util/Scheduler$SchedulerState.class */
    public class SchedulerState extends State implements Comparable {
        private EventCaller eventCaller;
        private ScheduledFuture scheduledFuture;
        private final BlockingQueue<Long> toNotifyQueue = new LinkedBlockingQueue();
        private volatile boolean running = false;
        private final String key = SiddhiAppContext.getPartitionFlowId();

        public SchedulerState() {
            this.eventCaller = new EventCaller(this, this.key);
        }

        @Override // io.siddhi.core.util.snapshot.state.State
        public boolean canDestroy() {
            return this.toNotifyQueue.isEmpty() && (this.scheduledFuture == null || this.scheduledFuture.isDone());
        }

        @Override // io.siddhi.core.util.snapshot.state.State
        public Map<String, Object> snapshot() {
            HashMap hashMap = new HashMap();
            hashMap.put("ToNotifyQueue", this.toNotifyQueue);
            return hashMap;
        }

        @Override // io.siddhi.core.util.snapshot.state.State
        public void restore(Map<String, Object> map) {
            Iterator it = ((BlockingQueue) map.get("ToNotifyQueue")).iterator();
            while (it.hasNext()) {
                Scheduler.this.notifyAt(((Long) it.next()).longValue());
            }
        }

        @Override // java.lang.Comparable
        public int compareTo(Object obj) {
            return 0;
        }
    }

    public Scheduler(Schedulable schedulable, SiddhiQueryContext siddhiQueryContext) {
        this.threadBarrier = siddhiQueryContext.getSiddhiAppContext().getThreadBarrier();
        this.siddhiQueryContext = siddhiQueryContext;
        this.singleThreadEntryValve = schedulable;
        this.scheduledExecutorService = siddhiQueryContext.getSiddhiAppContext().getScheduledExecutorService();
        siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().addTimeChangeListener(new TimestampGeneratorImpl.TimeChangeListener() { // from class: io.siddhi.core.util.Scheduler.1
            @Override // io.siddhi.core.util.timestamp.TimestampGeneratorImpl.TimeChangeListener
            public synchronized void onTimeChange(long j) {
                Map allStates = Scheduler.this.stateHolder.getAllStates();
                try {
                    TreeMultimap create = TreeMultimap.create();
                    Iterator it = allStates.entrySet().iterator();
                    while (it.hasNext()) {
                        for (Map.Entry entry : ((Map) ((Map.Entry) it.next()).getValue()).entrySet()) {
                            Long l = (Long) ((SchedulerState) entry.getValue()).toNotifyQueue.peek();
                            if (l != null && l.longValue() <= j) {
                                create.put(l, entry.getValue());
                            }
                        }
                    }
                    for (Map.Entry entry2 : create.entries()) {
                        try {
                            SiddhiAppContext.startPartitionFlow(((SchedulerState) entry2.getValue()).key);
                            Scheduler.this.sendTimerEvents((SchedulerState) entry2.getValue());
                            SiddhiAppContext.stopPartitionFlow();
                        } finally {
                        }
                    }
                } finally {
                    Scheduler.this.stateHolder.returnAllStates(allStates);
                }
            }
        });
    }

    public void init(LockWrapper lockWrapper, String str) {
        this.lockWrapper = lockWrapper;
        this.queryName = str;
        this.stateHolder = this.siddhiQueryContext.generateStateHolder("Scheduler_" + str + "_" + this.siddhiQueryContext.generateNewId(), false, () -> {
            return new SchedulerState();
        });
    }

    public void notifyAt(long j) {
        SchedulerState state = this.stateHolder.getState();
        try {
            try {
                state.toNotifyQueue.put(Long.valueOf(j));
                schedule(j, state, false);
                this.stateHolder.returnState(state);
            } catch (InterruptedException e) {
                if (!this.scheduledExecutorService.isShutdown()) {
                    log.error("Error when adding time:" + j + " to toNotifyQueue at Scheduler", e);
                }
                this.stateHolder.returnState(state);
            }
        } catch (Throwable th) {
            this.stateHolder.returnState(state);
            throw th;
        }
    }

    private void schedule(long j, SchedulerState schedulerState, boolean z) {
        if (this.siddhiQueryContext.getSiddhiAppContext().isPlayback() || schedulerState.running) {
            return;
        }
        if (schedulerState.toNotifyQueue.size() == 1 || z) {
            try {
                try {
                    this.mutex.acquire();
                    if (!schedulerState.running) {
                        schedulerState.running = true;
                        long currentTime = j - this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime();
                        if (currentTime > 0) {
                            schedulerState.scheduledFuture = this.scheduledExecutorService.schedule(schedulerState.eventCaller, currentTime, TimeUnit.MILLISECONDS);
                        } else {
                            schedulerState.scheduledFuture = this.scheduledExecutorService.schedule(schedulerState.eventCaller, 0L, TimeUnit.MILLISECONDS);
                        }
                    }
                    this.mutex.release();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.error("Error when scheduling System Time Based Scheduler", e);
                    this.mutex.release();
                }
            } catch (Throwable th) {
                this.mutex.release();
                throw th;
            }
        }
    }

    public void setStreamEventFactory(StreamEventFactory streamEventFactory) {
        this.streamEventFactory = streamEventFactory;
    }

    public void setLatencyTracker(LatencyTracker latencyTracker) {
        this.latencyTracker = latencyTracker;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendTimerEvents(SchedulerState schedulerState) {
        Long l = (Long) schedulerState.toNotifyQueue.peek();
        long currentTime = this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime();
        while (true) {
            long j = currentTime;
            if (l == null || l.longValue() - j > 0) {
                return;
            }
            schedulerState.toNotifyQueue.poll();
            StreamEvent newInstance = this.streamEventFactory.newInstance();
            newInstance.setType(ComplexEvent.Type.TIMER);
            newInstance.setTimestamp(l.longValue());
            if (this.lockWrapper != null) {
                this.lockWrapper.lock();
            }
            this.threadBarrier.enter();
            try {
                try {
                    ComplexEventChunk complexEventChunk = new ComplexEventChunk();
                    complexEventChunk.add(newInstance);
                    if (Level.BASIC.compareTo(this.siddhiQueryContext.getSiddhiAppContext().getRootMetricsLevel()) <= 0 && this.latencyTracker != null) {
                        try {
                            this.latencyTracker.markIn();
                            this.singleThreadEntryValve.process(complexEventChunk);
                            this.latencyTracker.markOut();
                        } catch (Throwable th) {
                            this.latencyTracker.markOut();
                            throw th;
                            break;
                        }
                    } else {
                        this.singleThreadEntryValve.process(complexEventChunk);
                    }
                    if (this.lockWrapper != null) {
                        this.lockWrapper.unlock();
                    }
                    this.threadBarrier.exit();
                } catch (Throwable th2) {
                    log.error("Error while sending timer events, " + th2.getMessage(), th2);
                    if (this.lockWrapper != null) {
                        this.lockWrapper.unlock();
                    }
                    this.threadBarrier.exit();
                }
                l = (Long) schedulerState.toNotifyQueue.peek();
                currentTime = this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime();
            } catch (Throwable th3) {
                if (this.lockWrapper != null) {
                    this.lockWrapper.unlock();
                }
                this.threadBarrier.exit();
                throw th3;
            }
        }
    }

    public void switchToLiveMode() {
        Map<String, Map<String, SchedulerState>> allStates = this.stateHolder.getAllStates();
        try {
            for (Map.Entry<String, Map<String, SchedulerState>> entry : allStates.entrySet()) {
                for (Map.Entry<String, SchedulerState> entry2 : entry.getValue().entrySet()) {
                    Long l = (Long) entry2.getValue().toNotifyQueue.peek();
                    if (l != null) {
                        SiddhiAppContext.startPartitionFlow(entry.getKey());
                        SiddhiAppContext.startGroupByFlow(entry2.getKey());
                        try {
                            schedule(l.longValue(), entry2.getValue(), true);
                            SiddhiAppContext.stopGroupByFlow();
                            SiddhiAppContext.stopPartitionFlow();
                        } finally {
                        }
                    }
                }
            }
        } finally {
            this.stateHolder.returnAllStates(allStates);
        }
    }

    public void switchToPlayBackMode() {
        Map<String, Map<String, SchedulerState>> allStates = this.stateHolder.getAllStates();
        try {
            Iterator<Map.Entry<String, Map<String, SchedulerState>>> it = allStates.entrySet().iterator();
            while (it.hasNext()) {
                for (Map.Entry<String, SchedulerState> entry : it.next().getValue().entrySet()) {
                    if (entry.getValue().scheduledFuture != null) {
                        entry.getValue().scheduledFuture.cancel(true);
                    }
                    entry.getValue().running = false;
                }
            }
        } finally {
            this.stateHolder.returnAllStates(allStates);
        }
    }

    @Override // io.siddhi.core.util.extension.holder.ExternalReferencedHolder
    public void stop() {
        this.stop = true;
    }

    @Override // io.siddhi.core.util.extension.holder.ExternalReferencedHolder
    public void start() {
        this.stop = false;
    }
}
