package org.apache.sysds.runtime.controlprogram.parfor;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.util.LongAccumulator;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.parser.ParForStatementBlock;
import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.utils.Statistics;

/* loaded from: input_file:org/apache/sysds/runtime/controlprogram/parfor/RemoteParForSpark.class */
public class RemoteParForSpark {
    protected static final Log LOG = LogFactory.getLog(RemoteParForSpark.class.getName());
    private static final IDSequence _jobID = new IDSequence();

    public static RemoteParForJobReturn runJob(long j, String str, HashMap<String, byte[]> hashMap, List<Task> list, ExecutionContext executionContext, Set<String> set, List<ParForStatementBlock.ResultVar> list2, boolean z, int i, boolean z2) {
        long nanoTime = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        SparkExecutionContext sparkExecutionContext = (SparkExecutionContext) executionContext;
        JavaSparkContext sparkContext = sparkExecutionContext.getSparkContext();
        boolean booleanValue = sparkContext.isLocal().booleanValue();
        LongAccumulator longAccumulator = sparkContext.sc().longAccumulator("tasks");
        LongAccumulator longAccumulator2 = sparkContext.sc().longAccumulator("iterations");
        long nextID = _jobID.getNextID();
        if (booleanValue) {
            RemoteParForSparkWorker.cleanupCachedVariables(nextID);
        }
        Map<String, Broadcast<CacheBlock<?>>> map = null;
        if (ParForProgramBlock.ALLOW_BROADCAST_INPUTS) {
            map = broadcastInputs(sparkExecutionContext, set);
        }
        LocalVariableMap[] results = RemoteParForUtils.getResults(sparkContext.parallelize(list, list.size()).flatMapToPair(new RemoteParForSparkWorker(nextID, str, booleanValue, hashMap, z, longAccumulator, longAccumulator2, map, z2, DMLScript.LINEAGE ? executionContext.getLineage().serialize() : null)).collect(), LOG);
        RemoteParForJobReturn remoteParForJobReturn = new RemoteParForJobReturn(true, longAccumulator.value().intValue(), longAccumulator2.value().intValue(), results, DMLScript.LINEAGE ? RemoteParForUtils.getLineages(results) : null);
        Statistics.incrementNoOfCompiledSPInst();
        Statistics.incrementNoOfExecutedSPInst();
        if (DMLScript.STATISTICS) {
            Statistics.maintainCPHeavyHitters("ParFor-ESP", System.nanoTime() - nanoTime);
        }
        return remoteParForJobReturn;
    }

    private static Map<String, Broadcast<CacheBlock<?>>> broadcastInputs(SparkExecutionContext sparkExecutionContext, Set<String> set) {
        HashMap hashMap = new HashMap();
        for (String str : set) {
            Data variable = sparkExecutionContext.getVariable(str);
            if (!(variable instanceof ScalarObject) && (!(variable instanceof MatrixObject) || !((MatrixObject) variable).isPartitioned())) {
                hashMap.put(str, sparkExecutionContext.broadcastVariable((CacheableData) variable));
            }
        }
        return hashMap;
    }
}
