package _ss_com.streamsets.datacollector.runner.preview;

import _ss_com.streamsets.datacollector.config.StageType;
import _ss_com.streamsets.datacollector.main.RuntimeInfo;
import _ss_com.streamsets.datacollector.metrics.MetricsConfigurator;
import _ss_com.streamsets.datacollector.restapi.bean.MetricRegistryJson;
import _ss_com.streamsets.datacollector.runner.BatchListener;
import _ss_com.streamsets.datacollector.runner.FullPipeBatch;
import _ss_com.streamsets.datacollector.runner.MultiplexerPipe;
import _ss_com.streamsets.datacollector.runner.Observer;
import _ss_com.streamsets.datacollector.runner.ObserverPipe;
import _ss_com.streamsets.datacollector.runner.Pipe;
import _ss_com.streamsets.datacollector.runner.PipelineRunner;
import _ss_com.streamsets.datacollector.runner.PipelineRuntimeException;
import _ss_com.streamsets.datacollector.runner.SourceOffsetTracker;
import _ss_com.streamsets.datacollector.runner.StageOutput;
import _ss_com.streamsets.datacollector.runner.StagePipe;
import _ss_com.streamsets.datacollector.runner.production.BadRecordsHandler;
import _ss_com.streamsets.datacollector.runner.production.StatsAggregationHandler;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.streamsets.pipeline.api.StageException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:_ss_com/streamsets/datacollector/runner/preview/PreviewPipelineRunner.class */
public class PreviewPipelineRunner implements PipelineRunner {
    private final RuntimeInfo runtimeInfo;
    private final SourceOffsetTracker offsetTracker;
    private final int batchSize;
    private final int batches;
    private final boolean skipTargets;
    private final MetricRegistry metrics = new MetricRegistry();
    private final List<List<StageOutput>> batchesOutput = new ArrayList();
    private final String name;
    private final String rev;
    private String sourceOffset;
    private String newSourceOffset;
    private final Timer processingTimer;

    public PreviewPipelineRunner(String str, String str2, RuntimeInfo runtimeInfo, SourceOffsetTracker sourceOffsetTracker, int i, int i2, boolean z) {
        this.name = str;
        this.rev = str2;
        this.runtimeInfo = runtimeInfo;
        this.offsetTracker = sourceOffsetTracker;
        this.batchSize = i;
        this.batches = i2;
        this.skipTargets = z;
        this.processingTimer = MetricsConfigurator.createTimer(this.metrics, "pipeline.batchProcessing", str, str2);
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public MetricRegistryJson getMetricRegistryJson() {
        return null;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void errorNotification(Pipe[] pipeArr, Throwable th) {
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public RuntimeInfo getRuntimeInfo() {
        return this.runtimeInfo;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public boolean isPreview() {
        return true;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public MetricRegistry getMetrics() {
        return this.metrics;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void run(Pipe[] pipeArr, BadRecordsHandler badRecordsHandler, StatsAggregationHandler statsAggregationHandler) throws StageException, PipelineRuntimeException {
        run(pipeArr, badRecordsHandler, Collections.EMPTY_LIST, statsAggregationHandler);
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void run(Pipe[] pipeArr, BadRecordsHandler badRecordsHandler, List<StageOutput> list, StatsAggregationHandler statsAggregationHandler) throws StageException, PipelineRuntimeException {
        HashMap hashMap = new HashMap();
        for (StageOutput stageOutput : list) {
            hashMap.put(stageOutput.getInstanceName(), stageOutput);
        }
        for (int i = 0; i < this.batches; i++) {
            FullPipeBatch fullPipeBatch = new FullPipeBatch(this.offsetTracker, this.batchSize, true);
            long currentTimeMillis = System.currentTimeMillis();
            this.sourceOffset = fullPipeBatch.getPreviousOffset();
            for (Pipe pipe : pipeArr) {
                StageOutput stageOutput2 = (StageOutput) hashMap.get(pipe.getStage().getInfo().getInstanceName());
                if (stageOutput2 == null || (pipe instanceof ObserverPipe) || (pipe instanceof MultiplexerPipe)) {
                    if (!this.skipTargets || pipe.getStage().getDefinition().getType() != StageType.TARGET) {
                        pipe.process(fullPipeBatch);
                    }
                } else if (pipe instanceof StagePipe) {
                    fullPipeBatch.overrideStageOutput((StagePipe) pipe, stageOutput2);
                }
            }
            this.offsetTracker.commitOffset();
            this.processingTimer.update(System.currentTimeMillis() - currentTimeMillis, TimeUnit.MILLISECONDS);
            this.newSourceOffset = this.offsetTracker.getOffset();
            this.batchesOutput.add(fullPipeBatch.getSnapshotsOfAllStagesOutput());
        }
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void destroy(Pipe[] pipeArr, BadRecordsHandler badRecordsHandler, StatsAggregationHandler statsAggregationHandler) throws StageException, PipelineRuntimeException {
        FullPipeBatch fullPipeBatch = new FullPipeBatch(this.offsetTracker, this.batchSize, true);
        for (Pipe pipe : pipeArr) {
            pipe.destroy(fullPipeBatch);
        }
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public List<List<StageOutput>> getBatchesOutput() {
        return this.batchesOutput;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public String getSourceOffset() {
        return this.sourceOffset;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public String getNewSourceOffset() {
        return this.newSourceOffset;
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void setObserver(Observer observer) {
    }

    @Override // _ss_com.streamsets.datacollector.runner.PipelineRunner
    public void registerListener(BatchListener batchListener) {
    }
}
