package org.apache.sysml.runtime.instructions.spark;

import java.util.ArrayList;
import java.util.Iterator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.storage.StorageLevel;
import org.apache.sysml.lops.PMapMult;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysml.runtime.functionobjects.Multiply;
import org.apache.sysml.runtime.functionobjects.Plus;
import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.instructions.spark.SPInstruction;
import org.apache.sysml.runtime.instructions.spark.data.PartitionedBlock;
import org.apache.sysml.runtime.instructions.spark.functions.IsBlockInRange;
import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
import org.apache.sysml.runtime.matrix.operators.Operator;
import scala.Tuple2;

/* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.class */
public class PMapmmSPInstruction extends BinarySPInstruction {
    private static final int NUM_ROWBLOCKS = 4;

    /* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction$PMapMMFunction.class */
    private static class PMapMMFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> {
        private static final long serialVersionUID = -4520080421816885321L;
        private AggregateBinaryOperator _op;
        private Broadcast<PartitionedBlock<MatrixBlock>> _pbc;
        private long _offset;

        public PMapMMFunction(Broadcast<PartitionedBlock<MatrixBlock>> broadcast, long j) {
            this._op = null;
            this._pbc = null;
            this._offset = -1L;
            this._pbc = broadcast;
            this._offset = j;
            this._op = new AggregateBinaryOperator(Multiply.getMultiplyFnObject(), new AggregateOperator(0.0d, Plus.getPlusFnObject()));
        }

        public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> tuple2) throws Exception {
            PartitionedBlock partitionedBlock = (PartitionedBlock) this._pbc.value();
            MatrixIndexes matrixIndexes = (MatrixIndexes) tuple2._1();
            MatrixBlock matrixBlock = (MatrixBlock) tuple2._2();
            MatrixIndexes matrixIndexes2 = new MatrixIndexes();
            MatrixBlock matrixBlock2 = new MatrixBlock();
            ArrayList arrayList = new ArrayList();
            for (int i = 1; i <= partitionedBlock.getNumRowBlocks(); i++) {
                OperationsOnMatrixValues.matMult(new MatrixIndexes(i, matrixIndexes.getRowIndex()), (MatrixBlock) partitionedBlock.getBlock(i, (int) matrixIndexes.getRowIndex()), matrixIndexes, matrixBlock, matrixIndexes2, matrixBlock2, this._op);
                matrixIndexes2.setIndexes(this._offset + i, matrixIndexes2.getColumnIndex());
                arrayList.add(new Tuple2(matrixIndexes2, matrixBlock2));
            }
            return arrayList.iterator();
        }
    }

    /* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction$PMapMMRebaseBlocksFunction.class */
    private static class PMapMMRebaseBlocksFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> {
        private static final long serialVersionUID = 98051757210704132L;
        private int _offset;

        public PMapMMRebaseBlocksFunction(int i) {
            this._offset = -1;
            this._offset = i;
        }

        public Tuple2<MatrixIndexes, MatrixBlock> call(Tuple2<MatrixIndexes, MatrixBlock> tuple2) throws Exception {
            return new Tuple2<>(new MatrixIndexes(((MatrixIndexes) tuple2._1()).getRowIndex() - this._offset, ((MatrixIndexes) tuple2._1()).getColumnIndex()), tuple2._2());
        }
    }

    private PMapmmSPInstruction(Operator operator, CPOperand cPOperand, CPOperand cPOperand2, CPOperand cPOperand3, String str, String str2) {
        super(SPInstruction.SPType.PMAPMM, operator, cPOperand, cPOperand2, cPOperand3, str, str2);
    }

    public static PMapmmSPInstruction parseInstruction(String str) {
        String[] instructionPartsWithValueType = InstructionUtils.getInstructionPartsWithValueType(str);
        String str2 = instructionPartsWithValueType[0];
        if (!str2.equalsIgnoreCase(PMapMult.OPCODE)) {
            throw new DMLRuntimeException("PMapmmSPInstruction.parseInstruction():: Unknown opcode " + str2);
        }
        return new PMapmmSPInstruction(new AggregateBinaryOperator(Multiply.getMultiplyFnObject(), new AggregateOperator(0.0d, Plus.getPlusFnObject())), new CPOperand(instructionPartsWithValueType[1]), new CPOperand(instructionPartsWithValueType[2]), new CPOperand(instructionPartsWithValueType[3]), str2, str);
    }

    @Override // org.apache.sysml.runtime.instructions.spark.SPInstruction, org.apache.sysml.runtime.instructions.Instruction
    public void processInstruction(ExecutionContext executionContext) {
        SparkExecutionContext sparkExecutionContext = (SparkExecutionContext) executionContext;
        JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlockRDDHandleForVariable = sparkExecutionContext.getBinaryBlockRDDHandleForVariable(this.input1.getName());
        JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlockRDDHandleForVariable2 = sparkExecutionContext.getBinaryBlockRDDHandleForVariable(this.input2.getName());
        MatrixCharacteristics matrixCharacteristics = sparkExecutionContext.getMatrixCharacteristics(this.input1.getName());
        StorageLevel MEMORY_AND_DISK = StorageLevel.MEMORY_AND_DISK();
        JavaPairRDD persist = binaryBlockRDDHandleForVariable2.repartition(sparkExecutionContext.getSparkContext().defaultParallelism().intValue()).persist(MEMORY_AND_DISK);
        JavaPairRDD<MatrixIndexes, MatrixBlock> javaPairRDD = null;
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 >= matrixCharacteristics.getRows()) {
                JavaPairRDD<?, ?> persist2 = javaPairRDD.persist(MEMORY_AND_DISK);
                persist2.count();
                sparkExecutionContext.setRDDHandleForVariable(this.output.getName(), persist2);
                sparkExecutionContext.addLineageRDD(this.output.getName(), this.input1.getName());
                sparkExecutionContext.addLineageRDD(this.output.getName(), this.input2.getName());
                updateBinaryMMOutputMatrixCharacteristics(sparkExecutionContext, true);
                return;
            }
            Broadcast broadcast = sparkExecutionContext.getSparkContext().broadcast(SparkExecutionContext.toPartitionedMatrixBlock(binaryBlockRDDHandleForVariable.filter(new IsBlockInRange(i2 + 1, i2 + (4 * matrixCharacteristics.getRowsPerBlock()), 1L, matrixCharacteristics.getCols(), matrixCharacteristics)).mapToPair(new PMapMMRebaseBlocksFunction(i2 / matrixCharacteristics.getRowsPerBlock())), (int) Math.min(matrixCharacteristics.getRows() - i2, 4 * matrixCharacteristics.getRowsPerBlock()), (int) matrixCharacteristics.getCols(), matrixCharacteristics.getRowsPerBlock(), matrixCharacteristics.getColsPerBlock(), -1L));
            JavaPairRDD<MatrixIndexes, MatrixBlock> sumByKeyStable = RDDAggregateUtils.sumByKeyStable(persist.flatMapToPair(new PMapMMFunction(broadcast, i2 / matrixCharacteristics.getRowsPerBlock())), false);
            sumByKeyStable.persist(MEMORY_AND_DISK).count();
            broadcast.unpersist(false);
            javaPairRDD = javaPairRDD == null ? sumByKeyStable : javaPairRDD.union(sumByKeyStable);
            i = i2 + (4 * matrixCharacteristics.getRowsPerBlock());
        }
    }
}
