package com.twitter.elephantbird.mapreduce.input;

import com.twitter.elephantbird.util.HadoopCompat;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/twitter/elephantbird/mapreduce/input/LzoRecordReader.class */
public abstract class LzoRecordReader<K, V> extends RecordReader<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(LzoRecordReader.class);
    public static final String BAD_RECORD_THRESHOLD_CONF_KEY = "elephantbird.mapred.input.bad.record.threshold";
    public static final String BAD_RECORD_MIN_COUNT_CONF_KEY = "elephantbird.mapred.input.bad.record.min";
    protected long start_;
    protected long pos_;
    protected long end_;
    private FSDataInputStream fileIn_;
    protected InputErrorTracker errorTracker;

    /* loaded from: input_file:com/twitter/elephantbird/mapreduce/input/LzoRecordReader$InputErrorTracker.class */
    static class InputErrorTracker {
        long numRecords = 0;
        long numErrors = 0;
        double errorThreshold;
        long minErrors;

        InputErrorTracker(Configuration configuration) {
            this.errorThreshold = configuration.getFloat(LzoRecordReader.BAD_RECORD_THRESHOLD_CONF_KEY, 1.0E-4f);
            this.minErrors = configuration.getLong(LzoRecordReader.BAD_RECORD_MIN_COUNT_CONF_KEY, 2L);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void incRecords() {
            this.numRecords++;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void incErrors(Throwable th) {
            this.numErrors++;
            if (this.numErrors > this.numRecords) {
                throw new RuntimeException("Forgot to invoke incRecords()?");
            }
            if (th == null) {
                th = new Exception("Unknown error");
            }
            if (this.errorThreshold <= 0.0d) {
                throw new RuntimeException("error while reading input records", th);
            }
            LzoRecordReader.LOG.warn("Error while reading an input record (" + this.numErrors + " out of " + this.numRecords + " so far ): ", th);
            double d = this.numErrors / this.numRecords;
            if (this.numErrors < this.minErrors || d <= this.errorThreshold) {
                return;
            }
            LzoRecordReader.LOG.error(this.numErrors + " out of " + this.numRecords + " crosses configured threshold (" + this.errorThreshold + ")");
            throw new RuntimeException("error rate while reading input records crossed threshold", th);
        }
    }

    public float getProgress() {
        if (this.start_ == this.end_) {
            return 0.0f;
        }
        return Math.min(1.0f, ((float) (this.pos_ - this.start_)) / ((float) (this.end_ - this.start_)));
    }

    public synchronized long getPos() throws IOException {
        return this.pos_;
    }

    public long getLzoFilePos() throws IOException {
        return this.fileIn_.getPos();
    }

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        FileSplit fileSplit = (FileSplit) inputSplit;
        this.start_ = fileSplit.getStart();
        this.end_ = this.start_ + fileSplit.getLength();
        Path path = fileSplit.getPath();
        Configuration configuration = HadoopCompat.getConfiguration(taskAttemptContext);
        this.errorTracker = new InputErrorTracker(configuration);
        LOG.info("input split: " + path + " " + this.start_ + ":" + this.end_);
        FileSystem fileSystem = path.getFileSystem(configuration);
        CompressionCodec codec = new CompressionCodecFactory(configuration).getCodec(path);
        if (codec == null) {
            throw new IOException("No codec for file " + path + " found, cannot run");
        }
        this.fileIn_ = fileSystem.open(fileSplit.getPath());
        createInputReader(codec.createInputStream(this.fileIn_), configuration);
        if (this.start_ != 0) {
            this.fileIn_.seek(this.start_);
            skipToNextSyncPoint(false);
            this.start_ = this.fileIn_.getPos();
            LOG.info("Start is now " + this.start_);
        } else {
            skipToNextSyncPoint(true);
        }
        this.pos_ = this.start_;
    }

    protected abstract void createInputReader(InputStream inputStream, Configuration configuration) throws IOException;

    protected abstract void skipToNextSyncPoint(boolean z) throws IOException;
}
