package org.apache.crunch.impl.spark;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.CachingOptions;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.PipelineExecution;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.dist.DistributedPipeline;
import org.apache.crunch.impl.dist.collect.PCollectionImpl;
import org.apache.crunch.impl.spark.collect.EmptyPCollection;
import org.apache.crunch.impl.spark.collect.EmptyPTable;
import org.apache.crunch.impl.spark.collect.SparkCollectFactory;
import org.apache.crunch.materialize.MaterializableIterable;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;

/* loaded from: input_file:org/apache/crunch/impl/spark/SparkPipeline.class */
public class SparkPipeline extends DistributedPipeline {
    private static final Log LOG = LogFactory.getLog(SparkPipeline.class);
    private final String sparkConnect;
    private JavaSparkContext sparkContext;
    private Class<?> jarClass;
    private final Map<PCollection<?>, StorageLevel> cachedCollections;

    public SparkPipeline(String str, String str2) {
        this(str, str2, null);
    }

    public SparkPipeline(String str, String str2, Class<?> cls) {
        this(str, str2, cls, new Configuration());
    }

    public SparkPipeline(String str, String str2, Class<?> cls, Configuration configuration) {
        super(str2, configuration, new SparkCollectFactory());
        this.cachedCollections = Maps.newHashMap();
        this.sparkConnect = (String) Preconditions.checkNotNull(str);
        this.jarClass = cls;
    }

    public SparkPipeline(JavaSparkContext javaSparkContext, String str) {
        super(str, javaSparkContext.hadoopConfiguration(), new SparkCollectFactory());
        this.cachedCollections = Maps.newHashMap();
        this.sparkContext = (JavaSparkContext) Preconditions.checkNotNull(javaSparkContext);
        this.sparkConnect = (String) javaSparkContext.getSparkHome().orNull();
    }

    public <T> Iterable<T> materialize(PCollection<T> pCollection) {
        MaterializableIterable materializableIterable = new MaterializableIterable(this, getMaterializeSourceTarget(pCollection));
        if (!this.outputTargetsToMaterialize.containsKey(pCollection)) {
            this.outputTargetsToMaterialize.put((PCollectionImpl) pCollection, materializableIterable);
        }
        return materializableIterable;
    }

    public <S> PCollection<S> emptyPCollection(PType<S> pType) {
        return new EmptyPCollection(this, pType);
    }

    public <K, V> PTable<K, V> emptyPTable(PTableType<K, V> pTableType) {
        return new EmptyPTable(this, pTableType);
    }

    public <T> void cache(PCollection<T> pCollection, CachingOptions cachingOptions) {
        this.cachedCollections.put(pCollection, toStorageLevel(cachingOptions));
    }

    private StorageLevel toStorageLevel(CachingOptions cachingOptions) {
        return StorageLevel.apply(cachingOptions.useDisk(), cachingOptions.useMemory(), cachingOptions.deserialized(), cachingOptions.replicas());
    }

    public PipelineResult run() {
        try {
            PipelineExecution runAsync = runAsync();
            runAsync.waitUntilDone();
            return runAsync.getResult();
        } catch (Exception e) {
            LOG.error("Exception running pipeline", e);
            return PipelineResult.EMPTY;
        }
    }

    public PipelineExecution runAsync() {
        String[] jarOfClass;
        HashMap newHashMap = Maps.newHashMap();
        for (PCollectionImpl pCollectionImpl : this.outputTargets.keySet()) {
            if (this.outputTargetsToMaterialize.containsKey(pCollectionImpl)) {
                newHashMap.put(pCollectionImpl, this.outputTargetsToMaterialize.get(pCollectionImpl));
                this.outputTargetsToMaterialize.remove(pCollectionImpl);
            }
        }
        Configuration configuration = getConfiguration();
        if (this.sparkContext == null) {
            SparkConf sparkConf = new SparkConf();
            Iterator it = configuration.iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                if (((String) entry.getKey()).startsWith("spark.")) {
                    sparkConf.set((String) entry.getKey(), (String) entry.getValue());
                }
            }
            this.sparkContext = new JavaSparkContext(this.sparkConnect, getName(), sparkConf);
            if (this.jarClass != null && (jarOfClass = JavaSparkContext.jarOfClass(this.jarClass)) != null && jarOfClass.length > 0) {
                for (String str : jarOfClass) {
                    this.sparkContext.addJar(str);
                }
            }
        }
        copyConfiguration(configuration, this.sparkContext.hadoopConfiguration());
        SparkRuntime sparkRuntime = new SparkRuntime(this, this.sparkContext, configuration, this.outputTargets, newHashMap, this.cachedCollections, this.allPipelineCallables);
        sparkRuntime.execute();
        this.outputTargets.clear();
        return sparkRuntime;
    }

    public PipelineResult done() {
        PipelineResult done = super.done();
        if (this.sparkContext != null) {
            this.sparkContext.stop();
            this.sparkContext = null;
        }
        return done;
    }

    private static void copyConfiguration(Configuration configuration, Configuration configuration2) {
        Iterator it = configuration.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            configuration2.set((String) entry.getKey(), (String) entry.getValue());
        }
    }
}
