/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.aggregation;

import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.state.MetaStateEvent;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventPool;
import io.siddhi.core.exception.DataPurgingException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.query.api.aggregation.TimePeriod;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.AggregationDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.TableDefinition;
import io.siddhi.query.api.expression.Expression;
import io.siddhi.query.api.expression.Variable;
import io.siddhi.query.api.expression.condition.Compare;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

public class IncrementalDataPurging
implements Runnable {
    private static final Logger LOG = Logger.getLogger(IncrementalDataPurging.class);
    private static final String INTERNAL_AGG_TIMESTAMP_FIELD = "AGG_TIMESTAMP";
    private static final String EXTERNAL_AGG_TIMESTAMP_FIELD = "AGG_EVENT_TIMESTAMP";
    private static final Long RETAIN_ALL = -1L;
    private static final String RETAIN_ALL_VALUES = "all";
    private long purgeExecutionInterval = Expression.Time.minute((int)15).value();
    private boolean purgingEnabled = true;
    private Map<TimePeriod.Duration, Long> retentionPeriods = new EnumMap<TimePeriod.Duration, Long>(TimePeriod.Duration.class);
    private StreamEventPool streamEventPool;
    private Map<TimePeriod.Duration, Table> aggregationTables;
    private SiddhiQueryContext siddhiQueryContext;
    private ScheduledFuture scheduledPurgingTaskStatus;
    private String purgingTimestampField;
    private Map<TimePeriod.Duration, Long> minimumDurationMap = new EnumMap<TimePeriod.Duration, Long>(TimePeriod.Duration.class);
    private ComplexEventChunk<StateEvent> eventChunk = new ComplexEventChunk(true);
    private List<VariableExpressionExecutor> variableExpressionExecutorList = new ArrayList<VariableExpressionExecutor>();
    private Attribute aggregatedTimestampAttribute;
    private Map<TimePeriod.Duration, CompiledCondition> compiledConditionsHolder = new EnumMap<TimePeriod.Duration, CompiledCondition>(TimePeriod.Duration.class);
    private Map<String, Table> tableMap = new HashMap<String, Table>();
    private AggregationDefinition aggregationDefinition;

    public void init(AggregationDefinition aggregationDefinition, StreamEventPool streamEventPool, Map<TimePeriod.Duration, Table> aggregationTables, Boolean isProcessingOnExternalTime, SiddhiQueryContext siddhiQueryContext) {
        this.siddhiQueryContext = siddhiQueryContext;
        this.aggregationDefinition = aggregationDefinition;
        List annotations = aggregationDefinition.getAnnotations();
        this.streamEventPool = streamEventPool;
        this.aggregationTables = aggregationTables;
        this.purgingTimestampField = isProcessingOnExternalTime != false ? EXTERNAL_AGG_TIMESTAMP_FIELD : INTERNAL_AGG_TIMESTAMP_FIELD;
        this.aggregatedTimestampAttribute = new Attribute(this.purgingTimestampField, Attribute.Type.LONG);
        VariableExpressionExecutor variableExpressionExecutor = new VariableExpressionExecutor(this.aggregatedTimestampAttribute, 0, 1);
        this.variableExpressionExecutorList.add(variableExpressionExecutor);
        for (Map.Entry<TimePeriod.Duration, Table> entry : aggregationTables.entrySet()) {
            this.tableMap.put(entry.getValue().getTableDefinition().getId(), entry.getValue());
            switch (entry.getKey()) {
                case SECONDS: {
                    this.retentionPeriods.put(entry.getKey(), Expression.Time.sec((int)120).value());
                    this.minimumDurationMap.put(entry.getKey(), Expression.Time.sec((int)120).value());
                    break;
                }
                case MINUTES: {
                    this.retentionPeriods.put(entry.getKey(), Expression.Time.hour((int)24).value());
                    this.minimumDurationMap.put(entry.getKey(), Expression.Time.minute((int)120).value());
                    break;
                }
                case HOURS: {
                    this.retentionPeriods.put(entry.getKey(), Expression.Time.day((int)30).value());
                    this.minimumDurationMap.put(entry.getKey(), Expression.Time.hour((int)25).value());
                    break;
                }
                case DAYS: {
                    this.retentionPeriods.put(entry.getKey(), Expression.Time.year((int)1).value());
                    this.minimumDurationMap.put(entry.getKey(), Expression.Time.day((int)32).value());
                    break;
                }
                case MONTHS: {
                    this.retentionPeriods.put(entry.getKey(), RETAIN_ALL);
                    this.minimumDurationMap.put(entry.getKey(), Expression.Time.month((int)13).value());
                    break;
                }
                case YEARS: {
                    this.retentionPeriods.put(entry.getKey(), RETAIN_ALL);
                    this.minimumDurationMap.put(entry.getKey(), 0L);
                }
            }
        }
        HashMap<String, Annotation> annotationTypes = new HashMap<String, Annotation>();
        for (Annotation annotation : annotations) {
            annotationTypes.put(annotation.getName().toLowerCase(), annotation);
        }
        Annotation annotation = (Annotation)annotationTypes.get("purge");
        if (annotation != null) {
            if (annotation.getElement("enable") != null) {
                String purgeEnable = annotation.getElement("enable");
                if (!"true".equalsIgnoreCase(purgeEnable) && !"false".equalsIgnoreCase(purgeEnable)) {
                    throw new SiddhiAppCreationException("Invalid value for enable: " + purgeEnable + ". Please use true or false");
                }
                this.purgingEnabled = Boolean.parseBoolean(purgeEnable);
            }
            if (this.purgingEnabled) {
                List retentions;
                if (annotation.getElement("interval") != null) {
                    String interval = annotation.getElement("interval");
                    this.purgeExecutionInterval = Expression.Time.timeToLong((String)interval);
                }
                if ((retentions = annotation.getAnnotations("retentionPeriod")) != null && !retentions.isEmpty()) {
                    Annotation retention = (Annotation)retentions.get(0);
                    List elements = retention.getElements();
                    for (Element element : elements) {
                        TimePeriod.Duration duration = Expression.Time.normalizeDuration((String)element.getKey());
                        if (!aggregationTables.keySet().contains(duration)) {
                            throw new SiddhiAppCreationException(duration + " granularity cannot be purged since aggregation has not performed in " + duration + " granularity");
                        }
                        if (element.getValue().equalsIgnoreCase(RETAIN_ALL_VALUES)) {
                            this.retentionPeriods.put(duration, RETAIN_ALL);
                            continue;
                        }
                        if (Expression.Time.timeToLong((String)element.getValue()) >= this.minimumDurationMap.get(duration)) {
                            this.retentionPeriods.put(duration, Expression.Time.timeToLong((String)element.getValue()));
                            continue;
                        }
                        throw new SiddhiAppCreationException(duration + " granularity cannot be purge with a retention of '" + element.getValue() + "', minimum retention should be greater  than " + TimeUnit.MILLISECONDS.toMinutes(this.minimumDurationMap.get(duration)) + " minutes");
                    }
                }
            }
        }
        this.compiledConditionsHolder = this.createCompileConditions(aggregationTables, this.tableMap);
    }

    public boolean isPurgingEnabled() {
        return this.purgingEnabled;
    }

    public void setPurgingEnabled(boolean purgingEnabled) {
        this.purgingEnabled = purgingEnabled;
    }

    @Override
    public void run() {
        long currentTime = System.currentTimeMillis();
        Object[] purgeTimeArray = new Object[1];
        for (Map.Entry<TimePeriod.Duration, Table> entry : this.aggregationTables.entrySet()) {
            if (this.retentionPeriods.get(entry.getKey()).equals(RETAIN_ALL)) continue;
            this.eventChunk.clear();
            long purgeTime = currentTime - this.retentionPeriods.get(entry.getKey());
            purgeTimeArray[0] = purgeTime;
            StateEvent secEvent = this.createStreamEvent(purgeTimeArray, currentTime);
            this.eventChunk.add(secEvent);
            Table table = this.aggregationTables.get(entry.getKey());
            try {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Purging data of table: " + table.getTableDefinition().getId() + " with a retention of timestamp : " + purgeTime));
                }
                table.deleteEvents(this.eventChunk, this.compiledConditionsHolder.get(entry.getKey()), 1);
            }
            catch (RuntimeException e) {
                LOG.error((Object)("Exception occurred while deleting events from " + table.getTableDefinition().getId() + " table"), (Throwable)e);
                throw new DataPurgingException("Exception occurred while deleting events from " + table.getTableDefinition().getId() + " table", e);
            }
        }
    }

    private MatchingMetaInfoHolder matchingMetaInfoHolder(Table table, Attribute attribute) {
        MetaStateEvent metaStateEvent = new MetaStateEvent(2);
        MetaStreamEvent metaStreamEventWithDeletePara = new MetaStreamEvent();
        MetaStreamEvent metaStreamEventForTable = new MetaStreamEvent();
        TableDefinition deleteTableDefinition = TableDefinition.id((String)"");
        deleteTableDefinition.attribute(attribute.getName(), attribute.getType());
        metaStreamEventWithDeletePara.setEventType(MetaStreamEvent.EventType.TABLE);
        metaStreamEventWithDeletePara.addOutputData(attribute);
        metaStreamEventWithDeletePara.addInputDefinition((AbstractDefinition)deleteTableDefinition);
        metaStreamEventForTable.setEventType(MetaStreamEvent.EventType.TABLE);
        for (Attribute attributes : table.getTableDefinition().getAttributeList()) {
            metaStreamEventForTable.addOutputData(attributes);
        }
        metaStreamEventForTable.addInputDefinition((AbstractDefinition)table.getTableDefinition());
        metaStateEvent.addEvent(metaStreamEventWithDeletePara);
        metaStateEvent.addEvent(metaStreamEventForTable);
        TableDefinition definition = table.getTableDefinition();
        return new MatchingMetaInfoHolder(metaStateEvent, 0, 1, (AbstractDefinition)deleteTableDefinition, (AbstractDefinition)definition, 0);
    }

    private Map<TimePeriod.Duration, CompiledCondition> createCompileConditions(Map<TimePeriod.Duration, Table> aggregationTables, Map<String, Table> tableMap) {
        EnumMap<TimePeriod.Duration, CompiledCondition> compiledConditionMap = new EnumMap<TimePeriod.Duration, CompiledCondition>(TimePeriod.Duration.class);
        for (Map.Entry<TimePeriod.Duration, Table> entry : aggregationTables.entrySet()) {
            if (this.retentionPeriods.get(entry.getKey()).equals(RETAIN_ALL)) continue;
            Table table = aggregationTables.get(entry.getKey());
            Variable leftVariable = new Variable(this.purgingTimestampField);
            leftVariable.setStreamId(entry.getValue().getTableDefinition().getId());
            Compare expression = new Compare((Expression)leftVariable, Compare.Operator.LESS_THAN, (Expression)new Variable(this.purgingTimestampField));
            CompiledCondition compiledCondition = table.compileCondition((Expression)expression, this.matchingMetaInfoHolder(table, this.aggregatedTimestampAttribute), this.variableExpressionExecutorList, tableMap, this.siddhiQueryContext);
            compiledConditionMap.put(entry.getKey(), compiledCondition);
        }
        return compiledConditionMap;
    }

    public void executeIncrementalDataPurging() {
        StringBuilder tableNames = new StringBuilder();
        if (this.isPurgingEnabled()) {
            if (this.scheduledPurgingTaskStatus != null) {
                this.scheduledPurgingTaskStatus.cancel(true);
                this.scheduledPurgingTaskStatus = this.siddhiQueryContext.getSiddhiAppContext().getScheduledExecutorService().scheduleWithFixedDelay(this, this.purgeExecutionInterval, this.purgeExecutionInterval, TimeUnit.MILLISECONDS);
            } else {
                this.scheduledPurgingTaskStatus = this.siddhiQueryContext.getSiddhiAppContext().getScheduledExecutorService().scheduleWithFixedDelay(this, this.purgeExecutionInterval, this.purgeExecutionInterval, TimeUnit.MILLISECONDS);
            }
            for (Map.Entry<TimePeriod.Duration, Long> entry : this.retentionPeriods.entrySet()) {
                if (this.retentionPeriods.get(entry.getKey()).equals(RETAIN_ALL)) continue;
                tableNames.append(entry.getKey()).append(",");
            }
            LOG.info((Object)("Data purging has enabled for tables: " + tableNames + " with an interval of " + this.purgeExecutionInterval / 1000L + " seconds in " + this.aggregationDefinition.getId() + " aggregation"));
        } else if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Purging is disabled in siddhi app: " + this.siddhiQueryContext.getSiddhiAppContext().getName()));
        }
    }

    private StateEvent createStreamEvent(Object[] values, Long timestamp) {
        StreamEvent streamEvent = this.streamEventPool.borrowEvent();
        streamEvent.setTimestamp(timestamp);
        streamEvent.setOutputData(values);
        StateEvent stateEvent = new StateEvent(2, 1);
        stateEvent.addEvent(0, streamEvent);
        return stateEvent;
    }
}

