package org.wso2.extension.siddhi.gpl.execution.streamingml.clustering.clustree;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import moa.gui.visualization.RunVisualizer;
import org.apache.log4j.Logger;
import org.wso2.extension.siddhi.gpl.execution.streamingml.clustering.clustree.util.ClusTreeModel;
import org.wso2.extension.siddhi.gpl.execution.streamingml.clustering.clustree.util.DataPoint;
import org.wso2.extension.siddhi.gpl.execution.streamingml.clustering.clustree.util.KMeansModel;
import org.wso2.extension.siddhi.gpl.execution.streamingml.clustering.clustree.util.Trainer;
import org.wso2.extension.siddhi.gpl.execution.streamingml.util.CoreUtils;
import org.wso2.extension.siddhi.gpl.execution.streamingml.util.MathUtil;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.ReturnAttribute;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
import org.wso2.siddhi.core.exception.SiddhiAppRuntimeException;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;

@Extension(name = "clusTree", namespace = "streamingml", description = "This extension performs clustering on a streaming data set. Initially a micro cluster model is generated using the ClusTree algorithm, and weighted k-means is periodically applied to micro clusters to generate a macro cluster model with the required number of clusters. Data points can be of any dimensionality, but the dimensionality should be constant throughout the stream. Euclidean distance is taken as the distance metric.", parameters = {@Parameter(name = "no.of.clusters", description = "The assumed number of natural clusters (`numberOfClusters`) in the data set.", type = {DataType.INT}), @Parameter(name = "max.iterations", description = "The number of times the process should be iterated. The process iterates until the number specified for this parameter is reached, or until iterating the process does not result in a change in the centroids.", type = {DataType.INT}, optional = true, defaultValue = "40"), @Parameter(name = "no.of.events.to.refresh.macro.model", description = "The number of new events that should arrive in order to recalculate the k-means macro cluster centers.", type = {DataType.INT}, optional = true, defaultValue = "100"), @Parameter(name = "max.height.of.tree", description = "This defines the maximum number of levels that should exist in the ClusTree. The maximum number of levels is calculated as `3^<VALUE_SPECIFIED>` (e.g., If 10 is specified, there can be a maximum of 3^10 micro clusters in the micro cluster). It is recommended to set the value within the 5-8 range because a lot of micro-clusters can consume a lot of memory, and as a result, creating the macro cluster model will take longer.", type = {DataType.INT}, optional = true, defaultValue = "8"), @Parameter(name = "horizon", description = "This controls the decay of weights of old micro-clusters to manage the concept drift. If horizon is set as `1000`, then a micro cluster that has not been recently updated loses its weight by half after 1000 events.", type = {DataType.INT}, optional = true, defaultValue = "1000"), @Parameter(name = "model.features", description = "This is a variable length argument. Depending on the dimensionality of data points, you receive coordinates as features along each axis.", type = {DataType.DOUBLE, DataType.FLOAT, DataType.INT, DataType.LONG})}, returnAttributes = {@ReturnAttribute(name = "euclideanDistanceToClosestCentroid", description = "This represents the Euclidean distance between the current data point and the closest centroid.", type = {DataType.DOUBLE}), @ReturnAttribute(name = "closestCentroidCoordinate", description = "This is a variable length attribute. Depending on the dimensionality(`d`) `closestCentroidCoordinate1` is returned to `closestCentroidCoordinated that are the `d `dimensional coordinates of the closest centroid from the model to the current event. This is the prediction result, and this represents the cluster towhich the current event belongs.", type = {DataType.DOUBLE})}, examples = {@Example(syntax = "@App:name('ClusTreeTestSiddhiApp') \ndefine stream InputStream (x double, y double);\n@info(name = 'query1') \nfrom InputStream#streamingml:clusTree(2, 10, 20, 5, 50, x, y) \nselect closestCentroidCoordinate1, closestCentroidCoordinate2, x, y \ninsert into OutputStream;", description = "This query creates a Siddhi application named `ClusTreeTestSiddhiApp`, and it accepts 2D inputs of doubles. The query named `query1` creates a ClusTree model. It also creates a k-means model after the first 20 events, and refreshes it after every 20 events. Two macro clusters are created, and the process is not iterated more than 10 times. The maximum height of tree is set to 5, and therefore, a maximum of 3^5 micro clusters are generated from the Clus Tree. The horizon is set to 50, and therefore, the weight of each micro cluster that is not updated reduces by half after every 50 events."), @Example(syntax = "@App:name('ClusTreeTestSiddhiApp') \ndefine stream InputStream (x double, y double);\n@info(name = 'query1') \nfrom InputStream#streamingml:ClusTree(2, x, y) \nselect closestCentroidCoordinate1, closestCentroidCoordinate2, x, y \ninsert into OutputStream;", description = "This query does not include hyper parameters. Therefore, the default values mentioned above are applied. This mode of querying is recommended if you are not familier with ClusTree/KMeans algorithms.")})
/* loaded from: input_file:org/wso2/extension/siddhi/gpl/execution/streamingml/clustering/clustree/ClusTreeStreamProcessorExtension.class */
public class ClusTreeStreamProcessorExtension extends StreamProcessor {
    private int noOfClusters;
    private int noOfDimensions;
    private double[] coordinateValuesOfCurrentDataPoint;
    private int noOfEventsReceived;
    private ExecutorService executorService;
    private ClusTreeModel clusTreeModel;
    private KMeansModel kMeansModel;
    private static final Logger logger = Logger.getLogger(ClusTreeStreamProcessorExtension.class.getName());
    private final int separateThreadThreshold = RunVisualizer.initialPauseInterval;
    private int noOfEventsToRefreshMacroModel = 500;
    private int maxIterations = 40;
    private List<VariableExpressionExecutor> featureVariableExpressionExecutors = new LinkedList();

    protected List<Attribute> init(AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        int i;
        int size = this.inputDefinition.getAttributeList().size();
        int i2 = 8;
        int i3 = 1000;
        if (this.attributeExpressionLength < 2 || this.attributeExpressionLength > 5 + size) {
            throw new SiddhiAppCreationException("Invalid number of parameters. User can either choose to give all 4 hyper parameters or none at all. So query can have between 2 or " + (5 + size) + " but found " + this.attributeExpressionLength + " parameters.");
        }
        if (!(this.attributeExpressionExecutors[0] instanceof ConstantExpressionExecutor)) {
            throw new SiddhiAppCreationException("noOfClusters has to be a constant but found " + this.attributeExpressionExecutors[0].getClass().getCanonicalName());
        }
        if (this.attributeExpressionExecutors[0].getReturnType() != Attribute.Type.INT) {
            throw new SiddhiAppCreationException("noOfClusters should be of type int but found " + this.attributeExpressionExecutors[0].getReturnType());
        }
        this.noOfClusters = ((Integer) this.attributeExpressionExecutors[0].getValue()).intValue();
        if (this.noOfClusters <= 0) {
            throw new SiddhiAppCreationException("noOfClusters should be a positive integer but found " + this.noOfClusters);
        }
        if ((this.attributeExpressionExecutors[1] instanceof VariableExpressionExecutor) && this.attributeExpressionLength == 1 + size) {
            i = 1;
        } else {
            i = 5;
            if (!(this.attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor)) {
                throw new SiddhiAppCreationException("Maximum iterations has to be a constant but found " + this.attributeExpressionExecutors[1].getClass().getCanonicalName());
            }
            if (this.attributeExpressionExecutors[1].getReturnType() != Attribute.Type.INT) {
                throw new SiddhiAppCreationException("Maximum iterations should be of type int but found " + this.attributeExpressionExecutors[1].getReturnType());
            }
            this.maxIterations = ((Integer) this.attributeExpressionExecutors[1].getValue()).intValue();
            if (this.maxIterations <= 0) {
                throw new SiddhiAppCreationException("maxIterations should be a positive integer but found " + this.maxIterations);
            }
            if (!(this.attributeExpressionExecutors[2] instanceof ConstantExpressionExecutor)) {
                throw new SiddhiAppCreationException("noOfEventsToRefreshMacroModel has to be a constant but found " + this.attributeExpressionExecutors[2].getClass().getCanonicalName());
            }
            if (this.attributeExpressionExecutors[2].getReturnType() != Attribute.Type.INT) {
                throw new SiddhiAppCreationException("noOfEventsToRefreshMacroModel should be of type int but found " + this.attributeExpressionExecutors[2].getReturnType());
            }
            this.noOfEventsToRefreshMacroModel = ((Integer) this.attributeExpressionExecutors[2].getValue()).intValue();
            if (this.noOfEventsToRefreshMacroModel <= 0) {
                throw new SiddhiAppCreationException("noOfEventsToRefreshMacroModel should be a positive integer but found " + this.noOfEventsToRefreshMacroModel);
            }
            if (!(this.attributeExpressionExecutors[3] instanceof ConstantExpressionExecutor)) {
                throw new SiddhiAppCreationException("maxHeightOfTree has to be a constant but found " + this.attributeExpressionExecutors[3].getClass().getCanonicalName());
            }
            if (this.attributeExpressionExecutors[3].getReturnType() != Attribute.Type.INT) {
                throw new SiddhiAppCreationException("maxHeightOfTree should be of type int but found " + this.attributeExpressionExecutors[3].getReturnType());
            }
            int intValue = ((Integer) this.attributeExpressionExecutors[3].getValue()).intValue();
            double roundOff = MathUtil.roundOff(Math.log(this.noOfClusters) / Math.log(3.0d), 4);
            if (intValue < roundOff) {
                throw new SiddhiAppCreationException("maxHeightOfTree should be an int greater than " + roundOff + " but found " + intValue);
            }
            i2 = intValue - 1;
            if (!(this.attributeExpressionExecutors[4] instanceof ConstantExpressionExecutor)) {
                throw new SiddhiAppCreationException("horizon has to be a constant but found " + this.attributeExpressionExecutors[4].getClass().getCanonicalName());
            }
            if (this.attributeExpressionExecutors[4].getReturnType() != Attribute.Type.INT) {
                throw new SiddhiAppCreationException("horizon should be of type int but found " + this.attributeExpressionExecutors[4].getReturnType());
            }
            i3 = ((Integer) this.attributeExpressionExecutors[4].getValue()).intValue();
            if (i3 <= 0) {
                throw new SiddhiAppCreationException("horizon should be a positive integer but found " + i3);
            }
        }
        this.noOfDimensions = this.attributeExpressionExecutors.length - i;
        this.coordinateValuesOfCurrentDataPoint = new double[this.noOfDimensions];
        this.featureVariableExpressionExecutors = CoreUtils.extractAndValidateFeatures(this.inputDefinition, this.attributeExpressionExecutors, i, this.noOfDimensions);
        this.clusTreeModel = new ClusTreeModel();
        this.clusTreeModel.init(i2, i3);
        this.kMeansModel = new KMeansModel();
        this.executorService = siddhiAppContext.getExecutorService();
        ArrayList arrayList = new ArrayList(1 + this.noOfDimensions);
        arrayList.add(new Attribute("euclideanDistanceToClosestCentroid", Attribute.Type.DOUBLE));
        for (int i4 = 1; i4 <= this.noOfDimensions; i4++) {
            arrayList.add(new Attribute("closestCentroidCoordinate" + i4, Attribute.Type.DOUBLE));
        }
        return arrayList;
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
        synchronized (this) {
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                this.noOfEventsReceived++;
                for (int i = 0; i < this.noOfDimensions; i++) {
                    try {
                        this.coordinateValuesOfCurrentDataPoint[i] = ((Number) this.featureVariableExpressionExecutors.get(i).execute(next)).doubleValue();
                    } catch (ClassCastException e) {
                        throw new SiddhiAppRuntimeException("coordinate values should be int/float/double/long but found " + this.featureVariableExpressionExecutors.get(i).execute(next).getClass());
                    }
                }
                this.clusTreeModel.trainOnEvent(this.coordinateValuesOfCurrentDataPoint);
                if (this.noOfEventsReceived % this.noOfEventsToRefreshMacroModel == 0) {
                    List<DataPoint> microClusteringAsDPArray = this.clusTreeModel.getMicroClusteringAsDPArray();
                    if (this.noOfEventsToRefreshMacroModel < 5000) {
                        this.kMeansModel.refresh(microClusteringAsDPArray, this.noOfClusters, this.maxIterations, this.noOfDimensions);
                    } else {
                        this.executorService.submit(new Trainer(this.kMeansModel, microClusteringAsDPArray, this.noOfClusters, this.maxIterations, this.noOfDimensions));
                    }
                }
                if (this.kMeansModel.isTrained()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Populating the event with the prediction");
                    }
                    complexEventPopulater.populateComplexEvent(next, this.kMeansModel.getPrediction(this.coordinateValuesOfCurrentDataPoint));
                }
            }
        }
        this.nextProcessor.process(complexEventChunk);
    }

    public void start() {
    }

    public void stop() {
    }

    public Map<String, Object> currentState() {
        HashMap hashMap;
        synchronized (this) {
            hashMap = new HashMap();
            hashMap.put("noOfEventsReceived", Integer.valueOf(this.noOfEventsReceived));
            hashMap.put("clusTreeModel", this.clusTreeModel);
            hashMap.put("kMeansModel", this.kMeansModel);
        }
        return hashMap;
    }

    public void restoreState(Map<String, Object> map) {
        synchronized (this) {
            this.noOfEventsReceived = ((Integer) map.get("noOfEventsReceived")).intValue();
            this.clusTreeModel = (ClusTreeModel) map.get("clusTreeModel");
            this.kMeansModel = (KMeansModel) map.get("kMeansModel");
        }
    }
}
