package org.apache.beam.runners.spark;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineResources;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineResult;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.spark.SparkPipelineResult;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.translation.SparkBatchPortablePipelineTranslator;
import org.apache.beam.runners.spark.translation.SparkContextFactory;
import org.apache.beam.runners.spark.translation.SparkTranslationContext;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/spark/SparkPipelineRunner.class */
public class SparkPipelineRunner implements PortablePipelineRunner {
    private static final Logger LOG = LoggerFactory.getLogger(SparkPipelineRunner.class);
    private final SparkPipelineOptions pipelineOptions;

    public SparkPipelineRunner(SparkPipelineOptions sparkPipelineOptions) {
        this.pipelineOptions = sparkPipelineOptions;
    }

    public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) {
        SparkBatchPortablePipelineTranslator sparkBatchPortablePipelineTranslator = new SparkBatchPortablePipelineTranslator();
        RunnerApi.Pipeline trim = PipelineTrimmer.trim(pipeline, sparkBatchPortablePipelineTranslator.knownUrns());
        RunnerApi.Pipeline pipeline2 = trim.getComponents().getTransformsMap().values().stream().anyMatch(pTransform -> {
            return "beam:runner:executable_stage:v1".equals(pTransform.getSpec().getUrn());
        }) ? trim : GreedyPipelineFuser.fuse(trim).toPipeline();
        if (this.pipelineOptions.getFilesToStage() == null) {
            this.pipelineOptions.setFilesToStage(PipelineResources.detectClassPathResourcesToStage(SparkPipelineRunner.class.getClassLoader()));
            LOG.info("PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath");
        }
        SparkPipelineOptions.prepareFilesToStage(this.pipelineOptions);
        LOG.info("Will stage {} files. (Enable logging at DEBUG level to see which files will be staged.)", Integer.valueOf(this.pipelineOptions.getFilesToStage().size()));
        LOG.debug("Staging files: {}", this.pipelineOptions.getFilesToStage());
        JavaSparkContext sparkContext = SparkContextFactory.getSparkContext(this.pipelineOptions);
        LOG.info(String.format("Running job %s on Spark master %s", jobInfo.jobId(), sparkContext.master()));
        AggregatorsAccumulator.init(this.pipelineOptions, sparkContext);
        MetricsEnvironment.setMetricsSupported(false);
        MetricsAccumulator.init(this.pipelineOptions, sparkContext);
        SparkTranslationContext sparkTranslationContext = new SparkTranslationContext(sparkContext, this.pipelineOptions, jobInfo);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        SparkPipelineResult.PortableBatchMode portableBatchMode = new SparkPipelineResult.PortableBatchMode(newSingleThreadExecutor.submit(() -> {
            sparkBatchPortablePipelineTranslator.translate(pipeline2, sparkTranslationContext);
            LOG.info(String.format("Job %s: Pipeline translated successfully. Computing outputs", jobInfo.jobId()));
            sparkTranslationContext.computeOutputs();
            LOG.info(String.format("Job %s finished.", jobInfo.jobId()));
        }), sparkContext);
        new MetricsPusher(MetricsAccumulator.getInstance().m28value(), this.pipelineOptions.as(MetricsOptions.class), portableBatchMode).start();
        portableBatchMode.waitUntilFinish();
        newSingleThreadExecutor.shutdown();
        return portableBatchMode;
    }
}
