package org.apache.sysds.runtime.instructions.spark;

import org.apache.commons.lang.NotImplementedException;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.sysds.api.DMLException;
import org.apache.sysds.common.Types;
import org.apache.sysds.hops.AggBinaryOp;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.functionobjects.ReduceAll;
import org.apache.sysds.runtime.functionobjects.ReduceCol;
import org.apache.sysds.runtime.functionobjects.ReduceRow;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
import org.apache.sysds.runtime.instructions.spark.SPInstruction;
import org.apache.sysds.runtime.instructions.spark.data.CorrMatrixBlock;
import org.apache.sysds.runtime.matrix.data.LibMatrixCountDistinct;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.operators.CountDistinctOperator;
import org.apache.sysds.runtime.matrix.operators.CountDistinctOperatorTypes;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.utils.Hash;
import scala.Tuple2;

/* loaded from: input_file:org/apache/sysds/runtime/instructions/spark/AggregateUnarySketchSPInstruction.class */
public class AggregateUnarySketchSPInstruction extends UnarySPInstruction {
    private AggBinaryOp.SparkAggType aggtype;
    private CountDistinctOperator op;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysds/runtime/instructions/spark/AggregateUnarySketchSPInstruction$AggregateUnarySketchCreateCombinerFunction.class */
    public static class AggregateUnarySketchCreateCombinerFunction implements Function<MatrixBlock, CorrMatrixBlock> {
        private static final long serialVersionUID = 8997980606986435297L;
        private final CountDistinctOperator op;

        private AggregateUnarySketchCreateCombinerFunction(CountDistinctOperator countDistinctOperator) {
            this.op = countDistinctOperator;
        }

        public CorrMatrixBlock call(MatrixBlock matrixBlock) throws Exception {
            return LibMatrixCountDistinct.createSketch(matrixBlock, this.op);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysds/runtime/instructions/spark/AggregateUnarySketchSPInstruction$AggregateUnarySketchCreateFunction.class */
    public static class AggregateUnarySketchCreateFunction implements Function<Tuple2<MatrixIndexes, MatrixBlock>, CorrMatrixBlock> {
        private static final long serialVersionUID = 7295176181965491548L;
        private CountDistinctOperator op;

        public AggregateUnarySketchCreateFunction(CountDistinctOperator countDistinctOperator) {
            this.op = countDistinctOperator;
        }

        public CorrMatrixBlock call(Tuple2<MatrixIndexes, MatrixBlock> tuple2) throws Exception {
            MatrixIndexes matrixIndexes = (MatrixIndexes) tuple2._1();
            MatrixBlock matrixBlock = (MatrixBlock) tuple2._2();
            this.op.getIndexFunction().execute(matrixIndexes, new MatrixIndexes());
            return LibMatrixCountDistinct.createSketch(matrixBlock, this.op);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysds/runtime/instructions/spark/AggregateUnarySketchSPInstruction$AggregateUnarySketchMergeCombinerFunction.class */
    public static class AggregateUnarySketchMergeCombinerFunction implements Function2<CorrMatrixBlock, CorrMatrixBlock, CorrMatrixBlock> {
        private static final long serialVersionUID = 172215143740379070L;
        private CountDistinctOperator op;

        public AggregateUnarySketchMergeCombinerFunction(CountDistinctOperator countDistinctOperator) {
            this.op = countDistinctOperator;
        }

        public CorrMatrixBlock call(CorrMatrixBlock corrMatrixBlock, CorrMatrixBlock corrMatrixBlock2) throws Exception {
            return LibMatrixCountDistinct.unionSketch(corrMatrixBlock, corrMatrixBlock2, this.op);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysds/runtime/instructions/spark/AggregateUnarySketchSPInstruction$AggregateUnarySketchMergeValueFunction.class */
    public static class AggregateUnarySketchMergeValueFunction implements Function2<CorrMatrixBlock, MatrixBlock, CorrMatrixBlock> {
        private static final long serialVersionUID = -7006864809860460549L;
        private CountDistinctOperator op;

        public AggregateUnarySketchMergeValueFunction(CountDistinctOperator countDistinctOperator) {
            this.op = countDistinctOperator;
        }

        public CorrMatrixBlock call(CorrMatrixBlock corrMatrixBlock, MatrixBlock matrixBlock) throws Exception {
            return LibMatrixCountDistinct.unionSketch(corrMatrixBlock, LibMatrixCountDistinct.createSketch(matrixBlock, this.op), this.op);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysds/runtime/instructions/spark/AggregateUnarySketchSPInstruction$AggregateUnarySketchUnionAllFunction.class */
    public static class AggregateUnarySketchUnionAllFunction implements Function2<CorrMatrixBlock, CorrMatrixBlock, CorrMatrixBlock> {
        private static final long serialVersionUID = -3799519241499062936L;
        private CountDistinctOperator op;

        public AggregateUnarySketchUnionAllFunction(CountDistinctOperator countDistinctOperator) {
            this.op = countDistinctOperator;
        }

        public CorrMatrixBlock call(CorrMatrixBlock corrMatrixBlock, CorrMatrixBlock corrMatrixBlock2) throws Exception {
            if (corrMatrixBlock.getCorrection() == null && corrMatrixBlock2.getCorrection() == null) {
                throw new DMLRuntimeException("Corrupt sketch: metadata is missing");
            }
            if ((corrMatrixBlock.getValue().getNumRows() != 0 || corrMatrixBlock.getValue().getNumColumns() != 0) && corrMatrixBlock.getCorrection() != null) {
                return ((corrMatrixBlock2.getValue().getNumRows() == 0 && corrMatrixBlock2.getValue().getNumColumns() == 0) || corrMatrixBlock2.getCorrection() == null) ? corrMatrixBlock : LibMatrixCountDistinct.unionSketch(corrMatrixBlock, corrMatrixBlock2, this.op);
            }
            corrMatrixBlock.set(corrMatrixBlock2.getValue(), corrMatrixBlock2.getCorrection());
            return corrMatrixBlock;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysds/runtime/instructions/spark/AggregateUnarySketchSPInstruction$CalculateAggregateSketchFunction.class */
    public static class CalculateAggregateSketchFunction implements Function<CorrMatrixBlock, MatrixBlock> {
        private static final long serialVersionUID = 7504873483231717138L;
        private CountDistinctOperator op;

        public CalculateAggregateSketchFunction(CountDistinctOperator countDistinctOperator) {
            this.op = countDistinctOperator;
        }

        public MatrixBlock call(CorrMatrixBlock corrMatrixBlock) throws Exception {
            return LibMatrixCountDistinct.countDistinctValuesFromSketch(corrMatrixBlock, this.op);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysds/runtime/instructions/spark/AggregateUnarySketchSPInstruction$RowColGroupingFunction.class */
    public static class RowColGroupingFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> {
        private static final long serialVersionUID = -3456633769452405482L;
        private CountDistinctOperator _op;

        public RowColGroupingFunction(CountDistinctOperator countDistinctOperator) {
            this._op = countDistinctOperator;
        }

        public Tuple2<MatrixIndexes, MatrixBlock> call(Tuple2<MatrixIndexes, MatrixBlock> tuple2) throws Exception {
            MatrixIndexes matrixIndexes = (MatrixIndexes) tuple2._1();
            MatrixBlock matrixBlock = (MatrixBlock) tuple2._2();
            MatrixIndexes matrixIndexes2 = new MatrixIndexes();
            this._op.getIndexFunction().execute(matrixIndexes, matrixIndexes2);
            return new Tuple2<>(matrixIndexes2, matrixBlock);
        }
    }

    protected AggregateUnarySketchSPInstruction(Operator operator, CPOperand cPOperand, CPOperand cPOperand2, AggBinaryOp.SparkAggType sparkAggType, String str, String str2) {
        super(SPInstruction.SPType.AggregateUnarySketch, operator, cPOperand, cPOperand2, str, str2);
        this.op = (CountDistinctOperator) super.getOperator();
        if (str.equals("uacdap")) {
            this.op.setDirection(Types.Direction.RowCol).setIndexFunction(ReduceAll.getReduceAllFnObject());
        } else if (str.equals("uacdapr")) {
            this.op.setDirection(Types.Direction.Row).setIndexFunction(ReduceCol.getReduceColFnObject());
        } else {
            if (!str.equals("uacdapc")) {
                throw new DMLException("Unrecognized opcode " + str);
            }
            this.op.setDirection(Types.Direction.Col).setIndexFunction(ReduceRow.getReduceRowFnObject());
        }
        this.aggtype = sparkAggType;
    }

    public static AggregateUnarySketchSPInstruction parseInstruction(String str) {
        String[] instructionPartsWithValueType = InstructionUtils.getInstructionPartsWithValueType(str);
        InstructionUtils.checkNumFields(instructionPartsWithValueType, 3);
        String str2 = instructionPartsWithValueType[0];
        return new AggregateUnarySketchSPInstruction(new CountDistinctOperator(CountDistinctOperatorTypes.KMV, Hash.HashType.LinearHash), new CPOperand(instructionPartsWithValueType[1]), new CPOperand(instructionPartsWithValueType[2]), AggBinaryOp.SparkAggType.valueOf(instructionPartsWithValueType[3]), str2, str);
    }

    @Override // org.apache.sysds.runtime.instructions.spark.SPInstruction, org.apache.sysds.runtime.instructions.Instruction
    public void processInstruction(ExecutionContext executionContext) {
        if (this.input1.getDataType() == Types.DataType.MATRIX) {
            processMatrixSketch(executionContext);
        } else {
            processTensorSketch(executionContext);
        }
    }

    private void processMatrixSketch(ExecutionContext executionContext) {
        SparkExecutionContext sparkExecutionContext = (SparkExecutionContext) executionContext;
        JavaPairRDD<MatrixIndexes, MatrixBlock> binaryMatrixBlockRDDHandleForVariable = sparkExecutionContext.getBinaryMatrixBlockRDDHandleForVariable(this.input1.getName());
        if (this.aggtype == AggBinaryOp.SparkAggType.SINGLE_BLOCK) {
            sparkExecutionContext.setMatrixOutput(this.output.getName(), LibMatrixCountDistinct.countDistinctValuesFromSketch((CorrMatrixBlock) binaryMatrixBlockRDDHandleForVariable.map(new AggregateUnarySketchCreateFunction(this.op)).fold(new CorrMatrixBlock(new MatrixBlock()), new AggregateUnarySketchUnionAllFunction(this.op)), this.op));
            return;
        }
        if (this.aggtype != AggBinaryOp.SparkAggType.NONE && this.aggtype != AggBinaryOp.SparkAggType.MULTI_BLOCK) {
            throw new DMLRuntimeException(String.format("Unsupported aggregation type: %s", this.aggtype));
        }
        JavaPairRDD<?, ?> mapValues = (this.aggtype == AggBinaryOp.SparkAggType.NONE ? binaryMatrixBlockRDDHandleForVariable.mapValues(new AggregateUnarySketchCreateCombinerFunction(this.op)) : binaryMatrixBlockRDDHandleForVariable.mapToPair(new RowColGroupingFunction(this.op)).combineByKey(new AggregateUnarySketchCreateCombinerFunction(this.op), new AggregateUnarySketchMergeValueFunction(this.op), new AggregateUnarySketchMergeCombinerFunction(this.op))).mapValues(new CalculateAggregateSketchFunction(this.op));
        updateUnaryAggOutputDataCharacteristics(sparkExecutionContext, this.op.getIndexFunction());
        sparkExecutionContext.setRDDHandleForVariable(this.output.getName(), mapValues);
        sparkExecutionContext.addLineageRDD(this.output.getName(), this.input1.getName());
    }

    private void processTensorSketch(ExecutionContext executionContext) {
        throw new NotImplementedException("Aggregate sketch instruction for tensors has not been implemented yet.");
    }
}
