package org.wso2.extension.siddhi.execution.approximate.distinctcount;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ReturnAttribute;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.stream.StreamProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Extension(name = "distinctCount", namespace = "approximate", description = "This applies the 'HyperLogLog' algorithm to a Siddhi window. The algorithm is set with a relative error and a confidence value on the basis of which the number of distinct events with an accepted level of accuracy is calculated. Note that if this extension is used without a window, it may cause an 'out of memory' error. If you need to perform these calculations without windows, use the `approximate:distinctCountEver` extension.", parameters = {@Parameter(name = "value", description = "The value for which the `distinctCount` is calculated.", type = {DataType.INT, DataType.DOUBLE, DataType.FLOAT, DataType.LONG, DataType.STRING, DataType.BOOL, DataType.TIME, DataType.OBJECT}), @Parameter(name = "relative.error", description = "This is the relative error to be allowed for the distinct count generated, expressed as a value between 0 and 1. Lower the value specified, lower is the rate by which the distinct count can deviate from being perfectly correct. If 0.01 is specified, the distinct count generated must be almost perfectly accurate. If 0.99 is specified, a minimal level of accuracy is expected. Note that you cannot specify `1` or `0` as the value for this parameter.", type = {DataType.DOUBLE, DataType.FLOAT}, optional = true, defaultValue = "0.01"), @Parameter(name = "confidence", description = "The confidence value determines the degree of guarantee with which the relative error given can be treated. Higher the value specified, higher is the possibility of the amount of error in the distinct count being no greater than the relative error specified. If 0.99 is specified, it can be almost considered with certainty that the distinct count is generated with the specified rate of relative error. If 0.01 is specified, there can be minimal certainty as to whether the distinct count is generated with the specified rate of error. The possible values include `0.65`, `0.95`, `0.99`, etc..", type = {DataType.DOUBLE, DataType.FLOAT}, optional = true, defaultValue = "0.95")}, returnAttributes = {@ReturnAttribute(name = "distinctCount", description = "This represents the distinct count based on the last event.", type = {DataType.LONG}), @ReturnAttribute(name = "distinctCountLowerBound", description = "The lowest value in the range within which the most accurate distinct count for the attribute is included. This distinct count range is based on the latest event.", type = {DataType.LONG}), @ReturnAttribute(name = "distinctCountUpperBound", description = "The highest value in the range within which the most accurate distinct count for the attribute is included This distinct count range is based on the latest event.", type = {DataType.LONG})}, examples = {@Example(syntax = "define stream RequestStream (ip string);\nfrom RequestStream#window.time(1000)#approximate:distinctCount(ip)\nselect distinctCount, distinctCountLowerBound, distinctCountUpperBound\ninsert into OutputStream;\n", description = "This query calculates the distinct count of events for each IP address that has sent requests within the last 1000 milliseconds. The distinct count is 95% guaranteed to deviate no more than 1% from the actual distinct count per IP address.The output consists of the approximate distinct count,the lower bound, and the upper bound of the approximate answer."), @Example(syntax = "define stream SensorStream (sensorId int);\nfrom SensorStream#window.length(1000)\n#approximate:distinctCount(sensorId, 0.05, 0.65)\nselect distinctCount, distinctCountLowerBound, distinctCountUpperBound\ninsert into OutputStream;\n", description = "This query calculates the distinct count of events for each sensor that has sent data to the stream. This value is calculated based on the last 1000 events in a sliding manner. The calculated distinct count is 65% guaranteed to deviate no more than 5% from the actual distinct count. The output consists of the approximate distinct count, and the lower bound and upper bound of the approximate answer.")})
/* loaded from: input_file:org/wso2/extension/siddhi/execution/approximate/distinctcount/DistinctCountExtension.class */
public class DistinctCountExtension extends StreamProcessor<ExtensionState> {
    private ExpressionExecutor valueExecutor;
    private List<Attribute> attributeList = new ArrayList(3);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/extension/siddhi/execution/approximate/distinctcount/DistinctCountExtension$ExtensionState.class */
    public class ExtensionState extends State {
        private HyperLogLog<Object> hyperLogLog;

        private ExtensionState(HyperLogLog<Object> hyperLogLog) {
            this.hyperLogLog = hyperLogLog;
        }

        public boolean canDestroy() {
            return false;
        }

        public Map<String, Object> snapshot() {
            HashMap hashMap;
            synchronized (DistinctCountExtension.this) {
                hashMap = new HashMap();
                hashMap.put("hyperLogLog", this.hyperLogLog);
            }
            return hashMap;
        }

        public void restore(Map<String, Object> map) {
            synchronized (DistinctCountExtension.this) {
                this.hyperLogLog = (HyperLogLog) map.get("hyperLogLog");
            }
        }
    }

    protected StateFactory<ExtensionState> init(MetaStreamEvent metaStreamEvent, AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean z, boolean z2, SiddhiQueryContext siddhiQueryContext) {
        double d = 0.01d;
        double d2 = 0.95d;
        if (expressionExecutorArr.length != 1 && expressionExecutorArr.length != 3) {
            throw new SiddhiAppCreationException("1 or 3 attributes are expected but " + expressionExecutorArr.length + " attributes are found inside the distinctCount function");
        }
        if (!(expressionExecutorArr[0] instanceof VariableExpressionExecutor)) {
            throw new SiddhiAppCreationException("The 1st parameter inside distinctCount function - 'value' has to be a variable but found " + this.attributeExpressionExecutors[0].getClass().getCanonicalName());
        }
        this.valueExecutor = expressionExecutorArr[0];
        if (expressionExecutorArr.length > 1) {
            if (!(expressionExecutorArr[1] instanceof ConstantExpressionExecutor)) {
                throw new SiddhiAppCreationException("The 2nd parameter inside distinctCount function - 'relative.error' has to be a constant but found " + this.attributeExpressionExecutors[1].getClass().getCanonicalName());
            }
            if (expressionExecutorArr[1].getReturnType() != Attribute.Type.DOUBLE && expressionExecutorArr[1].getReturnType() != Attribute.Type.FLOAT) {
                throw new SiddhiAppCreationException("The 2nd parameter inside distinctCount function - 'relative.error' should be of type Double or Float but found " + expressionExecutorArr[1].getReturnType());
            }
            d = ((Double) ((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()).doubleValue();
            if (d <= 0.0d || d >= 1.0d) {
                throw new SiddhiAppCreationException("The 2nd parameter inside distinctCount function - 'relative.error' must be in the range of (0, 1) but found " + d);
            }
        }
        if (expressionExecutorArr.length > 2) {
            if (!(expressionExecutorArr[2] instanceof ConstantExpressionExecutor)) {
                throw new SiddhiAppCreationException("The 3rd parameter inside distinctCount function - 'confidence' has to be a constant but found " + this.attributeExpressionExecutors[2].getClass().getCanonicalName());
            }
            if (expressionExecutorArr[2].getReturnType() != Attribute.Type.DOUBLE && expressionExecutorArr[2].getReturnType() != Attribute.Type.FLOAT) {
                throw new SiddhiAppCreationException("The 3rd parameter inside distinctCount function - 'confidence' should be of type Double or Float but found " + expressionExecutorArr[2].getReturnType());
            }
            d2 = ((Double) ((ConstantExpressionExecutor) expressionExecutorArr[2]).getValue()).doubleValue();
            if (Math.abs(d2 - 0.65d) > 1.0E-7d && Math.abs(d2 - 0.95d) > 1.0E-7d && Math.abs(d2 - 0.99d) > 1.0E-7d) {
                throw new SiddhiAppCreationException("The 3rd parameter inside distinctCount function - 'confidence' must be a value from 0.65, 0.95 and 0.99 but found " + d2);
            }
        }
        HyperLogLog hyperLogLog = new HyperLogLog(d, d2, true);
        this.attributeList.add(new Attribute("distinctCount", Attribute.Type.LONG));
        this.attributeList.add(new Attribute("distinctCountLowerBound", Attribute.Type.LONG));
        this.attributeList.add(new Attribute("distinctCountUpperBound", Attribute.Type.LONG));
        return () -> {
            return new ExtensionState(hyperLogLog);
        };
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, ExtensionState extensionState) {
        synchronized (this) {
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                Object execute = this.valueExecutor.execute(next);
                if (execute == null) {
                    complexEventChunk.remove();
                } else {
                    if (next.getType().equals(ComplexEvent.Type.CURRENT)) {
                        extensionState.hyperLogLog.addItem(execute);
                    } else if (next.getType().equals(ComplexEvent.Type.EXPIRED)) {
                        extensionState.hyperLogLog.removeItem(execute);
                    } else if (next.getType().equals(ComplexEvent.Type.RESET)) {
                        extensionState.hyperLogLog.clear();
                    }
                    complexEventPopulater.populateComplexEvent(next, new Object[]{Long.valueOf(extensionState.hyperLogLog.getCardinality()), Long.valueOf(extensionState.hyperLogLog.getConfidenceInterval()[0]), Long.valueOf(extensionState.hyperLogLog.getConfidenceInterval()[1])});
                }
            }
        }
        processor.process(complexEventChunk);
    }

    public void start() {
    }

    public void stop() {
    }

    public List<Attribute> getReturnAttributes() {
        return this.attributeList;
    }

    public ProcessingMode getProcessingMode() {
        return ProcessingMode.BATCH;
    }

    protected /* bridge */ /* synthetic */ void process(ComplexEventChunk complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, State state) {
        process((ComplexEventChunk<StreamEvent>) complexEventChunk, processor, streamEventCloner, complexEventPopulater, (ExtensionState) state);
    }
}
