package io.siddhi.extension.execution.streamingml.clustering.kmeans;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.ReturnAttribute;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.stream.StreamProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.extension.execution.streamingml.clustering.kmeans.util.DataPoint;
import io.siddhi.extension.execution.streamingml.clustering.kmeans.util.KMeansClusterer;
import io.siddhi.extension.execution.streamingml.clustering.kmeans.util.KMeansModel;
import io.siddhi.extension.execution.streamingml.util.CoreUtils;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;

@Extension(name = "kMeansIncremental", namespace = "streamingml", description = "Performs K-Means clustering on a streaming data set. Data points can be of any dimension and the dimensionality is calculated from number of parameters. All data points to be processed by a query should be of the same dimensionality. The Euclidean distance is taken as the distance metric. The algorithm resembles Sequential K-Means Clustering at https://www.cs.princeton.edu/courses/archive/fall08/cos436/Duda/C/sk_means.htm ", parameters = {@Parameter(name = "no.of.clusters", description = "The assumed number of natural clusters in the data set.", type = {DataType.INT}), @Parameter(name = "decay.rate", description = "this is the decay rate of old data compared to new data. Value of this will be in [0,1]. 0 means only old data used and1 will mean that only new data is used", optional = true, type = {DataType.DOUBLE}, defaultValue = "0.01"), @Parameter(name = "model.feature", description = "This is a variable length argument. Depending on the dimensionality of data points we will receive coordinates as features along each axis.", type = {DataType.DOUBLE, DataType.FLOAT, DataType.INT, DataType.LONG}, dynamic = true)}, parameterOverloads = {@ParameterOverload(parameterNames = {"no.of.clusters", "model.feature", "..."}), @ParameterOverload(parameterNames = {"no.of.clusters", "decay.rate", "model.feature", "..."})}, returnAttributes = {@ReturnAttribute(name = "euclideanDistanceToClosestCentroid", description = "Represents the Euclidean distance between the current data point and the closest centroid.", type = {DataType.DOUBLE}), @ReturnAttribute(name = "closestCentroidCoordinate", description = "This is a variable length attribute. Depending on the dimensionality(D) we will return closestCentroidCoordinate1, closestCentroidCoordinate2,... closestCentroidCoordinateD which are the d dimensional coordinates of the closest centroid from the model to the current event. This is the prediction result and this represents the cluster to which the current event belongs to.", type = {DataType.DOUBLE})}, examples = {@Example(syntax = "define stream InputStream (x double, y double);\n@info(name = 'query1')\nfrom InputStream#streamingml:kMeansIncremental(2, 0.2, x, y)\nselect closestCentroidCoordinate1, closestCentroidCoordinate2, x, y\ninsert into OutputStream;", description = "This is an example where user provides the decay rate. First two events will be used to initiate the model since the required number of clusters is specified as 2. After the first event itself prediction would start."), @Example(syntax = "define stream InputStream (x double, y double);\n@info(name = 'query1')\nfrom InputStream#streamingml:kMeansIncremental(2, x, y)\nselect closestCentroidCoordinate1, closestCentroidCoordinate2, x, y\ninsert into OutputStream;", description = "This is an example where user doesnt give the decay rate so the default value will be used")})
/* loaded from: input_file:io/siddhi/extension/execution/streamingml/clustering/kmeans/KMeansIncrementalSPExtension.class */
public class KMeansIncrementalSPExtension extends StreamProcessor<ExtensionState> {
    private double[] coordinateValuesOfCurrentDataPoint;
    private int numberOfClusters;
    private int dimensionality;
    private static final Logger logger = Logger.getLogger(KMeansIncrementalSPExtension.class.getName());
    private ArrayList<Attribute> attributes;
    private double decayRate = 0.01d;
    private List<VariableExpressionExecutor> featureVariableExpressionExecutors = new LinkedList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/siddhi/extension/execution/streamingml/clustering/kmeans/KMeansIncrementalSPExtension$ExtensionState.class */
    public static class ExtensionState extends State {
        private static final String KEY_UNTRAINED_DATA = "untrainedData";
        private static final String KEY_K_MEANS_MODEL = "kMeansModel";
        private KMeansModel kMeansModel;
        private LinkedList<DataPoint> dataPoints;

        private ExtensionState(KMeansModel kMeansModel, LinkedList<DataPoint> linkedList) {
            this.dataPoints = linkedList;
            this.kMeansModel = kMeansModel;
        }

        public boolean canDestroy() {
            return false;
        }

        public synchronized Map<String, Object> snapshot() {
            HashMap hashMap = new HashMap();
            hashMap.put(KEY_UNTRAINED_DATA, this.dataPoints);
            hashMap.put(KEY_K_MEANS_MODEL, this.kMeansModel);
            KMeansIncrementalSPExtension.logger.debug("storing kmeans model " + hashMap.get(KEY_K_MEANS_MODEL));
            return hashMap;
        }

        public synchronized void restore(Map<String, Object> map) {
            this.dataPoints = (LinkedList) map.get(KEY_UNTRAINED_DATA);
            this.kMeansModel = (KMeansModel) map.get(KEY_K_MEANS_MODEL);
        }
    }

    protected StateFactory<ExtensionState> init(MetaStreamEvent metaStreamEvent, AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean z, boolean z2, SiddhiQueryContext siddhiQueryContext) {
        int i;
        int size = this.inputDefinition.getAttributeList().size();
        LinkedList linkedList = new LinkedList();
        if (this.attributeExpressionLength < 2 || this.attributeExpressionLength > 2 + size) {
            throw new SiddhiAppCreationException("Invalid number of parameters. Please check the query");
        }
        if (!(expressionExecutorArr[0] instanceof ConstantExpressionExecutor)) {
            throw new SiddhiAppCreationException("1st query parameter is numberOfClusters which has to be constantbut found " + this.attributeExpressionExecutors[0].getClass().getCanonicalName());
        }
        if (expressionExecutorArr[0].getReturnType() != Attribute.Type.INT) {
            throw new SiddhiAppCreationException("The first query parameter should numberOfClusters which should be of type int but found " + expressionExecutorArr[0].getReturnType());
        }
        this.numberOfClusters = ((Integer) ((ConstantExpressionExecutor) expressionExecutorArr[0]).getValue()).intValue();
        if (expressionExecutorArr[1] instanceof VariableExpressionExecutor) {
            i = 1;
        } else {
            i = 2;
            if (!(expressionExecutorArr[1] instanceof ConstantExpressionExecutor)) {
                throw new SiddhiAppCreationException("Decay rate has to be a constant but found " + this.attributeExpressionExecutors[1].getClass().getCanonicalName());
            }
            if (expressionExecutorArr[1].getReturnType() != Attribute.Type.DOUBLE) {
                throw new SiddhiAppCreationException("Decay rate should be of type int but found " + expressionExecutorArr[1].getReturnType());
            }
            this.decayRate = ((Double) ((ConstantExpressionExecutor) expressionExecutorArr[1]).getValue()).doubleValue();
            if (this.decayRate < 0.0d || this.decayRate > 1.0d) {
                throw new SiddhiAppCreationException("Decay rate should be in [0,1] but given as " + this.decayRate);
            }
        }
        this.dimensionality = this.attributeExpressionLength - i;
        this.coordinateValuesOfCurrentDataPoint = new double[this.dimensionality];
        this.featureVariableExpressionExecutors = CoreUtils.extractAndValidateFeatures(this.inputDefinition, expressionExecutorArr, i, this.dimensionality);
        KMeansModel kMeansModel = new KMeansModel();
        this.attributes = new ArrayList<>(1 + this.dimensionality);
        this.attributes.add(new Attribute("euclideanDistanceToClosestCentroid", Attribute.Type.DOUBLE));
        for (int i2 = 1; i2 <= this.dimensionality; i2++) {
            this.attributes.add(new Attribute("closestCentroidCoordinate" + i2, Attribute.Type.DOUBLE));
        }
        return () -> {
            return new ExtensionState(kMeansModel, linkedList);
        };
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, ExtensionState extensionState) {
        synchronized (this) {
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                for (int i = 0; i < this.dimensionality; i++) {
                    try {
                        this.coordinateValuesOfCurrentDataPoint[i] = ((Number) this.featureVariableExpressionExecutors.get(i).execute(next)).doubleValue();
                    } catch (ClassCastException e) {
                        throw new SiddhiAppCreationException("coordinate values should be int/float/double/long but found " + this.attributeExpressionExecutors[i].execute(next).getClass());
                    }
                }
                DataPoint dataPoint = new DataPoint();
                dataPoint.setCoordinates(this.coordinateValuesOfCurrentDataPoint);
                extensionState.dataPoints.add(dataPoint);
                if (extensionState.kMeansModel.isTrained()) {
                    complexEventPopulater.populateComplexEvent(next, KMeansClusterer.getAssociatedCentroidInfo(dataPoint, extensionState.kMeansModel));
                }
                KMeansClusterer.train(extensionState.dataPoints, 1, this.decayRate, null, extensionState.kMeansModel, this.numberOfClusters, 2, this.dimensionality);
                extensionState.dataPoints.clear();
            }
        }
        this.nextProcessor.process(complexEventChunk);
    }

    public List<Attribute> getReturnAttributes() {
        return this.attributes;
    }

    public ProcessingMode getProcessingMode() {
        return ProcessingMode.BATCH;
    }

    public void start() {
    }

    public void stop() {
    }

    protected /* bridge */ /* synthetic */ void process(ComplexEventChunk complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, State state) {
        process((ComplexEventChunk<StreamEvent>) complexEventChunk, processor, streamEventCloner, complexEventPopulater, (ExtensionState) state);
    }
}
