package com.stratio.deep.commons.extractor.impl;

import com.stratio.deep.commons.config.BaseConfig;
import com.stratio.deep.commons.config.DeepJobConfig;
import com.stratio.deep.commons.config.ExtractorConfig;
import com.stratio.deep.commons.config.HadoopConfig;
import com.stratio.deep.commons.exception.DeepGenericException;
import com.stratio.deep.commons.querybuilder.UpdateQueryBuilder;
import com.stratio.deep.commons.rdd.IExtractor;
import com.stratio.deep.commons.utils.DeepSparkHadoopMapReduceUtil;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.apache.spark.Partition;
import org.apache.spark.rdd.NewHadoopPartition;
import scala.Tuple2;

/* loaded from: input_file:com/stratio/deep/commons/extractor/impl/GenericHadoopExtractor.class */
public abstract class GenericHadoopExtractor<T, S extends BaseConfig<T>, K, V, kOut, vOut> implements IExtractor<T, S> {
    protected DeepJobConfig deepJobConfig;
    protected transient RecordReader<K, V> reader;
    protected transient RecordWriter<kOut, vOut> writer;
    protected transient InputFormat<K, V> inputFormat;
    protected transient OutputFormat<kOut, vOut> outputFormat;
    protected transient TaskAttemptContext hadoopAttemptContext;
    private static final Logger LOG = Logger.getLogger(GenericHadoopExtractor.class);
    protected boolean havePair = false;
    protected boolean finished = false;
    protected transient JobID jobId = null;
    protected transient String jobTrackerId = new SimpleDateFormat("yyyyMMddHHmm").format(new Date());

    @Override // com.stratio.deep.commons.rdd.IExtractor
    public List<String> getPreferredLocations(Partition partition) {
        return null;
    }

    @Override // com.stratio.deep.commons.rdd.IExtractor
    public Partition[] getPartitions(S s) {
        if (s instanceof ExtractorConfig) {
            addSparkIdToDeepJobConfig((ExtractorConfig) s);
        } else if (s instanceof DeepJobConfig) {
            this.deepJobConfig = (DeepJobConfig) ((DeepJobConfig) s).initialize();
        }
        int rddId = s.getRddId();
        this.jobId = new JobID(this.jobTrackerId, rddId);
        try {
            List splits = this.inputFormat.getSplits(DeepSparkHadoopMapReduceUtil.newJobContext(((HadoopConfig) this.deepJobConfig).getHadoopConfiguration(), this.jobId));
            Partition[] partitionArr = new Partition[splits.size()];
            for (int i = 0; i < splits.size(); i++) {
                partitionArr[i] = new NewHadoopPartition(rddId, i, (InputSplit) splits.get(i));
            }
            return partitionArr;
        } catch (IOException | InterruptedException | RuntimeException e) {
            LOG.error("Impossible to calculate partitions " + e.getMessage());
            throw new DeepGenericException("Impossible to calculate partitions " + e.getMessage());
        }
    }

    @Override // com.stratio.deep.commons.rdd.IExtractor
    public boolean hasNext() {
        if (!this.finished && !this.havePair) {
            try {
                this.finished = !this.reader.nextKeyValue();
                this.havePair = !this.finished;
            } catch (IOException | InterruptedException e) {
                LOG.error("Impossible to get hasNext " + e.getMessage());
                throw new DeepGenericException("Impossible to get hasNext " + e.getMessage());
            }
        }
        return !this.finished;
    }

    @Override // com.stratio.deep.commons.rdd.IExtractor
    public T next() {
        if (!hasNext()) {
            throw new NoSuchElementException("End of stream");
        }
        this.havePair = false;
        try {
            return transformElement(new Tuple2<>(this.reader.getCurrentKey(), this.reader.getCurrentValue()), this.deepJobConfig);
        } catch (IOException | InterruptedException e) {
            LOG.error("Impossible to get next value " + e.getMessage());
            throw new DeepGenericException("Impossible to get next value " + e.getMessage());
        }
    }

    @Override // com.stratio.deep.commons.rdd.IExtractor
    public void close() {
        try {
            if (this.reader != null) {
                this.reader.close();
            }
            if (this.writer != null) {
                this.writer.close(this.hadoopAttemptContext);
            }
        } catch (IOException | InterruptedException e) {
            LOG.error("Impossible to close RecordReader " + e.getMessage());
            throw new DeepGenericException("Impossible to close RecordReader " + e.getMessage());
        }
    }

    private void addSparkIdToDeepJobConfig(ExtractorConfig<T> extractorConfig) {
        int rddId = extractorConfig.getRddId();
        this.deepJobConfig = (DeepJobConfig) this.deepJobConfig.initialize(extractorConfig);
        this.deepJobConfig.setRddId(rddId);
    }

    public abstract T transformElement(Tuple2<K, V> tuple2, DeepJobConfig<T, S> deepJobConfig);

    @Override // com.stratio.deep.commons.rdd.IExtractor
    public void saveRDD(T t) {
        Tuple2<kOut, vOut> transformElement = transformElement(t);
        try {
            this.writer.write(transformElement._1(), transformElement._2());
        } catch (IOException | InterruptedException e) {
            LOG.error("Impossible to saveRDD " + e.getMessage());
            throw new DeepGenericException("Impossible to saveRDD " + e.getMessage());
        }
    }

    @Override // com.stratio.deep.commons.rdd.IExtractor
    public void initSave(S s, T t, UpdateQueryBuilder updateQueryBuilder) {
        this.hadoopAttemptContext = DeepSparkHadoopMapReduceUtil.newTaskAttemptContext(s instanceof ExtractorConfig ? ((HadoopConfig) this.deepJobConfig.initialize((ExtractorConfig) s)).getHadoopConfiguration() : ((HadoopConfig) s).getHadoopConfiguration(), DeepSparkHadoopMapReduceUtil.newTaskAttemptID(this.jobTrackerId, s.getRddId(), true, s.getPartitionId(), 0));
        try {
            this.writer = this.outputFormat.getRecordWriter(this.hadoopAttemptContext);
        } catch (IOException | InterruptedException e) {
            throw new DeepGenericException(e);
        }
    }

    @Override // com.stratio.deep.commons.rdd.IExtractor
    public void initIterator(Partition partition, S s) {
        NewHadoopPartition newHadoopPartition = (NewHadoopPartition) partition;
        TaskAttemptContext newTaskAttemptContext = DeepSparkHadoopMapReduceUtil.newTaskAttemptContext(s instanceof ExtractorConfig ? ((HadoopConfig) this.deepJobConfig.initialize((ExtractorConfig) s)).getHadoopConfiguration() : ((HadoopConfig) s).getHadoopConfiguration(), DeepSparkHadoopMapReduceUtil.newTaskAttemptID(this.jobTrackerId, s.getRddId(), true, newHadoopPartition.index(), 0));
        try {
            this.reader = this.inputFormat.createRecordReader(newHadoopPartition.serializableHadoopSplit().value(), newTaskAttemptContext);
            this.reader.initialize(newHadoopPartition.serializableHadoopSplit().value(), newTaskAttemptContext);
        } catch (IOException | InterruptedException e) {
            throw new DeepGenericException(e);
        }
    }

    public abstract Tuple2<kOut, vOut> transformElement(T t);
}
