package org.apache.sysds.runtime.controlprogram.context;

import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.lang.annotation.Target;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.package$;
import org.apache.spark.storage.RDDInfo;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.LongAccumulator;
import org.apache.spark.util.SizeEstimator;
import org.apache.sysds.api.DMLException;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.api.mlcontext.MLContext;
import org.apache.sysds.api.mlcontext.MLContextUtil;
import org.apache.sysds.common.Types;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.lops.Checkpoint;
import org.apache.sysds.parser.DataExpression;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
import org.apache.sysds.runtime.compress.io.ReaderSparkCompressed;
import org.apache.sysds.runtime.controlprogram.Program;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.caching.TensorObject;
import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.data.TensorBlock;
import org.apache.sysds.runtime.data.TensorIndexes;
import org.apache.sysds.runtime.frame.data.FrameBlock;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.spark.DeCompressionSPInstruction;
import org.apache.sysds.runtime.instructions.spark.data.BroadcastObject;
import org.apache.sysds.runtime.instructions.spark.data.LineageObject;
import org.apache.sysds.runtime.instructions.spark.data.PartitionedBlock;
import org.apache.sysds.runtime.instructions.spark.data.PartitionedBroadcast;
import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
import org.apache.sysds.runtime.instructions.spark.functions.ComputeBinaryBlockNnzFunction;
import org.apache.sysds.runtime.instructions.spark.functions.CopyFrameBlockPairFunction;
import org.apache.sysds.runtime.instructions.spark.functions.CopyTextInputFunction;
import org.apache.sysds.runtime.instructions.spark.functions.CreateSparseBlockFunction;
import org.apache.sysds.runtime.instructions.spark.utils.FrameRDDConverterUtils;
import org.apache.sysds.runtime.instructions.spark.utils.RDDAggregateUtils;
import org.apache.sysds.runtime.instructions.spark.utils.SparkUtils;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.io.InputOutputInfo;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixCell;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.meta.TensorCharacteristics;
import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.HDFSTool;
import org.apache.sysds.runtime.util.UtilFunctions;
import org.apache.sysds.utils.MLContextProxy;
import org.apache.sysds.utils.Statistics;
import org.apache.sysds.utils.stats.SparkStatistics;
import scala.Tuple2;

/* loaded from: input_file:org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.class */
public class SparkExecutionContext extends ExecutionContext {
    private static final boolean LAZY_SPARKCTX_CREATION = true;
    private static final boolean ASYNCHRONOUS_VAR_DESTROY = true;
    public static final boolean FAIR_SCHEDULER_MODE = true;
    private static SparkClusterConfig _sconf = null;
    private static JavaSparkContext _spctx = null;
    private static final MemoryManagerParRDDs _parRDDs = new MemoryManagerParRDDs(0.1d);
    private static boolean[] _poolBuff = new boolean[InfrastructureAnalyzer.getLocalParallelism()];
    private static boolean localSparkWarning = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext$BlockPartitionToMatrixBlockTask.class */
    public static class BlockPartitionToMatrixBlockTask implements Callable<Object> {
        private final List<Tuple2<MatrixIndexes, MatrixBlock>> _tuples;
        private final int _start;
        private final int _end;
        private final MatrixBlock _out;
        private final LongAdder _aNnz;
        private final int _blen;
        private final boolean _sparseCopyShallow;

        protected BlockPartitionToMatrixBlockTask(List<Tuple2<MatrixIndexes, MatrixBlock>> list, int i, int i2, MatrixBlock matrixBlock, LongAdder longAdder, int i3, boolean z) {
            this._tuples = list;
            this._start = i;
            this._end = i2;
            this._out = matrixBlock;
            this._aNnz = longAdder;
            this._blen = i3;
            this._sparseCopyShallow = z;
        }

        @Override // java.util.concurrent.Callable
        public Object call() {
            for (int i = this._start; i < this._end; i++) {
                SparkExecutionContext.blockPartitionToMatrixBlock(this._tuples.get(i), this._out, this._aNnz, this._blen, this._sparseCopyShallow);
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext$MemoryManagerParRDDs.class */
    public static class MemoryManagerParRDDs {
        private final long _limit;
        private long _size = 0;
        private HashMap<Integer, Long> _rdds = new HashMap<>();

        public MemoryManagerParRDDs(double d) {
            this._limit = (long) (d * InfrastructureAnalyzer.getLocalMaxMemory());
        }

        public synchronized boolean reserve(long j) {
            boolean z = j + this._size < this._limit;
            this._size += z ? j : 0L;
            return z;
        }

        public synchronized void registerRDD(int i, long j, boolean z) {
            if (!z) {
                throw new RuntimeException("Unsupported rdd registration without size reservation for " + j + " bytes.");
            }
            this._rdds.put(Integer.valueOf(i), Long.valueOf(j));
        }

        public synchronized void deregisterRDD(int i) {
            Long remove = this._rdds.remove(Integer.valueOf(i));
            this._size -= remove != null ? remove.longValue() : 0L;
        }

        public synchronized void clear() {
            this._size = 0L;
            this._rdds.clear();
        }
    }

    /* loaded from: input_file:org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext$SparkClusterConfig.class */
    public static class SparkClusterConfig {
        private static final double BROADCAST_DATA_FRACTION = 0.7d;
        private static final double BROADCAST_DATA_FRACTION_LEGACY = 0.35d;
        private static final long RESERVED_SYSTEM_MEMORY_BYTES = 314572800;
        private boolean _legacyVersion;
        private boolean _confOnly;
        private long _memExecutor = -1;
        private double _memDataMinFrac = -1.0d;
        private double _memDataMaxFrac = -1.0d;
        private double _memBroadcastFrac = -1.0d;
        private int _numExecutors = -1;
        private int _defaultPar = -1;

        public SparkClusterConfig() {
            this._legacyVersion = false;
            this._confOnly = false;
            SparkExecutionContext.handleIllegalReflectiveAccessSpark();
            SparkConf createSystemDSSparkConf = SparkExecutionContext.createSystemDSSparkConf();
            this._confOnly = true;
            this._legacyVersion = UtilFunctions.compareVersion(getSparkVersionString(), "1.6.0") < 0 || createSystemDSSparkConf.getBoolean("spark.memory.useLegacyMode", false);
            if (this._legacyVersion) {
                analyzeSparkConfiguationLegacy(createSystemDSSparkConf);
            } else {
                analyzeSparkConfiguation(createSystemDSSparkConf);
            }
            if (ExecutionContext.LOG.isDebugEnabled()) {
                ExecutionContext.LOG.debug(toString());
            }
        }

        public long getBroadcastMemoryBudget() {
            return (long) (this._memExecutor * this._memBroadcastFrac);
        }

        public long getDataMemoryBudget(boolean z, boolean z2) {
            int i = this._numExecutors;
            if ((z2 && !this._confOnly) || SparkExecutionContext.isSparkContextCreated()) {
                i = Math.max(SparkExecutionContext.getSparkContextStatic().sc().getExecutorMemoryStatus().size() - 1, 1);
            }
            return (long) (i * this._memExecutor * (z ? this._memDataMinFrac : this._memDataMaxFrac));
        }

        public int getNumExecutors() {
            if (this._numExecutors < 0) {
                analyzeSparkParallelismConfiguation(null);
            }
            return this._numExecutors;
        }

        public int getDefaultParallelism(boolean z) {
            if (this._defaultPar < 0 && !z) {
                analyzeSparkParallelismConfiguation(null);
            }
            return Math.max(((!z || this._confOnly) && !SparkExecutionContext.isSparkContextCreated()) ? this._defaultPar : SparkExecutionContext.getSparkContextStatic().defaultParallelism().intValue(), 1);
        }

        public void analyzeSparkConfiguationLegacy(SparkConf sparkConf) {
            SparkConf createSystemDSSparkConf = sparkConf == null ? SparkExecutionContext.createSystemDSSparkConf() : sparkConf;
            this._memExecutor = UtilFunctions.parseMemorySize(createSystemDSSparkConf.get("spark.executor.memory", "1g"));
            double d = createSystemDSSparkConf.getDouble("spark.storage.memoryFraction", 0.6d);
            this._memDataMinFrac = d;
            this._memDataMaxFrac = d;
            this._memBroadcastFrac = d * BROADCAST_DATA_FRACTION_LEGACY;
            analyzeSparkParallelismConfiguation(createSystemDSSparkConf);
        }

        public void analyzeSparkConfiguation(SparkConf sparkConf) {
            SparkConf createSystemDSSparkConf = sparkConf == null ? SparkExecutionContext.createSystemDSSparkConf() : sparkConf;
            this._memExecutor = UtilFunctions.parseMemorySize(createSystemDSSparkConf.get("spark.executor.memory", "1g")) - RESERVED_SYSTEM_MEMORY_BYTES;
            double d = createSystemDSSparkConf.getDouble("spark.memory.fraction", 0.6d);
            this._memDataMinFrac = d * createSystemDSSparkConf.getDouble("spark.memory.storageFraction", 0.5d);
            this._memDataMaxFrac = d;
            this._memBroadcastFrac = this._memDataMinFrac * BROADCAST_DATA_FRACTION;
            analyzeSparkParallelismConfiguation(createSystemDSSparkConf);
        }

        private void analyzeSparkParallelismConfiguation(SparkConf sparkConf) {
            SparkConf createSystemDSSparkConf = sparkConf == null ? SparkExecutionContext.createSystemDSSparkConf() : sparkConf;
            int i = createSystemDSSparkConf.getInt("spark.executor.instances", -1);
            int i2 = createSystemDSSparkConf.getInt("spark.executor.cores", -1);
            int i3 = createSystemDSSparkConf.getInt("spark.default.parallelism", -1);
            if (i > 1 && (i3 > 1 || i2 > 1)) {
                this._numExecutors = i;
                this._defaultPar = i3 > 1 ? i3 : i * i2;
                this._confOnly &= true;
            } else if (DMLScript.USE_LOCAL_SPARK_CONFIG) {
                this._numExecutors = 1;
                this._defaultPar = 2;
                this._confOnly &= true;
            } else {
                JavaSparkContext sparkContextStatic = SparkExecutionContext.getSparkContextStatic();
                this._numExecutors = Math.max(sparkContextStatic.sc().getExecutorMemoryStatus().size() - 1, 1);
                this._defaultPar = sparkContextStatic.defaultParallelism().intValue();
                this._confOnly &= false;
            }
        }

        private static String getSparkVersionString() {
            return SparkExecutionContext.isSparkContextCreated() ? SparkExecutionContext.getSparkContextStatic().version() : package$.MODULE$.SPARK_VERSION();
        }

        public String toString() {
            return "SparkClusterConfig: \n" + ("-- legacyVersion    = " + this._legacyVersion + " (" + getSparkVersionString() + ")\n") + ("-- confOnly         = " + this._confOnly + "\n") + ("-- numExecutors     = " + this._numExecutors + "\n") + ("-- defaultPar       = " + this._defaultPar + "\n") + ("-- memExecutor      = " + this._memExecutor + "\n") + ("-- memDataMinFrac   = " + this._memDataMinFrac + "\n") + ("-- memDataMaxFrac   = " + this._memDataMaxFrac + "\n") + ("-- memBroadcastFrac = " + this._memBroadcastFrac + "\n");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SparkExecutionContext(boolean z, boolean z2, Program program) {
        super(z, z2, program);
        if (DMLScript.getGlobalExecMode() == Types.ExecMode.SPARK) {
            initSparkContext();
        }
    }

    public JavaSparkContext getSparkContext() {
        initSparkContext();
        return _spctx;
    }

    public static synchronized JavaSparkContext getSparkContextStatic() {
        initSparkContext();
        if (_spctx.sc().isStopped()) {
            _spctx = null;
            initSparkContext();
        }
        return _spctx;
    }

    public static synchronized boolean isSparkContextCreated() {
        return _spctx != null;
    }

    public static void resetSparkContextStatic() {
        _spctx = null;
    }

    public void close() {
        synchronized (SparkExecutionContext.class) {
            if (_spctx != null) {
                Logger logger = Logger.getLogger("org.apache.spark.network.client.TransportResponseHandler");
                logger.setLevel(Level.FATAL);
                OutputStream outputStream = new OutputStream() { // from class: org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext.1
                    @Override // java.io.OutputStream
                    public void write(int i) {
                    }
                };
                PrintStream printStream = System.err;
                System.setErr(new PrintStream(outputStream));
                _spctx.stop();
                _spctx = null;
                System.setErr(printStream);
                logger.setLevel(Level.ERROR);
            }
        }
    }

    public static boolean isLazySparkContextCreation() {
        return true;
    }

    public static void handleIllegalReflectiveAccessSpark() {
        Module module = Platform.class.getModule();
        Target.class.getModule().addOpens("java.nio", module);
        Target.class.getModule().addOpens("java.io", module);
        Module module2 = SizeEstimator.class.getModule();
        Target.class.getModule().addOpens("java.util", module2);
        Target.class.getModule().addOpens("java.lang", module2);
        Target.class.getModule().addOpens("java.lang.ref", module2);
        Target.class.getModule().addOpens("java.util.concurrent", module2);
    }

    private static synchronized void initSparkContext() {
        if (_spctx != null) {
            return;
        }
        handleIllegalReflectiveAccessSpark();
        long nanoTime = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        MLContext activeMLContext = MLContextProxy.getActiveMLContext();
        if (activeMLContext != null) {
            _spctx = MLContextUtil.getJavaSparkContext(activeMLContext);
        } else {
            SparkConf createSystemDSSparkConf = createSystemDSSparkConf();
            if (DMLScript.USE_LOCAL_SPARK_CONFIG) {
                setLocalConfSettings(createSystemDSSparkConf);
            }
            _spctx = createContext(createSystemDSSparkConf);
            if (DMLScript.USE_LOCAL_SPARK_CONFIG) {
                _spctx.setCheckpointDir("/tmp/systemds_spark_cache_" + DMLScript.getUUID());
            }
            _parRDDs.clear();
        }
        long parseMemorySize = UtilFunctions.parseMemorySize(_spctx.getConf().get("spark.driver.maxResultSize", "1g"));
        if (parseMemorySize != 0 && parseMemorySize < OptimizerUtils.getLocalMemBudget() && !DMLScript.USE_LOCAL_SPARK_CONFIG) {
            LOG.warn("Configuration parameter spark.driver.maxResultSize set to " + UtilFunctions.formatMemorySize(parseMemorySize) + ". You can set it through Spark default configuration setting either to 0 (unlimited) or to available memory budget of size " + UtilFunctions.formatMemorySize((long) OptimizerUtils.getLocalMemBudget()) + ".");
        }
        HDFSTool.addBinaryBlockSerializationFramework(_spctx.hadoopConfiguration());
        if (DMLScript.STATISTICS) {
            SparkStatistics.setCtxCreateTime(System.nanoTime() - nanoTime);
        }
    }

    private static JavaSparkContext createContext(SparkConf sparkConf) {
        try {
            return new JavaSparkContext(sparkConf);
        } catch (Exception e) {
            if (!e.getMessage().contains("A master URL must be set in your configuration")) {
                throw new DMLException("Error while creating Spark context", e);
            }
            if (!localSparkWarning) {
                LOG.warn("Error constructing Spark Context, falling back to local Spark context creation");
                localSparkWarning = true;
            }
            setLocalConfSettings(sparkConf);
            return createContext(sparkConf);
        }
    }

    private static void setLocalConfSettings(SparkConf sparkConf) {
        sparkConf.setMaster("local[" + ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS) + "]");
        sparkConf.setAppName("LocalSparkContextApp");
        sparkConf.set("spark.ui.showConsoleProgress", "false");
        sparkConf.set("spark.ui.enabled", "false");
    }

    public static SparkConf createSystemDSSparkConf() {
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.driver.maxResultSize", "0");
        sparkConf.set("spark.scheduler.mode", "FAIR");
        if (!sparkConf.contains("spark.locality.wait")) {
            sparkConf.set("spark.locality.wait", "5s");
        }
        String str = UtilFunctions.compareVersion(package$.MODULE$.SPARK_VERSION(), "2.0.0") < 0 ? "spark.akka.frameSize" : "spark.rpc.message.maxSize";
        if (!sparkConf.contains(str)) {
            sparkConf.set(str, "512");
        }
        return sparkConf;
    }

    public static boolean isLocalMaster() {
        return getSparkContextStatic().isLocal().booleanValue();
    }

    public JavaPairRDD<MatrixIndexes, MatrixBlock> getBinaryMatrixBlockRDDHandleForVariable(String str) {
        return getRDDHandleForMatrixObject(getMatrixObject(str), Types.FileFormat.BINARY, -1, true);
    }

    public JavaPairRDD<MatrixIndexes, MatrixBlock> getBinaryMatrixBlockRDDHandleForVariable(String str, int i, boolean z) {
        return getRDDHandleForMatrixObject(getMatrixObject(str), Types.FileFormat.BINARY, i, z);
    }

    public JavaPairRDD<TensorIndexes, TensorBlock> getBinaryTensorBlockRDDHandleForVariable(String str) {
        return getRDDHandleForTensorObject(getTensorObject(str), Types.FileFormat.BINARY, -1, true);
    }

    public JavaPairRDD<TensorIndexes, TensorBlock> getBinaryTensorBlockRDDHandleForVariable(String str, int i, boolean z) {
        return getRDDHandleForTensorObject(getTensorObject(str), Types.FileFormat.BINARY, i, z);
    }

    public JavaPairRDD<Long, FrameBlock> getFrameBinaryBlockRDDHandleForVariable(String str) {
        return getRDDHandleForFrameObject(getFrameObject(str), Types.FileFormat.BINARY);
    }

    public JavaPairRDD<?, ?> getRDDHandleForVariable(String str, Types.FileFormat fileFormat, int i, boolean z) {
        Data variable = getVariable(str);
        if (variable instanceof MatrixObject) {
            return getRDDHandleForMatrixObject(getMatrixObject(str), fileFormat, i, z);
        }
        if (variable instanceof FrameObject) {
            return getRDDHandleForFrameObject(getFrameObject(str), fileFormat);
        }
        throw new DMLRuntimeException("Failed to obtain RDD for data type other than matrix or frame.");
    }

    public JavaPairRDD<?, ?> getRDDHandleForMatrixObject(MatrixObject matrixObject, Types.FileFormat fileFormat) {
        return getRDDHandleForMatrixObject(matrixObject, fileFormat, -1, true);
    }

    public JavaPairRDD<?, ?> getRDDHandleForMatrixObject(MatrixObject matrixObject, Types.FileFormat fileFormat, int i, boolean z) {
        JavaPairRDD<?, ?> matrixJavaPairRDD;
        JavaSparkContext sparkContext = getSparkContext();
        InputOutputInfo inputOutputInfo = InputOutputInfo.get(Types.DataType.MATRIX, fileFormat);
        if (matrixObject.getRDDHandle() != null && (matrixObject.getRDDHandle().isCheckpointRDD() || !matrixObject.isCached(false))) {
            matrixJavaPairRDD = matrixObject.getRDDHandle().getRDD();
        } else if (matrixObject.isDirty() || matrixObject.isCached(false) || matrixObject.isFederated() || (matrixObject instanceof MatrixObjectFuture)) {
            DataCharacteristics dataCharacteristics = matrixObject.getDataCharacteristics();
            boolean z2 = false;
            if (matrixObject.isFederated() || (matrixObject instanceof MatrixObjectFuture) || (OptimizerUtils.checkSparkCollectMemoryBudget(dataCharacteristics, 0L) && _parRDDs.reserve(OptimizerUtils.estimatePartitionedSizeExactSparsity(dataCharacteristics)))) {
                matrixJavaPairRDD = toMatrixJavaPairRDD(sparkContext, matrixObject.acquireRead(), matrixObject.getBlocksize(), i, z);
                matrixObject.release();
                _parRDDs.registerRDD(matrixJavaPairRDD.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(dataCharacteristics), true);
            } else {
                if (matrixObject.isDirty() || !matrixObject.isHDFSFileExists()) {
                    matrixObject.exportData();
                }
                matrixJavaPairRDD = SparkUtils.copyBinaryBlockMatrix(sparkContext.hadoopFile(matrixObject.getFileName(), inputOutputInfo.inputFormatClass, inputOutputInfo.keyClass, inputOutputInfo.valueClass));
                z2 = true;
            }
            RDDObject rDDObject = new RDDObject(matrixJavaPairRDD);
            rDDObject.setHDFSFile(z2);
            rDDObject.setParallelizedRDD(!z2);
            matrixObject.setRDDHandle(rDDObject);
        } else {
            JavaPairRDD hadoopFile = sparkContext.hadoopFile(matrixObject.getFileName(), inputOutputInfo.inputFormatClass, inputOutputInfo.keyClass, inputOutputInfo.valueClass);
            if (fileFormat == Types.FileFormat.BINARY) {
                matrixJavaPairRDD = SparkUtils.copyBinaryBlockMatrix(hadoopFile);
            } else if (fileFormat == Types.FileFormat.COMPRESSED) {
                matrixJavaPairRDD = ReaderSparkCompressed.getRDD(sparkContext, matrixObject.getFileName());
            } else {
                if (!fileFormat.isTextFormat()) {
                    throw new DMLRuntimeException("Incorrect input format in getRDDHandleForVariable");
                }
                matrixJavaPairRDD = sparkContext.hadoopFile(matrixObject.getFileName(), inputOutputInfo.inputFormatClass, LongWritable.class, Text.class).mapToPair(new CopyTextInputFunction());
            }
            RDDObject rDDObject2 = new RDDObject(matrixJavaPairRDD);
            rDDObject2.setHDFSFile(true);
            matrixObject.setRDDHandle(rDDObject2);
        }
        return matrixJavaPairRDD;
    }

    public JavaPairRDD<?, ?> getRDDHandleForTensorObject(TensorObject tensorObject, Types.FileFormat fileFormat, int i, boolean z) {
        JavaPairRDD<?, ?> tensorJavaPairRDD;
        JavaSparkContext sparkContext = getSparkContext();
        if (tensorObject.getRDDHandle() != null && (tensorObject.getRDDHandle().isCheckpointRDD() || !tensorObject.isCached(false))) {
            tensorJavaPairRDD = tensorObject.getRDDHandle().getRDD();
        } else {
            if (!tensorObject.isDirty() && !tensorObject.isCached(false)) {
                if (fileFormat == Types.FileFormat.BINARY) {
                    throw new DMLRuntimeException("Tensor can not yet be written or read to hadoopFile");
                }
                throw new DMLRuntimeException("Incorrect input format in getRDDHandleForVariable");
            }
            DataCharacteristics dataCharacteristics = tensorObject.getDataCharacteristics();
            if (!OptimizerUtils.checkSparkCollectMemoryBudget(dataCharacteristics, 0L) || !_parRDDs.reserve(OptimizerUtils.estimatePartitionedSizeExactSparsity(dataCharacteristics))) {
                if (tensorObject.isDirty() || !tensorObject.isHDFSFileExists()) {
                    tensorObject.exportData();
                }
                throw new DMLRuntimeException("Tensor can not yet be written or read to hadoopFile");
            }
            tensorJavaPairRDD = toTensorJavaPairRDD(sparkContext, tensorObject.acquireRead(), dataCharacteristics.getBlocksize(), i, z);
            tensorObject.release();
            _parRDDs.registerRDD(tensorJavaPairRDD.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(dataCharacteristics), true);
            RDDObject rDDObject = new RDDObject(tensorJavaPairRDD);
            rDDObject.setHDFSFile(false);
            rDDObject.setParallelizedRDD(true);
            tensorObject.setRDDHandle(rDDObject);
        }
        return tensorJavaPairRDD;
    }

    public JavaPairRDD<?, ?> getRDDHandleForFrameObject(FrameObject frameObject, Types.FileFormat fileFormat) {
        JavaPairRDD<?, ?> mapToPair;
        InputOutputInfo inputOutputInfo = InputOutputInfo.get(Types.DataType.FRAME, fileFormat);
        JavaSparkContext sparkContext = getSparkContext();
        if (frameObject.getRDDHandle() != null && (frameObject.getRDDHandle().isCheckpointRDD() || !frameObject.isCached(false))) {
            mapToPair = frameObject.getRDDHandle().getRDD();
        } else if (frameObject.isDirty() || frameObject.isCached(false)) {
            DataCharacteristics dataCharacteristics = frameObject.getDataCharacteristics();
            boolean z = false;
            if (OptimizerUtils.checkSparkCollectMemoryBudget(dataCharacteristics, 0L) && _parRDDs.reserve(OptimizerUtils.estimatePartitionedSizeExactSparsity(dataCharacteristics))) {
                mapToPair = toFrameJavaPairRDD(sparkContext, frameObject.acquireRead());
                frameObject.release();
                _parRDDs.registerRDD(mapToPair.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(dataCharacteristics), true);
            } else {
                if (frameObject.isDirty()) {
                    frameObject.exportData();
                }
                mapToPair = sparkContext.hadoopFile(frameObject.getFileName(), inputOutputInfo.inputFormatClass, inputOutputInfo.keyClass, inputOutputInfo.valueClass).mapToPair(new CopyFrameBlockPairFunction());
                z = true;
            }
            RDDObject rDDObject = new RDDObject(mapToPair);
            rDDObject.setHDFSFile(z);
            frameObject.setRDDHandle(rDDObject);
        } else {
            if (fileFormat == Types.FileFormat.BINARY) {
                mapToPair = sparkContext.hadoopFile(frameObject.getFileName(), inputOutputInfo.inputFormatClass, inputOutputInfo.keyClass, inputOutputInfo.valueClass).mapToPair(new CopyFrameBlockPairFunction());
            } else {
                if (!fileFormat.isTextFormat()) {
                    throw new DMLRuntimeException("Incorrect input format in getRDDHandleForVariable");
                }
                mapToPair = sparkContext.hadoopFile(frameObject.getFileName(), inputOutputInfo.inputFormatClass, inputOutputInfo.keyClass, inputOutputInfo.valueClass).mapToPair(new CopyTextInputFunction());
            }
            RDDObject rDDObject2 = new RDDObject(mapToPair);
            rDDObject2.setHDFSFile(true);
            frameObject.setRDDHandle(rDDObject2);
        }
        return mapToPair;
    }

    public Broadcast<CacheBlock<?>> broadcastVariable(CacheableData<CacheBlock<?>> cacheableData) {
        long nanoTime = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        Broadcast<CacheBlock<?>> broadcast = null;
        if (cacheableData.getBroadcastHandle() != null && cacheableData.getBroadcastHandle().isNonPartitionedBroadcastValid()) {
            broadcast = cacheableData.getBroadcastHandle().getNonPartitionedBroadcast();
        }
        if (broadcast == null) {
            if (cacheableData.getBroadcastHandle() != null) {
                CacheableData.addBroadcastSize(-cacheableData.getBroadcastHandle().getSize());
            }
            CacheBlock<?> acquireRead = cacheableData.acquireRead();
            cacheableData.release();
            if (acquireRead.getExactSerializedSize() > 0 && acquireRead.getExactSerializedSize() <= OptimizerUtils.MAX_NUMCELLS_CP_DENSE) {
                broadcast = getSparkContext().broadcast(acquireRead);
                if (cacheableData.getBroadcastHandle() == null) {
                    cacheableData.setBroadcastHandle(new BroadcastObject());
                }
                cacheableData.getBroadcastHandle().setNonPartitionedBroadcast(broadcast, OptimizerUtils.estimateSize(cacheableData.getDataCharacteristics()));
                CacheableData.addBroadcastSize(cacheableData.getBroadcastHandle().getSize());
                if (DMLScript.STATISTICS) {
                    SparkStatistics.accBroadCastTime(System.nanoTime() - nanoTime);
                    SparkStatistics.incBroadcastCount(1L);
                }
            }
        }
        return broadcast;
    }

    public PartitionedBroadcast<MatrixBlock> getBroadcastForMatrixObject(MatrixObject matrixObject) {
        PartitionedBroadcast<MatrixBlock> partitionedBroadcast = null;
        synchronized (matrixObject) {
            long nanoTime = DMLScript.STATISTICS ? System.nanoTime() : 0L;
            if (matrixObject.getBroadcastHandle() != null && matrixObject.getBroadcastHandle().isPartitionedBroadcastValid()) {
                partitionedBroadcast = matrixObject.getBroadcastHandle().getPartitionedBroadcast();
            }
            if (partitionedBroadcast == null) {
                if (matrixObject.getBroadcastHandle() != null) {
                    CacheableData.addBroadcastSize(-matrixObject.getBroadcastHandle().getSize());
                }
                int blocksize = matrixObject.getBlocksize();
                PartitionedBlock partitionedBlock = new PartitionedBlock(matrixObject.acquireRead(), blocksize);
                matrixObject.release();
                int computeBlocksPerPartition = PartitionedBroadcast.computeBlocksPerPartition(matrixObject.getNumRows(), matrixObject.getNumColumns(), blocksize);
                int ceil = (int) Math.ceil((partitionedBlock.getNumRowBlocks() * partitionedBlock.getNumColumnBlocks()) / computeBlocksPerPartition);
                Broadcast[] broadcastArr = new Broadcast[ceil];
                if (ceil > 1) {
                    Arrays.parallelSetAll(broadcastArr, i -> {
                        return createPartitionedBroadcast(partitionedBlock, computeBlocksPerPartition, i);
                    });
                } else {
                    broadcastArr[0] = getSparkContext().broadcast(partitionedBlock);
                    if (!isLocalMaster()) {
                        partitionedBlock.clearBlocks();
                    }
                }
                partitionedBroadcast = new PartitionedBroadcast<>(broadcastArr, matrixObject.getDataCharacteristics());
                if (matrixObject.getBroadcastHandle() == null) {
                    matrixObject.setBroadcastHandle(new BroadcastObject());
                }
                matrixObject.getBroadcastHandle().setPartitionedBroadcast(partitionedBroadcast, OptimizerUtils.estimatePartitionedSizeExactSparsity(matrixObject.getDataCharacteristics()));
                CacheableData.addBroadcastSize(matrixObject.getBroadcastHandle().getSize());
            }
            if (DMLScript.STATISTICS) {
                SparkStatistics.accBroadCastTime(System.nanoTime() - nanoTime);
                SparkStatistics.incBroadcastCount(1L);
            }
        }
        return partitionedBroadcast;
    }

    public void setBroadcastHandle(MatrixObject matrixObject) {
        getBroadcastForMatrixObject(matrixObject);
    }

    public PartitionedBroadcast<TensorBlock> getBroadcastForTensorObject(TensorObject tensorObject) {
        long nanoTime = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        PartitionedBroadcast<TensorBlock> partitionedBroadcast = null;
        if (tensorObject.getBroadcastHandle() != null && tensorObject.getBroadcastHandle().isPartitionedBroadcastValid()) {
            partitionedBroadcast = tensorObject.getBroadcastHandle().getPartitionedBroadcast();
        }
        if (partitionedBroadcast == null) {
            if (tensorObject.getBroadcastHandle() != null) {
                CacheableData.addBroadcastSize(-tensorObject.getBroadcastHandle().getSize());
            }
            DataCharacteristics dataCharacteristics = tensorObject.getDataCharacteristics();
            long[] dims = dataCharacteristics.getDims();
            int blocksize = dataCharacteristics.getBlocksize();
            PartitionedBlock partitionedBlock = new PartitionedBlock(tensorObject.acquireReadAndRelease(), dims, blocksize);
            int computeBlocksPerPartition = PartitionedBroadcast.computeBlocksPerPartition(dims, blocksize);
            int ceil = (int) Math.ceil((partitionedBlock.getNumRowBlocks() * partitionedBlock.getNumColumnBlocks()) / computeBlocksPerPartition);
            Broadcast[] broadcastArr = new Broadcast[ceil];
            if (ceil > 1) {
                Arrays.parallelSetAll(broadcastArr, i -> {
                    return createPartitionedBroadcast(partitionedBlock, computeBlocksPerPartition, i);
                });
            } else {
                broadcastArr[0] = getSparkContext().broadcast(partitionedBlock);
                if (!isLocalMaster()) {
                    partitionedBlock.clearBlocks();
                }
            }
            partitionedBroadcast = new PartitionedBroadcast<>(broadcastArr, tensorObject.getDataCharacteristics());
            if (tensorObject.getBroadcastHandle() == null) {
                tensorObject.setBroadcastHandle(new BroadcastObject());
            }
            tensorObject.getBroadcastHandle().setPartitionedBroadcast(partitionedBroadcast, OptimizerUtils.estimatePartitionedSizeExactSparsity(tensorObject.getDataCharacteristics()));
            CacheableData.addBroadcastSize(tensorObject.getBroadcastHandle().getSize());
            if (DMLScript.STATISTICS) {
                SparkStatistics.accBroadCastTime(System.nanoTime() - nanoTime);
                SparkStatistics.incBroadcastCount(1L);
            }
        }
        return partitionedBroadcast;
    }

    public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable(String str) {
        return getBroadcastForMatrixObject(getMatrixObject(str));
    }

    public PartitionedBroadcast<TensorBlock> getBroadcastForTensorVariable(String str) {
        return getBroadcastForTensorObject(getTensorObject(str));
    }

    public PartitionedBroadcast<FrameBlock> getBroadcastForFrameVariable(String str) {
        long nanoTime = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        FrameObject frameObject = getFrameObject(str);
        PartitionedBroadcast<FrameBlock> partitionedBroadcast = null;
        if (frameObject.getBroadcastHandle() != null && frameObject.getBroadcastHandle().isPartitionedBroadcastValid()) {
            partitionedBroadcast = frameObject.getBroadcastHandle().getPartitionedBroadcast();
        }
        if (partitionedBroadcast == null) {
            if (frameObject.getBroadcastHandle() != null) {
                CacheableData.addBroadcastSize(-frameObject.getBroadcastHandle().getSize());
            }
            int defaultFrameSize = OptimizerUtils.getDefaultFrameSize();
            PartitionedBlock partitionedBlock = new PartitionedBlock(frameObject.acquireRead(), defaultFrameSize);
            frameObject.release();
            int computeBlocksPerPartition = PartitionedBroadcast.computeBlocksPerPartition(frameObject.getNumRows(), frameObject.getNumColumns(), defaultFrameSize);
            int ceil = (int) Math.ceil((partitionedBlock.getNumRowBlocks() * partitionedBlock.getNumColumnBlocks()) / computeBlocksPerPartition);
            Broadcast[] broadcastArr = new Broadcast[ceil];
            if (ceil > 1) {
                Arrays.parallelSetAll(broadcastArr, i -> {
                    return createPartitionedBroadcast(partitionedBlock, computeBlocksPerPartition, i);
                });
            } else {
                broadcastArr[0] = getSparkContext().broadcast(partitionedBlock);
                if (!isLocalMaster()) {
                    partitionedBlock.clearBlocks();
                }
            }
            partitionedBroadcast = new PartitionedBroadcast<>(broadcastArr, new MatrixCharacteristics(frameObject.getDataCharacteristics()).setBlocksize(defaultFrameSize));
            if (frameObject.getBroadcastHandle() == null) {
                frameObject.setBroadcastHandle(new BroadcastObject());
            }
            frameObject.getBroadcastHandle().setPartitionedBroadcast(partitionedBroadcast, OptimizerUtils.estimatePartitionedSizeExactSparsity(frameObject.getDataCharacteristics()));
            CacheableData.addBroadcastSize(frameObject.getBroadcastHandle().getSize());
            if (DMLScript.STATISTICS) {
                SparkStatistics.accBroadCastTime(System.nanoTime() - nanoTime);
                SparkStatistics.incBroadcastCount(1L);
            }
        }
        return partitionedBroadcast;
    }

    private Broadcast<PartitionedBlock<? extends CacheBlock<?>>> createPartitionedBroadcast(PartitionedBlock<? extends CacheBlock<?>> partitionedBlock, int i, int i2) {
        int i3 = i2 * i;
        PartitionedBlock<? extends CacheBlock<?>> createPartition = partitionedBlock.createPartition(i3, Math.min(i, (partitionedBlock.getNumRowBlocks() * partitionedBlock.getNumColumnBlocks()) - i3));
        Broadcast<PartitionedBlock<? extends CacheBlock<?>>> broadcast = getSparkContext().broadcast(createPartition);
        if (!isLocalMaster()) {
            createPartition.clearBlocks();
        }
        return broadcast;
    }

    public void setRDDHandleForVariable(String str, JavaPairRDD<?, ?> javaPairRDD) {
        getCacheableData(str).setRDDHandle(new RDDObject(javaPairRDD));
    }

    public void setRDDHandleForVariable(String str, RDDObject rDDObject) {
        getCacheableData(str).setRDDHandle(rDDObject);
    }

    public static JavaPairRDD<MatrixIndexes, MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext javaSparkContext, MatrixBlock matrixBlock, int i) {
        return toMatrixJavaPairRDD(javaSparkContext, matrixBlock, i, -1, true);
    }

    public static JavaPairRDD<MatrixIndexes, MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext javaSparkContext, MatrixBlock matrixBlock, int i, int i2, boolean z) {
        List list;
        long nanoTime = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        if (matrixBlock.getNumRows() > i || matrixBlock.getNumColumns() > i) {
            MatrixCharacteristics matrixCharacteristics = new MatrixCharacteristics(matrixBlock.getNumRows(), matrixBlock.getNumColumns(), i, matrixBlock.getNonZeros());
            list = (List) LongStream.range(0L, matrixCharacteristics.getNumBlocks()).parallel().mapToObj(j -> {
                return createIndexedMatrixBlock(matrixBlock, matrixCharacteristics, j);
            }).filter(tuple2 -> {
                return z || !((MatrixBlock) tuple2._2).isEmptyBlock(false);
            }).collect(Collectors.toList());
        } else {
            list = Arrays.asList(new Tuple2(new MatrixIndexes(1L, 1L), matrixBlock));
        }
        JavaPairRDD<MatrixIndexes, MatrixBlock> parallelizePairs = i2 > 1 ? javaSparkContext.parallelizePairs(list, i2) : javaSparkContext.parallelizePairs(list);
        if (DMLScript.STATISTICS) {
            SparkStatistics.accParallelizeTime(System.nanoTime() - nanoTime);
            SparkStatistics.incParallelizeCount(1L);
        }
        return parallelizePairs;
    }

    public static JavaPairRDD<TensorIndexes, TensorBlock> toTensorJavaPairRDD(JavaSparkContext javaSparkContext, TensorBlock tensorBlock, int i) {
        return toTensorJavaPairRDD(javaSparkContext, tensorBlock, i, -1, true);
    }

    public static JavaPairRDD<TensorIndexes, TensorBlock> toTensorJavaPairRDD(JavaSparkContext javaSparkContext, TensorBlock tensorBlock, int i, int i2, boolean z) {
        List list;
        long nanoTime = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        int numDims = tensorBlock.getNumDims();
        boolean z2 = true;
        int i3 = 0;
        while (true) {
            if (i3 >= numDims) {
                break;
            }
            if (i > tensorBlock.getDim(i3)) {
                z2 = false;
                break;
            }
            i3++;
        }
        if (z2) {
            long[] jArr = new long[numDims];
            Arrays.fill(jArr, 1L);
            list = Arrays.asList(new Tuple2(new TensorIndexes(jArr), tensorBlock));
        } else {
            TensorCharacteristics tensorCharacteristics = new TensorCharacteristics(tensorBlock.getLongDims(), tensorBlock.getNonZeros());
            list = (List) LongStream.range(0L, tensorCharacteristics.getNumBlocks()).parallel().mapToObj(j -> {
                return createIndexedTensorBlock(tensorBlock, tensorCharacteristics, j);
            }).filter(tuple2 -> {
                return z || !((TensorBlock) tuple2._2).isEmpty(false);
            }).collect(Collectors.toList());
        }
        JavaPairRDD<TensorIndexes, TensorBlock> parallelizePairs = i2 > 1 ? javaSparkContext.parallelizePairs(list, i2) : javaSparkContext.parallelizePairs(list);
        if (DMLScript.STATISTICS) {
            SparkStatistics.accParallelizeTime(System.nanoTime() - nanoTime);
            SparkStatistics.incParallelizeCount(1L);
        }
        return parallelizePairs;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Tuple2<MatrixIndexes, MatrixBlock> createIndexedMatrixBlock(MatrixBlock matrixBlock, MatrixCharacteristics matrixCharacteristics, long j) {
        try {
            long numColBlocks = j / matrixCharacteristics.getNumColBlocks();
            long numColBlocks2 = j % matrixCharacteristics.getNumColBlocks();
            int computeBlockSize = UtilFunctions.computeBlockSize(matrixCharacteristics.getRows(), numColBlocks + 1, matrixCharacteristics.getBlocksize());
            int computeBlockSize2 = UtilFunctions.computeBlockSize(matrixCharacteristics.getCols(), numColBlocks2 + 1, matrixCharacteristics.getBlocksize());
            MatrixBlock matrixBlock2 = new MatrixBlock(computeBlockSize, computeBlockSize2, matrixBlock.isInSparseFormat());
            int blocksize = ((int) numColBlocks) * matrixCharacteristics.getBlocksize();
            int blocksize2 = ((int) numColBlocks2) * matrixCharacteristics.getBlocksize();
            return new Tuple2<>(new MatrixIndexes(numColBlocks + 1, numColBlocks2 + 1), matrixBlock.slice(blocksize, (blocksize + computeBlockSize) - 1, blocksize2, (blocksize2 + computeBlockSize2) - 1, false, matrixBlock2));
        } catch (DMLRuntimeException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Tuple2<TensorIndexes, TensorBlock> createIndexedTensorBlock(TensorBlock tensorBlock, TensorCharacteristics tensorCharacteristics, long j) {
        try {
            long[] computeTensorIndexes = UtilFunctions.computeTensorIndexes(tensorCharacteristics, j);
            int[] iArr = new int[tensorCharacteristics.getNumDims()];
            int[] iArr2 = new int[tensorCharacteristics.getNumDims()];
            UtilFunctions.computeSliceInfo(tensorCharacteristics, computeTensorIndexes, iArr, iArr2);
            return new Tuple2<>(new TensorIndexes(computeTensorIndexes), tensorBlock.slice(iArr2, new TensorBlock(tensorBlock.getValueType(), iArr)));
        } catch (DMLRuntimeException e) {
            throw new RuntimeException(e);
        }
    }

    public static JavaPairRDD<Long, FrameBlock> toFrameJavaPairRDD(JavaSparkContext javaSparkContext, FrameBlock frameBlock) {
        long nanoTime = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        LinkedList linkedList = new LinkedList();
        int blocksize = ConfigurationManager.getBlocksize();
        for (int i = 0; i < ((int) Math.ceil(frameBlock.getNumRows() / blocksize)); i++) {
            int numRows = (i * blocksize) + blocksize < frameBlock.getNumRows() ? blocksize : frameBlock.getNumRows() - (i * blocksize);
            int i2 = i * blocksize;
            FrameBlock frameBlock2 = new FrameBlock(frameBlock.getSchema());
            frameBlock.slice(i2, (i2 + numRows) - 1, 0, frameBlock.getNumColumns() - 1, frameBlock2);
            if (i2 == 0) {
                frameBlock2.setColumnMetadata(frameBlock.getColumnMetadata());
            }
            linkedList.addLast(new Tuple2(Long.valueOf(i2 + 1), frameBlock2));
        }
        JavaPairRDD<Long, FrameBlock> parallelizePairs = javaSparkContext.parallelizePairs(linkedList);
        if (DMLScript.STATISTICS) {
            SparkStatistics.accParallelizeTime(System.nanoTime() - nanoTime);
            SparkStatistics.incParallelizeCount(1L);
        }
        return parallelizePairs;
    }

    public static MatrixBlock toMatrixBlock(RDDObject rDDObject, int i, int i2, int i3, long j) {
        return toMatrixBlock((JavaPairRDD<MatrixIndexes, MatrixBlock>) rDDObject.getRDD(), i, i2, i3, j);
    }

    public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes, MatrixBlock> javaPairRDD, int i, int i2, int i3, long j) {
        long nanoTime = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        MatrixBlock matrixBlockSingleBlock = Boolean.valueOf(i <= i3 && i2 <= i3).booleanValue() ? toMatrixBlockSingleBlock(javaPairRDD, i, i2) : toMatrixBlockMultiBlock(javaPairRDD, i, i2, i3, j);
        if (DMLScript.STATISTICS) {
            SparkStatistics.accCollectTime(System.nanoTime() - nanoTime);
        }
        return matrixBlockSingleBlock;
    }

    private static MatrixBlock toMatrixBlockSingleBlock(JavaPairRDD<MatrixIndexes, MatrixBlock> javaPairRDD, int i, int i2) {
        List collect = javaPairRDD.collect();
        if (collect.size() > 1) {
            throw new DMLRuntimeException("Expecting no more than one result block but got: " + collect.size());
        }
        MatrixBlock matrixBlock = collect.size() == 1 ? (MatrixBlock) ((Tuple2) collect.get(0))._2() : new MatrixBlock(i, i2, true);
        matrixBlock.examSparsity();
        return matrixBlock;
    }

    private static MatrixBlock toMatrixBlockMultiBlock(JavaPairRDD<MatrixIndexes, MatrixBlock> javaPairRDD, int i, int i2, int i3, long j) {
        long j2 = j >= 0 ? j : i * i2;
        boolean evalSparseFormatInMemory = MatrixBlock.evalSparseFormatInMemory(i, i2, j2);
        Future<MatrixBlock> allocateBlockAsync = new MatrixBlock(i, i2, evalSparseFormatInMemory, j2).allocateBlockAsync();
        List collect = javaPairRDD.collect();
        MatrixBlock matrixBlock = (MatrixBlock) IOUtilFunctions.get(allocateBlockAsync);
        LongAdder longAdder = new LongAdder();
        blockPartitionsToMatrixBlock(collect, matrixBlock, longAdder, i3);
        if (evalSparseFormatInMemory) {
            matrixBlock.sortSparseRows();
        }
        if (containsCompressedMatrixBlock(collect)) {
            matrixBlock.recomputeNonZeros();
        } else {
            matrixBlock.setNonZeros(longAdder.longValue());
        }
        matrixBlock.examSparsity();
        return matrixBlock;
    }

    private static boolean containsCompressedMatrixBlock(List<Tuple2<MatrixIndexes, MatrixBlock>> list) {
        Iterator<Tuple2<MatrixIndexes, MatrixBlock>> it = list.iterator();
        while (it.hasNext()) {
            if (it.next()._2() instanceof CompressedMatrixBlock) {
                return true;
            }
        }
        return false;
    }

    private static void blockPartitionsToMatrixBlock(List<Tuple2<MatrixIndexes, MatrixBlock>> list, MatrixBlock matrixBlock, LongAdder longAdder, int i) {
        if (matrixBlock.isInSparseFormat()) {
            blockPartitionsToMatrixBlockSingleThread(list, matrixBlock, longAdder, i);
        } else {
            blockPartitionsToMatrixBlockMultiThreaded(list, matrixBlock, longAdder, i);
        }
    }

    private static void blockPartitionsToMatrixBlockSingleThread(List<Tuple2<MatrixIndexes, MatrixBlock>> list, MatrixBlock matrixBlock, LongAdder longAdder, int i) {
        boolean z = matrixBlock.isInSparseFormat() && matrixBlock.getNumColumns() <= i;
        Iterator<Tuple2<MatrixIndexes, MatrixBlock>> it = list.iterator();
        while (it.hasNext()) {
            blockPartitionToMatrixBlock(it.next(), matrixBlock, longAdder, i, z);
        }
    }

    private static void blockPartitionsToMatrixBlockMultiThreaded(List<Tuple2<MatrixIndexes, MatrixBlock>> list, MatrixBlock matrixBlock, LongAdder longAdder, int i) {
        try {
            int localParallelism = InfrastructureAnalyzer.getLocalParallelism();
            ExecutorService executorService = CommonThreadPool.get(localParallelism);
            ArrayList arrayList = new ArrayList();
            int size = list.size();
            int max = Math.max((size / localParallelism) / 2, 1);
            for (int i2 = 0; i2 < size; i2 += max) {
                arrayList.add(new BlockPartitionToMatrixBlockTask(list, i2, Math.min(i2 + max, size), matrixBlock, longAdder, i, false));
            }
            Iterator it = executorService.invokeAll(arrayList).iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get();
            }
            executorService.shutdown();
        } catch (InterruptedException | ExecutionException e) {
            throw new DMLRuntimeException("Parallel block partitions to matrix block failed", e);
        }
    }

    private static void blockPartitionToMatrixBlock(Tuple2<MatrixIndexes, MatrixBlock> tuple2, MatrixBlock matrixBlock, LongAdder longAdder, int i, boolean z) {
        MatrixIndexes matrixIndexes = (MatrixIndexes) tuple2._1();
        MatrixBlock matrixBlock2 = (MatrixBlock) tuple2._2();
        matrixBlock2.putInto(matrixBlock, ((int) (matrixIndexes.getRowIndex() - 1)) * i, ((int) (matrixIndexes.getColumnIndex() - 1)) * i, z);
        longAdder.add(matrixBlock2.getNonZeros());
    }

    public static MatrixBlock toMatrixBlock(RDDObject rDDObject, int i, int i2, long j) {
        return toMatrixBlock((JavaPairRDD<MatrixIndexes, MatrixCell>) rDDObject.getRDD(), i, i2, j);
    }

    public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes, MatrixCell> javaPairRDD, int i, int i2, long j) {
        long nanoTime = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        boolean evalSparseFormatInMemory = MatrixBlock.evalSparseFormatInMemory(i, i2, j >= 0 ? j : i * i2);
        MatrixBlock matrixBlock = new MatrixBlock(i, i2, evalSparseFormatInMemory);
        for (Tuple2 tuple2 : javaPairRDD.collect()) {
            MatrixIndexes matrixIndexes = (MatrixIndexes) tuple2._1();
            matrixBlock.appendValue(((int) matrixIndexes.getRowIndex()) - 1, ((int) matrixIndexes.getColumnIndex()) - 1, ((MatrixCell) tuple2._2()).getValue());
        }
        if (evalSparseFormatInMemory) {
            matrixBlock.sortSparseRows();
        }
        matrixBlock.recomputeNonZeros();
        matrixBlock.examSparsity();
        if (DMLScript.STATISTICS) {
            SparkStatistics.accCollectTime(System.nanoTime() - nanoTime);
        }
        return matrixBlock;
    }

    public static TensorBlock toTensorBlock(JavaPairRDD<TensorIndexes, TensorBlock> javaPairRDD, DataCharacteristics dataCharacteristics) {
        long nanoTime = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        int[] intDims = dataCharacteristics.getIntDims();
        List<Tuple2> collect = javaPairRDD.collect();
        TensorBlock allocateBlock = new TensorBlock(((TensorBlock) ((Tuple2) collect.get(0))._2).getValueType(), intDims).allocateBlock();
        for (Tuple2 tuple2 : collect) {
            TensorIndexes tensorIndexes = (TensorIndexes) tuple2._1();
            TensorBlock tensorBlock = (TensorBlock) tuple2._2();
            int[] iArr = new int[tensorIndexes.getNumDims()];
            int[] iArr2 = new int[tensorIndexes.getNumDims()];
            for (int i = 0; i < iArr.length; i++) {
                iArr[i] = (int) ((tensorIndexes.getIndex(i) - 1) * dataCharacteristics.getBlocksize());
                iArr2[i] = (iArr[i] + tensorBlock.getDim(i)) - 1;
            }
            int length = iArr2.length - 1;
            iArr2[length] = iArr2[length] + 1;
            for (int length2 = iArr2.length - 1; length2 > 0; length2--) {
                if (iArr2[length2] == tensorBlock.getDim(length2)) {
                    iArr2[length2] = 0;
                    int i2 = length2 - 1;
                    iArr2[i2] = iArr2[i2] + 1;
                }
            }
            allocateBlock.copy(iArr, iArr2, tensorBlock);
        }
        if (DMLScript.STATISTICS) {
            SparkStatistics.accCollectTime(System.nanoTime() - nanoTime);
        }
        return allocateBlock;
    }

    public static PartitionedBlock<MatrixBlock> toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes, MatrixBlock> javaPairRDD, int i, int i2, int i3, long j) {
        long nanoTime = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        PartitionedBlock<MatrixBlock> partitionedBlock = new PartitionedBlock<>(i, i2, i3);
        for (Tuple2 tuple2 : javaPairRDD.collect()) {
            MatrixIndexes matrixIndexes = (MatrixIndexes) tuple2._1();
            partitionedBlock.setBlock((int) matrixIndexes.getRowIndex(), (int) matrixIndexes.getColumnIndex(), (MatrixBlock) tuple2._2());
        }
        if (DMLScript.STATISTICS) {
            SparkStatistics.accCollectTime(System.nanoTime() - nanoTime);
        }
        return partitionedBlock;
    }

    public static FrameBlock toFrameBlock(RDDObject rDDObject, Types.ValueType[] valueTypeArr, int i, int i2) {
        return toFrameBlock((JavaPairRDD<Long, FrameBlock>) rDDObject.getRDD(), valueTypeArr, i, i2);
    }

    public static FrameBlock toFrameBlock(JavaPairRDD<Long, FrameBlock> javaPairRDD, Types.ValueType[] valueTypeArr, int i, int i2) {
        long nanoTime = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        if (valueTypeArr == null) {
            valueTypeArr = UtilFunctions.nCopies(i2, Types.ValueType.STRING);
        }
        FrameBlock frameBlock = new FrameBlock(valueTypeArr, i);
        for (Tuple2 tuple2 : javaPairRDD.collect()) {
            int longValue = (int) (((Long) tuple2._1()).longValue() - 1);
            FrameBlock frameBlock2 = (FrameBlock) tuple2._2();
            frameBlock.copy(longValue, (longValue + frameBlock2.getNumRows()) - 1, 0, frameBlock2.getNumColumns() - 1, frameBlock2);
            if (longValue == 0) {
                frameBlock.setColumnNames(frameBlock2.getColumnNames());
                frameBlock.setColumnMetadata(frameBlock2.getColumnMetadata());
            }
        }
        if (DMLScript.STATISTICS) {
            SparkStatistics.accCollectTime(System.nanoTime() - nanoTime);
        }
        return frameBlock;
    }

    public static long writeMatrixRDDtoHDFS(RDDObject rDDObject, String str, Types.FileFormat fileFormat) {
        JavaPairRDD<?, ?> rdd = rDDObject.getRDD();
        InputOutputInfo inputOutputInfo = InputOutputInfo.get(Types.DataType.MATRIX, fileFormat);
        if (ConfigurationManager.isCompressionEnabled()) {
            rdd = rdd.mapValues(new DeCompressionSPInstruction.DeCompressionFunction());
        }
        LongAccumulator longAccumulator = getSparkContextStatic().sc().longAccumulator(DataExpression.READNNZPARAM);
        rdd.mapValues(new ComputeBinaryBlockNnzFunction(longAccumulator)).saveAsHadoopFile(str, inputOutputInfo.keyClass, inputOutputInfo.valueClass, inputOutputInfo.outputFormatClass);
        return longAccumulator.value().longValue();
    }

    public static void writeFrameRDDtoHDFS(RDDObject rDDObject, String str, Types.FileFormat fileFormat) {
        JavaPairRDD<?, ?> rdd = rDDObject.getRDD();
        InputOutputInfo inputOutputInfo = InputOutputInfo.get(Types.DataType.FRAME, fileFormat);
        if (fileFormat == Types.FileFormat.BINARY) {
            rdd = rdd.mapToPair(new FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction());
        }
        rdd.saveAsHadoopFile(str, inputOutputInfo.keyClass, inputOutputInfo.valueClass, inputOutputInfo.outputFormatClass);
    }

    public void addLineageRDD(String str, String str2) {
        getCacheableData(str).getRDDHandle().addLineageChild(getCacheableData(str2).getRDDHandle());
    }

    public void addLineageBroadcast(String str, String str2) {
        getCacheableData(str).getRDDHandle().addLineageChild(getCacheableData(str2).getBroadcastHandle());
    }

    public void addLineage(String str, String str2, boolean z) {
        if (z) {
            addLineageBroadcast(str, str2);
        } else {
            addLineageRDD(str, str2);
        }
    }

    @Override // org.apache.sysds.runtime.controlprogram.context.ExecutionContext
    public void cleanupCacheableData(CacheableData<?> cacheableData) {
        if (DMLScript.JMLC_MEM_STATISTICS) {
            Statistics.removeCPMemObject(System.identityHashCode(cacheableData));
        }
        if (cacheableData.isCleanupEnabled()) {
            try {
                if (!getVariables().hasReferences(cacheableData)) {
                    cacheableData.clearData(getTID());
                    if (cacheableData.isHDFSFileExists() && cacheableData.getFileName() != null) {
                        if (cacheableData.getRDDHandle() == null) {
                            HDFSTool.deleteFileWithMTDIfExistOnHDFS(cacheableData.getFileName());
                        } else {
                            cacheableData.getRDDHandle().setHDFSFilename(cacheableData.getFileName());
                        }
                    }
                    if (cacheableData.getRDDHandle() != null) {
                        rCleanupLineageObject(cacheableData.getRDDHandle());
                    }
                    if (cacheableData.getBroadcastHandle() != null) {
                        rCleanupLineageObject(cacheableData.getBroadcastHandle());
                    }
                }
            } catch (Exception e) {
                throw new DMLRuntimeException(e);
            }
        }
    }

    private void rCleanupLineageObject(LineageObject lineageObject) throws IOException {
        if (lineageObject.getNumReferences() > 0 || lineageObject.hasBackReference() || lineageObject.isInLineageCache()) {
            return;
        }
        cleanupSingleLineageObject(lineageObject);
        for (LineageObject lineageObject2 : lineageObject.getLineageChilds()) {
            lineageObject2.decrementNumReferences();
            rCleanupLineageObject(lineageObject2);
        }
    }

    public static void cleanupSingleLineageObject(LineageObject lineageObject) {
        Broadcast nonPartitionedBroadcast;
        PartitionedBroadcast partitionedBroadcast;
        if (lineageObject instanceof RDDObject) {
            RDDObject rDDObject = (RDDObject) lineageObject;
            int id = rDDObject.getRDD().id();
            cleanupRDDVariable(rDDObject.getRDD());
            if (rDDObject.getHDFSFilename() != null) {
                try {
                    HDFSTool.deleteFileWithMTDIfExistOnHDFS(rDDObject.getHDFSFilename());
                } catch (IOException e) {
                    throw new DMLRuntimeException(e);
                }
            }
            if (rDDObject.isParallelizedRDD()) {
                _parRDDs.deregisterRDD(id);
                return;
            }
            return;
        }
        if (lineageObject instanceof BroadcastObject) {
            BroadcastObject broadcastObject = (BroadcastObject) lineageObject;
            if (broadcastObject.isPartitionedBroadcastValid() && (partitionedBroadcast = broadcastObject.getPartitionedBroadcast()) != null) {
                partitionedBroadcast.destroy();
            }
            if (((BroadcastObject) lineageObject).isNonPartitionedBroadcastValid() && (nonPartitionedBroadcast = broadcastObject.getNonPartitionedBroadcast()) != null) {
                cleanupBroadcastVariable(nonPartitionedBroadcast);
            }
            CacheableData.addBroadcastSize(-broadcastObject.getSize());
        }
    }

    public static void cleanupBroadcastVariable(Broadcast<?> broadcast) {
        if (broadcast.isValid()) {
            broadcast.destroy(false);
        }
    }

    public static void cleanupRDDVariable(JavaPairRDD<?, ?> javaPairRDD) {
        if (javaPairRDD.getStorageLevel() != StorageLevel.NONE()) {
            javaPairRDD.unpersist(false);
        }
    }

    public void repartitionAndCacheMatrixObject(String str) {
        MatrixObject matrixObject = getMatrixObject(str);
        DataCharacteristics dataCharacteristics = matrixObject.getDataCharacteristics();
        if (OptimizerUtils.exceedsCachingThreshold(matrixObject.getNumColumns(), OptimizerUtils.estimateSizeExactSparsity(dataCharacteristics))) {
            JavaPairRDD<?, ?> rDDHandleForMatrixObject = getRDDHandleForMatrixObject(matrixObject, Types.FileFormat.BINARY);
            if (SparkUtils.isHashPartitioned(rDDHandleForMatrixObject)) {
                return;
            }
            if (matrixObject.getRDDHandle().allowsShortCircuitRead() && isRDDMarkedForCaching(rDDHandleForMatrixObject.id()) && !isRDDCached(rDDHandleForMatrixObject.id())) {
                rDDHandleForMatrixObject = ((RDDObject) matrixObject.getRDDHandle().getLineageChilds().get(0)).getRDD();
                int numPreferredPartitions = SparkUtils.getNumPreferredPartitions(dataCharacteristics, rDDHandleForMatrixObject);
                if (numPreferredPartitions < rDDHandleForMatrixObject.getNumPartitions()) {
                    rDDHandleForMatrixObject = rDDHandleForMatrixObject.coalesce(numPreferredPartitions);
                }
            }
            JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey = RDDAggregateUtils.mergeByKey(rDDHandleForMatrixObject, false);
            if (OptimizerUtils.checkSparseBlockCSRConversion(dataCharacteristics)) {
                mergeByKey = mergeByKey.mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR));
            }
            mergeByKey.persist(Checkpoint.DEFAULT_STORAGE_LEVEL).count();
            RDDObject rDDHandle = matrixObject.getRDDHandle();
            RDDObject rDDObject = new RDDObject(mergeByKey);
            rDDObject.setCheckpointRDD(true);
            rDDObject.addLineageChild(rDDHandle);
            matrixObject.setRDDHandle(rDDObject);
        }
    }

    public void cacheMatrixObject(String str) {
        MatrixObject matrixObject = getMatrixObject(str);
        if (OptimizerUtils.exceedsCachingThreshold(matrixObject.getNumColumns(), OptimizerUtils.estimateSizeExactSparsity(matrixObject.getDataCharacteristics()))) {
            JavaPairRDD<?, ?> rDDHandleForMatrixObject = getRDDHandleForMatrixObject(matrixObject, Types.FileFormat.BINARY);
            if (isRDDCached(rDDHandleForMatrixObject.id())) {
                return;
            }
            rDDHandleForMatrixObject.count();
        }
    }

    public int setThreadLocalSchedulerPool() {
        int allocSchedulerPoolName = allocSchedulerPoolName();
        getSparkContext().sc().setLocalProperty("spark.scheduler.pool", "parforPool" + allocSchedulerPoolName);
        return allocSchedulerPoolName;
    }

    public void cleanupThreadLocalSchedulerPool(int i) {
        freeSchedulerPoolName(i);
        getSparkContext().sc().setLocalProperty("spark.scheduler.pool", (String) null);
    }

    private static synchronized int allocSchedulerPoolName() {
        int indexOf = ArrayUtils.indexOf(_poolBuff, false);
        if (indexOf < 0) {
            indexOf = _poolBuff.length;
            _poolBuff = Arrays.copyOf(_poolBuff, (int) Math.min(2 * indexOf, OptimizerUtils.MAX_NUMCELLS_CP_DENSE));
        }
        _poolBuff[indexOf] = true;
        return indexOf;
    }

    private static synchronized void freeSchedulerPoolName(int i) {
        _poolBuff[i] = false;
    }

    private boolean isRDDMarkedForCaching(int i) {
        return getSparkContext().sc().getPersistentRDDs().contains(Integer.valueOf(i));
    }

    public static boolean isRDDCached(int i) {
        if (!isSparkContextCreated()) {
            return false;
        }
        JavaSparkContext javaSparkContext = _spctx;
        if (!javaSparkContext.sc().getPersistentRDDs().contains(Integer.valueOf(i))) {
            return false;
        }
        for (RDDInfo rDDInfo : javaSparkContext.sc().getRDDStorageInfo()) {
            if (rDDInfo.id() == i) {
                return rDDInfo.isCached();
            }
        }
        return false;
    }

    public static long getMemCachedRDDSize(int i) {
        if (!isSparkContextCreated()) {
            return 0L;
        }
        JavaSparkContext javaSparkContext = _spctx;
        if (!javaSparkContext.sc().getPersistentRDDs().contains(Integer.valueOf(i))) {
            return 0L;
        }
        for (RDDInfo rDDInfo : javaSparkContext.sc().getRDDStorageInfo()) {
            if (rDDInfo.id() == i && rDDInfo.isCached()) {
                return rDDInfo.memSize();
            }
        }
        return 0L;
    }

    public static long getStorageSpaceUsed() {
        if (isSparkContextCreated()) {
            return Arrays.stream(_spctx.sc().getRDDStorageInfo()).mapToLong((v0) -> {
                return v0.memSize();
            }).sum();
        }
        return 0L;
    }

    public static synchronized SparkClusterConfig getSparkClusterConfig() {
        if (_sconf == null) {
            _sconf = new SparkClusterConfig();
        }
        return _sconf;
    }

    public static double getBroadcastMemoryBudget() {
        return getSparkClusterConfig().getBroadcastMemoryBudget();
    }

    public static double getDataMemoryBudget(boolean z, boolean z2) {
        return getSparkClusterConfig().getDataMemoryBudget(z, z2);
    }

    public static int getNumExecutors() {
        return getSparkClusterConfig().getNumExecutors();
    }

    public static int getDefaultParallelism(boolean z) {
        return getSparkClusterConfig().getDefaultParallelism(z);
    }
}
