package org.apache.sysds.runtime.instructions.spark;

import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.RDDInfo;
import org.apache.spark.storage.StorageLevel;
import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory;
import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
import org.apache.sysds.runtime.compress.SingletonLookupHashMap;
import org.apache.sysds.runtime.compress.cost.CostEstimatorBuilder;
import org.apache.sysds.runtime.compress.cost.CostEstimatorFactory;
import org.apache.sysds.runtime.compress.workload.WTreeRoot;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import org.apache.sysds.runtime.instructions.spark.SPInstruction;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.operators.Operator;
import scala.Tuple2;

/* loaded from: input_file:org/apache/sysds/runtime/instructions/spark/CompressionSPInstruction.class */
public class CompressionSPInstruction extends UnarySPInstruction {
    private static final Log LOG = LogFactory.getLog(CompressionSPInstruction.class.getName());
    private final int _singletonLookupID;

    /* loaded from: input_file:org/apache/sysds/runtime/instructions/spark/CompressionSPInstruction$CompressionFunction.class */
    public static class CompressionFunction implements Function<MatrixBlock, MatrixBlock> {
        private static final long serialVersionUID = -6528833083609423922L;

        public MatrixBlock call(MatrixBlock matrixBlock) throws Exception {
            return (MatrixBlock) CompressedMatrixBlockFactory.compress(matrixBlock, new CompressionSettingsBuilder().setIsInSparkInstruction().setCostType(CostEstimatorFactory.CostType.MEMORY)).getLeft();
        }
    }

    /* loaded from: input_file:org/apache/sysds/runtime/instructions/spark/CompressionSPInstruction$CompressionWorkloadFunction.class */
    public static class CompressionWorkloadFunction implements Function<MatrixBlock, MatrixBlock> {
        private static final long serialVersionUID = -65288330833922L;
        final CostEstimatorBuilder costBuilder;

        public CompressionWorkloadFunction(CostEstimatorBuilder costEstimatorBuilder) {
            this.costBuilder = costEstimatorBuilder;
        }

        public MatrixBlock call(MatrixBlock matrixBlock) throws Exception {
            return (MatrixBlock) CompressedMatrixBlockFactory.compress(matrixBlock, InfrastructureAnalyzer.getLocalParallelism(), new CompressionSettingsBuilder().setIsInSparkInstruction(), this.costBuilder).getLeft();
        }
    }

    /* loaded from: input_file:org/apache/sysds/runtime/instructions/spark/CompressionSPInstruction$SizeFunction.class */
    public static class SizeFunction implements Function<MatrixBlock, Long> {
        private static final long serialVersionUID = 1;

        public Long call(MatrixBlock matrixBlock) throws Exception {
            return Long.valueOf(matrixBlock.getInMemorySize());
        }
    }

    private CompressionSPInstruction(Operator operator, CPOperand cPOperand, CPOperand cPOperand2, String str, String str2, int i) {
        super(SPInstruction.SPType.Compression, operator, cPOperand, cPOperand2, str, str2);
        this._singletonLookupID = i;
    }

    public static CompressionSPInstruction parseInstruction(String str) {
        InstructionUtils.checkNumFields(str, 2, 3);
        String[] instructionPartsWithValueType = InstructionUtils.getInstructionPartsWithValueType(str);
        String str2 = instructionPartsWithValueType[0];
        CPOperand cPOperand = new CPOperand(instructionPartsWithValueType[1]);
        CPOperand cPOperand2 = new CPOperand(instructionPartsWithValueType[2]);
        return instructionPartsWithValueType.length == 4 ? new CompressionSPInstruction(null, cPOperand, cPOperand2, str2, str, Integer.parseInt(instructionPartsWithValueType[3])) : new CompressionSPInstruction(null, cPOperand, cPOperand2, str2, str, 0);
    }

    @Override // org.apache.sysds.runtime.instructions.spark.SPInstruction, org.apache.sysds.runtime.instructions.Instruction
    public void processInstruction(ExecutionContext executionContext) {
        SparkExecutionContext sparkExecutionContext = (SparkExecutionContext) executionContext;
        JavaPairRDD<MatrixIndexes, MatrixBlock> binaryMatrixBlockRDDHandleForVariable = sparkExecutionContext.getBinaryMatrixBlockRDDHandleForVariable(this.input1.getName());
        JavaPairRDD<?, ?> mapValues = binaryMatrixBlockRDDHandleForVariable.mapValues(this._singletonLookupID == 0 ? new CompressionFunction() : new CompressionWorkloadFunction(new CostEstimatorBuilder((WTreeRoot) SingletonLookupHashMap.getMap().get(this._singletonLookupID))));
        if (LOG.isTraceEnabled()) {
            binaryMatrixBlockRDDHandleForVariable.persist(StorageLevel.MEMORY_AND_DISK());
            mapValues.persist(StorageLevel.MEMORY_AND_DISK());
            long j = 0;
            long j2 = 0;
            long longValue = reduceSizes(binaryMatrixBlockRDDHandleForVariable.mapValues(new SizeFunction()).collect()).longValue();
            long longValue2 = reduceSizes(mapValues.mapValues(new SizeFunction()).collect()).longValue();
            for (RDDInfo rDDInfo : sparkExecutionContext.getSparkContext().sc().getRDDStorageInfo()) {
                if (rDDInfo.id() == mapValues.id()) {
                    j2 = rDDInfo.memSize();
                } else if (rDDInfo.id() == binaryMatrixBlockRDDHandleForVariable.id()) {
                    j = rDDInfo.memSize();
                }
            }
            LOG.trace("Spark Compression Instruction sizes:" + String.format("\nSBCompress: InSize:       %16d", Long.valueOf(j)) + String.format("\nSBCompress: InBlockSize:  %16d", Long.valueOf(longValue)) + String.format("\nSBCompress: OutSize:      %16d", Long.valueOf(j2)) + String.format("\nSBCompress: OutBlockSize: %16d", Long.valueOf(longValue2)));
        }
        sparkExecutionContext.setRDDHandleForVariable(this.output.getName(), mapValues);
        sparkExecutionContext.addLineageRDD(this.input1.getName(), this.output.getName());
    }

    public static Long reduceSizes(List<Tuple2<MatrixIndexes, Long>> list) {
        long j = 0;
        Iterator<Tuple2<MatrixIndexes, Long>> it = list.iterator();
        while (it.hasNext()) {
            j += ((Long) it.next()._2()).longValue();
        }
        return Long.valueOf(j);
    }
}
