package com.stratio.deep.rdd;

import com.stratio.deep.config.ICassandraDeepJobConfig;
import com.stratio.deep.cql.DeepRecordReader;
import com.stratio.deep.cql.DeepTokenRange;
import com.stratio.deep.cql.RangeUtils;
import com.stratio.deep.entity.Cells;
import com.stratio.deep.entity.IDeepType;
import com.stratio.deep.exception.DeepIOException;
import com.stratio.deep.functions.CellList2TupleFunction;
import com.stratio.deep.functions.DeepType2TupleFunction;
import com.stratio.deep.partition.impl.DeepPartition;
import com.stratio.deep.utils.Pair;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.spark.InterruptibleIterator;
import org.apache.spark.Partition;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.rdd.RDD;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction0;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:com/stratio/deep/rdd/CassandraRDD.class */
public abstract class CassandraRDD<T> extends RDD<T> {
    private static final long serialVersionUID = -7338324965474684418L;
    protected final Broadcast<ICassandraDeepJobConfig<T>> config;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/stratio/deep/rdd/CassandraRDD$OnComputedRDDCallback.class */
    public class OnComputedRDDCallback<R> extends AbstractFunction0<R> {
        private final DeepRecordReader recordReader;
        private final DeepPartition deepPartition;

        public OnComputedRDDCallback(DeepRecordReader deepRecordReader, DeepPartition deepPartition) {
            this.recordReader = deepRecordReader;
            this.deepPartition = deepPartition;
        }

        public R apply() {
            this.recordReader.close();
            return null;
        }
    }

    protected abstract T transformElement(Pair<Map<String, ByteBuffer>, Map<String, ByteBuffer>> pair);

    public static <W, T extends IDeepType> void cql3SaveRDDToCassandra(RDD<W> rdd, ICassandraDeepJobConfig<W> iCassandraDeepJobConfig) {
        if (IDeepType.class.isAssignableFrom(iCassandraDeepJobConfig.getEntityClass())) {
            CassandraRDDUtils.doCql3SaveToCassandra(rdd, iCassandraDeepJobConfig, new DeepType2TupleFunction());
        } else {
            if (!Cells.class.isAssignableFrom(iCassandraDeepJobConfig.getEntityClass())) {
                throw new IllegalArgumentException("Provided RDD must be an RDD of Cells or an RDD of IDeepType");
            }
            CassandraRDDUtils.doCql3SaveToCassandra(rdd, iCassandraDeepJobConfig, new CellList2TupleFunction());
        }
    }

    public static <W, T extends IDeepType> void saveRDDToCassandra(RDD<W> rdd, ICassandraDeepJobConfig<W> iCassandraDeepJobConfig) {
        if (IDeepType.class.isAssignableFrom(iCassandraDeepJobConfig.getEntityClass())) {
            CassandraRDDUtils.doSaveToCassandra(rdd, iCassandraDeepJobConfig, new DeepType2TupleFunction());
        } else {
            if (!Cells.class.isAssignableFrom(iCassandraDeepJobConfig.getEntityClass())) {
                throw new IllegalArgumentException("Provided RDD must be an RDD of Cells or an RDD of IDeepType");
            }
            CassandraRDDUtils.doSaveToCassandra(rdd, iCassandraDeepJobConfig, new CellList2TupleFunction());
        }
    }

    public static <W> void saveRDDToCassandra(JavaRDD<W> javaRDD, ICassandraDeepJobConfig<W> iCassandraDeepJobConfig) {
        saveRDDToCassandra(javaRDD.rdd(), iCassandraDeepJobConfig);
    }

    public CassandraRDD(SparkContext sparkContext, ICassandraDeepJobConfig<T> iCassandraDeepJobConfig) {
        super(sparkContext, Seq$.MODULE$.empty(), ClassTag$.MODULE$.apply(iCassandraDeepJobConfig.getEntityClass()));
        this.config = sparkContext.broadcast(iCassandraDeepJobConfig, ClassTag$.MODULE$.apply(iCassandraDeepJobConfig.getClass()));
    }

    public Iterator<T> compute(Partition partition, TaskContext taskContext) {
        DeepPartition deepPartition = (DeepPartition) partition;
        log().debug("Executing compute for split: " + deepPartition);
        final DeepRecordReader initRecordReader = initRecordReader(taskContext, deepPartition);
        return new InterruptibleIterator(taskContext, JavaConversions.asScalaIterator(new java.util.Iterator<T>() { // from class: com.stratio.deep.rdd.CassandraRDD.1
            @Override // java.util.Iterator
            public boolean hasNext() {
                return initRecordReader.hasNext();
            }

            @Override // java.util.Iterator
            public T next() {
                return (T) CassandraRDD.this.transformElement(initRecordReader.next());
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new DeepIOException("Method not implemented (and won't be implemented anytime soon!!!)");
            }
        }));
    }

    protected AbstractFunction0<BoxedUnit> getComputeCallback(DeepRecordReader deepRecordReader, DeepPartition deepPartition) {
        return new OnComputedRDDCallback(deepRecordReader, deepPartition);
    }

    public Partition[] getPartitions() {
        List<DeepTokenRange> splits = RangeUtils.getSplits((ICassandraDeepJobConfig) this.config.value());
        DeepPartition[] deepPartitionArr = new DeepPartition[splits.size()];
        int i = 0;
        java.util.Iterator<DeepTokenRange> it = splits.iterator();
        while (it.hasNext()) {
            deepPartitionArr[i] = new DeepPartition(id(), i, it.next());
            log().debug("Detected partition: " + deepPartitionArr[i]);
            i++;
        }
        return deepPartitionArr;
    }

    public Seq<String> getPreferredLocations(Partition partition) {
        DeepPartition deepPartition = (DeepPartition) partition;
        List<String> replicas = deepPartition.splitWrapper().getReplicas();
        log().debug("getPreferredLocations: " + deepPartition);
        return JavaConversions.asScalaBuffer(replicas);
    }

    private DeepRecordReader initRecordReader(TaskContext taskContext, DeepPartition deepPartition) {
        DeepRecordReader deepRecordReader = new DeepRecordReader((ICassandraDeepJobConfig) this.config.value(), deepPartition.splitWrapper());
        taskContext.addOnCompleteCallback(getComputeCallback(deepRecordReader, deepPartition));
        return deepRecordReader;
    }
}
