/*
 * Decompiled with CFR 0.152.
 */
package com.lucidworks.spark.example.hadoop;

import com.lucidworks.spark.SparkApp;
import com.lucidworks.spark.util.SolrSupport;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.zip.ZipInputStream;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.log4j.Logger;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.input.PortableDataStream;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.JavaConversions;

public class Logs2SolrRDDProcessor
implements SparkApp.RDDProcessor {
    public static Logger log = Logger.getLogger(Logs2SolrRDDProcessor.class);

    @Override
    public String getName() {
        return "logs2solr";
    }

    @Override
    public Option[] getOptions() {
        Option[] optionArray = new Option[1];
        OptionBuilder.withArgName((String)"PATH");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)false);
        OptionBuilder.withDescription((String)"HDFS path identifying the directories / files to index");
        optionArray[0] = OptionBuilder.create((String)"hdfsPath");
        return optionArray;
    }

    @Override
    public int run(SparkConf conf, CommandLine cli) throws Exception {
        JavaSparkContext jsc = new JavaSparkContext(conf);
        final String zkHost = cli.getOptionValue("zkHost", "localhost:9983");
        final String collection = cli.getOptionValue("collection", "collection1");
        final int batchSize = Integer.parseInt(cli.getOptionValue("batchSize", "1000"));
        jsc.binaryFiles(cli.getOptionValue("hdfsPath")).foreach((VoidFunction)new VoidFunction<Tuple2<String, PortableDataStream>>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void call(Tuple2<String, PortableDataStream> t2) throws Exception {
                CloudSolrClient solrServer = SolrSupport.getCachedCloudClient(zkHost);
                ArrayList<SolrInputDocument> batch = new ArrayList<SolrInputDocument>(batchSize);
                String path = (String)t2._1();
                BufferedReader br = null;
                String line = null;
                int lineNum = 0;
                try {
                    br = new BufferedReader(new InputStreamReader(this.openPortableDataStream((PortableDataStream)t2._2()), "UTF-8"));
                    while ((line = br.readLine()) != null) {
                        SolrInputDocument doc = new SolrInputDocument(new String[0]);
                        doc.setField("id", (Object)(path + ":" + ++lineNum));
                        doc.setField("path_s", (Object)path);
                        doc.setField("line_t", (Object)line);
                        batch.add(doc);
                        if (batch.size() >= batchSize) {
                            SolrSupport.sendBatchToSolr((SolrClient)solrServer, collection, (Iterable<SolrInputDocument>)JavaConversions.collectionAsScalaIterable(batch));
                        }
                        if (lineNum % 10000 != 0) continue;
                        log.info((Object)("Sent " + lineNum + " docs to Solr from " + path));
                    }
                    if (!batch.isEmpty()) {
                        SolrSupport.sendBatchToSolr((SolrClient)solrServer, collection, (Iterable<SolrInputDocument>)JavaConversions.collectionAsScalaIterable(batch));
                    }
                }
                catch (Exception exc) {
                    log.error((Object)("Failed to read '" + path + "' due to: " + exc));
                }
                finally {
                    if (br != null) {
                        try {
                            br.close();
                        }
                        catch (Exception exception) {}
                    }
                }
            }

            InputStream openPortableDataStream(PortableDataStream pds) throws Exception {
                Object in = null;
                String path = pds.getPath();
                log.info((Object)("Opening InputStream to " + path));
                if (path.endsWith(".zip")) {
                    ZipInputStream zipIn = new ZipInputStream(pds.open());
                    zipIn.getNextEntry();
                    in = zipIn;
                } else if (path.endsWith(".bz2")) {
                    in = new BZip2CompressorInputStream((InputStream)pds.open());
                } else if (path.endsWith(".gz")) {
                    in = new GzipCompressorInputStream((InputStream)pds.open());
                }
                return in;
            }
        });
        return 0;
    }
}

