/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.query.processor.stream.window;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.stream.window.BatchingWindowProcessor;
import io.siddhi.core.util.ExceptionUtil;
import io.siddhi.core.util.config.ConfigReader;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
import org.quartz.CronScheduleBuilder;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobKey;
import org.quartz.ScheduleBuilder;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;

@Extension(name="cron", namespace="", description="This window outputs the arriving events as and when they arrive, and resets (expires) the window periodically based on the given cron expression.", parameters={@Parameter(name="cron.expression", description="The cron expression that resets the window.", type={DataType.STRING})}, examples={@Example(syntax="define stream InputEventStream (symbol string, price float, volume int);\n\n@info(name = 'query1')\nfrom InputEventStream#cron('*/5 * * * * ?')\nselect symbol, sum(price) as totalPrice \ninsert into OutputStream;", description="This let the totalPrice to gradually increase and resets to zero as a batch every 5 seconds."), @Example(syntax="define stream StockEventStream (symbol string, price float, volume int)\ndefine window StockEventWindow (symbol string, price float, volume int) cron('*/5 * * * * ?');\n\n@info(name = 'query0')\nfrom StockEventStream\ninsert into StockEventWindow;\n\n@info(name = 'query1')\nfrom StockEventWindow \nselect symbol, sum(price) as totalPrice\ninsert into OutputStream ;", description="The defined window will let the totalPrice to gradually increase and resets to zero as a batch every 5 seconds.")})
public class CronWindowProcessor
extends BatchingWindowProcessor
implements Job {
    private static final Logger log = Logger.getLogger(CronWindowProcessor.class);
    private final String jobGroup = "CronWindowGroup";
    private ComplexEventChunk<StreamEvent> currentEventChunk = new ComplexEventChunk(false);
    private ComplexEventChunk<StreamEvent> expiredEventChunk = new ComplexEventChunk(false);
    private SiddhiQueryContext siddhiQueryContext;
    private Scheduler scheduler;
    private String jobName;
    private String cronString;

    @Override
    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ConfigReader configReader, boolean outputExpectsExpiredEvents, SiddhiQueryContext siddhiQueryContext) {
        this.siddhiQueryContext = siddhiQueryContext;
        if (attributeExpressionExecutors != null) {
            this.cronString = (String)((ConstantExpressionExecutor)attributeExpressionExecutors[0]).getValue();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner) {
        CronWindowProcessor cronWindowProcessor = this;
        synchronized (cronWindowProcessor) {
            while (streamEventChunk.hasNext()) {
                StreamEvent streamEvent = (StreamEvent)streamEventChunk.next();
                StreamEvent clonedStreamEvent = streamEventCloner.copyStreamEvent(streamEvent);
                this.currentEventChunk.add(clonedStreamEvent);
                streamEventChunk.remove();
            }
        }
    }

    @Override
    public void start() {
        this.scheduleCronJob(this.cronString, this.elementId);
    }

    @Override
    public void stop() {
        try {
            if (this.scheduler != null) {
                this.scheduler.deleteJob(new JobKey(this.jobName, "CronWindowGroup"));
            }
        }
        catch (SchedulerException e) {
            log.error((Object)(ExceptionUtil.getMessageWithContext(e, this.siddhiQueryContext.getSiddhiAppContext()) + " Error while removing the cron job '" + "CronWindowGroup" + ":'" + this.jobName + "'."), (Throwable)e);
        }
    }

    @Override
    public Map<String, Object> currentState() {
        HashMap<String, Object> state = new HashMap<String, Object>();
        state.put("CurrentEventChunk", this.currentEventChunk.getFirst());
        state.put("ExpiredEventChunk", this.expiredEventChunk.getFirst());
        return state;
    }

    @Override
    public void restoreState(Map<String, Object> state) {
        this.currentEventChunk.clear();
        this.currentEventChunk.add((StreamEvent)state.get("CurrentEventChunk"));
        this.expiredEventChunk.clear();
        this.expiredEventChunk.add((StreamEvent)state.get("ExpiredEventChunk"));
    }

    private void scheduleCronJob(String cronString, String elementId) {
        try {
            StdSchedulerFactory schedFact = new StdSchedulerFactory();
            this.scheduler = schedFact.getScheduler();
            this.jobName = "EventRemoverJob_" + elementId;
            JobKey jobKey = new JobKey(this.jobName, "CronWindowGroup");
            if (this.scheduler.checkExists(jobKey)) {
                this.scheduler.deleteJob(jobKey);
            }
            this.scheduler.start();
            JobDataMap dataMap = new JobDataMap();
            dataMap.put("windowProcessor", (Object)this);
            JobDetail job = JobBuilder.newJob(CronWindowProcessor.class).withIdentity(this.jobName, "CronWindowGroup").usingJobData(dataMap).build();
            Trigger trigger = TriggerBuilder.newTrigger().withIdentity("EventRemoverTrigger_" + elementId, "CronWindowGroup").withSchedule((ScheduleBuilder)CronScheduleBuilder.cronSchedule((String)cronString)).build();
            this.scheduler.scheduleJob(job, trigger);
        }
        catch (SchedulerException e) {
            log.error((Object)"Error while instantiating quartz scheduler", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dispatchEvents() {
        ComplexEventChunk<StreamEvent> streamEventChunk = new ComplexEventChunk<StreamEvent>(false);
        CronWindowProcessor cronWindowProcessor = this;
        synchronized (cronWindowProcessor) {
            if (this.currentEventChunk.getFirst() != null) {
                long currentTime = this.siddhiQueryContext.getSiddhiAppContext().getTimestampGenerator().currentTime();
                while (this.expiredEventChunk.hasNext()) {
                    StreamEvent expiredEvent = (StreamEvent)this.expiredEventChunk.next();
                    expiredEvent.setTimestamp(currentTime);
                }
                if (this.expiredEventChunk.getFirst() != null) {
                    streamEventChunk.add(this.expiredEventChunk.getFirst());
                }
                this.expiredEventChunk.clear();
                while (this.currentEventChunk.hasNext()) {
                    StreamEvent currentEvent = (StreamEvent)this.currentEventChunk.next();
                    StreamEvent toExpireEvent = this.streamEventCloner.copyStreamEvent(currentEvent);
                    toExpireEvent.setType(ComplexEvent.Type.EXPIRED);
                    this.expiredEventChunk.add(toExpireEvent);
                }
                streamEventChunk.add(this.currentEventChunk.getFirst());
                this.currentEventChunk.clear();
            }
        }
        if (streamEventChunk.getFirst() != null) {
            this.nextProcessor.process(streamEventChunk);
        }
    }

    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        if (log.isDebugEnabled()) {
            log.debug((Object)"Running Event Remover Job");
        }
        JobDataMap dataMap = jobExecutionContext.getJobDetail().getJobDataMap();
        CronWindowProcessor windowProcessor = (CronWindowProcessor)dataMap.get((Object)"windowProcessor");
        windowProcessor.dispatchEvents();
    }
}

