package org.apache.sysds.runtime.io;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.sysds.common.Types;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.data.TensorBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.FastStringTokenizer;

/* loaded from: input_file:org/apache/sysds/runtime/io/TensorReaderTextCellParallel.class */
public class TensorReaderTextCellParallel extends TensorReaderTextCell {
    private int _numThreads = OptimizerUtils.getParallelTextReadParallelism();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysds/runtime/io/TensorReaderTextCellParallel$ReadTask.class */
    public static class ReadTask implements Callable<Object> {
        private final InputSplit _split;
        private final TextInputFormat _informat;
        private final JobConf _job;
        private final TensorBlock _dest;

        public ReadTask(InputSplit inputSplit, TextInputFormat textInputFormat, JobConf jobConf, TensorBlock tensorBlock) {
            this._split = inputSplit;
            this._informat = textInputFormat;
            this._job = jobConf;
            this._dest = tensorBlock;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.util.concurrent.Callable
        public Object call() throws Exception {
            LongWritable longWritable = new LongWritable();
            Text text = new Text();
            try {
                FastStringTokenizer fastStringTokenizer = new FastStringTokenizer(' ');
                int[] iArr = new int[this._dest.getNumDims()];
                RecordReader recordReader = this._informat.getRecordReader(this._split, this._job, Reporter.NULL);
                while (recordReader.next(longWritable, text)) {
                    try {
                        fastStringTokenizer.reset(text.toString());
                        for (int i = 0; i < iArr.length; i++) {
                            iArr[i] = fastStringTokenizer.nextInt() - 1;
                        }
                        this._dest.set(iArr, fastStringTokenizer.nextToken());
                    } catch (Throwable th) {
                        IOUtilFunctions.closeSilently((RecordReader<?, ?>) recordReader);
                        throw th;
                    }
                }
                IOUtilFunctions.closeSilently((RecordReader<?, ?>) recordReader);
                return null;
            } catch (Exception e) {
                throw new IOException("Unable to read tensor in text cell format.", e);
            }
        }
    }

    @Override // org.apache.sysds.runtime.io.TensorReaderTextCell
    protected TensorBlock readTextCellTensorFromHDFS(Path path, JobConf jobConf, long[] jArr, Types.ValueType[] valueTypeArr) throws IOException {
        FileInputFormat.addInputPath(jobConf, path);
        TextInputFormat textInputFormat = new TextInputFormat();
        textInputFormat.configure(jobConf);
        int[] array = Arrays.stream(jArr).mapToInt(j -> {
            return (int) j;
        }).toArray();
        TensorBlock allocateBlock = valueTypeArr.length == 1 ? new TensorBlock(valueTypeArr[0], array).allocateBlock() : new TensorBlock(valueTypeArr, array).allocateBlock();
        try {
            ExecutorService executorService = CommonThreadPool.get(this._numThreads);
            TensorBlock tensorBlock = allocateBlock;
            Iterator it = executorService.invokeAll((List) Arrays.stream(textInputFormat.getSplits(jobConf, this._numThreads)).map(inputSplit -> {
                return new ReadTask(inputSplit, textInputFormat, jobConf, tensorBlock);
            }).collect(Collectors.toList())).iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get();
            }
            executorService.shutdown();
            return allocateBlock;
        } catch (Exception e) {
            throw new IOException("Threadpool issue, while parallel read.", e);
        }
    }
}
