/*
 * Decompiled with CFR 0.152.
 */
package com.lucidworks.spark.example.streaming;

import com.lucidworks.spark.SparkApp;
import com.lucidworks.spark.filter.DocFilterContext;
import com.lucidworks.spark.util.SolrSupport;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.log4j.Logger;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.common.SolrInputDocument;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.twitter.TwitterUtils;
import twitter4j.Status;

public class DocumentFilteringStreamProcessor
extends SparkApp.StreamProcessor {
    public static Logger log = Logger.getLogger(DocumentFilteringStreamProcessor.class);

    @Override
    public String getName() {
        return "docfilter";
    }

    @Override
    public void setup(JavaStreamingContext jssc, CommandLine cli) throws Exception {
        DocFilterContext docFilterContext = this.loadDocFilterContext(jssc, cli);
        final String idFieldName = docFilterContext.getDocIdFieldName();
        String filtersArg = cli.getOptionValue("tweetFilters");
        String[] filters = filtersArg != null ? filtersArg.split(",") : new String[]{};
        JavaReceiverInputDStream tweets = TwitterUtils.createStream((JavaStreamingContext)jssc, null, (String[])filters);
        JavaDStream docs = tweets.map((Function)new Function<Status, SolrInputDocument>(){

            public SolrInputDocument call(Status status) {
                SolrInputDocument doc = SolrSupport.autoMapToSolrInputDoc(idFieldName, "tweet-" + status.getId(), status, null);
                doc.setField("provider_s", (Object)"twitter");
                doc.setField("author_s", (Object)status.getUser().getScreenName());
                doc.setField("type_s", (Object)(status.isRetweet() ? "echo" : "post"));
                return doc;
            }
        });
        String filterCollection = cli.getOptionValue("filterCollection", this.collection);
        DStream<SolrInputDocument> enriched = SolrSupport.filterDocuments(docFilterContext, this.zkHost, filterCollection, (DStream<SolrInputDocument>)docs.dstream());
        SolrSupport.indexDStreamOfDocs(this.zkHost, this.collection, this.batchSize, enriched);
    }

    protected DocFilterContext loadDocFilterContext(JavaStreamingContext jssc, CommandLine cli) throws Exception {
        DocFilterContext ctxt = null;
        String docFilterContextImplClass = cli.getOptionValue("docFilterContextImplClass");
        if (docFilterContextImplClass != null) {
            Class<?> implClass = this.getClass().getClassLoader().loadClass(docFilterContextImplClass);
            ctxt = (DocFilterContext)implClass.newInstance();
        } else {
            ctxt = new ExampleDocFilterContextImpl();
        }
        ctxt.init(jssc, cli);
        return ctxt;
    }

    @Override
    public Option[] getOptions() {
        Option[] optionArray = new Option[3];
        OptionBuilder.withArgName((String)"LIST");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)false);
        OptionBuilder.withDescription((String)"List of Twitter keywords to filter on, separated by commas");
        optionArray[0] = OptionBuilder.create((String)"tweetFilters");
        OptionBuilder.withArgName((String)"NAME");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)false);
        OptionBuilder.withDescription((String)"Collection to pull configuration files to create an EmbeddedSolrServer for document matching; defaults to the value of the collection option.");
        optionArray[1] = OptionBuilder.create((String)"filterCollection");
        OptionBuilder.withArgName((String)"CLASS");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)false);
        OptionBuilder.withDescription((String)("Name of the DocFilterContext implementation class; defaults to an internal example impl: " + ExampleDocFilterContextImpl.class.getName()));
        optionArray[2] = OptionBuilder.create((String)"docFilterContextImplClass");
        return optionArray;
    }

    class ExampleDocFilterContextImpl
    implements DocFilterContext {
        ExampleDocFilterContextImpl() {
        }

        @Override
        public void init(JavaStreamingContext jssc, CommandLine cli) {
        }

        @Override
        public String getDocIdFieldName() {
            return "id";
        }

        @Override
        public List<SolrQuery> getQueries() {
            ArrayList<SolrQuery> queryList = new ArrayList<SolrQuery>();
            SolrQuery q1 = new SolrQuery("type_s:post");
            q1.setParam("_qid_", new String[]{"POSTS"});
            queryList.add(q1);
            SolrQuery q2 = new SolrQuery("type_s:echo");
            q2.setParam("_qid_", new String[]{"ECHOS"});
            queryList.add(q2);
            return queryList;
        }

        @Override
        public void onMatch(SolrQuery query, SolrInputDocument inputDoc) {
            String[] qids = query.getParams("_qid_");
            if (qids == null || qids.length < 1) {
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug((Object)("document [" + inputDoc.getFieldValue("id") + "] matches query: " + qids[0]));
            }
            inputDoc.addField("_qid_ss", (Object)qids[0]);
        }
    }
}

