package org.wso2.siddhi.core.aggregation;

import java.time.Instant;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.MetaStreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventPool;
import org.wso2.siddhi.core.exception.SiddhiAppRuntimeException;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.selector.GroupByKeyGenerator;
import org.wso2.siddhi.core.query.selector.attribute.processor.executor.GroupByAggregationAttributeExecutor;
import org.wso2.siddhi.core.table.Table;
import org.wso2.siddhi.core.util.IncrementalTimeConverterUtil;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.core.util.snapshot.Snapshotable;
import org.wso2.siddhi.query.api.aggregation.TimePeriod;

/* loaded from: input_file:org/wso2/siddhi/core/aggregation/IncrementalExecutor.class */
public class IncrementalExecutor implements Executor, Snapshotable {
    private static final Logger LOG = Logger.getLogger(IncrementalExecutor.class);
    private final StreamEvent resetEvent;
    private final ExpressionExecutor timestampExpressionExecutor;
    private final ExpressionExecutor timeZoneExpressionExecutor;
    private TimePeriod.Duration duration;
    private Table table;
    private GroupByKeyGenerator groupByKeyGenerator;
    private int bufferSize;
    private boolean ignoreEventsOlderThanBuffer;
    private StreamEventPool streamEventPool;
    private boolean isProcessingOnExternalTime;
    private boolean isGroupBy;
    private Executor next;
    private Scheduler scheduler;
    private boolean isRoot;
    private long millisecondsPerDuration;
    private boolean eventOlderThanBuffer;
    private int maxTimestampPosition;
    private long maxTimestampInBuffer;
    private Semaphore mutex;
    private String elementId;
    private BaseIncrementalValueStore baseIncrementalValueStore;
    private Map<String, BaseIncrementalValueStore> baseIncrementalValueStoreMap;
    private ArrayList<BaseIncrementalValueStore> baseIncrementalValueStoreList;
    private ArrayList<HashMap<String, BaseIncrementalValueStore>> baseIncrementalValueGroupByStoreList;
    private long nextEmitTime = -1;
    private int currentBufferIndex = -1;
    private long startTimeOfAggregates = -1;
    private boolean timerStarted = false;
    private int countEventsGreaterThanCurrentMax = 0;
    private boolean isRootAndLoadedFromTable = false;

    public IncrementalExecutor(TimePeriod.Duration duration, List<ExpressionExecutor> list, GroupByKeyGenerator groupByKeyGenerator, MetaStreamEvent metaStreamEvent, int i, boolean z, IncrementalExecutor incrementalExecutor, boolean z2, Table table, boolean z3, SiddhiAppContext siddhiAppContext, String str) {
        this.baseIncrementalValueStore = null;
        this.baseIncrementalValueStoreMap = null;
        this.baseIncrementalValueStoreList = null;
        this.baseIncrementalValueGroupByStoreList = null;
        this.duration = duration;
        this.next = incrementalExecutor;
        this.isRoot = z2;
        this.table = table;
        this.bufferSize = i;
        this.ignoreEventsOlderThanBuffer = z;
        this.streamEventPool = new StreamEventPool(metaStreamEvent, 10);
        this.isProcessingOnExternalTime = z3;
        this.timestampExpressionExecutor = list.remove(0);
        this.timeZoneExpressionExecutor = list.get(0);
        this.baseIncrementalValueStore = new BaseIncrementalValueStore(-1L, list, this.streamEventPool, siddhiAppContext, str);
        if (groupByKeyGenerator != null) {
            this.groupByKeyGenerator = groupByKeyGenerator;
            this.isGroupBy = true;
            if (i <= 0 || !z2) {
                this.baseIncrementalValueStoreMap = new HashMap();
            } else {
                this.baseIncrementalValueGroupByStoreList = new ArrayList<>(i + 1);
                for (int i2 = 0; i2 < i + 1; i2++) {
                    this.baseIncrementalValueGroupByStoreList.add(new HashMap<>());
                }
                this.millisecondsPerDuration = IncrementalTimeConverterUtil.getMillisecondsPerDuration(duration);
            }
        } else {
            this.isGroupBy = false;
            if (i > 0 && z2) {
                this.baseIncrementalValueStoreList = new ArrayList<>(i + 1);
                for (int i3 = 0; i3 < i + 1; i3++) {
                    this.baseIncrementalValueStoreList.add(this.baseIncrementalValueStore.cloneStore(null, -1L));
                }
                this.millisecondsPerDuration = IncrementalTimeConverterUtil.getMillisecondsPerDuration(duration);
            }
        }
        this.resetEvent = this.streamEventPool.borrowEvent();
        this.resetEvent.setType(ComplexEvent.Type.RESET);
        setNextExecutor(incrementalExecutor);
        this.mutex = new Semaphore(1);
        if (this.elementId == null) {
            this.elementId = "IncrementalExecutor-" + siddhiAppContext.getElementIdGenerator().createNewId();
        }
        siddhiAppContext.getSnapshotService().addSnapshotable(str, this);
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override // org.wso2.siddhi.core.aggregation.Executor
    public void execute(ComplexEventChunk complexEventChunk) {
        LOG.debug("Event Chunk received by " + this.duration + " incremental executor: " + complexEventChunk.toString());
        complexEventChunk.reset();
        while (complexEventChunk.hasNext()) {
            StreamEvent streamEvent = (StreamEvent) complexEventChunk.next();
            complexEventChunk.remove();
            String timeZone = getTimeZone(streamEvent);
            long timestamp = getTimestamp(streamEvent, timeZone);
            this.startTimeOfAggregates = IncrementalTimeConverterUtil.getStartTimeOfAggregates(timestamp, this.duration, timeZone);
            if (this.isRootAndLoadedFromTable) {
                if (timestamp < this.nextEmitTime) {
                    continue;
                } else {
                    this.isRootAndLoadedFromTable = false;
                }
            }
            if (this.bufferSize <= 0 || !this.isRoot) {
                if (timestamp >= this.nextEmitTime) {
                    this.nextEmitTime = IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, timeZone);
                    dispatchAggregateEvents(this.startTimeOfAggregates);
                    if (!this.isProcessingOnExternalTime) {
                        sendTimerEvent(timeZone);
                    }
                }
                if (streamEvent.getType() == ComplexEvent.Type.CURRENT) {
                    if (this.nextEmitTime == IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, timeZone)) {
                        processAggregates(streamEvent);
                    } else if (!this.ignoreEventsOlderThanBuffer) {
                        processAggregates(streamEvent);
                    }
                }
            } else {
                try {
                    try {
                        this.mutex.acquire();
                        dispatchBufferedAggregateEvents(this.startTimeOfAggregates);
                        this.mutex.release();
                        if (streamEvent.getType() == ComplexEvent.Type.CURRENT) {
                            if (!this.eventOlderThanBuffer) {
                                processAggregates(streamEvent);
                            } else if (!this.ignoreEventsOlderThanBuffer) {
                                processAggregates(streamEvent);
                            }
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new SiddhiAppRuntimeException("Error when dispatching events from buffer", e);
                    }
                } catch (Throwable th) {
                    this.mutex.release();
                    throw th;
                }
            }
        }
    }

    private void sendTimerEvent(String str) {
        if (getNextExecutor() != null) {
            StreamEvent borrowEvent = this.streamEventPool.borrowEvent();
            borrowEvent.setType(ComplexEvent.Type.TIMER);
            borrowEvent.setTimestamp(IncrementalTimeConverterUtil.getPreviousStartTime(this.startTimeOfAggregates, this.duration, str));
            ComplexEventChunk complexEventChunk = new ComplexEventChunk(true);
            complexEventChunk.add(borrowEvent);
            this.next.execute(complexEventChunk);
        }
    }

    private long getTimestamp(StreamEvent streamEvent, String str) {
        long timestamp;
        if (streamEvent.getType() == ComplexEvent.Type.CURRENT) {
            timestamp = ((Long) this.timestampExpressionExecutor.execute(streamEvent)).longValue();
            if (this.isRoot && !this.isProcessingOnExternalTime && !this.timerStarted) {
                this.scheduler.notifyAt(IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, str));
                this.timerStarted = true;
            }
        } else {
            timestamp = streamEvent.getTimestamp();
            if (this.isRoot) {
                this.scheduler.notifyAt(IncrementalTimeConverterUtil.getNextEmitTime(timestamp, this.duration, str));
            }
        }
        return timestamp;
    }

    private String getTimeZone(StreamEvent streamEvent) {
        return streamEvent.getType() == ComplexEvent.Type.CURRENT ? this.timeZoneExpressionExecutor.execute(streamEvent).toString() : ZoneOffset.systemDefault().getRules().getOffset(Instant.now()).getId();
    }

    @Override // org.wso2.siddhi.core.aggregation.Executor
    public Executor getNextExecutor() {
        return this.next;
    }

    @Override // org.wso2.siddhi.core.aggregation.Executor
    public void setNextExecutor(Executor executor) {
        this.next = executor;
    }

    private void processAggregates(StreamEvent streamEvent) {
        synchronized (this) {
            if (this.isGroupBy) {
                try {
                    String constructEventKey = this.groupByKeyGenerator.constructEventKey(streamEvent);
                    GroupByAggregationAttributeExecutor.getKeyThreadLocal().set(constructEventKey);
                    if (this.baseIncrementalValueGroupByStoreList != null) {
                        process(streamEvent, this.baseIncrementalValueGroupByStoreList.get(this.currentBufferIndex).computeIfAbsent(constructEventKey, str -> {
                            return this.baseIncrementalValueStore.cloneStore(str, this.startTimeOfAggregates);
                        }));
                    } else {
                        process(streamEvent, this.baseIncrementalValueStoreMap.computeIfAbsent(constructEventKey, str2 -> {
                            return this.baseIncrementalValueStore.cloneStore(str2, this.startTimeOfAggregates);
                        }));
                    }
                    GroupByAggregationAttributeExecutor.getKeyThreadLocal().remove();
                } catch (Throwable th) {
                    GroupByAggregationAttributeExecutor.getKeyThreadLocal().remove();
                    throw th;
                }
            } else if (this.baseIncrementalValueStoreList != null) {
                BaseIncrementalValueStore baseIncrementalValueStore = this.baseIncrementalValueStoreList.get(this.currentBufferIndex);
                if (!baseIncrementalValueStore.isProcessed()) {
                    baseIncrementalValueStore.setTimestamp(this.startTimeOfAggregates);
                }
                process(streamEvent, baseIncrementalValueStore);
            } else {
                process(streamEvent, this.baseIncrementalValueStore);
            }
        }
    }

    private void process(StreamEvent streamEvent, BaseIncrementalValueStore baseIncrementalValueStore) {
        List<ExpressionExecutor> expressionExecutors = baseIncrementalValueStore.getExpressionExecutors();
        for (int i = 0; i < expressionExecutors.size(); i++) {
            baseIncrementalValueStore.setValue(expressionExecutors.get(i).execute(streamEvent), i + 1);
        }
        baseIncrementalValueStore.setProcessed(true);
    }

    private void dispatchAggregateEvents(long j) {
        if (this.isGroupBy) {
            dispatchEvents(this.baseIncrementalValueStoreMap);
        } else {
            dispatchEvent(j, this.baseIncrementalValueStore);
        }
    }

    private void dispatchBufferedAggregateEvents(long j) {
        if (this.currentBufferIndex == -1) {
            this.countEventsGreaterThanCurrentMax++;
            this.maxTimestampPosition = 0;
            this.maxTimestampInBuffer = j;
            this.currentBufferIndex = 0;
            this.eventOlderThanBuffer = false;
            return;
        }
        if (j <= this.maxTimestampInBuffer) {
            if (((int) ((this.maxTimestampInBuffer - j) / this.millisecondsPerDuration)) > this.bufferSize) {
                if (this.countEventsGreaterThanCurrentMax <= this.bufferSize + 1) {
                    this.currentBufferIndex = 0;
                } else {
                    this.currentBufferIndex = (this.maxTimestampPosition + 1) % (this.bufferSize + 1);
                }
                this.eventOlderThanBuffer = true;
                return;
            }
            int i = this.maxTimestampPosition - ((int) ((this.maxTimestampInBuffer - j) / this.millisecondsPerDuration));
            if (i >= 0) {
                this.currentBufferIndex = i;
            } else {
                this.currentBufferIndex = this.bufferSize + 1 + i;
            }
            this.eventOlderThanBuffer = false;
            return;
        }
        this.countEventsGreaterThanCurrentMax++;
        if ((j - this.maxTimestampInBuffer) / this.millisecondsPerDuration >= this.bufferSize + 1) {
            if (this.isGroupBy) {
                for (int i2 = 0; i2 <= this.bufferSize; i2++) {
                    dispatchEvents(this.baseIncrementalValueGroupByStoreList.get(i2));
                }
            } else {
                for (int i3 = 0; i3 <= this.bufferSize; i3++) {
                    dispatchEvent(j, this.baseIncrementalValueStoreList.get(i3));
                }
            }
            this.currentBufferIndex = (int) (((j - this.maxTimestampInBuffer) / this.millisecondsPerDuration) % (this.bufferSize + 1));
            this.maxTimestampPosition = this.currentBufferIndex;
        } else {
            int i4 = (this.maxTimestampPosition + ((int) ((j - this.maxTimestampInBuffer) / this.millisecondsPerDuration))) % (this.bufferSize + 1);
            if (!this.isGroupBy) {
                for (int i5 = 0; i5 <= i4; i5++) {
                    BaseIncrementalValueStore baseIncrementalValueStore = this.baseIncrementalValueStoreList.get(i5);
                    if (baseIncrementalValueStore.isProcessed() && baseIncrementalValueStore.getTimestamp() <= this.baseIncrementalValueStoreList.get(i4).getTimestamp()) {
                        dispatchEvent(j, this.baseIncrementalValueStoreList.get(i5));
                    }
                }
            } else if (this.baseIncrementalValueGroupByStoreList.get(i4).size() > 0) {
                for (int i6 = 0; i6 <= i4; i6++) {
                    dispatchEvents(this.baseIncrementalValueGroupByStoreList.get(i6));
                }
            }
            this.currentBufferIndex = i4;
            this.maxTimestampPosition = i4;
        }
        this.maxTimestampInBuffer = j;
        this.eventOlderThanBuffer = false;
    }

    private void dispatchEvent(long j, BaseIncrementalValueStore baseIncrementalValueStore) {
        if (baseIncrementalValueStore.isProcessed()) {
            StreamEvent createStreamEvent = baseIncrementalValueStore.createStreamEvent();
            ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>(true);
            complexEventChunk.add(createStreamEvent);
            LOG.debug("Event dispatched by " + this.duration + " incremental executor: " + complexEventChunk.toString());
            this.table.addEvents(complexEventChunk, 1);
            this.next.execute(complexEventChunk);
        }
        cleanBaseIncrementalValueStore(j, baseIncrementalValueStore);
    }

    private void dispatchEvents(Map<String, BaseIncrementalValueStore> map) {
        int size = map.size();
        if (size > 0) {
            ComplexEventChunk<StreamEvent> complexEventChunk = new ComplexEventChunk<>(true);
            Iterator<BaseIncrementalValueStore> it = map.values().iterator();
            while (it.hasNext()) {
                complexEventChunk.add(it.next().createStreamEvent());
            }
            LOG.debug("Event dispatched by " + this.duration + " incremental executor: " + complexEventChunk.toString());
            this.table.addEvents(complexEventChunk, size);
            this.next.execute(complexEventChunk);
        }
        map.clear();
    }

    private void cleanBaseIncrementalValueStore(long j, BaseIncrementalValueStore baseIncrementalValueStore) {
        baseIncrementalValueStore.clearValues();
        baseIncrementalValueStore.setTimestamp(j);
        baseIncrementalValueStore.setProcessed(false);
        Iterator<ExpressionExecutor> it = baseIncrementalValueStore.getExpressionExecutors().iterator();
        while (it.hasNext()) {
            it.next().execute(this.resetEvent);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ArrayList<HashMap<String, BaseIncrementalValueStore>> getBaseIncrementalValueGroupByStoreList() {
        return this.baseIncrementalValueGroupByStoreList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, BaseIncrementalValueStore> getBaseIncrementalValueStoreMap() {
        return this.baseIncrementalValueStoreMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ArrayList<BaseIncrementalValueStore> getBaseIncrementalValueStoreList() {
        return this.baseIncrementalValueStoreList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BaseIncrementalValueStore getBaseIncrementalValueStore() {
        return this.baseIncrementalValueStore;
    }

    public long getOldestEventTimestamp() {
        if (this.bufferSize <= 0 || !this.isRoot) {
            if (this.isGroupBy) {
                if (this.baseIncrementalValueStoreMap.size() != 0) {
                    return ((BaseIncrementalValueStore) this.baseIncrementalValueStoreMap.values().toArray()[0]).getTimestamp();
                }
                return -1L;
            }
            if (this.baseIncrementalValueStore.isProcessed()) {
                return this.baseIncrementalValueStore.getTimestamp();
            }
            return -1L;
        }
        try {
            try {
                this.mutex.acquire();
                if (this.currentBufferIndex == -1) {
                    return -1L;
                }
                long j = this.maxTimestampInBuffer - (this.bufferSize * this.millisecondsPerDuration);
                this.mutex.release();
                return j;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new SiddhiAppRuntimeException("Error when getting the oldest in-memory event timestamp", e);
            }
        } finally {
            this.mutex.release();
        }
    }

    public long getNewestEventTimestamp() {
        if (this.bufferSize > 0) {
            try {
                if (this.isRoot) {
                    try {
                        this.mutex.acquire();
                        if (this.currentBufferIndex == -1) {
                            return -1L;
                        }
                        long j = this.maxTimestampInBuffer;
                        this.mutex.release();
                        return j;
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new SiddhiAppRuntimeException("Error when getting the newest in-memory event timestamp", e);
                    }
                }
            } finally {
                this.mutex.release();
            }
        }
        if (this.isGroupBy) {
            if (this.baseIncrementalValueStoreMap.size() != 0) {
                return ((BaseIncrementalValueStore) this.baseIncrementalValueStoreMap.values().toArray()[0]).getTimestamp();
            }
            return -1L;
        }
        if (this.baseIncrementalValueStore.isProcessed()) {
            return this.baseIncrementalValueStore.getTimestamp();
        }
        return -1L;
    }

    public long getNextEmitTime() {
        return this.nextEmitTime;
    }

    public void setValuesForInMemoryRecreateFromTable(boolean z, long j) {
        this.isRootAndLoadedFromTable = z;
        this.nextEmitTime = j;
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put("NextEmitTime", Long.valueOf(this.nextEmitTime));
        hashMap.put("CurrentBufferIndex", Integer.valueOf(this.currentBufferIndex));
        hashMap.put("StartTimeOfAggregates", Long.valueOf(this.startTimeOfAggregates));
        hashMap.put("TimerStarted", Boolean.valueOf(this.timerStarted));
        hashMap.put("EventOlderThanBuffer", Boolean.valueOf(this.eventOlderThanBuffer));
        hashMap.put("CountEventsGreaterThanCurrentMax", Integer.valueOf(this.countEventsGreaterThanCurrentMax));
        hashMap.put("MaxTimestampPosition", Integer.valueOf(this.maxTimestampPosition));
        hashMap.put("MaxTimestampInBuffer", Long.valueOf(this.maxTimestampInBuffer));
        hashMap.put("IsRootAndLoadedFromTable", Boolean.valueOf(this.isRootAndLoadedFromTable));
        hashMap.put("Mutex", this.mutex);
        return hashMap;
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public void restoreState(Map<String, Object> map) {
        this.nextEmitTime = ((Long) map.get("NextEmitTime")).longValue();
        this.currentBufferIndex = ((Integer) map.get("CurrentBufferIndex")).intValue();
        this.startTimeOfAggregates = ((Long) map.get("StartTimeOfAggregates")).longValue();
        this.timerStarted = ((Boolean) map.get("TimerStarted")).booleanValue();
        this.eventOlderThanBuffer = ((Boolean) map.get("EventOlderThanBuffer")).booleanValue();
        this.countEventsGreaterThanCurrentMax = ((Integer) map.get("CountEventsGreaterThanCurrentMax")).intValue();
        this.maxTimestampPosition = ((Integer) map.get("MaxTimestampPosition")).intValue();
        this.maxTimestampInBuffer = ((Long) map.get("MaxTimestampInBuffer")).longValue();
        this.isRootAndLoadedFromTable = ((Boolean) map.get("IsRootAndLoadedFromTable")).booleanValue();
        this.mutex = (Semaphore) map.get("Mutex");
    }

    @Override // org.wso2.siddhi.core.util.snapshot.Snapshotable
    public String getElementId() {
        return this.elementId;
    }
}
