package io.siddhi.core.aggregation;

import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.Event;
import io.siddhi.core.event.state.MetaStateEvent;
import io.siddhi.core.event.state.StateEvent;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventFactory;
import io.siddhi.core.exception.DataPurgingException;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.table.Table;
import io.siddhi.core.util.IncrementalTimeConverterUtil;
import io.siddhi.core.util.SiddhiConstants;
import io.siddhi.core.util.collection.operator.CompiledCondition;
import io.siddhi.core.util.collection.operator.MatchingMetaInfoHolder;
import io.siddhi.core.util.parser.OnDemandQueryParser;
import io.siddhi.core.window.Window;
import io.siddhi.query.api.aggregation.TimePeriod;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.annotation.Element;
import io.siddhi.query.api.definition.AggregationDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.TableDefinition;
import io.siddhi.query.api.execution.query.OnDemandQuery;
import io.siddhi.query.api.execution.query.input.store.InputStore;
import io.siddhi.query.api.execution.query.selection.OutputAttribute;
import io.siddhi.query.api.execution.query.selection.Selector;
import io.siddhi.query.api.expression.Expression;
import io.siddhi.query.api.expression.Variable;
import io.siddhi.query.api.expression.condition.Compare;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.wso2.carbon.databridge.agent.util.DataEndpointConstants;

/* JADX WARN: Classes with same name are omitted:
  input_file:dependencies/siddhi-core-5.1.19.jar:io/siddhi/core/aggregation/IncrementalDataPurger.class
 */
/* loaded from: input_file:io/siddhi/core/aggregation/IncrementalDataPurger.class */
public class IncrementalDataPurger implements Runnable {
    private static final Logger LOG = Logger.getLogger(IncrementalDataPurger.class);
    private static final Long RETAIN_ALL = -1L;
    private static final String RETAIN_ALL_VALUES = "all";
    private static final String AGGREGATION_START_TIME = "aggregationStartTime";
    private static final String AGGREGATION_NEXT_EMIT_TIME = "nextEmitTime";
    private static final String IS_DATA_AVAILABLE_TO_PURGE = "isDataAvailableToPurge";
    private static final String IS_PARENT_TABLE_HAS_AGGREGATED_DATA = "isParentTableHasAggregatedData";
    private StreamEventFactory streamEventFactory;
    private Map<TimePeriod.Duration, Table> aggregationTables;
    private SiddhiQueryContext siddhiQueryContext;
    private ScheduledFuture scheduledPurgingTaskStatus;
    private String purgingTimestampField;
    private Attribute aggregatedTimestampAttribute;
    private AggregationDefinition aggregationDefinition;
    private List<TimePeriod.Duration> activeIncrementalDurations;
    private String timeZone;
    private Map<String, Window> windowMap;
    private Map<String, AggregationRuntime> aggregationMap;
    private String errorMessage;
    private long purgeExecutionInterval = Expression.Time.minute(15).value();
    private boolean purgingEnabled = true;
    private Map<TimePeriod.Duration, Long> retentionPeriods = new EnumMap(TimePeriod.Duration.class);
    private Map<TimePeriod.Duration, Long> minimumDurationMap = new EnumMap(TimePeriod.Duration.class);
    private ComplexEventChunk<StateEvent> eventChunk = new ComplexEventChunk<>();
    private List<VariableExpressionExecutor> variableExpressionExecutorList = new ArrayList();
    private Map<TimePeriod.Duration, CompiledCondition> compiledConditionsHolder = new EnumMap(TimePeriod.Duration.class);
    private Map<String, Table> tableMap = new HashMap();
    private boolean purgingHalted = false;

    public void init(AggregationDefinition aggregationDefinition, StreamEventFactory streamEventFactory, Map<TimePeriod.Duration, Table> map, Boolean bool, SiddhiQueryContext siddhiQueryContext, List<TimePeriod.Duration> list, String str, Map<String, Window> map2, Map<String, AggregationRuntime> map3) {
        this.siddhiQueryContext = siddhiQueryContext;
        this.aggregationDefinition = aggregationDefinition;
        List<Annotation> annotations = aggregationDefinition.getAnnotations();
        this.streamEventFactory = streamEventFactory;
        this.aggregationTables = map;
        this.activeIncrementalDurations = list;
        this.windowMap = map2;
        this.aggregationMap = map3;
        if (bool.booleanValue()) {
            this.purgingTimestampField = SiddhiConstants.AGG_EXTERNAL_TIMESTAMP_COL;
        } else {
            this.purgingTimestampField = SiddhiConstants.AGG_START_TIMESTAMP_COL;
        }
        this.aggregatedTimestampAttribute = new Attribute(this.purgingTimestampField, Attribute.Type.LONG);
        this.variableExpressionExecutorList.add(new VariableExpressionExecutor(this.aggregatedTimestampAttribute, 0, 1));
        for (Map.Entry<TimePeriod.Duration, Table> entry : map.entrySet()) {
            this.tableMap.put(entry.getValue().getTableDefinition().getId(), entry.getValue());
            switch (entry.getKey()) {
                case SECONDS:
                    this.retentionPeriods.put(entry.getKey(), Long.valueOf(Expression.Time.sec(120).value()));
                    this.minimumDurationMap.put(entry.getKey(), Long.valueOf(Expression.Time.sec(120).value()));
                    break;
                case MINUTES:
                    this.retentionPeriods.put(entry.getKey(), Long.valueOf(Expression.Time.hour(24).value()));
                    this.minimumDurationMap.put(entry.getKey(), Long.valueOf(Expression.Time.minute(120).value()));
                    break;
                case HOURS:
                    this.retentionPeriods.put(entry.getKey(), Long.valueOf(Expression.Time.day(30).value()));
                    this.minimumDurationMap.put(entry.getKey(), Long.valueOf(Expression.Time.hour(25).value()));
                    break;
                case DAYS:
                    this.retentionPeriods.put(entry.getKey(), Long.valueOf(Expression.Time.year(1).value()));
                    this.minimumDurationMap.put(entry.getKey(), Long.valueOf(Expression.Time.day(32).value()));
                    break;
                case MONTHS:
                    this.retentionPeriods.put(entry.getKey(), RETAIN_ALL);
                    this.minimumDurationMap.put(entry.getKey(), Long.valueOf(Expression.Time.month(13).value()));
                    break;
                case YEARS:
                    this.retentionPeriods.put(entry.getKey(), RETAIN_ALL);
                    this.minimumDurationMap.put(entry.getKey(), 0L);
                    break;
            }
        }
        this.timeZone = str;
        HashMap hashMap = new HashMap();
        for (Annotation annotation : annotations) {
            hashMap.put(annotation.getName().toLowerCase(), annotation);
        }
        Annotation annotation2 = (Annotation) hashMap.get(SiddhiConstants.NAMESPACE_PURGE);
        if (annotation2 != null) {
            if (annotation2.getElement(SiddhiConstants.ANNOTATION_ELEMENT_ENABLE) != null) {
                String element = annotation2.getElement(SiddhiConstants.ANNOTATION_ELEMENT_ENABLE);
                if (!"true".equalsIgnoreCase(element) && !"false".equalsIgnoreCase(element)) {
                    throw new SiddhiAppCreationException("Invalid value for enable: " + element + ". Please use true or false");
                }
                this.purgingEnabled = Boolean.parseBoolean(element);
            }
            if (this.purgingEnabled) {
                if (annotation2.getElement(SiddhiConstants.ANNOTATION_ELEMENT_INTERVAL) != null) {
                    this.purgeExecutionInterval = Expression.Time.timeToLong(annotation2.getElement(SiddhiConstants.ANNOTATION_ELEMENT_INTERVAL)).longValue();
                }
                List<Annotation> annotations2 = annotation2.getAnnotations(SiddhiConstants.NAMESPACE_RETENTION_PERIOD);
                if (annotations2 != null && !annotations2.isEmpty()) {
                    for (Element element2 : annotations2.get(0).getElements()) {
                        TimePeriod.Duration normalizeDuration = Expression.Time.normalizeDuration(element2.getKey());
                        if (!list.contains(normalizeDuration)) {
                            throw new SiddhiAppCreationException(normalizeDuration + " granularity cannot be purged since aggregation has not performed in " + normalizeDuration + " granularity");
                        }
                        if (element2.getValue().equalsIgnoreCase(RETAIN_ALL_VALUES)) {
                            this.retentionPeriods.put(normalizeDuration, RETAIN_ALL);
                        } else {
                            if (Expression.Time.timeToLong(element2.getValue()).longValue() < this.minimumDurationMap.get(normalizeDuration).longValue()) {
                                throw new SiddhiAppCreationException(normalizeDuration + " granularity cannot be purge with a retention of '" + element2.getValue() + "', minimum retention should be greater  than " + TimeUnit.MILLISECONDS.toMinutes(this.minimumDurationMap.get(normalizeDuration).longValue()) + " minutes");
                            }
                            this.retentionPeriods.put(normalizeDuration, Expression.Time.timeToLong(element2.getValue()));
                        }
                    }
                }
            }
        }
        this.compiledConditionsHolder = createCompileConditions(map, this.tableMap);
    }

    public boolean isPurgingEnabled() {
        return this.purgingEnabled;
    }

    public void setPurgingEnabled(boolean z) {
        this.purgingEnabled = z;
    }

    @Override // java.lang.Runnable
    public void run() {
        boolean z = false;
        boolean z2 = false;
        long currentTimeMillis = System.currentTimeMillis();
        Object[] objArr = new Object[1];
        int i = 1;
        if (this.purgingHalted) {
            LOG.error(this.errorMessage);
            return;
        }
        for (TimePeriod.Duration duration : this.activeIncrementalDurations) {
            if (!this.retentionPeriods.get(duration).equals(RETAIN_ALL)) {
                this.eventChunk.clear();
                long longValue = currentTimeMillis - this.retentionPeriods.get(duration).longValue();
                objArr[0] = Long.valueOf(longValue);
                if (this.retentionPeriods.size() > i) {
                    Map<String, Boolean> isSafeToPurgeTheDuration = isSafeToPurgeTheDuration(longValue, this.aggregationTables.get(this.activeIncrementalDurations.get(i)), this.aggregationTables.get(duration), duration, this.timeZone);
                    if (isSafeToPurgeTheDuration.get(IS_DATA_AVAILABLE_TO_PURGE).booleanValue()) {
                        z = true;
                        if (isSafeToPurgeTheDuration.get(IS_PARENT_TABLE_HAS_AGGREGATED_DATA).booleanValue()) {
                            z2 = true;
                        } else {
                            z2 = false;
                            this.purgingHalted = true;
                        }
                    } else {
                        z = false;
                    }
                }
                if (!z) {
                    continue;
                } else {
                    if (!z2) {
                        this.errorMessage = "Purging task halted!!!. Data purging for table: " + this.aggregationTables.get(duration).getTableDefinition().getId() + " with a retention of timestamp : " + longValue + " didn't executed since parent " + this.aggregationTables.get(this.activeIncrementalDurations.get(i)).getTableDefinition().getId() + " table does not contain values of above period. This has to be investigate since this may lead to an aggregation data mismatch";
                        LOG.info(this.errorMessage);
                        return;
                    }
                    this.eventChunk.add(createStreamEvent(objArr, Long.valueOf(currentTimeMillis)));
                    Table table = this.aggregationTables.get(duration);
                    try {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Purging data of table: " + table.getTableDefinition().getId() + " with a retention of timestamp : " + longValue);
                        }
                        table.deleteEvents(this.eventChunk, this.compiledConditionsHolder.get(duration), 1);
                    } catch (RuntimeException e) {
                        LOG.error("Exception occurred while deleting events from " + table.getTableDefinition().getId() + " table", e);
                        throw new DataPurgingException("Exception occurred while deleting events from " + table.getTableDefinition().getId() + " table", e);
                    }
                }
            }
            i++;
        }
    }

    private MatchingMetaInfoHolder matchingMetaInfoHolder(Table table, Attribute attribute) {
        MetaStateEvent metaStateEvent = new MetaStateEvent(2);
        MetaStreamEvent metaStreamEvent = new MetaStreamEvent();
        MetaStreamEvent metaStreamEvent2 = new MetaStreamEvent();
        TableDefinition id = TableDefinition.id("");
        id.attribute(attribute.getName(), attribute.getType());
        metaStreamEvent.setEventType(MetaStreamEvent.EventType.TABLE);
        metaStreamEvent.addOutputData(attribute);
        metaStreamEvent.addInputDefinition(id);
        metaStreamEvent2.setEventType(MetaStreamEvent.EventType.TABLE);
        Iterator<Attribute> it = table.getTableDefinition().getAttributeList().iterator();
        while (it.hasNext()) {
            metaStreamEvent2.addOutputData(it.next());
        }
        metaStreamEvent2.addInputDefinition(table.getTableDefinition());
        metaStateEvent.addEvent(metaStreamEvent);
        metaStateEvent.addEvent(metaStreamEvent2);
        return new MatchingMetaInfoHolder(metaStateEvent, 0, 1, id, table.getTableDefinition(), 0);
    }

    private Map<TimePeriod.Duration, CompiledCondition> createCompileConditions(Map<TimePeriod.Duration, Table> map, Map<String, Table> map2) {
        EnumMap enumMap = new EnumMap(TimePeriod.Duration.class);
        for (Map.Entry<TimePeriod.Duration, Table> entry : map.entrySet()) {
            if (!this.retentionPeriods.get(entry.getKey()).equals(RETAIN_ALL)) {
                Table table = map.get(entry.getKey());
                Variable variable = new Variable(this.purgingTimestampField);
                variable.setStreamId(entry.getValue().getTableDefinition().getId());
                enumMap.put((EnumMap) entry.getKey(), (TimePeriod.Duration) table.compileCondition(new Compare(variable, Compare.Operator.LESS_THAN, new Variable(this.purgingTimestampField)), matchingMetaInfoHolder(table, this.aggregatedTimestampAttribute), this.variableExpressionExecutorList, map2, this.siddhiQueryContext));
            }
        }
        return enumMap;
    }

    public void executeIncrementalDataPurging() {
        StringBuilder sb = new StringBuilder();
        if (!isPurgingEnabled()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Purging is disabled in siddhi app: " + this.siddhiQueryContext.getSiddhiAppContext().getName());
                return;
            }
            return;
        }
        if (this.scheduledPurgingTaskStatus != null) {
            this.scheduledPurgingTaskStatus.cancel(true);
            this.scheduledPurgingTaskStatus = this.siddhiQueryContext.getSiddhiAppContext().getScheduledExecutorService().scheduleWithFixedDelay(this, this.purgeExecutionInterval, this.purgeExecutionInterval, TimeUnit.MILLISECONDS);
        } else {
            this.scheduledPurgingTaskStatus = this.siddhiQueryContext.getSiddhiAppContext().getScheduledExecutorService().scheduleWithFixedDelay(this, this.purgeExecutionInterval, this.purgeExecutionInterval, TimeUnit.MILLISECONDS);
        }
        for (Map.Entry<TimePeriod.Duration, Long> entry : this.retentionPeriods.entrySet()) {
            if (!this.retentionPeriods.get(entry.getKey()).equals(RETAIN_ALL) && this.activeIncrementalDurations.contains(entry.getKey())) {
                sb.append(entry.getKey()).append(DataEndpointConstants.LB_URL_GROUP_SEPARATOR);
            }
        }
        LOG.info("Data purging has enabled for tables: " + ((Object) sb) + " with an interval of " + (this.purgeExecutionInterval / 1000) + " seconds in " + this.aggregationDefinition.getId() + " aggregation");
    }

    private StateEvent createStreamEvent(Object[] objArr, Long l) {
        StreamEvent newInstance = this.streamEventFactory.newInstance();
        newInstance.setTimestamp(l.longValue());
        newInstance.setOutputData(objArr);
        StateEvent stateEvent = new StateEvent(2, 1);
        stateEvent.addEvent(0, newInstance);
        return stateEvent;
    }

    private Map<String, Boolean> isSafeToPurgeTheDuration(long j, Table table, Table table2, TimePeriod.Duration duration, String str) {
        Event[] eventArr = null;
        HashMap hashMap = new HashMap();
        try {
            Event[] dataToDelete = dataToDelete(j, table2);
            if (dataToDelete != null && dataToDelete.length != 0) {
                Map<String, Long> purgingValidationTimeDurations = getPurgingValidationTimeDurations(duration, ((Long) dataToDelete[0].getData()[0]).longValue(), str);
                OnDemandQuery onDemandQuery = getOnDemandQuery(table, purgingValidationTimeDurations.get(AGGREGATION_START_TIME).longValue(), purgingValidationTimeDurations.get(AGGREGATION_NEXT_EMIT_TIME).longValue());
                onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
                eventArr = OnDemandQueryParser.parse(onDemandQuery, null, this.siddhiQueryContext.getSiddhiAppContext(), this.tableMap, this.windowMap, this.aggregationMap).execute();
            }
            hashMap.put(IS_DATA_AVAILABLE_TO_PURGE, Boolean.valueOf(dataToDelete != null && dataToDelete.length > 0));
            hashMap.put(IS_PARENT_TABLE_HAS_AGGREGATED_DATA, Boolean.valueOf(eventArr != null && eventArr.length > 0));
        } catch (Exception e) {
            LOG.error("Error occurred while checking whether the data is safe to purge from aggregation tables for the aggregation " + this.aggregationDefinition.getId(), e);
            hashMap.put(IS_DATA_AVAILABLE_TO_PURGE, false);
            hashMap.put(IS_PARENT_TABLE_HAS_AGGREGATED_DATA, false);
            this.errorMessage = "Error occurred while checking whether the data is safe to purge from aggregation tables for the aggregation " + this.aggregationDefinition.getId();
            this.purgingHalted = true;
        }
        return hashMap;
    }

    private OnDemandQuery getOnDemandQuery(Table table, long j, long j2) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new OutputAttribute(new Variable(SiddhiConstants.AGG_START_TIMESTAMP_COL)));
        return OnDemandQuery.query().from(j2 != 0 ? InputStore.store(table.getTableDefinition().getId()).on(Expression.and(Expression.compare(Expression.variable(SiddhiConstants.AGG_START_TIMESTAMP_COL), Compare.Operator.GREATER_THAN_EQUAL, Expression.value(j)), Expression.compare(Expression.variable(SiddhiConstants.AGG_START_TIMESTAMP_COL), Compare.Operator.LESS_THAN_EQUAL, Expression.value(j2)))) : InputStore.store(table.getTableDefinition().getId()).on(Expression.compare(Expression.variable(SiddhiConstants.AGG_START_TIMESTAMP_COL), Compare.Operator.LESS_THAN_EQUAL, Expression.value(j)))).select(Selector.selector().addSelectionList(arrayList).groupBy(Expression.variable(SiddhiConstants.AGG_START_TIMESTAMP_COL)).limit(Expression.value(1)));
    }

    private Map<String, Long> getPurgingValidationTimeDurations(TimePeriod.Duration duration, long j, String str) {
        HashMap hashMap = new HashMap();
        switch (duration) {
            case SECONDS:
                long startTimeOfAggregates = IncrementalTimeConverterUtil.getStartTimeOfAggregates(j, TimePeriod.Duration.MINUTES, str);
                long nextEmitTime = IncrementalTimeConverterUtil.getNextEmitTime(j, TimePeriod.Duration.MINUTES, str);
                hashMap.put(AGGREGATION_START_TIME, Long.valueOf(startTimeOfAggregates));
                hashMap.put(AGGREGATION_NEXT_EMIT_TIME, Long.valueOf(nextEmitTime));
                return hashMap;
            case MINUTES:
                long startTimeOfAggregates2 = IncrementalTimeConverterUtil.getStartTimeOfAggregates(j, TimePeriod.Duration.HOURS, str);
                long nextEmitTime2 = IncrementalTimeConverterUtil.getNextEmitTime(j, TimePeriod.Duration.HOURS, str);
                hashMap.put(AGGREGATION_START_TIME, Long.valueOf(startTimeOfAggregates2));
                hashMap.put(AGGREGATION_NEXT_EMIT_TIME, Long.valueOf(nextEmitTime2));
                return hashMap;
            case HOURS:
                long startTimeOfAggregates3 = IncrementalTimeConverterUtil.getStartTimeOfAggregates(j, TimePeriod.Duration.DAYS, str);
                long nextEmitTime3 = IncrementalTimeConverterUtil.getNextEmitTime(j, TimePeriod.Duration.DAYS, str);
                hashMap.put(AGGREGATION_START_TIME, Long.valueOf(startTimeOfAggregates3));
                hashMap.put(AGGREGATION_NEXT_EMIT_TIME, Long.valueOf(nextEmitTime3));
                return hashMap;
            case DAYS:
                long startTimeOfAggregates4 = IncrementalTimeConverterUtil.getStartTimeOfAggregates(j, TimePeriod.Duration.MONTHS, str);
                long nextEmitTime4 = IncrementalTimeConverterUtil.getNextEmitTime(j, TimePeriod.Duration.MONTHS, str);
                hashMap.put(AGGREGATION_START_TIME, Long.valueOf(startTimeOfAggregates4));
                hashMap.put(AGGREGATION_NEXT_EMIT_TIME, Long.valueOf(nextEmitTime4));
                return hashMap;
            case MONTHS:
                long startTimeOfAggregates5 = IncrementalTimeConverterUtil.getStartTimeOfAggregates(j, TimePeriod.Duration.YEARS, str);
                long nextEmitTime5 = IncrementalTimeConverterUtil.getNextEmitTime(j, TimePeriod.Duration.YEARS, str);
                hashMap.put(AGGREGATION_START_TIME, Long.valueOf(startTimeOfAggregates5));
                hashMap.put(AGGREGATION_NEXT_EMIT_TIME, Long.valueOf(nextEmitTime5));
                return hashMap;
            default:
                return hashMap;
        }
    }

    Event[] dataToDelete(long j, Table table) {
        OnDemandQuery onDemandQuery = getOnDemandQuery(table, j, 0L);
        onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
        return OnDemandQueryParser.parse(onDemandQuery, null, this.siddhiQueryContext.getSiddhiAppContext(), this.tableMap, this.windowMap, this.aggregationMap).execute();
    }
}
