/*
 * Decompiled with CFR 0.152.
 */
package com.lucidworks.spark.example.streaming;

import com.lucidworks.spark.SparkApp;
import com.lucidworks.spark.util.SolrSupport;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.log4j.Logger;
import org.apache.solr.common.SolrInputDocument;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.twitter.TwitterUtils;
import twitter4j.Status;

public class TwitterToSolrStreamProcessor
extends SparkApp.StreamProcessor {
    public static Logger log = Logger.getLogger(TwitterToSolrStreamProcessor.class);

    @Override
    public String getName() {
        return "twitter-to-solr";
    }

    @Override
    public void setup(JavaStreamingContext jssc, CommandLine cli) throws Exception {
        String filtersArg = cli.getOptionValue("tweetFilters");
        String[] filters = filtersArg != null ? filtersArg.split(",") : new String[]{};
        JavaReceiverInputDStream tweets = TwitterUtils.createStream((JavaStreamingContext)jssc, null, (String[])filters);
        String fusionUrl = cli.getOptionValue("fusion");
        if (fusionUrl != null) {
            SolrSupport.sendDStreamOfDocsToFusion(fusionUrl, cli.getOptionValue("fusionCredentials"), tweets.dstream(), this.batchSize);
        } else {
            JavaDStream docs = tweets.map((Function)new Function<Status, SolrInputDocument>(){

                public SolrInputDocument call(Status status) {
                    if (log.isDebugEnabled()) {
                        log.debug((Object)("Received tweet: " + status.getId() + ": " + status.getText().replaceAll("\\s+", " ")));
                    }
                    SolrInputDocument doc = SolrSupport.autoMapToSolrInputDoc("tweet-" + status.getId(), status, null);
                    doc.setField("provider_s", (Object)"twitter");
                    doc.setField("author_s", (Object)status.getUser().getScreenName());
                    doc.setField("type_s", (Object)(status.isRetweet() ? "echo" : "post"));
                    if (log.isDebugEnabled()) {
                        log.debug((Object)("Transformed document: " + doc.toString()));
                    }
                    return doc;
                }
            });
            SolrSupport.indexDStreamOfDocs(this.zkHost, this.collection, this.batchSize, (DStream<SolrInputDocument>)docs.dstream());
        }
    }

    @Override
    public Option[] getOptions() {
        Option[] optionArray = new Option[3];
        OptionBuilder.withArgName((String)"LIST");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)false);
        OptionBuilder.withDescription((String)"List of Twitter keywords to filter on, separated by commas");
        optionArray[0] = OptionBuilder.create((String)"tweetFilters");
        OptionBuilder.withArgName((String)"URL(s)");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)false);
        OptionBuilder.withDescription((String)"Fusion endpoint");
        optionArray[1] = OptionBuilder.create((String)"fusion");
        OptionBuilder.withArgName((String)"user:password:realm");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)false);
        OptionBuilder.withDescription((String)"Fusion credentials user:password:realm");
        optionArray[2] = OptionBuilder.create((String)"fusionCredentials");
        return optionArray;
    }
}

