package org.wso2.extension.siddhi.execution.ml.samoa.utils.clustering;

import java.util.Iterator;
import java.util.Queue;
import org.apache.samoa.learners.Learner;
import org.apache.samoa.learners.clusterers.simple.ClusteringDistributorProcessor;
import org.apache.samoa.moa.cluster.Clustering;
import org.apache.samoa.streams.InstanceStream;
import org.apache.samoa.topology.Stream;
import org.apache.samoa.topology.TopologyBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.extension.siddhi.execution.ml.samoa.utils.ProcessTask;
import org.wso2.extension.siddhi.execution.ml.samoa.utils.clustering.StreamingClusteringEvaluationProcessor;

/* loaded from: input_file:org/wso2/extension/siddhi/execution/ml/samoa/utils/clustering/StreamingClusteringTask.class */
public class StreamingClusteringTask extends ProcessTask {
    private static final long serialVersionUID = 33336;
    private static final int DISTRIBUTOR_PARALLELISM = 1;
    private static final Logger logger = LoggerFactory.getLogger(StreamingClusteringTask.class);
    private ClusteringDistributorProcessor distributor;
    public Queue<Clustering> samoaClusters;
    public int numberOfClusters;

    @Override // org.wso2.extension.siddhi.execution.ml.samoa.utils.ProcessTask, org.apache.samoa.tasks.Task
    public void init() {
        if (this.builder == null) {
            this.builder = new TopologyBuilder();
            this.builder.initTopology(this.evaluationNameOption.getValue(), this.sourceDelayOption.getValue());
            logger.debug("Successfully initialized SAMOA topology with name {}", this.evaluationNameOption.getValue());
        }
        this.source = new StreamingClusteringEntranceProcessor();
        this.inputStream = (InstanceStream) this.streamTrainOption.getValue();
        if (this.inputStream instanceof StreamingClusteringStream) {
            logger.info("DataStream is a StreamingClusteringStream");
            ((StreamingClusteringStream) this.inputStream).setCepEvents(this.cepEvents);
        } else {
            logger.info("Check DataStream: DataStream is not a StreamingClusteringStream");
        }
        this.source.setStreamSource(this.inputStream);
        this.builder.addEntranceProcessor(this.source);
        this.source.setMaxEvents(this.instanceLimitOption.getValue());
        this.distributor = new ClusteringDistributorProcessor();
        this.builder.addProcessor(this.distributor, 1);
        this.builder.connectInputShuffleStream(this.builder.createStream(this.source), this.distributor);
        this.sourcePiOutputStream = this.builder.createStream(this.distributor);
        this.distributor.setOutputStream(this.sourcePiOutputStream);
        Stream createStream = this.builder.createStream(this.distributor);
        this.distributor.setEvaluationStream(createStream);
        this.learner = (Learner) this.learnerOption.getValue();
        this.learner.init(this.builder, this.source.getDataset(), 1);
        this.builder.connectInputShuffleStream(this.sourcePiOutputStream, this.learner.getInputProcessor());
        StreamingClusteringEvaluationProcessor build = new StreamingClusteringEvaluationProcessor.Builder(this.sampleFrequencyOption.getValue()).dumpFile(this.dumpFileOption.getFile()).build();
        build.setSamoaClusters(this.samoaClusters);
        build.setNumberOfClusters(this.numberOfClusters);
        this.builder.addProcessor(build);
        Iterator<Stream> it = this.learner.getResultStreams().iterator();
        while (it.hasNext()) {
            this.builder.connectInputShuffleStream(it.next(), build);
        }
        this.builder.connectInputAllStream(createStream, build);
        this.topology = this.builder.build();
        logger.info("Successfully built the topology");
    }

    public void setSamoaClusters(Queue<Clustering> queue) {
        this.samoaClusters = queue;
    }

    public void setNumberOfClusters(int i) {
        this.numberOfClusters = i;
    }
}
