/*
 * Decompiled with CFR 0.152.
 */
package com.lucidworks.spark.example.hadoop;

import com.lucidworks.spark.SparkApp;
import com.lucidworks.spark.util.SolrSupport;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.log4j.Logger;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
import scala.Tuple2;

public class HdfsToSolrRDDProcessor
implements SparkApp.RDDProcessor {
    public static Logger log = Logger.getLogger(HdfsToSolrRDDProcessor.class);
    private static final String[] pigSchema = "id,integer1_i,integer2_i,long1_l,long2_l,float1_f,float2_f,double1_d,double2_d,timestamp1_tdt,timestamp2_tdt,string1_s,string2_s,string3_s,boolean1_b,boolean2_b,text1_en,text2_en,text3_en,random_bucket".split(",");

    @Override
    public String getName() {
        return "hdfs-to-solr";
    }

    @Override
    public Option[] getOptions() {
        Option[] optionArray = new Option[4];
        OptionBuilder.withArgName((String)"PATH");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)false);
        OptionBuilder.withDescription((String)"HDFS path identifying the directories / files to index");
        optionArray[0] = OptionBuilder.create((String)"hdfsPath");
        OptionBuilder.withArgName((String)"INT");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)false);
        OptionBuilder.withDescription((String)"Queue size for ConcurrentUpdateSolrClient; default is 1000");
        optionArray[1] = OptionBuilder.create((String)"queueSize");
        OptionBuilder.withArgName((String)"INT");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)false);
        OptionBuilder.withDescription((String)"Number of runner threads per ConcurrentUpdateSolrClient instance; default is 2");
        optionArray[2] = OptionBuilder.create((String)"numRunners");
        OptionBuilder.withArgName((String)"INT");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)false);
        OptionBuilder.withDescription((String)"Number of millis to wait until CUSS sees a doc on the queue before it closes the current request and starts another; default is 20 ms");
        optionArray[3] = OptionBuilder.create((String)"pollQueueTime");
        return optionArray;
    }

    @Override
    public int run(SparkConf conf, CommandLine cli) throws Exception {
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaRDD textFiles = jsc.textFile(cli.getOptionValue("hdfsPath"));
        JavaPairRDD pairs = textFiles.mapToPair((PairFunction)new PairFunction<String, String, SolrInputDocument>(){

            public Tuple2<String, SolrInputDocument> call(String line) throws Exception {
                SolrInputDocument doc = new SolrInputDocument(new String[0]);
                String[] row = line.split("\t");
                if (row.length != pigSchema.length) {
                    return null;
                }
                for (int c = 0; c < row.length; ++c) {
                    if (row[c] == null || row[c].length() <= 0) continue;
                    doc.setField(pigSchema[c], (Object)row[c]);
                }
                return new Tuple2((Object)((String)doc.getFieldValue("id")), (Object)doc);
            }
        });
        String zkHost = cli.getOptionValue("zkHost", "localhost:9983");
        String collection = cli.getOptionValue("collection", "collection1");
        int queueSize = Integer.parseInt(cli.getOptionValue("queueSize", "1000"));
        int numRunners = Integer.parseInt(cli.getOptionValue("numRunners", "2"));
        int pollQueueTime = Integer.parseInt(cli.getOptionValue("pollQueueTime", "20"));
        SolrSupport.indexDocs(zkHost, collection, 100, (RDD<SolrInputDocument>)pairs.values().rdd());
        CloudSolrClient cloudSolrClient = SolrSupport.getCachedCloudClient(zkHost);
        cloudSolrClient.setDefaultCollection(collection);
        cloudSolrClient.commit(true, true);
        return 0;
    }
}

