package org.apache.sysml.runtime.controlprogram.parfor;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.conf.DMLConfig;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat;
import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
import org.apache.sysml.runtime.instructions.cp.Data;
import org.apache.sysml.runtime.io.IOUtilFunctions;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.MapReduceTool;
import org.apache.sysml.runtime.util.ProgramConverter;
import org.apache.sysml.utils.Statistics;
import org.apache.sysml.yarn.DMLAppMasterUtils;

/* loaded from: input_file:org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.class */
public class RemoteDPParForMR {
    protected static final Log LOG = LogFactory.getLog(RemoteDPParForMR.class.getName());

    public static RemoteParForJobReturn runJob(long j, String str, String str2, String str3, String str4, MatrixObject matrixObject, ParForProgramBlock.PartitionFormat partitionFormat, OutputInfo outputInfo, boolean z, boolean z2, int i, int i2) {
        long nanoTime = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        JobConf jobConf = new JobConf(RemoteDPParForMR.class);
        jobConf.setJobName("ParFor-DPEMR" + j);
        Statistics.incrementNoOfCompiledMRJobs();
        try {
            try {
                MRJobConfiguration.setProgramBlocks(jobConf, str3);
                MRJobConfiguration.setParforCachingConfig(jobConf, z2);
                Path path = new Path(matrixObject.getFileName());
                MRJobConfiguration.setPartitioningInfo(jobConf, matrixObject.getNumRows(), matrixObject.getNumColumns(), (int) matrixObject.getNumRowsPerBlock(), (int) matrixObject.getNumColumnsPerBlock(), InputInfo.BinaryBlockInputInfo, outputInfo, partitionFormat._dpf, partitionFormat._N, matrixObject.getFileName(), str, str2, z);
                jobConf.setInputFormat(InputInfo.BinaryBlockInputInfo.inputFormatClass);
                FileInputFormat.setInputPaths(jobConf, path);
                jobConf.setMapperClass(DataPartitionerRemoteMapper.class);
                jobConf.setReducerClass(RemoteDPParWorkerReducer.class);
                jobConf.setOutputFormat(SequenceFileOutputFormat.class);
                MapReduceTool.deleteFileIfExistOnHDFS(str4);
                FileOutputFormat.setOutputPath(jobConf, new Path(str4));
                jobConf.setMapOutputKeyClass(LongWritable.class);
                if (outputInfo == OutputInfo.BinaryBlockOutputInfo) {
                    jobConf.setMapOutputValueClass(PairWritableBlock.class);
                } else {
                    if (outputInfo != OutputInfo.BinaryCellOutputInfo) {
                        throw new DMLRuntimeException("Unsupported intermrediate output info: " + outputInfo);
                    }
                    jobConf.setMapOutputValueClass(PairWritableCell.class);
                }
                jobConf.setOutputKeyClass(LongWritable.class);
                jobConf.setOutputValueClass(Text.class);
                jobConf.setNumReduceTasks(i);
                jobConf.setInt(MRConfigurationNames.MR_TASK_TIMEOUT, 0);
                jobConf.setMapSpeculativeExecution(false);
                MRJobConfiguration.addBinaryBlockSerializationFramework(jobConf);
                DMLConfig dMLConfig = ConfigurationManager.getDMLConfig();
                DMLAppMasterUtils.setupMRJobRemoteMaxMemory(jobConf, dMLConfig);
                MRJobConfiguration.setupCustomMRConfigurations(jobConf, dMLConfig);
                jobConf.setNumTasksToExecutePerJvm(1);
                jobConf.setInt(MRConfigurationNames.DFS_REPLICATION, i2);
                MRJobConfiguration.setUniqueWorkingDir(jobConf);
                RunningJob runJob = JobClient.runJob(jobConf);
                Statistics.incrementNoOfExecutedMRJobs();
                Counters.Group group = runJob.getCounters().getGroup(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME);
                int counter = (int) group.getCounter(Stat.PARFOR_NUMTASKS.toString());
                int counter2 = (int) group.getCounter(Stat.PARFOR_NUMITERS.toString());
                if (DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode()) {
                    Statistics.incrementJITCompileTime(group.getCounter(Stat.PARFOR_JITCOMPILE.toString()));
                    Statistics.incrementJVMgcCount(group.getCounter(Stat.PARFOR_JVMGC_COUNT.toString()));
                    Statistics.incrementJVMgcTime(group.getCounter(Stat.PARFOR_JVMGC_TIME.toString()));
                    Counters.Group group2 = runJob.getCounters().getGroup(CacheableData.CACHING_COUNTER_GROUP_NAME.toString());
                    CacheStatistics.incrementMemHits((int) group2.getCounter(CacheStatistics.Stat.CACHE_HITS_MEM.toString()));
                    CacheStatistics.incrementFSBuffHits((int) group2.getCounter(CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString()));
                    CacheStatistics.incrementFSHits((int) group2.getCounter(CacheStatistics.Stat.CACHE_HITS_FS.toString()));
                    CacheStatistics.incrementHDFSHits((int) group2.getCounter(CacheStatistics.Stat.CACHE_HITS_HDFS.toString()));
                    CacheStatistics.incrementFSBuffWrites((int) group2.getCounter(CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString()));
                    CacheStatistics.incrementFSWrites((int) group2.getCounter(CacheStatistics.Stat.CACHE_WRITES_FS.toString()));
                    CacheStatistics.incrementHDFSWrites((int) group2.getCounter(CacheStatistics.Stat.CACHE_WRITES_HDFS.toString()));
                    CacheStatistics.incrementAcquireRTime(group2.getCounter(CacheStatistics.Stat.CACHE_TIME_ACQR.toString()));
                    CacheStatistics.incrementAcquireMTime(group2.getCounter(CacheStatistics.Stat.CACHE_TIME_ACQM.toString()));
                    CacheStatistics.incrementReleaseTime(group2.getCounter(CacheStatistics.Stat.CACHE_TIME_RLS.toString()));
                    CacheStatistics.incrementExportTime(group2.getCounter(CacheStatistics.Stat.CACHE_TIME_EXP.toString()));
                }
                RemoteParForJobReturn remoteParForJobReturn = new RemoteParForJobReturn(runJob.isSuccessful(), counter, counter2, readResultFile(jobConf, str4));
                try {
                    MapReduceTool.deleteFileIfExistOnHDFS(new Path(str4), jobConf);
                    if (DMLScript.STATISTICS) {
                        Statistics.maintainCPHeavyHitters("MR-Job_ParFor-DPEMR", System.nanoTime() - nanoTime);
                    }
                    return remoteParForJobReturn;
                } catch (IOException e) {
                    throw new DMLRuntimeException(e);
                }
            } catch (Exception e2) {
                throw new DMLRuntimeException(e2);
            }
        } catch (Throwable th) {
            try {
                MapReduceTool.deleteFileIfExistOnHDFS(new Path(str4), jobConf);
                throw th;
            } catch (IOException e3) {
                throw new DMLRuntimeException(e3);
            }
        }
    }

    public static LocalVariableMap[] readResultFile(JobConf jobConf, String str) throws IOException {
        HashMap hashMap = new HashMap();
        Path path = new Path(str);
        FileSystem fileSystem = IOUtilFunctions.getFileSystem(path, jobConf);
        LongWritable longWritable = new LongWritable();
        Text text = new Text();
        int i = 0;
        for (Path path2 : IOUtilFunctions.getSequenceFilePaths(fileSystem, path)) {
            SequenceFile.Reader reader = new SequenceFile.Reader(fileSystem, path2, jobConf);
            while (reader.next(longWritable, text)) {
                try {
                    if (!hashMap.containsKey(Long.valueOf(longWritable.get()))) {
                        hashMap.put(Long.valueOf(longWritable.get()), new LocalVariableMap());
                    }
                    Object[] parseDataObject = ProgramConverter.parseDataObject(text.toString());
                    ((LocalVariableMap) hashMap.get(Long.valueOf(longWritable.get()))).put((String) parseDataObject[0], (Data) parseDataObject[1]);
                    i++;
                } finally {
                    IOUtilFunctions.closeSilently((Closeable) reader);
                }
            }
        }
        LOG.debug("Num remote worker results (before deduplication): " + i);
        LOG.debug("Num remote worker results: " + hashMap.size());
        return (LocalVariableMap[]) hashMap.values().toArray(new LocalVariableMap[0]);
    }
}
