package io.druid.indexer.hadoop;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.JobHelper;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexStorageAdapter;
import io.druid.segment.realtime.firehose.IngestSegmentFirehose;
import io.druid.segment.realtime.firehose.WindowedStorageAdapter;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

/* loaded from: input_file:io/druid/indexer/hadoop/DatasourceRecordReader.class */
public class DatasourceRecordReader extends RecordReader<NullWritable, InputRow> {
    private static final Logger logger = new Logger(DatasourceRecordReader.class);
    private DatasourceIngestionSpec spec;
    private IngestSegmentFirehose firehose;
    private int rowNum;
    private MapBasedRow currRow;
    private List<QueryableIndex> indexes = Lists.newArrayList();
    private List<File> tmpSegmentDirs = Lists.newArrayList();
    private int numRows;

    public void initialize(InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        this.spec = readAndVerifyDatasourceIngestionSpec(taskAttemptContext.getConfiguration(), HadoopDruidIndexerConfig.JSON_MAPPER);
        this.firehose = new IngestSegmentFirehose(Lists.transform(((DatasourceInputSplit) inputSplit).getSegments(), new Function<WindowedDataSegment, WindowedStorageAdapter>() { // from class: io.druid.indexer.hadoop.DatasourceRecordReader.1
            public WindowedStorageAdapter apply(WindowedDataSegment windowedDataSegment) {
                try {
                    DatasourceRecordReader.logger.info("Getting storage path for segment [%s]", new Object[]{windowedDataSegment.getSegment().getIdentifier()});
                    Path path = new Path(JobHelper.getURIFromSegment(windowedDataSegment.getSegment()));
                    DatasourceRecordReader.logger.info("Fetch segment files from [%s]", new Object[]{path});
                    File createTempDir = Files.createTempDir();
                    DatasourceRecordReader.this.tmpSegmentDirs.add(createTempDir);
                    DatasourceRecordReader.logger.info("Locally storing fetched segment at [%s]", new Object[]{createTempDir});
                    JobHelper.unzipNoGuava(path, taskAttemptContext.getConfiguration(), createTempDir, taskAttemptContext);
                    DatasourceRecordReader.logger.info("finished fetching segment files", new Object[0]);
                    QueryableIndex loadIndex = HadoopDruidIndexerConfig.INDEX_IO.loadIndex(createTempDir);
                    DatasourceRecordReader.this.indexes.add(loadIndex);
                    DatasourceRecordReader.this.numRows += loadIndex.getNumRows();
                    return new WindowedStorageAdapter(new QueryableIndexStorageAdapter(loadIndex), windowedDataSegment.getInterval());
                } catch (IOException e) {
                    throw Throwables.propagate(e);
                }
            }
        }), this.spec.getDimensions(), this.spec.getMetrics(), this.spec.getFilter(), this.spec.getGranularity());
    }

    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!this.firehose.hasMore()) {
            return false;
        }
        this.currRow = this.firehose.nextRow();
        this.rowNum++;
        return true;
    }

    /* renamed from: getCurrentKey, reason: merged with bridge method [inline-methods] */
    public NullWritable m23getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }

    /* renamed from: getCurrentValue, reason: merged with bridge method [inline-methods] */
    public InputRow m22getCurrentValue() throws IOException, InterruptedException {
        return new SegmentInputRow(new MapBasedInputRow(this.currRow.getTimestamp(), this.spec.getDimensions(), this.currRow.getEvent()));
    }

    public float getProgress() throws IOException, InterruptedException {
        if (this.numRows > 0) {
            return (this.rowNum * 1.0f) / this.numRows;
        }
        return 0.0f;
    }

    public void close() throws IOException {
        Closeables.close(this.firehose, true);
        Iterator<QueryableIndex> it = this.indexes.iterator();
        while (it.hasNext()) {
            Closeables.close(it.next(), true);
        }
        Iterator<File> it2 = this.tmpSegmentDirs.iterator();
        while (it2.hasNext()) {
            FileUtils.deleteDirectory(it2.next());
        }
    }

    private DatasourceIngestionSpec readAndVerifyDatasourceIngestionSpec(Configuration configuration, ObjectMapper objectMapper) {
        try {
            String str = (String) Preconditions.checkNotNull(configuration.get(DatasourceInputFormat.CONF_DRUID_SCHEMA), "null schema");
            logger.info("load schema [%s]", new Object[]{str});
            DatasourceIngestionSpec datasourceIngestionSpec = (DatasourceIngestionSpec) objectMapper.readValue(str, DatasourceIngestionSpec.class);
            if (datasourceIngestionSpec.getDimensions() == null || datasourceIngestionSpec.getDimensions().size() == 0) {
                throw new ISE("load schema does not have dimensions", new Object[0]);
            }
            if (datasourceIngestionSpec.getMetrics() == null || datasourceIngestionSpec.getMetrics().size() == 0) {
                throw new ISE("load schema does not have metrics", new Object[0]);
            }
            return datasourceIngestionSpec;
        } catch (IOException e) {
            throw new RuntimeException("couldn't load segment load spec", e);
        }
    }
}
