/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.aggregation;

import io.siddhi.core.aggregation.AggregationRuntime;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.Event;
import io.siddhi.core.event.state.MetaStateEvent;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventFactory;
import io.siddhi.core.exception.DataPurgingException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.OnDemandQueryRuntime;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.IncrementalTimeConverterUtil;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.parser.OnDemandQueryParser;
import io.siddhi.core.window.Window;
import io.siddhi.query.api.aggregation.TimePeriod;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.AggregationDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.TableDefinition;
import io.siddhi.query.api.execution.query.OnDemandQuery;
import io.siddhi.query.api.execution.query.input.store.InputStore;
import io.siddhi.query.api.execution.query.selection.OrderByAttribute;
import io.siddhi.query.api.execution.query.selection.OutputAttribute;
import io.siddhi.query.api.execution.query.selection.Selector;
import io.siddhi.query.api.expression.Expression;
import io.siddhi.query.api.expression.Variable;
import io.siddhi.query.api.expression.condition.Compare;
import io.siddhi.query.api.expression.constant.Constant;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class IncrementalDataPurger
implements Runnable {
    private static final Logger LOG = LogManager.getLogger(IncrementalDataPurger.class);
    private static final Long RETAIN_ALL = -1L;
    private static final String RETAIN_ALL_VALUES = "all";
    private static final String AGGREGATION_START_TIME = "aggregationStartTime";
    private static final String AGGREGATION_NEXT_EMIT_TIME = "nextEmitTime";
    private static final String IS_DATA_AVAILABLE_TO_PURGE = "isDataAvailableToPurge";
    private static final String IS_PARENT_TABLE_HAS_AGGREGATED_DATA = "isParentTableHasAggregatedData";
    private long purgeExecutionInterval = Expression.Time.minute((int)15).value();
    private boolean purgingEnabled = true;
    private Map<TimePeriod.Duration, Long> retentionPeriods = new EnumMap<TimePeriod.Duration, Long>(TimePeriod.Duration.class);
    private StreamEventFactory streamEventFactory;
    private Map<TimePeriod.Duration, Table> aggregationTables;
    private SiddhiQueryContext siddhiQueryContext;
    private ScheduledFuture scheduledPurgingTaskStatus;
    private String purgingTimestampField;
    private Map<TimePeriod.Duration, Long> minimumDurationMap = new EnumMap<TimePeriod.Duration, Long>(TimePeriod.Duration.class);
    private ComplexEventChunk<StateEvent> eventChunk = new ComplexEventChunk();
    private List<VariableExpressionExecutor> variableExpressionExecutorList = new ArrayList<VariableExpressionExecutor>();
    private Attribute aggregatedTimestampAttribute;
    private Map<TimePeriod.Duration, CompiledCondition> compiledConditionsHolder = new EnumMap<TimePeriod.Duration, CompiledCondition>(TimePeriod.Duration.class);
    private Map<String, Table> tableMap = new HashMap<String, Table>();
    private AggregationDefinition aggregationDefinition;
    private List<TimePeriod.Duration> activeIncrementalDurations;
    private String timeZone;
    private Map<String, Window> windowMap;
    private Map<String, AggregationRuntime> aggregationMap;
    private boolean purgingHalted = false;
    private String errorMessage;

    public void init(AggregationDefinition aggregationDefinition, StreamEventFactory streamEventFactory, Map<TimePeriod.Duration, Table> aggregationTables, Boolean isProcessingOnExternalTime, SiddhiQueryContext siddhiQueryContext, List<TimePeriod.Duration> activeIncrementalDurations, String timeZone, Map<String, Window> windowMap, Map<String, AggregationRuntime> aggregationMap) {
        this.siddhiQueryContext = siddhiQueryContext;
        this.aggregationDefinition = aggregationDefinition;
        List annotations = aggregationDefinition.getAnnotations();
        this.streamEventFactory = streamEventFactory;
        this.aggregationTables = aggregationTables;
        this.activeIncrementalDurations = activeIncrementalDurations;
        this.windowMap = windowMap;
        this.aggregationMap = aggregationMap;
        this.purgingTimestampField = isProcessingOnExternalTime != false ? "AGG_EVENT_TIMESTAMP" : "AGG_TIMESTAMP";
        this.aggregatedTimestampAttribute = new Attribute(this.purgingTimestampField, Attribute.Type.LONG);
        VariableExpressionExecutor variableExpressionExecutor = new VariableExpressionExecutor(this.aggregatedTimestampAttribute, 0, 1);
        this.variableExpressionExecutorList.add(variableExpressionExecutor);
        for (Map.Entry<TimePeriod.Duration, Table> entry : aggregationTables.entrySet()) {
            this.tableMap.put(entry.getValue().getTableDefinition().getId(), entry.getValue());
            switch (entry.getKey()) {
                case SECONDS: {
                    this.retentionPeriods.put(entry.getKey(), Expression.Time.sec((int)120).value());
                    this.minimumDurationMap.put(entry.getKey(), Expression.Time.sec((int)120).value());
                    break;
                }
                case MINUTES: {
                    this.retentionPeriods.put(entry.getKey(), Expression.Time.hour((int)24).value());
                    this.minimumDurationMap.put(entry.getKey(), Expression.Time.minute((int)120).value());
                    break;
                }
                case HOURS: {
                    this.retentionPeriods.put(entry.getKey(), Expression.Time.day((int)30).value());
                    this.minimumDurationMap.put(entry.getKey(), Expression.Time.hour((int)25).value());
                    break;
                }
                case DAYS: {
                    this.retentionPeriods.put(entry.getKey(), Expression.Time.year((int)1).value());
                    this.minimumDurationMap.put(entry.getKey(), Expression.Time.day((int)32).value());
                    break;
                }
                case MONTHS: {
                    this.retentionPeriods.put(entry.getKey(), RETAIN_ALL);
                    this.minimumDurationMap.put(entry.getKey(), Expression.Time.month((int)13).value());
                    break;
                }
                case YEARS: {
                    this.retentionPeriods.put(entry.getKey(), RETAIN_ALL);
                    this.minimumDurationMap.put(entry.getKey(), 0L);
                }
            }
        }
        this.timeZone = timeZone;
        HashMap<String, Annotation> annotationTypes = new HashMap<String, Annotation>();
        for (Annotation annotation : annotations) {
            annotationTypes.put(annotation.getName().toLowerCase(), annotation);
        }
        Annotation annotation = (Annotation)annotationTypes.get("purge");
        if (annotation != null) {
            if (annotation.getElement("enable") != null) {
                String purgeEnable = annotation.getElement("enable");
                if (!"true".equalsIgnoreCase(purgeEnable) && !"false".equalsIgnoreCase(purgeEnable)) {
                    throw new SiddhiAppCreationException("Invalid value for enable: " + purgeEnable + ". Please use true or false");
                }
                this.purgingEnabled = Boolean.parseBoolean(purgeEnable);
            }
            if (this.purgingEnabled) {
                List retentions;
                if (annotation.getElement("interval") != null) {
                    String interval = annotation.getElement("interval");
                    this.purgeExecutionInterval = Expression.Time.timeToLong((String)interval);
                }
                if ((retentions = annotation.getAnnotations("retentionPeriod")) != null && !retentions.isEmpty()) {
                    Annotation retention = (Annotation)retentions.get(0);
                    List elements = retention.getElements();
                    for (Element element : elements) {
                        TimePeriod.Duration duration = Expression.Time.normalizeDuration((String)element.getKey());
                        if (!activeIncrementalDurations.contains(duration)) {
                            throw new SiddhiAppCreationException(duration + " granularity cannot be purged since aggregation has not performed in " + duration + " granularity");
                        }
                        if (element.getValue().equalsIgnoreCase(RETAIN_ALL_VALUES)) {
                            this.retentionPeriods.put(duration, RETAIN_ALL);
                            continue;
                        }
                        if (Expression.Time.timeToLong((String)element.getValue()) >= this.minimumDurationMap.get(duration)) {
                            this.retentionPeriods.put(duration, Expression.Time.timeToLong((String)element.getValue()));
                            continue;
                        }
                        throw new SiddhiAppCreationException(duration + " granularity cannot be purge with a retention of '" + element.getValue() + "', minimum retention should be greater  than " + TimeUnit.MILLISECONDS.toMinutes(this.minimumDurationMap.get(duration)) + " minutes");
                    }
                }
            }
        }
        this.compiledConditionsHolder = this.createCompileConditions(aggregationTables, this.tableMap);
    }

    public boolean isPurgingEnabled() {
        return this.purgingEnabled;
    }

    public void setPurgingEnabled(boolean purgingEnabled) {
        this.purgingEnabled = purgingEnabled;
    }

    @Override
    public void run() {
        boolean isNeededToExecutePurgeTask = false;
        boolean isSafeToRunPurgingTask = false;
        long currentTime = System.currentTimeMillis();
        Object[] purgeTimeArray = new Object[1];
        int i = 1;
        if (this.purgingHalted) {
            LOG.error(this.errorMessage);
            return;
        }
        for (TimePeriod.Duration duration : this.activeIncrementalDurations) {
            if (!this.retentionPeriods.get(duration).equals(RETAIN_ALL)) {
                this.eventChunk.clear();
                long purgeTime = currentTime - this.retentionPeriods.get(duration);
                purgeTimeArray[0] = purgeTime;
                if (this.retentionPeriods.size() > i) {
                    Map<String, Boolean> purgingCheckState = this.isSafeToPurgeTheDuration(purgeTime, this.aggregationTables.get(this.activeIncrementalDurations.get(i)), this.aggregationTables.get(duration), duration, this.timeZone);
                    if (purgingCheckState.get(IS_DATA_AVAILABLE_TO_PURGE).booleanValue()) {
                        isNeededToExecutePurgeTask = true;
                        if (purgingCheckState.get(IS_PARENT_TABLE_HAS_AGGREGATED_DATA).booleanValue()) {
                            isSafeToRunPurgingTask = true;
                        } else {
                            isSafeToRunPurgingTask = false;
                            this.purgingHalted = true;
                        }
                    } else {
                        isNeededToExecutePurgeTask = false;
                    }
                }
                if (isNeededToExecutePurgeTask) {
                    if (isSafeToRunPurgingTask) {
                        StateEvent secEvent = this.createStreamEvent(purgeTimeArray, currentTime);
                        this.eventChunk.add(secEvent);
                        Table table = this.aggregationTables.get(duration);
                        try {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Purging data of table: " + table.getTableDefinition().getId() + " with a retention of timestamp : " + purgeTime);
                            }
                            table.deleteEvents(this.eventChunk, this.compiledConditionsHolder.get(duration), 1);
                        }
                        catch (RuntimeException e) {
                            LOG.error("Exception occurred while deleting events from " + table.getTableDefinition().getId() + " table", (Throwable)e);
                            throw new DataPurgingException("Exception occurred while deleting events from " + table.getTableDefinition().getId() + " table", e);
                        }
                    } else {
                        this.errorMessage = "Purging task halted!!!. Data purging for table: " + this.aggregationTables.get(duration).getTableDefinition().getId() + " with a retention of timestamp : " + purgeTime + " didn't executed since parent " + this.aggregationTables.get(this.activeIncrementalDurations.get(i)).getTableDefinition().getId() + " table does not contain values of above period. This has to be investigate since this may lead to an aggregation data mismatch";
                        LOG.info(this.errorMessage);
                        return;
                    }
                }
            }
            ++i;
        }
    }

    private MatchingMetaInfoHolder matchingMetaInfoHolder(Table table, Attribute attribute) {
        MetaStateEvent metaStateEvent = new MetaStateEvent(2);
        MetaStreamEvent metaStreamEventWithDeletePara = new MetaStreamEvent();
        MetaStreamEvent metaStreamEventForTable = new MetaStreamEvent();
        TableDefinition deleteTableDefinition = TableDefinition.id((String)"");
        deleteTableDefinition.attribute(attribute.getName(), attribute.getType());
        metaStreamEventWithDeletePara.setEventType(MetaStreamEvent.EventType.TABLE);
        metaStreamEventWithDeletePara.addOutputData(attribute);
        metaStreamEventWithDeletePara.addInputDefinition((AbstractDefinition)deleteTableDefinition);
        metaStreamEventForTable.setEventType(MetaStreamEvent.EventType.TABLE);
        for (Attribute attributes : table.getTableDefinition().getAttributeList()) {
            metaStreamEventForTable.addOutputData(attributes);
        }
        metaStreamEventForTable.addInputDefinition((AbstractDefinition)table.getTableDefinition());
        metaStateEvent.addEvent(metaStreamEventWithDeletePara);
        metaStateEvent.addEvent(metaStreamEventForTable);
        TableDefinition definition = table.getTableDefinition();
        return new MatchingMetaInfoHolder(metaStateEvent, 0, 1, (AbstractDefinition)deleteTableDefinition, (AbstractDefinition)definition, 0);
    }

    private Map<TimePeriod.Duration, CompiledCondition> createCompileConditions(Map<TimePeriod.Duration, Table> aggregationTables, Map<String, Table> tableMap) {
        EnumMap<TimePeriod.Duration, CompiledCondition> compiledConditionMap = new EnumMap<TimePeriod.Duration, CompiledCondition>(TimePeriod.Duration.class);
        for (Map.Entry<TimePeriod.Duration, Table> entry : aggregationTables.entrySet()) {
            if (this.retentionPeriods.get(entry.getKey()).equals(RETAIN_ALL)) continue;
            Table table = aggregationTables.get(entry.getKey());
            Variable leftVariable = new Variable(this.purgingTimestampField);
            leftVariable.setStreamId(entry.getValue().getTableDefinition().getId());
            Compare expression = new Compare((Expression)leftVariable, Compare.Operator.LESS_THAN, (Expression)new Variable(this.purgingTimestampField));
            CompiledCondition compiledCondition = table.compileCondition((Expression)expression, this.matchingMetaInfoHolder(table, this.aggregatedTimestampAttribute), this.variableExpressionExecutorList, tableMap, this.siddhiQueryContext);
            compiledConditionMap.put(entry.getKey(), compiledCondition);
        }
        return compiledConditionMap;
    }

    public void executeIncrementalDataPurging() {
        StringBuilder tableNames = new StringBuilder();
        if (this.isPurgingEnabled()) {
            if (this.scheduledPurgingTaskStatus != null) {
                this.scheduledPurgingTaskStatus.cancel(true);
                this.scheduledPurgingTaskStatus = this.siddhiQueryContext.getSiddhiAppContext().getScheduledExecutorService().scheduleWithFixedDelay(this, this.purgeExecutionInterval, this.purgeExecutionInterval, TimeUnit.MILLISECONDS);
            } else {
                this.scheduledPurgingTaskStatus = this.siddhiQueryContext.getSiddhiAppContext().getScheduledExecutorService().scheduleWithFixedDelay(this, this.purgeExecutionInterval, this.purgeExecutionInterval, TimeUnit.MILLISECONDS);
            }
            for (Map.Entry<TimePeriod.Duration, Long> entry : this.retentionPeriods.entrySet()) {
                if (this.retentionPeriods.get(entry.getKey()).equals(RETAIN_ALL) || !this.activeIncrementalDurations.contains(entry.getKey())) continue;
                tableNames.append(entry.getKey()).append(",");
            }
            LOG.info("Data purging has enabled for tables: " + tableNames + " with an interval of " + this.purgeExecutionInterval / 1000L + " seconds in " + this.aggregationDefinition.getId() + " aggregation");
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("Purging is disabled in siddhi app: " + this.siddhiQueryContext.getSiddhiAppContext().getName());
        }
    }

    private StateEvent createStreamEvent(Object[] values, Long timestamp) {
        StreamEvent streamEvent = this.streamEventFactory.newInstance();
        streamEvent.setTimestamp(timestamp);
        streamEvent.setOutputData(values);
        StateEvent stateEvent = new StateEvent(2, 1);
        stateEvent.addEvent(0, streamEvent);
        return stateEvent;
    }

    private Map<String, Boolean> isSafeToPurgeTheDuration(long purgeTime, Table parentTable, Table currentTable, TimePeriod.Duration duration, String timeZone) {
        Event[] dataInParentTable = null;
        HashMap<String, Boolean> purgingCheckState = new HashMap<String, Boolean>();
        try {
            Event[] dataToDelete = this.dataToDelete(purgeTime, currentTable);
            if (dataToDelete != null && dataToDelete.length != 0) {
                Map<String, Long> purgingValidationTimeDurations = this.getPurgingValidationTimeDurations(duration, (Long)dataToDelete[0].getData()[0], timeZone);
                OnDemandQuery onDemandQuery = this.getOnDemandQuery(parentTable, purgingValidationTimeDurations.get(AGGREGATION_START_TIME), purgingValidationTimeDurations.get(AGGREGATION_NEXT_EMIT_TIME));
                onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
                OnDemandQueryRuntime onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null, this.siddhiQueryContext.getSiddhiAppContext(), this.tableMap, this.windowMap, this.aggregationMap);
                dataInParentTable = onDemandQueryRuntime.execute();
            }
            purgingCheckState.put(IS_DATA_AVAILABLE_TO_PURGE, dataToDelete != null && dataToDelete.length > 0);
            purgingCheckState.put(IS_PARENT_TABLE_HAS_AGGREGATED_DATA, dataInParentTable != null && dataInParentTable.length > 0);
        }
        catch (Exception e) {
            this.errorMessage = e.getMessage().contains("deadlocked") ? "Deadlock observed while checking whether the data is safe to purge from aggregation tables for the aggregation " + this.aggregationDefinition.getId() + ". If this occurred in an Active Active deployment, this error can be ignored if other node doesn't have this error" : "Error occurred while checking whether the data is safe to purge from aggregation tables for the aggregation " + this.aggregationDefinition.getId();
            LOG.error(this.errorMessage, (Throwable)e);
            purgingCheckState.put(IS_DATA_AVAILABLE_TO_PURGE, false);
            purgingCheckState.put(IS_PARENT_TABLE_HAS_AGGREGATED_DATA, false);
            this.errorMessage = "Error occurred while checking whether the data is safe to purge from aggregation tables for the aggregation " + this.aggregationDefinition.getId();
            this.purgingHalted = true;
        }
        return purgingCheckState;
    }

    private OnDemandQuery getOnDemandQuery(Table table, long timeFrom, long timeTo) {
        ArrayList<OutputAttribute> outputAttributes = new ArrayList<OutputAttribute>();
        outputAttributes.add(new OutputAttribute(new Variable(this.purgingTimestampField)));
        Selector selector = Selector.selector().addSelectionList(outputAttributes).groupBy(Expression.variable((String)this.purgingTimestampField)).orderBy(Expression.variable((String)this.purgingTimestampField), OrderByAttribute.Order.DESC).limit((Constant)Expression.value((int)1));
        InputStore inputStore = timeTo != 0L ? InputStore.store((String)table.getTableDefinition().getId()).on(Expression.and((Expression)Expression.compare((Expression)Expression.variable((String)this.purgingTimestampField), (Compare.Operator)Compare.Operator.GREATER_THAN_EQUAL, (Expression)Expression.value((long)timeFrom)), (Expression)Expression.compare((Expression)Expression.variable((String)this.purgingTimestampField), (Compare.Operator)Compare.Operator.LESS_THAN_EQUAL, (Expression)Expression.value((long)timeTo)))) : InputStore.store((String)table.getTableDefinition().getId()).on(Expression.compare((Expression)Expression.variable((String)this.purgingTimestampField), (Compare.Operator)Compare.Operator.LESS_THAN_EQUAL, (Expression)Expression.value((long)timeFrom)));
        return OnDemandQuery.query().from(inputStore).select(selector);
    }

    private Map<String, Long> getPurgingValidationTimeDurations(TimePeriod.Duration duration, long purgeTime, String timeZone) {
        HashMap<String, Long> purgingValidationTimeDuration = new HashMap<String, Long>();
        switch (duration) {
            case SECONDS: {
                long aggregtionStartTime = IncrementalTimeConverterUtil.getStartTimeOfAggregates(purgeTime, TimePeriod.Duration.MINUTES, timeZone);
                long nextEmmitTime = IncrementalTimeConverterUtil.getNextEmitTime(purgeTime, TimePeriod.Duration.MINUTES, timeZone);
                purgingValidationTimeDuration.put(AGGREGATION_START_TIME, aggregtionStartTime);
                purgingValidationTimeDuration.put(AGGREGATION_NEXT_EMIT_TIME, nextEmmitTime);
                return purgingValidationTimeDuration;
            }
            case MINUTES: {
                long aggregtionStartTime = IncrementalTimeConverterUtil.getStartTimeOfAggregates(purgeTime, TimePeriod.Duration.HOURS, timeZone);
                long nextEmmitTime = IncrementalTimeConverterUtil.getNextEmitTime(purgeTime, TimePeriod.Duration.HOURS, timeZone);
                purgingValidationTimeDuration.put(AGGREGATION_START_TIME, aggregtionStartTime);
                purgingValidationTimeDuration.put(AGGREGATION_NEXT_EMIT_TIME, nextEmmitTime);
                return purgingValidationTimeDuration;
            }
            case HOURS: {
                long aggregtionStartTime = IncrementalTimeConverterUtil.getStartTimeOfAggregates(purgeTime, TimePeriod.Duration.DAYS, timeZone);
                long nextEmmitTime = IncrementalTimeConverterUtil.getNextEmitTime(purgeTime, TimePeriod.Duration.DAYS, timeZone);
                purgingValidationTimeDuration.put(AGGREGATION_START_TIME, aggregtionStartTime);
                purgingValidationTimeDuration.put(AGGREGATION_NEXT_EMIT_TIME, nextEmmitTime);
                return purgingValidationTimeDuration;
            }
            case DAYS: {
                long aggregtionStartTime = IncrementalTimeConverterUtil.getStartTimeOfAggregates(purgeTime, TimePeriod.Duration.MONTHS, timeZone);
                long nextEmmitTime = IncrementalTimeConverterUtil.getNextEmitTime(purgeTime, TimePeriod.Duration.MONTHS, timeZone);
                purgingValidationTimeDuration.put(AGGREGATION_START_TIME, aggregtionStartTime);
                purgingValidationTimeDuration.put(AGGREGATION_NEXT_EMIT_TIME, nextEmmitTime);
                return purgingValidationTimeDuration;
            }
            case MONTHS: {
                long aggregtionStartTime = IncrementalTimeConverterUtil.getStartTimeOfAggregates(purgeTime, TimePeriod.Duration.YEARS, timeZone);
                long nextEmmitTime = IncrementalTimeConverterUtil.getNextEmitTime(purgeTime, TimePeriod.Duration.YEARS, timeZone);
                purgingValidationTimeDuration.put(AGGREGATION_START_TIME, aggregtionStartTime);
                purgingValidationTimeDuration.put(AGGREGATION_NEXT_EMIT_TIME, nextEmmitTime);
                return purgingValidationTimeDuration;
            }
        }
        return purgingValidationTimeDuration;
    }

    Event[] dataToDelete(long purgingTime, Table table) {
        OnDemandQuery onDemandQuery = this.getOnDemandQuery(table, purgingTime, 0L);
        onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
        OnDemandQueryRuntime onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null, this.siddhiQueryContext.getSiddhiAppContext(), this.tableMap, this.windowMap, this.aggregationMap);
        return onDemandQueryRuntime.execute();
    }
}

