package com.lucidworks.spark.example.streaming;

import com.lucidworks.spark.SparkApp;
import com.lucidworks.spark.util.SolrSupport;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.log4j.Logger;
import org.apache.solr.common.SolrInputDocument;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.twitter.TwitterUtils;
import twitter4j.Status;
import twitter4j.auth.Authorization;

/* loaded from: input_file:com/lucidworks/spark/example/streaming/TwitterToSolrStreamProcessor.class */
public class TwitterToSolrStreamProcessor extends SparkApp.StreamProcessor {
    public static Logger log = Logger.getLogger(TwitterToSolrStreamProcessor.class);

    @Override // com.lucidworks.spark.SparkApp.RDDProcessor
    public String getName() {
        return "twitter-to-solr";
    }

    @Override // com.lucidworks.spark.SparkApp.StreamProcessor
    public void setup(JavaStreamingContext javaStreamingContext, CommandLine commandLine) throws Exception {
        String optionValue = commandLine.getOptionValue("tweetFilters");
        JavaReceiverInputDStream createStream = TwitterUtils.createStream(javaStreamingContext, (Authorization) null, optionValue != null ? optionValue.split(",") : new String[0]);
        String optionValue2 = commandLine.getOptionValue("fusion");
        if (optionValue2 != null) {
            SolrSupport.sendDStreamOfDocsToFusion(optionValue2, commandLine.getOptionValue("fusionCredentials"), createStream.dstream(), this.batchSize);
        } else {
            SolrSupport.indexDStreamOfDocs(this.zkHost, this.collection, this.batchSize, createStream.map(new Function<Status, SolrInputDocument>() { // from class: com.lucidworks.spark.example.streaming.TwitterToSolrStreamProcessor.1
                public SolrInputDocument call(Status status) {
                    if (TwitterToSolrStreamProcessor.log.isDebugEnabled()) {
                        TwitterToSolrStreamProcessor.log.debug("Received tweet: " + status.getId() + ": " + status.getText().replaceAll("\\s+", " "));
                    }
                    SolrInputDocument autoMapToSolrInputDoc = SolrSupport.autoMapToSolrInputDoc("tweet-" + status.getId(), status, null);
                    autoMapToSolrInputDoc.setField("provider_s", "twitter");
                    autoMapToSolrInputDoc.setField("author_s", status.getUser().getScreenName());
                    autoMapToSolrInputDoc.setField("type_s", status.isRetweet() ? "echo" : "post");
                    if (TwitterToSolrStreamProcessor.log.isDebugEnabled()) {
                        TwitterToSolrStreamProcessor.log.debug("Transformed document: " + autoMapToSolrInputDoc.toString());
                    }
                    return autoMapToSolrInputDoc;
                }
            }).dstream());
        }
    }

    @Override // com.lucidworks.spark.SparkApp.RDDProcessor
    public Option[] getOptions() {
        OptionBuilder.withArgName("LIST");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(false);
        OptionBuilder.withDescription("List of Twitter keywords to filter on, separated by commas");
        OptionBuilder.withArgName("URL(s)");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(false);
        OptionBuilder.withDescription("Fusion endpoint");
        OptionBuilder.withArgName("user:password:realm");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired(false);
        OptionBuilder.withDescription("Fusion credentials user:password:realm");
        return new Option[]{OptionBuilder.create("tweetFilters"), OptionBuilder.create("fusion"), OptionBuilder.create("fusionCredentials")};
    }
}
