package com.lucidworks.spark;

import com.lucidworks.spark.example.hadoop.HdfsToSolrRDDProcessor;
import com.lucidworks.spark.example.hadoop.Logs2SolrRDDProcessor;
import com.lucidworks.spark.example.ml.SVMExample;
import com.lucidworks.spark.example.query.KMeansAnomaly;
import com.lucidworks.spark.example.query.QueryBenchmark;
import com.lucidworks.spark.example.query.ReadTermVectors;
import com.lucidworks.spark.example.query.WordCount;
import com.lucidworks.spark.example.streaming.DocumentFilteringStreamProcessor;
import com.lucidworks.spark.example.streaming.TwitterToSolrStreamProcessor;
import com.lucidworks.spark.port.example.events.EventsimIndexer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PrintStream;
import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.TreeSet;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/* loaded from: input_file:com/lucidworks/spark/SparkApp.class */
public class SparkApp implements Serializable {
    private static final String sparkExecutorExtraJavaOptionsParam = "spark.executor.extraJavaOptions";
    public static Logger log = Logger.getLogger(SparkApp.class);

    /* loaded from: input_file:com/lucidworks/spark/SparkApp$RDDProcessor.class */
    public interface RDDProcessor extends Serializable {
        String getName();

        Option[] getOptions();

        int run(SparkConf sparkConf, CommandLine commandLine) throws Exception;
    }

    /* loaded from: input_file:com/lucidworks/spark/SparkApp$StreamProcessor.class */
    public static abstract class StreamProcessor implements RDDProcessor {
        protected String zkHost;
        protected String collection;
        protected int batchSize;

        @Override // com.lucidworks.spark.SparkApp.RDDProcessor
        public int run(SparkConf sparkConf, CommandLine commandLine) throws Exception {
            this.zkHost = commandLine.getOptionValue("zkHost", "localhost:9983");
            this.collection = commandLine.getOptionValue("collection", "collection1");
            this.batchSize = Integer.parseInt(commandLine.getOptionValue("batchSize", "10"));
            JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(Integer.parseInt(commandLine.getOptionValue("batchInterval", "1")) * 1000));
            if (commandLine.hasOption("pipeline")) {
                File file = new File(commandLine.getOptionValue("pipeline"));
                if (!file.isFile()) {
                    throw new FileNotFoundException(file.getAbsolutePath() + " not found!");
                }
                javaStreamingContext.sparkContext().addFile(commandLine.getOptionValue("pipeline"));
            }
            setup(javaStreamingContext, commandLine);
            javaStreamingContext.start();
            javaStreamingContext.awaitTermination();
            return 0;
        }

        public String getCollection() {
            return this.collection;
        }

        public String getZkHost() {
            return this.zkHost;
        }

        public int getBatchSize() {
            return this.batchSize;
        }

        public abstract void setup(JavaStreamingContext javaStreamingContext, CommandLine commandLine) throws Exception;
    }

    public static void main(String[] strArr) throws Exception {
        RDDProcessor newProcessor;
        if (strArr == null || strArr.length == 0 || strArr[0] == null || strArr[0].trim().length() == 0) {
            System.err.println("Invalid command-line args! Must pass the name of a processor to run.\nSupported processors:\n");
            displayProcessorOptions(System.err);
            System.exit(1);
        }
        try {
            newProcessor = (RDDProcessor) SparkApp.class.getClassLoader().loadClass(strArr[0]).newInstance();
        } catch (ClassNotFoundException e) {
            newProcessor = newProcessor(strArr[0].trim().toLowerCase(Locale.ROOT));
        }
        assertSerializable(newProcessor);
        String[] strArr2 = new String[strArr.length - 1];
        System.arraycopy(strArr, 1, strArr2, 0, strArr2.length);
        CommandLine processCommandLineArgs = processCommandLineArgs(joinCommonAndProcessorOptions(newProcessor.getOptions()), strArr2);
        SparkConf appName = new SparkConf().setAppName(newProcessor.getName());
        appName.set("spark.task.maxFailures", "10");
        setupSolrAuthenticationProps(processCommandLineArgs, appName);
        String optionValue = processCommandLineArgs.getOptionValue("master");
        if (optionValue != null) {
            appName.setMaster(optionValue);
        }
        log.info("Running processor " + newProcessor.getName());
        System.exit(newProcessor.run(appName, processCommandLineArgs));
    }

    protected static void setupSolrAuthenticationProps(CommandLine commandLine, SparkConf sparkConf) {
        String optionValue = commandLine.getOptionValue("solrJaasAuthConfig");
        if (optionValue == null || optionValue.isEmpty()) {
            return;
        }
        String optionValue2 = commandLine.getOptionValue("solrJaasAppName", "Client");
        String format = String.format(Locale.ROOT, "-D%s=%s -Dsolr.kerberos.jaas.appname=%s", "java.security.auth.login.config", optionValue, optionValue2);
        String str = sparkConf.contains(sparkExecutorExtraJavaOptionsParam) ? sparkConf.get(sparkExecutorExtraJavaOptionsParam) : null;
        if (str == null) {
            str = format;
        } else if (!str.contains("java.security.auth.login.config")) {
            str = str + " " + format;
        }
        sparkConf.set(sparkExecutorExtraJavaOptionsParam, str);
        System.setProperty("java.security.auth.login.config", optionValue);
        System.setProperty("solr.kerberos.jaas.appname", optionValue2);
        log.info("Added " + format + " to " + sparkExecutorExtraJavaOptionsParam + " for authenticating to Solr.");
    }

    public static Option[] getCommonOptions() {
        return new Option[]{Option.builder().hasArg().required(false).desc("Batch interval (seconds) for streaming applications; default is 1 second").longOpt("batchInterval").build(), Option.builder().hasArg().required(false).desc("The master URL to connect to, such as \"local\" to run locally with one thread, \"local[4]\" to run locally with 4 cores, or \"spark://master:7077\" to run on a Spark standalone cluster.").longOpt("master").build(), Option.builder().hasArg().required(false).desc("Address of the Zookeeper ensemble; defaults to: localhost:9983").longOpt("zkHost").build(), Option.builder().hasArg().required(false).desc("Name of collection; no default").longOpt("collection").build(), Option.builder().hasArg().required(false).desc("Number of docs to queue up on the client before sending to Solr; default is 10").longOpt("batchSize").build(), Option.builder().hasArg().required(false).desc("For authenticating to Solr using JAAS, sets the 'java.security.auth.login.config' system property.").longOpt("solrJaasAuthConfig").build(), Option.builder().hasArg().required(false).desc("For authenticating to Solr using JAAS, sets the 'solr.kerberos.jaas.appname' system property; default is Client").longOpt("solrJaasAppName").build()};
    }

    private static RDDProcessor newProcessor(String str) throws Exception {
        String trim = str.trim();
        if ("twitter-to-solr".equals(trim)) {
            return new TwitterToSolrStreamProcessor();
        }
        if ("word-count".equals(trim)) {
            return new WordCount();
        }
        if ("term-vectors".equals(trim)) {
            return new ReadTermVectors();
        }
        if ("docfilter".equals(trim)) {
            return new DocumentFilteringStreamProcessor();
        }
        if ("hdfs-to-solr".equals(trim)) {
            return new HdfsToSolrRDDProcessor();
        }
        if ("logs2solr".equals(trim)) {
            return new Logs2SolrRDDProcessor();
        }
        if ("query-solr-benchmark".equals(trim)) {
            return new QueryBenchmark();
        }
        if ("kmeans-anomaly".equals(trim)) {
            return new KMeansAnomaly();
        }
        if ("eventsim".equals(trim)) {
            return new EventsimIndexer();
        }
        if ("mllib-svm".equals(trim)) {
            return new SVMExample();
        }
        Iterator<Class<RDDProcessor>> it = findProcessorClassesInPackage("com.lucidworks.spark").iterator();
        while (it.hasNext()) {
            RDDProcessor newInstance = it.next().newInstance();
            if (trim.equals(newInstance.getName())) {
                return newInstance;
            }
        }
        System.err.println("\n\n " + trim + " not supported! Please check your command-line arguments and re-try. \n\n");
        System.exit(1);
        return null;
    }

    private static void displayProcessorOptions(PrintStream printStream) throws Exception {
        HelpFormatter helpFormatter = new HelpFormatter();
        helpFormatter.printHelp("twitter-to-solr", getProcessorOptions(new TwitterToSolrStreamProcessor()));
        helpFormatter.printHelp("word-count", getProcessorOptions(new WordCount()));
        helpFormatter.printHelp("term-vectors", getProcessorOptions(new ReadTermVectors()));
        helpFormatter.printHelp("docfilter", getProcessorOptions(new DocumentFilteringStreamProcessor()));
        helpFormatter.printHelp("hdfs-to-solr", getProcessorOptions(new HdfsToSolrRDDProcessor()));
        helpFormatter.printHelp("logs2solr", getProcessorOptions(new Logs2SolrRDDProcessor()));
        helpFormatter.printHelp("query-solr-benchmark", getProcessorOptions(new QueryBenchmark()));
        helpFormatter.printHelp("kmeans-anomaly", getProcessorOptions(new KMeansAnomaly()));
        helpFormatter.printHelp("eventsim", getProcessorOptions(new EventsimIndexer()));
        helpFormatter.printHelp("mllib-svm", getProcessorOptions(new SVMExample()));
        Iterator<Class<RDDProcessor>> it = findProcessorClassesInPackage("com.lucidworks.spark").iterator();
        while (it.hasNext()) {
            RDDProcessor newInstance = it.next().newInstance();
            helpFormatter.printHelp(newInstance.getName(), getProcessorOptions(newInstance));
        }
    }

    private static Options getProcessorOptions(RDDProcessor rDDProcessor) {
        Options options = new Options();
        options.addOption("h", "help", false, "Print this message");
        options.addOption("v", "verbose", false, "Generate verbose log messages");
        for (Option option : joinCommonAndProcessorOptions(rDDProcessor.getOptions())) {
            options.addOption(option);
        }
        return options;
    }

    public static Option[] joinCommonAndProcessorOptions(Option[] optionArr) {
        return joinOptions(getCommonOptions(), optionArr);
    }

    public static Option[] joinOptions(Option[] optionArr, Option[] optionArr2) {
        ArrayList arrayList = new ArrayList();
        if (optionArr != null && optionArr.length > 0) {
            for (Option option : optionArr) {
                arrayList.add(option);
            }
        }
        if (optionArr2 != null) {
            for (Option option2 : optionArr2) {
                arrayList.add(option2);
            }
        }
        return (Option[]) arrayList.toArray(new Option[0]);
    }

    public static CommandLine processCommandLineArgs(Option[] optionArr, String[] strArr) {
        Options options = new Options();
        options.addOption("h", "help", false, "Print this message");
        options.addOption("v", "verbose", false, "Generate verbose log messages");
        if (optionArr != null) {
            for (Option option : optionArr) {
                options.addOption(option);
            }
        }
        CommandLine commandLine = null;
        try {
            commandLine = new GnuParser().parse(options, strArr);
        } catch (ParseException e) {
            boolean z = false;
            if (strArr != null && strArr.length > 0) {
                for (int i = 0; i < strArr.length; i++) {
                    if ("-h".equals(strArr[i]) || "-help".equals(strArr[i])) {
                        z = true;
                        break;
                    }
                }
            }
            if (!z) {
                System.err.println("Failed to parse command-line arguments due to: " + e.getMessage());
            }
            new HelpFormatter().printHelp(SparkApp.class.getName(), options);
            System.exit(1);
        }
        if (commandLine.hasOption("help")) {
            new HelpFormatter().printHelp(SparkApp.class.getName(), options);
            System.exit(0);
        }
        return commandLine;
    }

    private static List<Class<RDDProcessor>> findProcessorClassesInPackage(String str) {
        ArrayList arrayList = new ArrayList();
        try {
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            Enumeration<URL> resources = contextClassLoader.getResources(str.replace('.', '/'));
            TreeSet treeSet = new TreeSet();
            while (resources.hasMoreElements()) {
                treeSet.addAll(findClasses(resources.nextElement().getFile(), str));
            }
            Iterator it = treeSet.iterator();
            while (it.hasNext()) {
                Class<?> loadClass = contextClassLoader.loadClass((String) it.next());
                if (RDDProcessor.class.isAssignableFrom(loadClass)) {
                    arrayList.add(loadClass);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return arrayList;
    }

    private static Set<String> findClasses(String str, String str2) throws Exception {
        TreeSet treeSet = new TreeSet();
        if (str.startsWith("file:") && str.contains("!")) {
            ZipInputStream zipInputStream = new ZipInputStream(new URL(str.split("!")[0]).openStream());
            while (true) {
                ZipEntry nextEntry = zipInputStream.getNextEntry();
                if (nextEntry == null) {
                    break;
                }
                if (nextEntry.getName().endsWith(".class")) {
                    String replace = nextEntry.getName().replaceAll("[$].*", "").replaceAll("[.]class", "").replace('/', '.');
                    if (replace.startsWith(str2)) {
                        treeSet.add(replace);
                    }
                }
            }
        } else {
            File file = new File(str);
            if (file.isDirectory()) {
                if (file.getAbsolutePath().endsWith(str2.replace('.', File.separatorChar))) {
                    for (File file2 : file.listFiles()) {
                        if (file2.getName().endsWith(".class")) {
                            String replace2 = file2.getName().replaceAll("[$].*", "").replaceAll("[.]class", "").replace('/', '.');
                            if (replace2.indexOf("$") == -1) {
                                treeSet.add(str2 + "." + replace2);
                            }
                        }
                    }
                }
            }
        }
        return treeSet;
    }

    public static Serializable assertSerializable(Object obj) {
        try {
            return bytes2ser(ser2bytes((Serializable) obj));
        } catch (Exception e) {
            throw new IllegalArgumentException("Object of type [" + obj.getClass().getName() + "] is not Serializable due to: " + e);
        }
    }

    public static Serializable bytes2ser(byte[] bArr) throws Exception {
        ObjectInputStream objectInputStream = null;
        try {
            objectInputStream = new ObjectInputStream(new ByteArrayInputStream(bArr));
            Serializable serializable = (Serializable) objectInputStream.readObject();
            if (objectInputStream != null) {
                try {
                    objectInputStream.close();
                } catch (Exception e) {
                }
            }
            return serializable;
        } catch (Throwable th) {
            if (objectInputStream != null) {
                try {
                    objectInputStream.close();
                } catch (Exception e2) {
                }
            }
            throw th;
        }
    }

    private static byte[] ser2bytes(Serializable serializable) throws Exception {
        ObjectOutputStream objectOutputStream = null;
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
            objectOutputStream.writeObject(serializable);
            objectOutputStream.flush();
            byte[] byteArray = byteArrayOutputStream.toByteArray();
            if (objectOutputStream != null) {
                try {
                    objectOutputStream.close();
                } catch (Exception e) {
                }
            }
            return byteArray;
        } catch (Throwable th) {
            if (objectOutputStream != null) {
                try {
                    objectOutputStream.close();
                } catch (Exception e2) {
                }
            }
            throw th;
        }
    }
}
