package org.wso2.extension.siddhi.execution.ml.samoa.utils.regression;

import java.util.Iterator;
import java.util.Queue;
import java.util.Vector;
import org.apache.samoa.evaluation.PerformanceEvaluator;
import org.apache.samoa.learners.Learner;
import org.apache.samoa.streams.InstanceStream;
import org.apache.samoa.topology.Stream;
import org.apache.samoa.topology.TopologyBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.extension.siddhi.execution.ml.samoa.utils.ProcessTask;
import org.wso2.extension.siddhi.execution.ml.samoa.utils.regression.StreamingRegressionEvaluationProcessor;
import org.wso2.siddhi.core.exception.SiddhiAppRuntimeException;

/* loaded from: input_file:org/wso2/extension/siddhi/execution/ml/samoa/utils/regression/StreamingRegressionTask.class */
public class StreamingRegressionTask extends ProcessTask {
    private static Logger logger = LoggerFactory.getLogger(StreamingRegression.class);
    private static final long serialVersionUID = 11114;
    public Queue<Vector> samoaPredictions;

    @Override // org.wso2.extension.siddhi.execution.ml.samoa.utils.ProcessTask, org.apache.samoa.tasks.Task
    public void init() {
        this.inputStream = (InstanceStream) this.streamTrainOption.getValue();
        if (!(this.inputStream instanceof StreamingRegressionStream)) {
            throw new SiddhiAppRuntimeException("Check stream: DataStream is not a StreamingRegressionStream");
        }
        ((StreamingRegressionStream) this.inputStream).setCepEvents(this.cepEvents);
        if (this.builder == null) {
            this.builder = new TopologyBuilder();
            this.builder.initTopology(this.evaluationNameOption.getValue());
            logger.debug("Successfully initialized SAMOA topology with name {}", this.evaluationNameOption.getValue());
        }
        this.source = new StreamingRegressionEntranceProcessor();
        this.source.setStreamSource(this.inputStream);
        this.builder.addEntranceProcessor(this.source);
        this.source.setMaxEvents(this.instanceLimitOption.getValue());
        this.source.setSourceDelay(this.sourceDelayOption.getValue());
        this.source.setDelayBatchSize(this.batchDelayOption.getValue());
        this.sourcePiOutputStream = this.builder.createStream(this.source);
        this.learner = (Learner) this.learnerOption.getValue();
        this.learner.init(this.builder, this.source.getDataset(), 1);
        this.builder.connectInputShuffleStream(this.sourcePiOutputStream, this.learner.getInputProcessor());
        PerformanceEvaluator performanceEvaluator = (PerformanceEvaluator) this.evaluatorOption.getValue();
        if (!isLearnerAndEvaluatorCompatible(this.learner, performanceEvaluator)) {
            performanceEvaluator = getDefaultPerformanceEvaluatorForLearner(this.learner);
        }
        StreamingRegressionEvaluationProcessor build = new StreamingRegressionEvaluationProcessor.Builder(performanceEvaluator).samplingFrequency(this.sampleFrequencyOption.getValue()).dumpFile(this.dumpFileOption.getFile()).build();
        build.setSamoaPredictions(this.samoaPredictions);
        this.builder.addProcessor(build);
        Iterator<Stream> it = this.learner.getResultStreams().iterator();
        while (it.hasNext()) {
            this.builder.connectInputShuffleStream(it.next(), build);
        }
        this.topology = this.builder.build();
        logger.info("Successfully built the topology");
    }

    public void setSamoaData(Queue<Vector> queue) {
        this.samoaPredictions = queue;
    }
}
