package io.siddhi.extension.execution.streamingml.bayesian.classification;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ReturnAttribute;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.stream.StreamProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.extension.execution.streamingml.bayesian.classification.util.SoftmaxRegressionModelHolder;
import io.siddhi.extension.execution.streamingml.bayesian.util.SoftmaxRegression;
import io.siddhi.extension.execution.streamingml.util.CoreUtils;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;

@Extension(name = "bayesianClassification", namespace = "streamingml", description = "This extension predicts using a Bayesian multivariate logistic regression model. This Bayesian model allows determining the uncertainty of each prediction by estimating the full-predictive distribution", parameters = {@Parameter(name = "model.name", description = "The name of the model to be used.", type = {DataType.STRING}), @Parameter(name = "prediction.samples", description = "The number of samples to be drawn from the predictive distribution. Drawing more samples will improve the accuracy of the predictions", type = {DataType.INT}, optional = true, defaultValue = "1000"), @Parameter(name = "model.features", description = "The features of the model that need to be attributes of the stream.", type = {DataType.DOUBLE})}, returnAttributes = {@ReturnAttribute(name = "prediction", description = "The predicted label (string)", type = {DataType.DOUBLE}), @ReturnAttribute(name = "confidence", description = "Mean probability of the predictive distribution.", type = {DataType.DOUBLE})}, examples = {@Example(syntax = "define stream StreamA (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double);\n\nfrom StreamA#streamingml:bayesianRegression('model1', attribute_0, attribute_1, attribute_2, attribute_3) \ninsert all events into OutputStream;", description = "This query uses a Bayesian Softmax regression model named `model1` to predict the label of the feature vector represented by `attribute_0`, `attribute_1`, `attribute_2`, and `attribute_3`. The predicted label is emitted to the `OutputStream` streamalong with the prediction confidence (std of predictive distribution) and the feature vector. As a result, the OutputStream stream is defined as follows: (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double, prediction string, confidence double)."), @Example(syntax = "define stream StreamA (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double);\n\nfrom StreamA#streamingml:bayesianRegression('model1', 5000, attribute_0, attribute_1, attribute_2, attribute_3) \ninsert all events into OutputStream;", description = "This query uses a Bayesian Softmax regression model named `model1` to predict the label of the feature vector represented by `attribute_0`, `attribute_1`, `attribute_2`, and `attribute_3`. The label is estimated based on 5000 samples from the predictive distribution. The predicted label is emitted to the `OutputStream` streamalong with the confidence of the prediction (mean of predictive distribution) and the feature vector. As a result, the OutputStream stream is defined as follows: (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double, prediction string, confidence double).")})
/* loaded from: input_file:io/siddhi/extension/execution/streamingml/bayesian/classification/BayesianClassificationStreamProcessorExtension.class */
public class BayesianClassificationStreamProcessorExtension extends StreamProcessor<State> {
    private static Logger logger = Logger.getLogger(BayesianClassificationStreamProcessorExtension.class);
    private String modelName;
    private int numberOfFeatures;
    private List<VariableExpressionExecutor> featureVariableExpressionExecutors = new ArrayList();
    private SoftmaxRegression model;
    private List<Attribute> attributes;

    protected StateFactory<State> init(MetaStreamEvent metaStreamEvent, AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean z, boolean z2, SiddhiQueryContext siddhiQueryContext) {
        String name = siddhiQueryContext.getSiddhiAppContext().getName();
        int i = -1;
        int size = this.inputDefinition.getAttributeList().size();
        if (this.attributeExpressionLength < 2) {
            throw new SiddhiAppCreationException(String.format("Invalid number of parameters [%s] for streamingml:bayesianClassification. Expect at least %s parameters", Integer.valueOf(this.attributeExpressionLength), 2));
        }
        if (this.attributeExpressionLength > 2 + size) {
            throw new SiddhiAppCreationException(String.format("Invalid number of parameters for streamingml:bayesianClassification. This Stream Processor requires at most %s parameters, namely, model.name, prediction.samples[optional], model.features but found %s parameters", Integer.valueOf(2 + size), Integer.valueOf(this.attributeExpressionLength)));
        }
        if (!(this.attributeExpressionExecutors[0] instanceof ConstantExpressionExecutor)) {
            throw new SiddhiAppCreationException("Parameter model.name must be a constant but found " + this.attributeExpressionExecutors[0].getClass().getCanonicalName());
        }
        if (this.attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
            throw new SiddhiAppCreationException("Invalid parameter type found for the model.name argument, required " + Attribute.Type.STRING + " but found " + this.attributeExpressionExecutors[0].getReturnType().toString());
        }
        String str = (String) this.attributeExpressionExecutors[0].getValue();
        this.modelName = str + "." + name;
        if (this.attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor) {
            if (this.attributeExpressionExecutors[1].getReturnType() != Attribute.Type.INT) {
                throw new SiddhiAppCreationException("Invalid parameter type found for the prediction.samples argument. Expected: " + Attribute.Type.INT + " but found: " + this.attributeExpressionExecutors[1].getReturnType().toString());
            }
            int intValue = ((Integer) this.attributeExpressionExecutors[1].getValue()).intValue();
            if (intValue <= 0) {
                throw new SiddhiAppCreationException(String.format("Invalid parameter value found for the prediction.samples argument. Expected a value greater than zero, but found: %d", Integer.valueOf(intValue)));
            }
            i = intValue;
            if (!(this.attributeExpressionExecutors[2] instanceof VariableExpressionExecutor)) {
                throw new SiddhiAppCreationException("3rd Parameter must be an attribute of the stream (model.features), but found a " + this.attributeExpressionExecutors[2].getClass().getCanonicalName());
            }
            this.numberOfFeatures = this.attributeExpressionLength - 2;
            this.featureVariableExpressionExecutors = CoreUtils.extractAndValidateFeatures(this.inputDefinition, this.attributeExpressionExecutors, 2, this.numberOfFeatures);
        } else {
            if (!(this.attributeExpressionExecutors[1] instanceof VariableExpressionExecutor)) {
                throw new SiddhiAppCreationException("2nd Parameter must either be a constant (prediction.samples) or an attribute of the stream (model.features), but found a " + this.attributeExpressionExecutors[1].getClass().getCanonicalName());
            }
            this.numberOfFeatures = this.attributeExpressionLength - 1;
            this.featureVariableExpressionExecutors = CoreUtils.extractAndValidateFeatures(this.inputDefinition, this.attributeExpressionExecutors, 1, this.numberOfFeatures);
        }
        this.model = SoftmaxRegressionModelHolder.getInstance().getSoftmaxRegressionModel(this.modelName);
        if (this.model == null) {
            throw new SiddhiAppCreationException(String.format("Model [%s] needs to initialized prior to be used with streamingml:bayesianClassification. Perform streamingml:updateBayesianClassification process first.", this.modelName));
        }
        if (i != -1) {
            this.model.setPredictionSamples(i);
        }
        if (this.model.getNumFeatures() != -1 && this.numberOfFeatures != this.model.getNumFeatures()) {
            throw new SiddhiAppCreationException(String.format("Model [%s] expects %s features, but the streamingml:bayesianClassification specifies %s features", str, Integer.valueOf(this.model.getNumFeatures()), Integer.valueOf(this.numberOfFeatures)));
        }
        this.attributes = new ArrayList();
        this.attributes.add(new Attribute("prediction", Attribute.Type.DOUBLE));
        this.attributes.add(new Attribute("confidence", Attribute.Type.DOUBLE));
        return null;
    }

    public void start() {
    }

    public void stop() {
    }

    public List<Attribute> getReturnAttributes() {
        return this.attributes;
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, State state) {
        synchronized (this) {
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                if (logger.isDebugEnabled()) {
                    logger.debug(String.format("Event received; Model name: %s Event:%s", this.modelName, next));
                }
                double[] dArr = new double[this.numberOfFeatures];
                for (int i = 0; i < this.numberOfFeatures; i++) {
                    dArr[i] = ((Number) this.featureVariableExpressionExecutors.get(i).execute(next)).doubleValue();
                }
                Double[] predictWithStd = this.model.predictWithStd(dArr);
                complexEventPopulater.populateComplexEvent(next, new Object[]{this.model.getClassLabel(Integer.valueOf(predictWithStd[0].intValue())), predictWithStd[1]});
            }
        }
        this.nextProcessor.process(complexEventChunk);
    }

    public ProcessingMode getProcessingMode() {
        return ProcessingMode.BATCH;
    }
}
