package io.siddhi.extension.execution.streamingml.bayesian.regression;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.ParameterOverload;
import io.siddhi.annotation.ReturnAttribute;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.event.ComplexEventChunk;
import io.siddhi.core.event.stream.MetaStreamEvent;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventCloner;
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
import io.siddhi.core.query.processor.ProcessingMode;
import io.siddhi.core.query.processor.Processor;
import io.siddhi.core.query.processor.stream.StreamProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.extension.execution.streamingml.bayesian.regression.util.LinearRegressionModelHolder;
import io.siddhi.extension.execution.streamingml.bayesian.util.BayesianModel;
import io.siddhi.extension.execution.streamingml.bayesian.util.LinearRegression;
import io.siddhi.extension.execution.streamingml.util.CoreUtils;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.log4j.Logger;

@Extension(name = "updateBayesianRegression", namespace = "streamingml", description = "This extension builds/updates a linear Bayesian regression model. This extension uses an improved version of stochastic variational inference.", parameters = {@Parameter(name = "model.name", description = "The name of the model to be built.", type = {DataType.STRING}), @Parameter(name = "model.target", description = "The target attribute (dependant variable) of the input stream.", type = {DataType.INT, DataType.DOUBLE, DataType.LONG, DataType.FLOAT}, dynamic = true), @Parameter(name = "model.samples", description = "Number of samples used to construct the gradients.", type = {DataType.INT}, optional = true, defaultValue = "1"), @Parameter(name = "model.optimizer", description = "The type of optimization used", type = {DataType.STRING}, optional = true, defaultValue = "ADAM"), @Parameter(name = "learning.rate", description = "The learning rate of the updater", type = {DataType.DOUBLE}, optional = true, defaultValue = "0.05"), @Parameter(name = "model.feature", description = "Features of the model that need to be attributes of the stream.", type = {DataType.DOUBLE, DataType.FLOAT, DataType.INT, DataType.LONG}, dynamic = true)}, parameterOverloads = {@ParameterOverload(parameterNames = {"model.name", "model.target", "model.feature", "..."}), @ParameterOverload(parameterNames = {"model.name", "model.target", "model.samples", "model.feature", "..."}), @ParameterOverload(parameterNames = {"model.name", "model.target", "model.optimizer", "model.feature", "..."}), @ParameterOverload(parameterNames = {"model.name", "model.target", "learning.rate", "model.feature", "..."}), @ParameterOverload(parameterNames = {"model.name", "model.target", "model.samples", "model.optimizer", "model.feature", "..."}), @ParameterOverload(parameterNames = {"model.name", "model.target", "model.samples", "learning.rate", "model.feature", "..."}), @ParameterOverload(parameterNames = {"model.name", "model.target", "model.optimizer", "learning.rate", "model.feature", "..."}), @ParameterOverload(parameterNames = {"model.name", "model.target", "model.samples", "model.optimizer", "learning.rate", "model.feature", "..."})}, returnAttributes = {@ReturnAttribute(name = "loss", description = " loss of the model.", type = {DataType.DOUBLE})}, examples = {@Example(syntax = "define stream StreamA (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double, attribute_4 double );\n\nfrom StreamA#streamingml:updateBayesianRegression('model1', attribute_4, attribute_0, attribute_1, attribute_2, attribute_3) \ninsert all events into outputStream;", description = "This query builds/updates a Bayesian Linear regression model named `model1` using `attribute_0`, `attribute_1`, `attribute_2`, and `attribute_3` as features, and `attribute_4` as the label. Updated weights of the model are emitted to the OutputStream stream."), @Example(syntax = "define stream StreamA (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double, attribute_4 double );\n\nfrom StreamA#streamingml:updateBayesianRegression('model1', attribute_4, 2, 'NADAM', 0.01, attribute_0, attribute_1, attribute_2, attribute_3) \ninsert all events into outputStream;", description = "This query builds/updates a Bayesian Linear regression model named `model1` with a `0.01` learning rate using `attribute_0`, `attribute_1`, `attribute_2`, and `attribute_3` as features, and `attribute_4` as the label. Updated weights of the model are emitted to the OutputStream stream. This model draws two samples during monte-carlo integration and uses NADAM optimizer.")})
/* loaded from: input_file:io/siddhi/extension/execution/streamingml/bayesian/regression/BayesianRegressionUpdaterStreamProcessorExtension.class */
public class BayesianRegressionUpdaterStreamProcessorExtension extends StreamProcessor<ExtensionState> {
    private static Logger logger = Logger.getLogger(BayesianRegressionUpdaterStreamProcessorExtension.class);
    private String modelName;
    private int numberOfFeatures;
    private VariableExpressionExecutor targetVariableExpressionExecutor;
    private List<VariableExpressionExecutor> featureVariableExpressionExecutors = new ArrayList();
    private ArrayList<Attribute> attributes;

    /* loaded from: input_file:io/siddhi/extension/execution/streamingml/bayesian/regression/BayesianRegressionUpdaterStreamProcessorExtension$ExtensionState.class */
    static class ExtensionState extends State {
        private static final String KEY_BAYSEIAN_REGRESSION_MODEL = "BayesianRegressionModel";
        private final Map<String, Object> state;
        private final String modelName;

        private ExtensionState(String str) {
            this.state = new HashMap();
            this.modelName = str;
        }

        public boolean canDestroy() {
            return false;
        }

        public Map<String, Object> snapshot() {
            this.state.put(KEY_BAYSEIAN_REGRESSION_MODEL, LinearRegressionModelHolder.getInstance().getClonedLinearRegressionModel(this.modelName));
            return this.state;
        }

        public void restore(Map<String, Object> map) {
            LinearRegression linearRegression = (LinearRegression) this.state.get(KEY_BAYSEIAN_REGRESSION_MODEL);
            linearRegression.initiateModel();
            LinearRegressionModelHolder.getInstance().addLinearRegressionModel(this.modelName, linearRegression);
        }
    }

    protected StateFactory<ExtensionState> init(MetaStreamEvent metaStreamEvent, AbstractDefinition abstractDefinition, ExpressionExecutor[] expressionExecutorArr, ConfigReader configReader, StreamEventClonerHolder streamEventClonerHolder, boolean z, boolean z2, SiddhiQueryContext siddhiQueryContext) {
        String name = siddhiQueryContext.getSiddhiAppContext().getName();
        double d = -1.0d;
        int i = -1;
        BayesianModel.OptimizerType optimizerType = null;
        int size = this.inputDefinition.getAttributeList().size() - 1;
        if (this.attributeExpressionLength < 3) {
            throw new SiddhiAppCreationException(String.format("Invalid number of parameters [%s] for streamingml:updateBayesianRegression. Expect at least %s parameters", Integer.valueOf(this.attributeExpressionLength), 3));
        }
        if (this.attributeExpressionLength > 5 + size) {
            throw new SiddhiAppCreationException(String.format("Invalid number of parameters for streamingml:updateBayesianRegression. This Stream Processor requires at most %s parameters, namely, model.name, model.target, model.samples[optional], model.optimizer[optional], learning.rate[optional], model.features. but found %s parameters", Integer.valueOf(5 + size), Integer.valueOf(this.attributeExpressionLength)));
        }
        if (!(expressionExecutorArr[0] instanceof ConstantExpressionExecutor)) {
            throw new SiddhiAppCreationException("Parameter model.name must be a constant but found " + expressionExecutorArr[0].getClass().getCanonicalName());
        }
        if (expressionExecutorArr[0].getReturnType() != Attribute.Type.STRING) {
            throw new SiddhiAppCreationException("Invalid parameter type found for the model.name argument, required " + Attribute.Type.STRING + " but found " + expressionExecutorArr[0].getReturnType().toString());
        }
        String str = (String) ((ConstantExpressionExecutor) expressionExecutorArr[0]).getValue();
        this.modelName = str + "." + name;
        if (LinearRegressionModelHolder.getInstance().getLinearRegressionMap().containsKey(this.modelName)) {
            throw new SiddhiAppCreationException("A model already exists with name the " + str + ". Use a different value for model.name argument.");
        }
        if (!(this.attributeExpressionExecutors[1] instanceof VariableExpressionExecutor)) {
            throw new SiddhiAppCreationException("model.target attribute in updateBayesianRegression should be a variable, but found a " + this.attributeExpressionExecutors[1].getClass().getCanonicalName());
        }
        this.targetVariableExpressionExecutor = this.attributeExpressionExecutors[1];
        Attribute.Type attributeType = this.inputDefinition.getAttributeType(this.targetVariableExpressionExecutor.getAttribute().getName());
        if (!CoreUtils.isNumeric(attributeType)) {
            throw new SiddhiAppCreationException(String.format("[model.target] %s in updateBayesianRegression should be a numeric. But found %s", this.targetVariableExpressionExecutor.getAttribute().getName(), attributeType.name()));
        }
        int i2 = 2;
        while (true) {
            if (!(expressionExecutorArr[i2] instanceof ConstantExpressionExecutor)) {
                break;
            }
            if (expressionExecutorArr[i2].getReturnType() == Attribute.Type.INT) {
                if (i2 != 2) {
                    throw new SiddhiAppCreationException(String.format("%dth parameter cannot be type of %s. Only model.sample can be %s, which can be set as the %dth parameter.", Integer.valueOf(i2), Attribute.Type.INT, Attribute.Type.INT, 2));
                }
                int intValue = ((Integer) ((ConstantExpressionExecutor) expressionExecutorArr[i2]).getValue()).intValue();
                if (intValue <= 0) {
                    throw new SiddhiAppCreationException(String.format("model.sample should be greater than zero.But found %d", Integer.valueOf(intValue)));
                }
                i = intValue;
                i2++;
            } else if (expressionExecutorArr[i2].getReturnType() == Attribute.Type.STRING) {
                if (i2 > 2 + 1) {
                    throw new SiddhiAppCreationException(String.format("%dth parameter cannot be type of %s. Only model.optimizer can be %s.", Integer.valueOf(i2), Attribute.Type.STRING, Attribute.Type.STRING));
                }
                if (optimizerType != null) {
                    throw new SiddhiAppCreationException(String.format("%dth parameter cannot be type of %s. Only model.optimizer can be %s, which is already set to %s.", Integer.valueOf(i2), Attribute.Type.STRING, Attribute.Type.STRING, optimizerType));
                }
                String str2 = (String) ((ConstantExpressionExecutor) expressionExecutorArr[i2]).getValue();
                try {
                    optimizerType = BayesianModel.OptimizerType.valueOf(str2.toUpperCase(Locale.ENGLISH));
                    i2++;
                } catch (Exception e) {
                    throw new SiddhiAppCreationException(String.format("model.optimizer should be one of %s. But found %s", Arrays.toString(BayesianModel.OptimizerType.values()), str2));
                }
            } else {
                if (expressionExecutorArr[i2].getReturnType() != Attribute.Type.DOUBLE) {
                    throw new SiddhiAppCreationException(String.format("Invalid parameter type found. Expected: %s or %s or %s. But found %s", Attribute.Type.INT, Attribute.Type.STRING, Attribute.Type.DOUBLE, expressionExecutorArr[2].getReturnType().toString()));
                }
                double doubleValue = ((Double) ((ConstantExpressionExecutor) expressionExecutorArr[i2]).getValue()).doubleValue();
                if (doubleValue <= 0.0d) {
                    throw new SiddhiAppCreationException(String.format("learning.rate should be greater than zero. But found %f", Double.valueOf(doubleValue)));
                }
                d = doubleValue;
                i2++;
            }
        }
        if (!(expressionExecutorArr[i2] instanceof VariableExpressionExecutor)) {
            throw new SiddhiAppCreationException("Parameter " + i2 + " must either be a constant (hyperparameter) or an attribute of the stream (model.features), but found a " + expressionExecutorArr[i2].getClass().getCanonicalName());
        }
        this.numberOfFeatures = this.attributeExpressionLength - i2;
        this.featureVariableExpressionExecutors = CoreUtils.extractAndValidateFeatures(this.inputDefinition, expressionExecutorArr, i2, this.numberOfFeatures);
        LinearRegression linearRegression = new LinearRegression();
        LinearRegressionModelHolder.getInstance().addLinearRegressionModel(this.modelName, linearRegression);
        if (d != -1.0d) {
            logger.debug("set learning rate to : " + d);
            linearRegression.setLearningRate(d);
        }
        if (i != -1) {
            logger.debug("set number of samples to : " + i);
            linearRegression.setNumSamples(i);
        }
        if (optimizerType != null) {
            logger.debug("set optimizer to : " + optimizerType);
            linearRegression.setOptimizerType(optimizerType);
        }
        if (linearRegression.getNumFeatures() == -1) {
            linearRegression.setNumFeatures(this.numberOfFeatures);
            linearRegression.initiateModel();
        } else if (this.numberOfFeatures != linearRegression.getNumFeatures()) {
            throw new SiddhiAppCreationException(String.format("Model [%s] expects %s features, but the streamingml:updateBayesianRegression specifies %s features", str, Integer.valueOf(linearRegression.getNumFeatures()), Integer.valueOf(this.numberOfFeatures)));
        }
        this.attributes = new ArrayList<>();
        this.attributes.add(new Attribute("loss", Attribute.Type.DOUBLE));
        return () -> {
            return new ExtensionState(this.modelName);
        };
    }

    protected void process(ComplexEventChunk<StreamEvent> complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, ExtensionState extensionState) {
        synchronized (this) {
            while (complexEventChunk.hasNext()) {
                StreamEvent next = complexEventChunk.next();
                if (logger.isDebugEnabled()) {
                    logger.debug(String.format("Event received; Model name: %s Event:%s", this.modelName, next));
                }
                double[] dArr = {((Number) this.targetVariableExpressionExecutor.execute(next)).doubleValue()};
                double[] dArr2 = new double[this.numberOfFeatures];
                for (int i = 0; i < this.numberOfFeatures; i++) {
                    dArr2[i] = ((Number) this.featureVariableExpressionExecutors.get(i).execute(next)).doubleValue();
                }
                complexEventPopulater.populateComplexEvent(next, new Object[]{Double.valueOf(LinearRegressionModelHolder.getInstance().getLinearRegressionModel(this.modelName).update(dArr2, dArr)[0])});
            }
        }
        this.nextProcessor.process(complexEventChunk);
    }

    public List<Attribute> getReturnAttributes() {
        return this.attributes;
    }

    public void start() {
    }

    public void stop() {
        LinearRegressionModelHolder.getInstance().deleteLinearRegressionModel(this.modelName);
    }

    public ProcessingMode getProcessingMode() {
        return ProcessingMode.BATCH;
    }

    protected /* bridge */ /* synthetic */ void process(ComplexEventChunk complexEventChunk, Processor processor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater, State state) {
        process((ComplexEventChunk<StreamEvent>) complexEventChunk, processor, streamEventCloner, complexEventPopulater, (ExtensionState) state);
    }
}
