package org.wso2.extension.siddhi.execution.approximate.distinctcount;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.ReturnAttribute;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;

@Extension(name = "distinctCount", namespace = "approximate", description = "Performs HyperLogLog algorithm on a window of streaming data set based on a specific relative error and a confidence value to calculate the number of distinct events. If used without a window, the out of memory errors will occur. For usage without the window, use the approximate:distinctCountEver extension.", parameters = {@Parameter(name = "value", description = "The value used to find distinctCount", type = {DataType.INT, DataType.DOUBLE, DataType.FLOAT, DataType.LONG, DataType.STRING, DataType.BOOL, DataType.TIME, DataType.OBJECT}), @Parameter(name = "relative.error", description = "This is the relative error for which the distinct count is obtained. The values must be in the range of (0, 1).", type = {DataType.DOUBLE, DataType.FLOAT}, optional = true, defaultValue = "0.01"), @Parameter(name = "confidence", description = "This is the confidence for which the relative error is true. The value must be one out of 0.65, 0.95, 0.99.", type = {DataType.DOUBLE, DataType.FLOAT}, optional = true, defaultValue = "0.95")}, returnAttributes = {@ReturnAttribute(name = "distinctCount", description = "Represents the distinct count considering the last event ", type = {DataType.LONG}), @ReturnAttribute(name = "distinctCountLowerBound", description = "Represents the lower bound of the distinct count considering the last event", type = {DataType.LONG}), @ReturnAttribute(name = "distinctCountUpperBound", description = "Represents the upper bound of the distinct count considering the last event", type = {DataType.LONG})}, examples = {@Example(syntax = "define stream requestStream (ip string);\nfrom requestStream#window.time(1000)#approximate:distinctCount(ip)\nselect distinctCount, distinctCountLowerBound, distinctCountUpperBound\ninsert into OutputStream;\n", description = "Distinct count of ip addresses which has sent requests within the last 1000ms is calculated for a default relative error of 0.01 and a default confidence of 0.95. Here the distinct count is the number of different values received for ip attribute considering the events received within last 1000ms time period. The answers are 95% guaranteed to have a +-1% error relative to the distinct count. The output will consist of the approximate distinct count, lower bound and upper bound of the approximate answer."), @Example(syntax = "define stream sensorStream (sensorId int);\nfrom sensorStream#window.length(1000)\n#approximate:distinctCount(sensorId, 0.05, 0.65)\nselect distinctCount, distinctCountLowerBound, distinctCountUpperBound\ninsert into OutputStream;\n", description = "Distinct count of sensors which has sent data to the stream out of last 1000 events is calculated for a relative error of 0.05 and a confidence of 0.65. Here the distinct count is the number of different values values received for sensorId attribute in the last 1000 events. The answers are 65% guaranteed to have a +-5% error relative to the distinct count. The output will consist of the approximate distinct count, lower bound and upper bound of the approximate answer.")})
/* loaded from: input_file:org/wso2/extension/siddhi/execution/approximate/distinctcount/DistinctCountExtension.class */
public class DistinctCountExtension extends StreamProcessor {
    private HyperLogLog<Object> hyperLogLog;
    private ExpressionExecutor valueExecutor;

    protected List<Attribute> init(AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        double d = 0.01d;
        double d2 = 0.95d;
        if (expressionExecutorArr.length != 1 && expressionExecutorArr.length != 3) {
            throw new SiddhiAppCreationException("1 or 3 attributes are expected but " + expressionExecutorArr.length + " attributes are found inside the distinctCount function");
        }
        if (!(expressionExecutorArr[0] instanceof VariableExpressionExecutor)) {
            throw new SiddhiAppCreationException("The 1st parameter inside distinctCount function - 'value' has to be a variable but found " + this.attributeExpressionExecutors[0].getClass().getCanonicalName());
        }
        this.valueExecutor = expressionExecutorArr[0];
        if (expressionExecutorArr.length > 1) {
            if (!(expressionExecutorArr[1] instanceof ConstantExpressionExecutor)) {
                throw new SiddhiAppCreationException("The 2nd parameter inside distinctCount function - 'relative.error' has to be a constant but found " + this.attributeExpressionExecutors[1].getClass().getCanonicalName());
            }
            if (expressionExecutorArr[1].getReturnType() != Attribute.Type.DOUBLE && expressionExecutorArr[1].getReturnType() != Attribute.Type.FLOAT) {
                throw new SiddhiAppCreationException("The 2nd parameter inside distinctCount function - 'relative.error' should be of type Double or Float but found " + expressionExecutorArr[1].getReturnType());
            }
            d = ((Double) ((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()).doubleValue();
            if (d <= 0.0d || d >= 1.0d) {
                throw new SiddhiAppCreationException("The 2nd parameter inside distinctCount function - 'relative.error' must be in the range of (0, 1) but found " + d);
            }
        }
        if (expressionExecutorArr.length > 2) {
            if (!(expressionExecutorArr[2] instanceof ConstantExpressionExecutor)) {
                throw new SiddhiAppCreationException("The 3rd parameter inside distinctCount function - 'confidence' has to be a constant but found " + this.attributeExpressionExecutors[2].getClass().getCanonicalName());
            }
            if (expressionExecutorArr[2].getReturnType() != Attribute.Type.DOUBLE && expressionExecutorArr[2].getReturnType() != Attribute.Type.FLOAT) {
                throw new SiddhiAppCreationException("The 3rd parameter inside distinctCount function - 'confidence' should be of type Double or Float but found " + expressionExecutorArr[2].getReturnType());
            }
            d2 = ((Double) ((ConstantExpressionExecutor) expressionExecutorArr[2]).getValue()).doubleValue();
            if (Math.abs(d2 - 0.65d) > 1.0E-7d && Math.abs(d2 - 0.95d) > 1.0E-7d && Math.abs(d2 - 0.99d) > 1.0E-7d) {
                throw new SiddhiAppCreationException("The 3rd parameter inside distinctCount function - 'confidence' must be a value from 0.65, 0.95 and 0.99 but found " + d2);
            }
        }
        this.hyperLogLog = new HyperLogLog<>(d, d2, true);
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(new Attribute("distinctCount", Attribute.Type.LONG));
        arrayList.add(new Attribute("distinctCountLowerBound", Attribute.Type.LONG));
        arrayList.add(new Attribute("distinctCountUpperBound", Attribute.Type.LONG));
        return arrayList;
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
        synchronized (this) {
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                Object execute = this.valueExecutor.execute(next);
                if (execute == null) {
                    complexEventChunk.remove();
                } else {
                    if (next.getType().equals(ComplexEvent.Type.CURRENT)) {
                        this.hyperLogLog.addItem(execute);
                    } else if (next.getType().equals(ComplexEvent.Type.EXPIRED)) {
                        this.hyperLogLog.removeItem(execute);
                    } else if (next.getType().equals(ComplexEvent.Type.RESET)) {
                        this.hyperLogLog.clear();
                    }
                    complexEventPopulater.populateComplexEvent(next, new Object[]{Long.valueOf(this.hyperLogLog.getCardinality()), Long.valueOf(this.hyperLogLog.getConfidenceInterval()[0]), Long.valueOf(this.hyperLogLog.getConfidenceInterval()[1])});
                }
            }
        }
        this.nextProcessor.process(complexEventChunk);
    }

    public void start() {
    }

    public void stop() {
    }

    public Map<String, Object> currentState() {
        HashMap hashMap;
        synchronized (this) {
            hashMap = new HashMap();
            hashMap.put("hyperLogLog", this.hyperLogLog);
        }
        return hashMap;
    }

    public void restoreState(Map<String, Object> map) {
        synchronized (this) {
            this.hyperLogLog = (HyperLogLog) map.get("hyperLogLog");
        }
    }
}
