/*
 * Decompiled with CFR 0.152.
 */
package com.stratio.deep.commons.extractor.impl;

import com.stratio.deep.commons.config.BaseConfig;
import com.stratio.deep.commons.config.DeepJobConfig;
import com.stratio.deep.commons.config.ExtractorConfig;
import com.stratio.deep.commons.config.HadoopConfig;
import com.stratio.deep.commons.exception.DeepGenericException;
import com.stratio.deep.commons.querybuilder.UpdateQueryBuilder;
import com.stratio.deep.commons.rdd.IExtractor;
import com.stratio.deep.commons.utils.DeepSparkHadoopMapReduceUtil;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.log4j.Logger;
import org.apache.spark.Partition;
import org.apache.spark.rdd.NewHadoopPartition;
import scala.Tuple2;

public abstract class GenericHadoopExtractor<T, S extends BaseConfig<T>, K, V, kOut, vOut>
implements IExtractor<T, S> {
    protected DeepJobConfig deepJobConfig;
    protected transient RecordReader<K, V> reader;
    protected transient RecordWriter<kOut, vOut> writer;
    protected transient InputFormat<K, V> inputFormat;
    protected transient OutputFormat<kOut, vOut> outputFormat;
    protected transient String jobTrackerId;
    protected transient TaskAttemptContext hadoopAttemptContext;
    protected boolean havePair = false;
    protected boolean finished = false;
    protected transient JobID jobId = null;
    private static final Logger LOG = Logger.getLogger(GenericHadoopExtractor.class);

    public GenericHadoopExtractor() {
        SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
        this.jobTrackerId = formatter.format(new Date());
    }

    @Override
    public List<String> getPreferredLocations(Partition split) {
        return null;
    }

    @Override
    public Partition[] getPartitions(S config) {
        if (config instanceof ExtractorConfig) {
            this.addSparkIdToDeepJobConfig((ExtractorConfig)config);
        } else if (config instanceof DeepJobConfig) {
            this.deepJobConfig = (DeepJobConfig)((DeepJobConfig)config).initialize();
        }
        int id = ((BaseConfig)config).getRddId();
        this.jobId = new JobID(this.jobTrackerId, id);
        Configuration conf = ((HadoopConfig)this.deepJobConfig).getHadoopConfiguration();
        JobContext jobContext = DeepSparkHadoopMapReduceUtil.newJobContext(conf, this.jobId);
        try {
            List splits = this.inputFormat.getSplits(jobContext);
            Partition[] partitions = new Partition[splits.size()];
            for (int i = 0; i < splits.size(); ++i) {
                partitions[i] = new NewHadoopPartition(id, i, (InputSplit)splits.get(i));
            }
            return partitions;
        }
        catch (IOException | InterruptedException | RuntimeException e) {
            LOG.error((Object)("Impossible to calculate partitions " + e.getMessage()));
            throw new DeepGenericException("Impossible to calculate partitions " + e.getMessage());
        }
    }

    @Override
    public boolean hasNext() {
        if (!this.finished && !this.havePair) {
            try {
                this.finished = !this.reader.nextKeyValue();
            }
            catch (IOException | InterruptedException e) {
                LOG.error((Object)("Impossible to get hasNext " + e.getMessage()));
                throw new DeepGenericException("Impossible to get hasNext " + e.getMessage());
            }
            this.havePair = !this.finished;
        }
        return !this.finished;
    }

    @Override
    public T next() {
        if (!this.hasNext()) {
            throw new NoSuchElementException("End of stream");
        }
        this.havePair = false;
        Object tuple = null;
        try {
            return this.transformElement(new Tuple2(this.reader.getCurrentKey(), this.reader.getCurrentValue()), this.deepJobConfig);
        }
        catch (IOException | InterruptedException e) {
            LOG.error((Object)("Impossible to get next value " + e.getMessage()));
            throw new DeepGenericException("Impossible to get next value " + e.getMessage());
        }
    }

    @Override
    public void close() {
        try {
            if (this.reader != null) {
                this.reader.close();
            }
            if (this.writer != null) {
                this.writer.close(this.hadoopAttemptContext);
            }
        }
        catch (IOException | InterruptedException e) {
            LOG.error((Object)("Impossible to close RecordReader " + e.getMessage()));
            throw new DeepGenericException("Impossible to close RecordReader " + e.getMessage());
        }
    }

    private void addSparkIdToDeepJobConfig(ExtractorConfig<T> config) {
        int id = config.getRddId();
        this.deepJobConfig = (DeepJobConfig)this.deepJobConfig.initialize(config);
        this.deepJobConfig.setRddId(id);
    }

    public abstract T transformElement(Tuple2<K, V> var1, DeepJobConfig<T, S> var2);

    @Override
    public void saveRDD(T t) {
        Tuple2<kOut, vOut> tuple = this.transformElement(t);
        try {
            this.writer.write(tuple._1(), tuple._2());
        }
        catch (IOException | InterruptedException e) {
            LOG.error((Object)("Impossible to saveRDD " + e.getMessage()));
            throw new DeepGenericException("Impossible to saveRDD " + e.getMessage());
        }
    }

    @Override
    public void initSave(S config, T first, UpdateQueryBuilder queryBuilder) {
        int id = ((BaseConfig)config).getRddId();
        int partitionIndex = ((BaseConfig)config).getPartitionId();
        TaskAttemptID attemptId = DeepSparkHadoopMapReduceUtil.newTaskAttemptID(this.jobTrackerId, id, true, partitionIndex, 0);
        Configuration configuration = null;
        configuration = config instanceof ExtractorConfig ? ((HadoopConfig)this.deepJobConfig.initialize((ExtractorConfig)config)).getHadoopConfiguration() : ((HadoopConfig)config).getHadoopConfiguration();
        this.hadoopAttemptContext = DeepSparkHadoopMapReduceUtil.newTaskAttemptContext(configuration, attemptId);
        try {
            this.writer = this.outputFormat.getRecordWriter(this.hadoopAttemptContext);
        }
        catch (IOException | InterruptedException e) {
            throw new DeepGenericException(e);
        }
    }

    @Override
    public void initIterator(Partition dp, S config) {
        int id = ((BaseConfig)config).getRddId();
        NewHadoopPartition split = (NewHadoopPartition)dp;
        TaskAttemptID attemptId = DeepSparkHadoopMapReduceUtil.newTaskAttemptID(this.jobTrackerId, id, true, split.index(), 0);
        Configuration configuration = null;
        configuration = config instanceof ExtractorConfig ? ((HadoopConfig)this.deepJobConfig.initialize((ExtractorConfig)config)).getHadoopConfiguration() : ((HadoopConfig)config).getHadoopConfiguration();
        TaskAttemptContext hadoopAttemptContext = DeepSparkHadoopMapReduceUtil.newTaskAttemptContext(configuration, attemptId);
        try {
            this.reader = this.inputFormat.createRecordReader((InputSplit)split.serializableHadoopSplit().value(), hadoopAttemptContext);
            this.reader.initialize((InputSplit)split.serializableHadoopSplit().value(), hadoopAttemptContext);
        }
        catch (IOException | InterruptedException e) {
            throw new DeepGenericException(e);
        }
    }

    public abstract Tuple2<kOut, vOut> transformElement(T var1);
}

