package org.wso2.extension.siddhi.execution.reorder;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.math3.distribution.NormalDistribution;
import org.wso2.extension.siddhi.execution.reorder.utils.WindowCoverage;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.ReturnAttribute;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.SchedulingProcessor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.siddhi.core.util.Scheduler;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;

@Extension(name = "akslack", namespace = "reorder", description = "This stream processor extension performs reordering of an out-of-order event stream.\n It implements the AQ-K-Slack based out-of-order handling algorithm (originally described in \nhttp://dl.acm.org/citation.cfm?doid=2675743.2771828)", parameters = {@Parameter(name = "timestamp", description = "Attribute used for ordering the events", type = {DataType.LONG}), @Parameter(name = "correlation.field", description = "Corresponds to the data field of which the accuracy directly gets affected by the adaptive operation of the Alpha K-Slack extension. This field is used by the Alpha K-Slack to calculate the runtime window coverage threshold which is an upper limit set for the unsuccessfully handled late arrivals", type = {DataType.DOUBLE}), @Parameter(name = "batch.size", description = "The parameter batch.size denotes the number of events that should be considered in the calculation of an alpha value. batch.size should be a value which should be greater than or equals to 15", defaultValue = "10,000", type = {DataType.LONG}, optional = true), @Parameter(name = "timer.timeout", description = "Corresponds to a fixed time-out value in milliseconds, which is set at the beginning of the process. Once the time-out value expires, the extension drains all the events that are buffered within the reorder extension to outside. The time out has been implemented internally using a timer. The events buffered within the extension are released each time the timer ticks.", defaultValue = "-1 (timeout is infinite)", type = {DataType.LONG}, optional = true), @Parameter(name = "max.k", description = "The maximum threshold value for K parameter in the Alpha K-Slack algorithm", defaultValue = "9,223,372,036,854,775,807 (The maximum Long value)", type = {DataType.LONG}, optional = true), @Parameter(name = "discard.flag", description = "Indicates whether the out-of-order events which appear after the expiration of the Alpha K-slack window should get discarded or not. When this value is set to true, the events would get discarded", defaultValue = "false", type = {DataType.BOOL}, optional = true), @Parameter(name = "error.threshold", description = "Error threshold to be applied in Alpha K-Slack algorithm. This parameter must be defined simultaneously with confidenceLevel", defaultValue = "0.03 (3%)", type = {DataType.DOUBLE}, optional = true), @Parameter(name = "confidence.level", description = "Confidence level to be applied in Alpha K-Slack algorithm. This parameter must be defined simultaneously with errorThreshold", defaultValue = "0.95 (95%)", type = {DataType.DOUBLE}, optional = true)}, returnAttributes = {@ReturnAttribute(name = "beta0", description = "Timestamp based on which the reordering is performed", type = {DataType.LONG}), @ReturnAttribute(name = "beta1", description = "An upper limit value assigned for the unsuccessfully handled late arrivals", type = {DataType.DOUBLE}), @ReturnAttribute(name = "beta2", description = "The number of events that should be considered in the calculation of an alpha value", type = {DataType.LONG}), @ReturnAttribute(name = "beta3", description = "Fixed time-out value (in milliseconds) assigned for flushing all the events buffered inside the extension.", type = {DataType.LONG}), @ReturnAttribute(name = "beta4", description = "Maximum threshold value assigned for K parameter.", type = {DataType.LONG}), @ReturnAttribute(name = "beta5", description = "Flag set to indicate whether out-of-order events which arrive after buffer eviction to be discarded or not", type = {DataType.BOOL}), @ReturnAttribute(name = "beta6", description = "Error threshold value set for Alpha K-Slack algorithm", type = {DataType.DOUBLE}), @ReturnAttribute(name = "beta7", description = "Confidence level set for the Alpha K-Slack algorithm", type = {DataType.DOUBLE})}, examples = {@Example(syntax = "define stream inputStream (eventtt long,data double);\n@info(name = 'query1')\nfrom inputStream#reorder:akslack(eventtt, data, 20)\nselect eventtt, data\ninsert into outputStream;", description = "This query performs reordering based on the 'eventtt' attribute values. In this example, 20 represents the batch size")})
/* loaded from: input_file:org/wso2/extension/siddhi/execution/reorder/AlphaKSlackExtension.class */
public class AlphaKSlackExtension extends StreamProcessor implements SchedulingProcessor {
    private TreeMap<Long, List<StreamEvent>> primaryTreeMap;
    private TreeMap<Long, List<StreamEvent>> secondaryTreeMap;
    private ExpressionExecutor timestampExecutor;
    private ExpressionExecutor correlationFieldExecutor;
    private Scheduler scheduler;
    private SiddhiAppContext siddhiAppContext;
    private Long k = 0L;
    private Long largestTimestamp = 0L;
    private Long maxK = Long.MAX_VALUE;
    private Long timerDuration = -1L;
    private boolean discardFlag = false;
    private Long lastSentTimestamp = -1L;
    private Long lastScheduledTimestamp = -1L;
    private ReentrantLock lock = new ReentrantLock();
    private double previousAlpha = 0.0d;
    private Integer counter = 0;
    private Long batchSize = 10000L;
    private double previousError = 0.0d;
    private List<Double> dataItemList = new ArrayList();
    private List<Long> timestampList = new ArrayList();
    private double kp = 0.5d;
    private double kd = 0.8d;
    private boolean flag = true;
    private boolean timerFlag = true;
    private double errorThreshold = 0.03d;
    private double confidenceLevel = 0.95d;
    private double alpha = 1.0d;

    public void start() {
    }

    public void stop() {
    }

    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put("k", this.k);
        hashMap.put("largestTimestamp", this.largestTimestamp);
        hashMap.put("lastSentTimestamp", this.lastSentTimestamp);
        hashMap.put("lastScheduledTimestamp", this.lastScheduledTimestamp);
        hashMap.put("previousAlpha", Double.valueOf(this.previousAlpha));
        hashMap.put("counter", this.counter);
        hashMap.put("previousError", Double.valueOf(this.previousError));
        hashMap.put("kp", Double.valueOf(this.kp));
        hashMap.put("kd", Double.valueOf(this.kd));
        hashMap.put("primaryTreeMap", this.primaryTreeMap);
        hashMap.put("secondaryTreeMap", this.secondaryTreeMap);
        hashMap.put("dataItemList", this.dataItemList);
        hashMap.put("timestampList", this.timestampList);
        return hashMap;
    }

    public void restoreState(Map<String, Object> map) {
        this.k = (Long) map.get("k");
        this.largestTimestamp = (Long) map.get("largestTimestamp");
        this.lastSentTimestamp = (Long) map.get("lastSentTimestamp");
        this.lastScheduledTimestamp = (Long) map.get("lastScheduledTimestamp");
        this.previousAlpha = ((Double) map.get("previousAlpha")).doubleValue();
        this.counter = (Integer) map.get("counter");
        this.previousError = ((Double) map.get("previousError")).doubleValue();
        this.kp = ((Double) map.get("kp")).doubleValue();
        this.kd = ((Double) map.get("kd")).doubleValue();
        this.primaryTreeMap = (TreeMap) map.get("primaryTreeMap");
        this.secondaryTreeMap = (TreeMap) map.get("secondaryTreeMap");
        this.dataItemList = (List) map.get("dataItemList");
        this.timestampList = (List) map.get("timestampList");
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
        ComplexEventChunk complexEventChunk2 = new ComplexEventChunk(false);
        try {
            try {
                this.lock.lock();
                double abs = Math.abs(new NormalDistribution().inverseCumulativeProbability((1.0d - this.confidenceLevel) / 2.0d));
                WindowCoverage windowCoverage = new WindowCoverage(this.errorThreshold);
                while (complexEventChunk.hasNext()) {
                    StreamEvent next = complexEventChunk.next();
                    if (next.getType() != ComplexEvent.Type.TIMER) {
                        complexEventChunk.remove();
                        long longValue = ((Long) this.timestampExecutor.execute(next)).longValue();
                        this.timestampList.add(Long.valueOf(longValue));
                        this.dataItemList.add(Double.valueOf(((Double) this.correlationFieldExecutor.execute(next)).doubleValue()));
                        if (!this.discardFlag || longValue >= this.lastSentTimestamp.longValue()) {
                            if (this.timerFlag) {
                                this.timerFlag = false;
                                this.lastScheduledTimestamp = Long.valueOf(this.lastScheduledTimestamp.longValue() + this.timerDuration.longValue());
                                this.scheduler.notifyAt(this.lastScheduledTimestamp.longValue());
                            }
                            List<StreamEvent> list = this.primaryTreeMap.get(Long.valueOf(longValue));
                            if (list == null) {
                                list = new ArrayList();
                                this.primaryTreeMap.put(Long.valueOf(longValue), list);
                            }
                            list.add(next);
                            this.counter = Integer.valueOf(this.counter.intValue() + 1);
                            if (this.counter.intValue() > this.batchSize.longValue()) {
                                this.alpha = calculateAlpha(windowCoverage.calculateWindowCoverageThreshold(abs, this.dataItemList), windowCoverage.calculateRuntimeWindowCoverage(this.timestampList, Math.round(this.batchSize.longValue() * 0.75d)));
                                this.counter = 0;
                                this.timestampList = new ArrayList();
                                this.dataItemList = new ArrayList();
                            }
                            if (longValue > this.largestTimestamp.longValue()) {
                                this.largestTimestamp = Long.valueOf(longValue);
                                long longValue2 = this.largestTimestamp.longValue() - this.primaryTreeMap.firstKey().longValue();
                                if (longValue2 > this.k.longValue()) {
                                    if (longValue2 < this.maxK.longValue()) {
                                        this.k = Long.valueOf(Math.round(longValue2 * this.alpha));
                                    } else {
                                        this.k = this.maxK;
                                    }
                                }
                                for (Map.Entry<Long, List<StreamEvent>> entry : this.primaryTreeMap.entrySet()) {
                                    List<StreamEvent> list2 = this.secondaryTreeMap.get(entry.getKey());
                                    if (list2 != null) {
                                        list2.addAll(entry.getValue());
                                    } else {
                                        this.secondaryTreeMap.put(entry.getKey(), new ArrayList(entry.getValue()));
                                    }
                                }
                                this.primaryTreeMap = new TreeMap<>();
                                Iterator<Map.Entry<Long, List<StreamEvent>>> it = this.secondaryTreeMap.entrySet().iterator();
                                while (it.hasNext()) {
                                    Map.Entry<Long, List<StreamEvent>> next2 = it.next();
                                    if (next2.getKey().longValue() + this.k.longValue() <= this.largestTimestamp.longValue()) {
                                        it.remove();
                                        List<StreamEvent> value = next2.getValue();
                                        this.lastSentTimestamp = next2.getKey();
                                        Iterator<StreamEvent> it2 = value.iterator();
                                        while (it2.hasNext()) {
                                            complexEventChunk2.add(it2.next());
                                        }
                                    }
                                }
                            }
                        }
                    } else if (this.timerDuration.longValue() != -1) {
                        if (this.secondaryTreeMap.size() > 0) {
                            Iterator<Map.Entry<Long, List<StreamEvent>>> it3 = this.secondaryTreeMap.entrySet().iterator();
                            while (it3.hasNext()) {
                                Iterator<StreamEvent> it4 = it3.next().getValue().iterator();
                                while (it4.hasNext()) {
                                    complexEventChunk2.add(it4.next());
                                }
                            }
                            this.secondaryTreeMap = new TreeMap<>();
                        }
                        if (this.primaryTreeMap.size() > 0) {
                            Iterator<Map.Entry<Long, List<StreamEvent>>> it5 = this.primaryTreeMap.entrySet().iterator();
                            while (it5.hasNext()) {
                                Iterator<StreamEvent> it6 = it5.next().getValue().iterator();
                                while (it6.hasNext()) {
                                    complexEventChunk2.add(it6.next());
                                }
                            }
                            this.primaryTreeMap = new TreeMap<>();
                        }
                        this.timerFlag = true;
                    }
                }
                processor.process(complexEventChunk2);
            } catch (ArrayIndexOutOfBoundsException e) {
                throw new SiddhiAppCreationException("The very first parameter must be an Integer with a valid  field index (0 to (fieldsLength-1)).");
            }
        } finally {
            this.lock.unlock();
        }
    }

    protected List<Attribute> init(AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        ArrayList arrayList = new ArrayList();
        this.siddhiAppContext = siddhiAppContext;
        if (this.attributeExpressionLength > 8 || this.attributeExpressionLength < 2 || this.attributeExpressionLength == 7) {
            throw new SiddhiAppCreationException("Maximum six input parameters and minimum two input parameters can be specified for AK-Slack.  Timestamp (long), velocity (long), batchSize (long), timerTimeout (long), maxK (long), discardFlag (boolean), errorThreshold (double) and confidenceLevel (double)  fields. But found " + this.attributeExpressionLength + " attributes.");
        }
        if (this.attributeExpressionExecutors.length >= 2) {
            this.flag = false;
            if (this.attributeExpressionExecutors[0].getReturnType() != Attribute.Type.LONG) {
                throw new SiddhiAppCreationException("Invalid parameter type found for the first argument of  reorder:akslack() function. Required LONG, but found " + this.attributeExpressionExecutors[0].getReturnType());
            }
            this.timestampExecutor = this.attributeExpressionExecutors[0];
            arrayList.add(new Attribute("beta0", Attribute.Type.LONG));
            if (this.attributeExpressionExecutors[1].getReturnType() != Attribute.Type.DOUBLE) {
                throw new SiddhiAppCreationException("Invalid parameter type found for the second argument of  reorder:akslack() function. Required DOUBLE, but found " + this.attributeExpressionExecutors[1].getReturnType());
            }
            this.correlationFieldExecutor = this.attributeExpressionExecutors[1];
            arrayList.add(new Attribute("beta1", Attribute.Type.DOUBLE));
        }
        if (this.attributeExpressionExecutors.length >= 3) {
            if (!(this.attributeExpressionExecutors[2] instanceof ConstantExpressionExecutor)) {
                throw new SiddhiAppCreationException("Batch size parameter must be a constant.");
            }
            if (this.attributeExpressionExecutors[2].getReturnType() != Attribute.Type.LONG) {
                throw new SiddhiAppCreationException("Invalid parameter type found for the third argument of  reorder:akslack() function. Required LONG, but found " + this.attributeExpressionExecutors[2].getReturnType());
            }
            arrayList.add(new Attribute("beta2", Attribute.Type.LONG));
            this.batchSize = (Long) this.attributeExpressionExecutors[2].getValue();
        }
        if (this.attributeExpressionExecutors.length >= 4) {
            this.flag = true;
            if (!(this.attributeExpressionExecutors[3] instanceof ConstantExpressionExecutor)) {
                throw new SiddhiAppCreationException("timerDuration must be a constant");
            }
            if (this.attributeExpressionExecutors[3].getReturnType() != Attribute.Type.LONG) {
                throw new SiddhiAppCreationException("Invalid parameter type found for the fourth argument of  reorder:akslack() function. Required LONG, but found " + this.attributeExpressionExecutors[3].getReturnType());
            }
            this.timerDuration = (Long) this.attributeExpressionExecutors[3].getValue();
            arrayList.add(new Attribute("beta3", Attribute.Type.LONG));
        }
        if (this.attributeExpressionExecutors.length >= 5) {
            if (!(this.attributeExpressionExecutors[4] instanceof ConstantExpressionExecutor)) {
                throw new SiddhiAppCreationException("maxK must be a constant");
            }
            if (this.attributeExpressionExecutors[4].getReturnType() != Attribute.Type.LONG) {
                throw new SiddhiAppCreationException("Invalid parameter type found for the fifth argument of  reorder:akslack() function. Required LONG, but found " + this.attributeExpressionExecutors[4].getReturnType());
            }
            this.maxK = (Long) this.attributeExpressionExecutors[4].getValue();
            if (this.maxK.longValue() == -1) {
                this.maxK = Long.MAX_VALUE;
            }
            arrayList.add(new Attribute("beta4", Attribute.Type.LONG));
        }
        if (this.attributeExpressionExecutors.length >= 6) {
            if (!(this.attributeExpressionExecutors[5] instanceof ConstantExpressionExecutor)) {
                throw new SiddhiAppCreationException("discardFlag must be a constant");
            }
            if (this.attributeExpressionExecutors[5].getReturnType() != Attribute.Type.BOOL) {
                throw new SiddhiAppCreationException("Invalid parameter type found for the sixth argument of  reorder:akslack() function. Required BOOL, but found " + this.attributeExpressionExecutors[5].getReturnType());
            }
            this.discardFlag = ((Boolean) this.attributeExpressionExecutors[5].execute((ComplexEvent) null)).booleanValue();
            arrayList.add(new Attribute("beta5", Attribute.Type.BOOL));
        }
        if (this.attributeExpressionExecutors.length == 8) {
            if (!(this.attributeExpressionExecutors[6] instanceof ConstantExpressionExecutor) || !(this.attributeExpressionExecutors[7] instanceof ConstantExpressionExecutor)) {
                throw new SiddhiAppCreationException("errorThreshold and confidenceLevel must be constants");
            }
            if (this.attributeExpressionExecutors[6].getReturnType() != Attribute.Type.DOUBLE) {
                throw new SiddhiAppCreationException("Invalid parameter type found for the seventh argument of  reorder:akslack() function. Required DOUBLE, but found " + this.attributeExpressionExecutors[6].getReturnType());
            }
            this.errorThreshold = ((Double) this.attributeExpressionExecutors[6].getValue()).doubleValue();
            arrayList.add(new Attribute("beta6", Attribute.Type.DOUBLE));
            if (this.attributeExpressionExecutors[7].getReturnType() != Attribute.Type.DOUBLE) {
                throw new SiddhiAppCreationException("Invalid parameter type found for the eighth argument of  reorder:akslack() function. Required DOUBLE, but found " + this.attributeExpressionExecutors[7].getReturnType());
            }
            this.confidenceLevel = ((Double) this.attributeExpressionExecutors[7].getValue()).doubleValue();
            arrayList.add(new Attribute("beta7", Attribute.Type.DOUBLE));
        }
        this.primaryTreeMap = new TreeMap<>();
        this.secondaryTreeMap = new TreeMap<>();
        return arrayList;
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    public void setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
        if (this.lastScheduledTimestamp.longValue() >= 0 || !this.flag) {
            return;
        }
        this.lastScheduledTimestamp = Long.valueOf(this.siddhiAppContext.getTimestampGenerator().currentTime() + this.timerDuration.longValue());
        scheduler.notifyAt(this.lastScheduledTimestamp.longValue());
    }

    private double calculateAlpha(double d, double d2) {
        double d3 = d - d2;
        double abs = Math.abs(this.previousAlpha + (this.kp * d3) + (this.kd * (d3 - this.previousError)));
        this.previousError = d3;
        this.previousAlpha = abs;
        return abs;
    }
}
