package org.apache.sysml.runtime.io;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.runtime.matrix.data.DenseBlock;
import org.apache.sysml.runtime.matrix.data.IJV;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.SparseBlock;
import org.apache.sysml.runtime.util.CommonThreadPool;
import org.apache.sysml.runtime.util.FastStringTokenizer;
import org.apache.sysml.runtime.util.MapReduceTool;
import org.apache.sysml.runtime.util.UtilFunctions;

/* loaded from: input_file:org/apache/sysml/runtime/io/ReaderTextCellParallel.class */
public class ReaderTextCellParallel extends ReaderTextCell {
    private static final long MIN_FILESIZE_MM = 8192;
    private int _numThreads;

    /* loaded from: input_file:org/apache/sysml/runtime/io/ReaderTextCellParallel$CellBuffer.class */
    public static class CellBuffer {
        public static final int CAPACITY = 102400;
        private int[] _rlen = new int[CAPACITY];
        private int[] _clen = new int[CAPACITY];
        private double[] _vals = new double[CAPACITY];
        private int _pos = -1;

        public void addCell(int i, int i2, double d) {
            if (d == 0.0d) {
                return;
            }
            this._pos++;
            this._rlen[this._pos] = i;
            this._clen[this._pos] = i2;
            this._vals[this._pos] = d;
        }

        public void flushCellBufferToMatrixBlock(MatrixBlock matrixBlock) {
            for (int i = 0; i <= this._pos; i++) {
                matrixBlock.appendValue(this._rlen[i], this._clen[i], this._vals[i]);
            }
            reset();
        }

        public int size() {
            return this._pos + 1;
        }

        public void reset() {
            this._pos = -1;
        }
    }

    /* loaded from: input_file:org/apache/sysml/runtime/io/ReaderTextCellParallel$CountNnzTask.class */
    public static class CountNnzTask implements Callable<Void> {
        private final InputSplit _split;
        private final TextInputFormat _informat;
        private final JobConf _job;
        private final int[] _rNnz;
        private final boolean _isSymmetric;

        public CountNnzTask(InputSplit inputSplit, TextInputFormat textInputFormat, JobConf jobConf, int[] iArr, boolean z) {
            this._split = inputSplit;
            this._informat = textInputFormat;
            this._job = jobConf;
            this._rNnz = iArr;
            this._isSymmetric = z;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            LongWritable longWritable = new LongWritable();
            Text text = new Text();
            FastStringTokenizer fastStringTokenizer = new FastStringTokenizer(' ');
            RecordReader<LongWritable, Text> recordReader = this._informat.getRecordReader(this._split, this._job, Reporter.NULL);
            while (recordReader.next(longWritable, text)) {
                try {
                    if (text.toString().charAt(0) != '%') {
                        fastStringTokenizer.reset(text.toString());
                        int[] iArr = this._rNnz;
                        int nextLong = ((int) fastStringTokenizer.nextLong()) - 1;
                        iArr[nextLong] = iArr[nextLong] + 1;
                        if (this._isSymmetric) {
                            int[] iArr2 = this._rNnz;
                            int nextLong2 = ((int) fastStringTokenizer.nextLong()) - 1;
                            iArr2[nextLong2] = iArr2[nextLong2] + 1;
                        }
                    }
                } finally {
                    IOUtilFunctions.closeSilently(recordReader);
                }
            }
            return null;
        }
    }

    /* loaded from: input_file:org/apache/sysml/runtime/io/ReaderTextCellParallel$ReadTask.class */
    public static class ReadTask implements Callable<Long> {
        private final InputSplit _split;
        private final boolean _sparse;
        private final TextInputFormat _informat;
        private final JobConf _job;
        private final MatrixBlock _dest;
        private final long _rlen;
        private final long _clen;
        private final boolean _matrixMarket;
        private final FileFormatPropertiesMM _mmProps;

        public ReadTask(InputSplit inputSplit, TextInputFormat textInputFormat, JobConf jobConf, MatrixBlock matrixBlock, long j, long j2, boolean z, FileFormatPropertiesMM fileFormatPropertiesMM) {
            this._split = inputSplit;
            this._sparse = matrixBlock.isInSparseFormat();
            this._informat = textInputFormat;
            this._job = jobConf;
            this._dest = matrixBlock;
            this._rlen = j;
            this._clen = j2;
            this._matrixMarket = z;
            this._mmProps = fileFormatPropertiesMM;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Long call() throws Exception {
            long j = 0;
            LongWritable longWritable = new LongWritable();
            Text text = new Text();
            IJV ijv = new IJV();
            FastStringTokenizer fastStringTokenizer = new FastStringTokenizer(' ');
            RecordReader<LongWritable, Text> recordReader = this._informat.getRecordReader(this._split, this._job, Reporter.NULL);
            try {
                try {
                    if (this._matrixMarket) {
                        boolean z = false;
                        while (recordReader.next(longWritable, text) && text.toString().charAt(0) == '%') {
                            z = true;
                        }
                        if (!z) {
                            ijv = ReaderTextCell.parseCell(text.toString(), fastStringTokenizer, ijv, this._mmProps);
                            synchronized (this._dest) {
                                j = 0 + ReaderTextCell.appendCell(ijv, this._dest, this._mmProps);
                            }
                        }
                    }
                    if (this._sparse) {
                        CellBuffer cellBuffer = new CellBuffer();
                        while (recordReader.next(longWritable, text)) {
                            ijv = ReaderTextCell.parseCell(text.toString(), fastStringTokenizer, ijv, this._mmProps);
                            cellBuffer.addCell(ijv.getI(), ijv.getJ(), ijv.getV());
                            if (this._mmProps != null && this._mmProps.isSymmetric() && !ijv.onDiag()) {
                                cellBuffer.addCell(ijv.getJ(), ijv.getI(), ijv.getV());
                            }
                            if (cellBuffer.size() >= 102400) {
                                synchronized (this._dest) {
                                    j += cellBuffer.size();
                                    cellBuffer.flushCellBufferToMatrixBlock(this._dest);
                                }
                            }
                        }
                        synchronized (this._dest) {
                            j += cellBuffer.size();
                            cellBuffer.flushCellBufferToMatrixBlock(this._dest);
                        }
                    } else {
                        DenseBlock denseBlock = this._dest.getDenseBlock();
                        while (recordReader.next(longWritable, text)) {
                            ijv = ReaderTextCell.parseCell(text.toString(), fastStringTokenizer, ijv, this._mmProps);
                            j += ReaderTextCell.appendCell(ijv, denseBlock, this._mmProps);
                        }
                    }
                    return Long.valueOf(j);
                } catch (Exception e) {
                    if (ijv.getI() < 0 || ijv.getI() + 1 > this._rlen || ijv.getJ() < 0 || ijv.getJ() + 1 > this._clen) {
                        throw new RuntimeException("Matrix cell [" + (ijv.getI() + 1) + "," + (ijv.getJ() + 1) + "] out of overall matrix range [1:" + this._rlen + ",1:" + this._clen + "]. ", e);
                    }
                    throw new RuntimeException("Unable to read matrix in text cell format. ", e);
                }
            } finally {
                IOUtilFunctions.closeSilently(recordReader);
            }
        }
    }

    public ReaderTextCellParallel(InputInfo inputInfo) {
        super(inputInfo, false);
        this._numThreads = 1;
        this._numThreads = OptimizerUtils.getParallelTextReadParallelism();
    }

    @Override // org.apache.sysml.runtime.io.ReaderTextCell
    protected void readTextCellMatrixFromHDFS(Path path, JobConf jobConf, MatrixBlock matrixBlock, long j, long j2, int i, int i2) throws IOException {
        int i3 = this._numThreads;
        FileInputFormat.addInputPath(jobConf, path);
        TextInputFormat textInputFormat = new TextInputFormat();
        textInputFormat.configure(jobConf);
        if (this._isMMFile) {
            i3 = MapReduceTool.getFilesizeOnHDFS(path) < MIN_FILESIZE_MM ? 1 : i3;
        }
        try {
            ExecutorService executorService = CommonThreadPool.get(i3);
            InputSplit[] splits = textInputFormat.getSplits(jobConf, i3);
            if (matrixBlock.isInSparseFormat()) {
                int[] iArr = new int[(int) j];
                boolean z = this._isMMFile && this._mmProps.isSymmetric();
                Iterator it = executorService.invokeAll((List) Arrays.stream(splits).map(inputSplit -> {
                    return new CountNnzTask(inputSplit, textInputFormat, jobConf, iArr, z);
                }).collect(Collectors.toList())).iterator();
                while (it.hasNext()) {
                    ((Future) it.next()).get();
                }
                SparseBlock sparseBlock = matrixBlock.allocateBlock().getSparseBlock();
                for (int i4 = 0; i4 < j; i4++) {
                    if (iArr[i4] > 0) {
                        sparseBlock.allocate(i4, UtilFunctions.roundToNext(iArr[i4], 4));
                    }
                }
            }
            long j3 = 0;
            Iterator it2 = executorService.invokeAll((List) Arrays.stream(splits).map(inputSplit2 -> {
                return new ReadTask(inputSplit2, textInputFormat, jobConf, matrixBlock, j, j2, this._isMMFile, this._mmProps);
            }).collect(Collectors.toList())).iterator();
            while (it2.hasNext()) {
                j3 += ((Long) ((Future) it2.next()).get()).longValue();
            }
            matrixBlock.setNonZeros(j3);
            if (matrixBlock.isInSparseFormat()) {
                sortSparseRowsParallel(matrixBlock, j, this._numThreads, executorService);
            }
            executorService.shutdown();
        } catch (Exception e) {
            throw new IOException("Threadpool issue, while parallel read.", e);
        }
    }
}
