package org.apache.sysml.runtime.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.parser.Expression;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
import org.apache.sysml.runtime.util.CommonThreadPool;

/* loaded from: input_file:org/apache/sysml/runtime/io/FrameReaderTextCellParallel.class */
public class FrameReaderTextCellParallel extends FrameReaderTextCell {

    /* loaded from: input_file:org/apache/sysml/runtime/io/FrameReaderTextCellParallel$ReadTask.class */
    public class ReadTask implements Callable<Object> {
        private InputSplit _split;
        private TextInputFormat _informat;
        private JobConf _job;
        private FrameBlock _dest;

        public ReadTask(InputSplit inputSplit, TextInputFormat textInputFormat, JobConf jobConf, FrameBlock frameBlock) {
            this._split = null;
            this._informat = null;
            this._job = null;
            this._dest = null;
            this._split = inputSplit;
            this._informat = textInputFormat;
            this._job = jobConf;
            this._dest = frameBlock;
        }

        @Override // java.util.concurrent.Callable
        public Object call() throws Exception {
            FrameReaderTextCell.readTextCellFrameFromInputSplit(this._split, this._informat, this._job, this._dest);
            return null;
        }
    }

    @Override // org.apache.sysml.runtime.io.FrameReaderTextCell
    protected void readTextCellFrameFromHDFS(Path path, JobConf jobConf, FileSystem fileSystem, FrameBlock frameBlock, Expression.ValueType[] valueTypeArr, String[] strArr, long j, long j2) throws IOException {
        int parallelTextReadParallelism = OptimizerUtils.getParallelTextReadParallelism();
        FileInputFormat.addInputPath(jobConf, path);
        TextInputFormat textInputFormat = new TextInputFormat();
        textInputFormat.configure(jobConf);
        try {
            ExecutorService executorService = CommonThreadPool.get(parallelTextReadParallelism);
            InputSplit[] splits = textInputFormat.getSplits(jobConf, parallelTextReadParallelism);
            ArrayList arrayList = new ArrayList();
            for (InputSplit inputSplit : splits) {
                arrayList.add(new ReadTask(inputSplit, textInputFormat, jobConf, frameBlock));
            }
            List invokeAll = executorService.invokeAll(arrayList);
            executorService.shutdown();
            Iterator it = invokeAll.iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get();
            }
        } catch (Exception e) {
            throw new IOException("Failed parallel read of text cell input.", e);
        }
    }
}
