package org.wso2.extension.siddhi.execution.ml.samoa.utils;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import org.apache.samoa.core.ContentEvent;
import org.apache.samoa.core.EntranceProcessor;
import org.apache.samoa.core.Processor;
import org.apache.samoa.instances.Instance;
import org.apache.samoa.instances.Instances;
import org.apache.samoa.moa.options.AbstractOptionHandler;
import org.apache.samoa.streams.InstanceStream;
import org.apache.samoa.streams.StreamSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/wso2/extension/siddhi/execution/ml/samoa/utils/SourceProcessor.class */
public abstract class SourceProcessor implements EntranceProcessor {
    private static final long serialVersionUID = 4169053337917578558L;
    private static final Logger logger = LoggerFactory.getLogger(SourceProcessor.class);
    protected transient ScheduledExecutorService timer;
    protected int delay;
    private StreamSource streamSource;
    protected Instance firstInstance;
    protected int maxEvents;
    private int interval;
    protected transient ScheduledFuture<?> schedule = null;
    private int readyEventIndex = 1;
    protected int numberOfInstancesSent = 0;
    protected boolean finished = false;
    private boolean isInitialized = false;

    @Override // org.apache.samoa.core.EntranceProcessor
    public abstract ContentEvent nextEvent();

    @Override // org.apache.samoa.core.Processor
    public abstract Processor newProcessor(Processor processor);

    @Override // org.apache.samoa.core.Processor
    public boolean process(ContentEvent contentEvent) {
        return false;
    }

    @Override // org.apache.samoa.core.EntranceProcessor, org.apache.samoa.core.Processor
    public void onCreate(int i) {
        this.timer = Executors.newScheduledThreadPool(1);
        logger.info("Creating PrequentialSourceProcessor with processId {}", Integer.valueOf(i));
    }

    @Override // org.apache.samoa.core.EntranceProcessor
    public boolean hasNext() {
        return !isFinished() && (this.delay <= 0 || this.numberOfInstancesSent < this.readyEventIndex);
    }

    @Override // org.apache.samoa.core.EntranceProcessor
    public boolean isFinished() {
        return hasReachedEndOfStream();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean hasReachedEndOfStream() {
        if (this.streamSource.hasMoreInstances() && (this.maxEvents < 0 || this.numberOfInstancesSent < this.maxEvents)) {
            return false;
        }
        this.finished = true;
        return true;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void setStreamSource(InstanceStream instanceStream) {
        if (instanceStream instanceof AbstractOptionHandler) {
            ((AbstractOptionHandler) instanceStream).prepareForUse();
        }
        this.streamSource = new StreamSource(instanceStream);
        this.firstInstance = this.streamSource.nextInstance().getData();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Instance nextInstance() {
        if (this.isInitialized) {
            return this.streamSource.nextInstance().getData();
        }
        this.isInitialized = true;
        return this.firstInstance;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void increaseReadyEventIndex() {
        this.readyEventIndex += this.interval;
        if (this.schedule == null || !isFinished()) {
            return;
        }
        this.schedule.cancel(false);
    }

    public Instances getDataset() {
        return this.firstInstance.dataset();
    }

    public void setMaxEvents(int i) {
        this.maxEvents = i;
    }

    public void setSourceDelay(int i) {
        this.delay = i;
    }

    public void setDelayBatchSize(int i) {
        this.interval = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamSource getStreamSource() {
        return this.streamSource;
    }
}
