package org.apache.sysds.runtime.io;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.parser.DataExpression;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.UtilFunctions;

/* loaded from: input_file:org/apache/sysds/runtime/io/ReaderTextCSVParallel.class */
public class ReaderTextCSVParallel extends MatrixReader {
    private FileFormatPropertiesCSV _props;
    private int _numThreads;
    private SplitOffsetInfos _offsets = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysds/runtime/io/ReaderTextCSVParallel$CSVReadTask.class */
    public static class CSVReadTask implements Callable<Object> {
        private InputSplit _split;
        private SplitOffsetInfos _splitoffsets;
        private boolean _sparse;
        private TextInputFormat _informat;
        private JobConf _job;
        private MatrixBlock _dest;
        private long _rlen;
        private long _clen;
        private boolean _isFirstSplit;
        private boolean _hasHeader;
        private boolean _fill;
        private double _fillValue;
        private String _delim;
        private int _splitCount;
        private boolean _rc;
        private Exception _exception = null;
        private long _nnz;
        private HashSet<String> _naStrings;

        public CSVReadTask(InputSplit inputSplit, SplitOffsetInfos splitOffsetInfos, TextInputFormat textInputFormat, JobConf jobConf, MatrixBlock matrixBlock, long j, long j2, boolean z, String str, boolean z2, double d, int i, HashSet<String> hashSet) {
            this._split = null;
            this._splitoffsets = null;
            this._sparse = false;
            this._informat = null;
            this._job = null;
            this._dest = null;
            this._rlen = -1L;
            this._clen = -1L;
            this._isFirstSplit = false;
            this._hasHeader = false;
            this._fill = false;
            this._fillValue = DataExpression.DEFAULT_DELIM_FILL_VALUE;
            this._delim = null;
            this._splitCount = 0;
            this._rc = true;
            this._split = inputSplit;
            this._splitoffsets = splitOffsetInfos;
            this._sparse = matrixBlock.isInSparseFormat();
            this._informat = textInputFormat;
            this._job = jobConf;
            this._dest = matrixBlock;
            this._rlen = j;
            this._clen = j2;
            this._isFirstSplit = i == 0;
            this._hasHeader = z;
            this._fill = z2;
            this._fillValue = d;
            this._delim = str;
            this._rc = true;
            this._splitCount = i;
            this._naStrings = hashSet;
        }

        public boolean getReturnCode() {
            return this._rc;
        }

        public Exception getException() {
            return this._exception;
        }

        public long getPartialNnz() {
            return this._nnz;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.util.concurrent.Callable
        public Object call() throws Exception {
            double parseToDouble;
            double parseToDouble2;
            int i = 0;
            int i2 = 0;
            long j = 0;
            try {
                RecordReader recordReader = this._informat.getRecordReader(this._split, this._job, Reporter.NULL);
                LongWritable longWritable = new LongWritable();
                Text text = new Text();
                if (this._isFirstSplit && this._hasHeader) {
                    recordReader.next(longWritable, text);
                }
                boolean z = false;
                i = this._splitoffsets.getOffsetPerSplit(this._splitCount);
                try {
                    if (this._sparse) {
                        while (recordReader.next(longWritable, text)) {
                            String trim = text.toString().trim();
                            String[] split = IOUtilFunctions.split(trim, this._delim);
                            i2 = 0;
                            for (String str : split) {
                                String trim2 = str.trim();
                                if (trim2.isEmpty()) {
                                    z |= !this._fill;
                                    parseToDouble2 = this._fillValue;
                                } else {
                                    parseToDouble2 = UtilFunctions.parseToDouble(trim2, this._naStrings);
                                }
                                if (parseToDouble2 != DataExpression.DEFAULT_DELIM_FILL_VALUE) {
                                    this._dest.appendValue(i, i2, parseToDouble2);
                                    j++;
                                }
                                i2++;
                            }
                            IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(trim, this._fill, z);
                            IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(this._split.toString(), trim, split, this._clen);
                            i++;
                        }
                    } else {
                        DenseBlock denseBlock = this._dest.getDenseBlock();
                        while (recordReader.next(longWritable, text)) {
                            String trim3 = text.toString().trim();
                            String[] split2 = IOUtilFunctions.split(trim3, this._delim);
                            i2 = 0;
                            for (String str2 : split2) {
                                String trim4 = str2.trim();
                                if (trim4.isEmpty()) {
                                    z |= !this._fill;
                                    parseToDouble = this._fillValue;
                                } else {
                                    parseToDouble = UtilFunctions.parseToDouble(trim4, this._naStrings);
                                }
                                if (parseToDouble != DataExpression.DEFAULT_DELIM_FILL_VALUE) {
                                    denseBlock.set(i, i2, parseToDouble);
                                    j++;
                                }
                                i2++;
                            }
                            IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(trim3, this._fill, z);
                            IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(this._split.toString(), trim3, split2, this._clen);
                            i++;
                        }
                    }
                    if (i != this._splitoffsets.getOffsetPerSplit(this._splitCount) + this._splitoffsets.getLenghtPerSplit(this._splitCount)) {
                        throw new IOException("Incorrect number of rows (" + i + ") found in delimited file (" + (this._splitoffsets.getOffsetPerSplit(this._splitCount) + this._splitoffsets.getLenghtPerSplit(this._splitCount)) + "): " + text);
                    }
                    IOUtilFunctions.closeSilently((RecordReader<?, ?>) recordReader);
                    this._nnz = j;
                    return null;
                } catch (Throwable th) {
                    IOUtilFunctions.closeSilently((RecordReader<?, ?>) recordReader);
                    throw th;
                }
            } catch (Exception e) {
                this._rc = false;
                this._exception = e;
                if (i < 0 || i + 1 > this._rlen || i2 < 0 || i2 + 1 > this._clen) {
                    throw new IOException("CSV cell [" + (i + 1) + "," + (i2 + 1) + "] out of overall matrix range [1:" + this._rlen + ",1:" + this._clen + "]. " + e.getMessage(), this._exception);
                }
                throw new IOException("Unable to read matrix in text CSV format. " + e.getMessage(), this._exception);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysds/runtime/io/ReaderTextCSVParallel$SplitOffsetInfos.class */
    public static class SplitOffsetInfos {
        private int[] offsetPerSplit;
        private int[] lenghtPerSplit;

        public SplitOffsetInfos(int i) {
            this.offsetPerSplit = null;
            this.lenghtPerSplit = null;
            this.lenghtPerSplit = new int[i];
            this.offsetPerSplit = new int[i];
        }

        public int getLenghtPerSplit(int i) {
            return this.lenghtPerSplit[i];
        }

        public void setLenghtPerSplit(int i, int i2) {
            this.lenghtPerSplit[i] = i2;
        }

        public int getOffsetPerSplit(int i) {
            return this.offsetPerSplit[i];
        }

        public void setOffsetPerSplit(int i, int i2) {
            this.offsetPerSplit[i] = i2;
        }
    }

    public ReaderTextCSVParallel(FileFormatPropertiesCSV fileFormatPropertiesCSV) {
        this._props = null;
        this._numThreads = 1;
        this._numThreads = OptimizerUtils.getParallelTextReadParallelism();
        this._props = fileFormatPropertiesCSV;
    }

    @Override // org.apache.sysds.runtime.io.MatrixReader
    public MatrixBlock readMatrixFromHDFS(String str, long j, long j2, int i, long j3) throws IOException, DMLRuntimeException {
        JobConf jobConf = new JobConf(ConfigurationManager.getCachedJobConf());
        Path path = new Path(str);
        FileSystem fileSystem = IOUtilFunctions.getFileSystem(path, jobConf);
        FileInputFormat.addInputPath(jobConf, path);
        TextInputFormat textInputFormat = new TextInputFormat();
        textInputFormat.configure(jobConf);
        InputSplit[] sortInputSplits = IOUtilFunctions.sortInputSplits(textInputFormat.getSplits(jobConf, this._numThreads));
        checkValidInputFile(fileSystem, path);
        MatrixBlock computeCSVSizeAndCreateOutputMatrixBlock = computeCSVSizeAndCreateOutputMatrixBlock(sortInputSplits, path, jobConf, this._props.hasHeader(), this._props.getDelim(), j, j2, j3);
        long numRows = computeCSVSizeAndCreateOutputMatrixBlock.getNumRows();
        readCSVMatrixFromHDFS(sortInputSplits, path, jobConf, computeCSVSizeAndCreateOutputMatrixBlock, numRows, computeCSVSizeAndCreateOutputMatrixBlock.getNumColumns(), i, this._props.hasHeader(), this._props.getDelim(), this._props.isFill(), this._props.getFillValue(), this._props.getNAStrings());
        computeCSVSizeAndCreateOutputMatrixBlock.examSparsity();
        if (numRows < 0 || numRows == computeCSVSizeAndCreateOutputMatrixBlock.getNumRows()) {
            return computeCSVSizeAndCreateOutputMatrixBlock;
        }
        throw new DMLRuntimeException("Read matrix inconsistent with given meta data: expected nrow=" + numRows + ", real nrow=" + computeCSVSizeAndCreateOutputMatrixBlock.getNumRows());
    }

    @Override // org.apache.sysds.runtime.io.MatrixReader
    public MatrixBlock readMatrixFromInputStream(InputStream inputStream, long j, long j2, int i, long j3) throws IOException, DMLRuntimeException {
        return new ReaderTextCSV(this._props).readMatrixFromInputStream(inputStream, j, j2, i, j3);
    }

    private void readCSVMatrixFromHDFS(InputSplit[] inputSplitArr, Path path, JobConf jobConf, MatrixBlock matrixBlock, long j, long j2, int i, boolean z, String str, boolean z2, double d, HashSet<String> hashSet) throws IOException {
        FileInputFormat.addInputPath(jobConf, path);
        TextInputFormat textInputFormat = new TextInputFormat();
        textInputFormat.configure(jobConf);
        ExecutorService executorService = CommonThreadPool.get(this._numThreads);
        try {
            ArrayList arrayList = new ArrayList();
            int i2 = 0;
            for (InputSplit inputSplit : inputSplitArr) {
                int i3 = i2;
                i2++;
                arrayList.add(new CSVReadTask(inputSplit, this._offsets, textInputFormat, jobConf, matrixBlock, j, j2, z, str, z2, d, i3, hashSet));
            }
            executorService.invokeAll(arrayList);
            executorService.shutdown();
            long j3 = 0;
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                CSVReadTask cSVReadTask = (CSVReadTask) it.next();
                j3 += cSVReadTask.getPartialNnz();
                if (!cSVReadTask.getReturnCode()) {
                    Exception exception = cSVReadTask.getException();
                    throw new IOException("Read task for csv input failed: " + exception.toString(), exception);
                }
            }
            matrixBlock.setNonZeros(j3);
        } catch (Exception e) {
            throw new IOException("Threadpool issue, while parallel read.", e);
        }
    }

    private MatrixBlock computeCSVSizeAndCreateOutputMatrixBlock(InputSplit[] inputSplitArr, Path path, JobConf jobConf, boolean z, String str, long j, long j2, long j3) throws IOException, DMLRuntimeException {
        int i = 0;
        FileInputFormat.addInputPath(jobConf, path);
        TextInputFormat textInputFormat = new TextInputFormat();
        textInputFormat.configure(jobConf);
        LongWritable longWritable = new LongWritable();
        Text text = new Text();
        RecordReader recordReader = textInputFormat.getRecordReader(inputSplitArr[0], jobConf, Reporter.NULL);
        try {
            int countMatches = recordReader.next(longWritable, text) ? StringUtils.countMatches(text.toString().trim(), str) + 1 : 0;
            try {
                ExecutorService executorService = CommonThreadPool.get(this._numThreads);
                ArrayList arrayList = new ArrayList();
                for (InputSplit inputSplit : inputSplitArr) {
                    arrayList.add(new IOUtilFunctions.CountRowsTask(inputSplit, textInputFormat, jobConf, z));
                    z = false;
                }
                List<Future> invokeAll = executorService.invokeAll(arrayList);
                executorService.shutdown();
                this._offsets = new SplitOffsetInfos(arrayList.size());
                for (Future future : invokeAll) {
                    int longValue = (int) ((Long) future.get()).longValue();
                    this._offsets.setOffsetPerSplit(invokeAll.indexOf(future), i);
                    this._offsets.setLenghtPerSplit(invokeAll.indexOf(future), longValue);
                    i += longValue;
                }
                if ((j != -1 && i != j) || (j2 != -1 && countMatches != j2)) {
                    String str2 = "Read matrix dimensions differ from meta data: [" + i + "x" + countMatches + "] vs. [" + j + "x" + j2 + "].";
                    if (j < i || j2 < countMatches) {
                        throw new DMLRuntimeException(str2);
                    }
                    LOG.warn(str2);
                    i = (int) j;
                    countMatches = (int) j2;
                }
                return createOutputMatrixBlock(i, countMatches, i, j3 < 0 ? i * countMatches : j3, true, true);
            } catch (Exception e) {
                throw new IOException("Threadpool Error " + e.getMessage(), e);
            }
        } finally {
            IOUtilFunctions.closeSilently((RecordReader<?, ?>) recordReader);
        }
    }
}
