package org.wso2.extension.siddhi.gpl.execution.streamingml.classification.hoeffdingtree;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.wso2.extension.siddhi.gpl.execution.streamingml.classification.hoeffdingtree.util.AdaptiveHoeffdingModelsHolder;
import org.wso2.extension.siddhi.gpl.execution.streamingml.classification.hoeffdingtree.util.AdaptiveHoeffdingTreeModel;
import org.wso2.extension.siddhi.gpl.execution.streamingml.util.CoreUtils;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.annotation.Parameter;
import org.wso2.siddhi.annotation.ReturnAttribute;
import org.wso2.siddhi.annotation.util.DataType;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.ComplexEventChunk;
import org.wso2.siddhi.core.event.stream.StreamEvent;
import org.wso2.siddhi.core.event.stream.StreamEventCloner;
import org.wso2.siddhi.core.event.stream.populater.ComplexEventPopulater;
import org.wso2.siddhi.core.exception.SiddhiAppRuntimeException;
import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.VariableExpressionExecutor;
import org.wso2.siddhi.core.query.processor.Processor;
import org.wso2.siddhi.core.query.processor.stream.StreamProcessor;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.exception.SiddhiAppValidationException;

@Extension(name = "hoeffdingTreeClassifier", namespace = "streamingml", description = "This extension performs classification using the Hoeffding Adaptive Tree algorithm for evolving data streams that use `ADWIN` to replace branches with new ones.", parameters = {@Parameter(name = "model.name", description = "The name of the model to be used for prediction.", type = {DataType.STRING})}, returnAttributes = {@ReturnAttribute(name = "prediction", description = "The predicted class label.", type = {DataType.STRING}), @ReturnAttribute(name = "confidenceLevel", description = "The probability of the prediction.", type = {DataType.DOUBLE})}, examples = {@Example(syntax = "define stream StreamA (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double);\n\nfrom StreamA#streamingml:hoeffdingTreeClassifier('model1',  attribute_0, attribute_1, attribute_2, attribute_3) \nselect attribute_0, attribute_1, attribute_2, attribute_3, prediction, predictionConfidence insert into OutputStream;", description = "This query uses a Hoeffding Tree model named `model1` to predict the label of the feature vector represented by `attribute_0`, `attribute_1`, `attribute_2`, and attribute_3 attributes. The predicted label (`String/Bool`) along with the prediction confidence and the feature vector are output to the `OutputStream` stream. The expected definition of the `OutputStream` is as follows:(attribute_0 double, attribute_1 double, attribute_2\n double, attribute_3 double, prediction string, \nconfidenceLevel double).")})
/* loaded from: input_file:org/wso2/extension/siddhi/gpl/execution/streamingml/classification/hoeffdingtree/HoeffdingClassifierStreamProcessorExtension.class */
public class HoeffdingClassifierStreamProcessorExtension extends StreamProcessor {
    private static final int minNoOfFeatures = 2;
    private static final int minNoOfParameters = 1;
    private String modelName;
    private int noOfFeatures;
    private List<VariableExpressionExecutor> featureVariableExpressionExecutors = new ArrayList();
    private double[] cepEvent;

    protected List<Attribute> init(AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        String name = siddhiAppContext.getName();
        this.noOfFeatures = this.inputDefinition.getAttributeList().size();
        if (this.attributeExpressionExecutors.length < 3) {
            throw new SiddhiAppValidationException(String.format("Invalid number of parameters for streamingml:hoeffdingTreeClassifier. This Stream Processor requires at least %s parameters, namely, model.name and at least %s feature_attributes, but found %s parameters", 3, 2, Integer.valueOf(this.attributeExpressionExecutors.length)));
        }
        if (this.noOfFeatures < 2) {
            throw new SiddhiAppValidationException(String.format("Invalid number of feature attributes for streamingml:hoeffdingTreeClassifier. This Stream Processor requires at least %s feature attributes, but found %s feature attributes", 2, Integer.valueOf(this.noOfFeatures)));
        }
        if (this.noOfFeatures != this.attributeExpressionLength - 1) {
            throw new SiddhiAppValidationException(String.format("Invalid number of feature attributes for streamingml:hoeffdingTreeClassifier. This Stream Processor is defined with %s features, but found %s feature attributes", Integer.valueOf(this.noOfFeatures), Integer.valueOf(this.attributeExpressionLength - 1)));
        }
        if (!(this.attributeExpressionExecutors[0] instanceof ConstantExpressionExecutor)) {
            throw new SiddhiAppValidationException("Parameter model.name must be a constant but found " + this.attributeExpressionExecutors[0].getClass().getCanonicalName());
        }
        if (this.attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
            throw new SiddhiAppValidationException("Invalid parameter type found for the model.name argument, required " + Attribute.Type.STRING + " but found " + this.attributeExpressionExecutors[0].getReturnType().toString());
        }
        this.modelName = name + "." + ((String) this.attributeExpressionExecutors[0].getValue());
        this.featureVariableExpressionExecutors = CoreUtils.extractAndValidateFeatures(this.inputDefinition, this.attributeExpressionExecutors, this.attributeExpressionLength - this.noOfFeatures, this.noOfFeatures);
        if (!CoreUtils.isInitialized(AdaptiveHoeffdingModelsHolder.getInstance().getHoeffdingModel(this.modelName), this.noOfFeatures + 1)) {
            throw new SiddhiAppValidationException(String.format("Model [%s] needs to initialized prior to be used with streamingml:hoeffdingTreeClassifier. Perform streamingml:updateHoeffdingTree process first.", this.modelName));
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Attribute("prediction", Attribute.Type.STRING));
        arrayList.add(new Attribute("confidenceLevel", Attribute.Type.DOUBLE));
        return arrayList;
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
        synchronized (this) {
            while (complexEventChunk.hasNext()) {
                ComplexEvent next = complexEventChunk.next();
                this.cepEvent = new double[this.noOfFeatures];
                for (int i = 0; i < this.noOfFeatures; i++) {
                    try {
                        this.cepEvent[i] = ((Number) this.featureVariableExpressionExecutors.get(i).execute(next)).doubleValue();
                    } catch (ClassCastException e) {
                        throw new SiddhiAppRuntimeException(String.format("Incompatible attribute feature type at position %s. Not of any numeric type. Please refer the stream definition for Model[%s]", Integer.valueOf(i + 1), this.modelName));
                    }
                }
                AdaptiveHoeffdingTreeModel hoeffdingModel = AdaptiveHoeffdingModelsHolder.getInstance().getHoeffdingModel(this.modelName);
                Object[] prediction = hoeffdingModel.getPrediction(this.cepEvent);
                prediction[0] = hoeffdingModel.getClasses().get(((Integer) prediction[0]).intValue());
                complexEventPopulater.populateComplexEvent(next, prediction);
            }
            this.nextProcessor.process(complexEventChunk);
        }
    }

    public void start() {
    }

    public void stop() {
        AdaptiveHoeffdingModelsHolder.getInstance().deleteHoeffdingModel(this.modelName);
    }

    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put("AdaptiveHoeffdingModelsMap", AdaptiveHoeffdingModelsHolder.getInstance().getClonedHoeffdingModelMap());
        return hashMap;
    }

    public void restoreState(Map<String, Object> map) {
        AdaptiveHoeffdingModelsHolder.getInstance().setHoeffdingModelMap((Map) map.get("AdaptiveHoeffdingModelsMap"));
    }
}
