package org.wso2.siddhi.core.query.processor.stream.window;

import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
import org.quartz.CronScheduleBuilder;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.util.ExceptionUtil;
import org.wso2.siddhi.core.util.config.ConfigReader;

/* JADX WARN: Classes with same name are omitted:
  input_file:dependencies/siddhi-core-4.5.3.jar:org/wso2/siddhi/core/query/processor/stream/window/CronWindowProcessor.class
 */
@Extension(name = "cron", namespace = "", description = "This window returns events processed periodically as the output in time-repeating patterns, triggered based on time passing.", parameters = {@Parameter(name = "cron.expression", description = "The cron expression that represents a time schedule.", type = {DataType.STRING})}, examples = {@Example(syntax = "define window cseEventWindow (symbol string, price float, volume int)cron('*/5 * * * * ?');\n@info(name = 'query0')\nfrom cseEventStream\ninsert into cseEventWindow;\n@info(name = 'query1')\nfrom cseEventWindow \nselect symbol,price,volume\ninsert into outputStream ;", description = "This will processed events as the output every 5 seconds.")})
/* loaded from: input_file:org/wso2/siddhi/core/query/processor/stream/window/CronWindowProcessor.class */
public class CronWindowProcessor extends WindowProcessor implements Job {
    private static final Logger log = Logger.getLogger(CronWindowProcessor.class);
    private final String jobGroup = "CronWindowGroup";
    private ComplexEventChunk<StreamEvent> currentEventChunk = new ComplexEventChunk<>(false);
    private ComplexEventChunk<StreamEvent> expiredEventChunk = new ComplexEventChunk<>(false);
    private SiddhiAppContext siddhiAppContext;
    private Scheduler scheduler;
    private String jobName;
    private String cronString;

    @Override // org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected void init(ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, boolean z, SiddhiAppContext siddhiAppContext) {
        this.siddhiAppContext = siddhiAppContext;
        if (expressionExecutorArr != null) {
            this.cronString = (String) ((ConstantExpressionExecutor) expressionExecutorArr[0]).getValue();
        }
    }

    @Override // org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor
    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner) {
        synchronized (this) {
            while (complexEventChunk.hasNext()) {
                this.currentEventChunk.add(streamEventCloner.copyStreamEvent(complexEventChunk.next()));
                complexEventChunk.remove();
            }
        }
    }

    @Override // org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void start() {
        scheduleCronJob(this.cronString, this.elementId);
    }

    @Override // org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder
    public void stop() {
        try {
            if (this.scheduler != null) {
                this.scheduler.deleteJob(new JobKey(this.jobName, "CronWindowGroup"));
            }
        } catch (SchedulerException e) {
            log.error(ExceptionUtil.getMessageWithContext(e, this.siddhiAppContext) + " Error while removing the cron job 'CronWindowGroup:'" + this.jobName + "'.", e);
        }
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put("CurrentEventChunk", this.currentEventChunk.getFirst());
        hashMap.put("ExpiredEventChunk", this.expiredEventChunk.getFirst());
        return hashMap;
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public void restoreState(Map<String, Object> map) {
        this.currentEventChunk.clear();
        this.currentEventChunk.add((StreamEvent) map.get("CurrentEventChunk"));
        this.expiredEventChunk.clear();
        this.expiredEventChunk.add((StreamEvent) map.get("ExpiredEventChunk"));
    }

    private void scheduleCronJob(String str, String str2) {
        try {
            this.scheduler = new StdSchedulerFactory().getScheduler();
            this.jobName = "EventRemoverJob_" + str2;
            JobKey jobKey = new JobKey(this.jobName, "CronWindowGroup");
            if (this.scheduler.checkExists(jobKey)) {
                this.scheduler.deleteJob(jobKey);
            }
            this.scheduler.start();
            JobDataMap jobDataMap = new JobDataMap();
            jobDataMap.put("windowProcessor", (Object) this);
            this.scheduler.scheduleJob(JobBuilder.newJob(CronWindowProcessor.class).withIdentity(this.jobName, "CronWindowGroup").usingJobData(jobDataMap).build(), TriggerBuilder.newTrigger().withIdentity("EventRemoverTrigger_" + str2, "CronWindowGroup").withSchedule(CronScheduleBuilder.cronSchedule(str)).build());
        } catch (SchedulerException e) {
            log.error("Error while instantiating quartz scheduler", e);
        }
    }

    public void dispatchEvents() {
        ComplexEventChunk complexEventChunk = new ComplexEventChunk(false);
        synchronized (this) {
            if (this.currentEventChunk.getFirst() != null) {
                long currentTime = this.siddhiAppContext.getTimestampGenerator().currentTime();
                while (this.expiredEventChunk.hasNext()) {
                    this.expiredEventChunk.next().setTimestamp(currentTime);
                }
                if (this.expiredEventChunk.getFirst() != null) {
                    complexEventChunk.add(this.expiredEventChunk.getFirst());
                }
                this.expiredEventChunk.clear();
                while (this.currentEventChunk.hasNext()) {
                    StreamEvent copyStreamEvent = this.streamEventCloner.copyStreamEvent(this.currentEventChunk.next());
                    copyStreamEvent.setType(ComplexEvent.Type.EXPIRED);
                    this.expiredEventChunk.add(copyStreamEvent);
                }
                complexEventChunk.add(this.currentEventChunk.getFirst());
                this.currentEventChunk.clear();
            }
        }
        if (complexEventChunk.getFirst() != null) {
            this.nextProcessor.process(complexEventChunk);
        }
    }

    @Override // org.quartz.Job
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        if (log.isDebugEnabled()) {
            log.debug("Running Event Remover Job");
        }
        ((CronWindowProcessor) jobExecutionContext.getJobDetail().getJobDataMap().get("windowProcessor")).dispatchEvents();
    }
}
