/*
 * Decompiled with CFR 0.152.
 */
package com.lucidworks.spark;

import com.lucidworks.spark.example.hadoop.HdfsToSolrRDDProcessor;
import com.lucidworks.spark.example.hadoop.Logs2SolrRDDProcessor;
import com.lucidworks.spark.example.ml.SVMExample;
import com.lucidworks.spark.example.query.KMeansAnomaly;
import com.lucidworks.spark.example.query.QueryBenchmark;
import com.lucidworks.spark.example.query.ReadTermVectors;
import com.lucidworks.spark.example.query.WordCount;
import com.lucidworks.spark.example.streaming.DocumentFilteringStreamProcessor;
import com.lucidworks.spark.example.streaming.TwitterToSolrStreamProcessor;
import com.lucidworks.spark.port.example.events.EventsimIndexer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.PrintStream;
import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.TreeSet;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

public class SparkApp
implements Serializable {
    private static final String sparkExecutorExtraJavaOptionsParam = "spark.executor.extraJavaOptions";
    public static Logger log = Logger.getLogger(SparkApp.class);

    public static void main(String[] args) throws Exception {
        RDDProcessor procImpl;
        if (args == null || args.length == 0 || args[0] == null || args[0].trim().length() == 0) {
            System.err.println("Invalid command-line args! Must pass the name of a processor to run.\nSupported processors:\n");
            SparkApp.displayProcessorOptions(System.err);
            System.exit(1);
        }
        ClassLoader myCL = SparkApp.class.getClassLoader();
        try {
            Class<?> clazz = myCL.loadClass(args[0]);
            procImpl = (RDDProcessor)clazz.newInstance();
        }
        catch (ClassNotFoundException cnfe) {
            procImpl = SparkApp.newProcessor(args[0].trim().toLowerCase(Locale.ROOT));
        }
        SparkApp.assertSerializable(procImpl);
        String[] procImplArgs = new String[args.length - 1];
        System.arraycopy(args, 1, procImplArgs, 0, procImplArgs.length);
        CommandLine cli = SparkApp.processCommandLineArgs(SparkApp.joinCommonAndProcessorOptions(procImpl.getOptions()), procImplArgs);
        SparkConf sparkConf = new SparkConf().setAppName(procImpl.getName());
        sparkConf.set("spark.task.maxFailures", "10");
        SparkApp.setupSolrAuthenticationProps(cli, sparkConf);
        String masterUrl = cli.getOptionValue("master");
        if (masterUrl != null) {
            sparkConf.setMaster(masterUrl);
        }
        log.info((Object)("Running processor " + procImpl.getName()));
        int exitCode = procImpl.run(sparkConf, cli);
        System.exit(exitCode);
    }

    protected static void setupSolrAuthenticationProps(CommandLine cli, SparkConf sparkConf) {
        String sparkExecutorExtraJavaOptions;
        String solrJaasAuthConfig = cli.getOptionValue("solrJaasAuthConfig");
        if (solrJaasAuthConfig == null || solrJaasAuthConfig.isEmpty()) {
            return;
        }
        String solrJaasAppName = cli.getOptionValue("solrJaasAppName", "Client");
        String solrJaasOpts = String.format(Locale.ROOT, "-D%s=%s -Dsolr.kerberos.jaas.appname=%s", "java.security.auth.login.config", solrJaasAuthConfig, solrJaasAppName);
        String string = sparkExecutorExtraJavaOptions = sparkConf.contains(sparkExecutorExtraJavaOptionsParam) ? sparkConf.get(sparkExecutorExtraJavaOptionsParam) : null;
        if (sparkExecutorExtraJavaOptions == null) {
            sparkExecutorExtraJavaOptions = solrJaasOpts;
        } else if (!sparkExecutorExtraJavaOptions.contains("java.security.auth.login.config")) {
            sparkExecutorExtraJavaOptions = sparkExecutorExtraJavaOptions + " " + solrJaasOpts;
        }
        sparkConf.set(sparkExecutorExtraJavaOptionsParam, sparkExecutorExtraJavaOptions);
        System.setProperty("java.security.auth.login.config", solrJaasAuthConfig);
        System.setProperty("solr.kerberos.jaas.appname", solrJaasAppName);
        log.info((Object)("Added " + solrJaasOpts + " to " + sparkExecutorExtraJavaOptionsParam + " for authenticating to Solr."));
    }

    public static Option[] getCommonOptions() {
        return new Option[]{Option.builder().hasArg().required(false).desc("Batch interval (seconds) for streaming applications; default is 1 second").longOpt("batchInterval").build(), Option.builder().hasArg().required(false).desc("The master URL to connect to, such as \"local\" to run locally with one thread, \"local[4]\" to run locally with 4 cores, or \"spark://master:7077\" to run on a Spark standalone cluster.").longOpt("master").build(), Option.builder().hasArg().required(false).desc("Address of the Zookeeper ensemble; defaults to: localhost:9983").longOpt("zkHost").build(), Option.builder().hasArg().required(false).desc("Name of collection; no default").longOpt("collection").build(), Option.builder().hasArg().required(false).desc("Number of docs to queue up on the client before sending to Solr; default is 10").longOpt("batchSize").build(), Option.builder().hasArg().required(false).desc("For authenticating to Solr using JAAS, sets the 'java.security.auth.login.config' system property.").longOpt("solrJaasAuthConfig").build(), Option.builder().hasArg().required(false).desc("For authenticating to Solr using JAAS, sets the 'solr.kerberos.jaas.appname' system property; default is Client").longOpt("solrJaasAppName").build()};
    }

    private static RDDProcessor newProcessor(String streamProcType) throws Exception {
        if ("twitter-to-solr".equals(streamProcType = streamProcType.trim())) {
            return new TwitterToSolrStreamProcessor();
        }
        if ("word-count".equals(streamProcType)) {
            return new WordCount();
        }
        if ("term-vectors".equals(streamProcType)) {
            return new ReadTermVectors();
        }
        if ("docfilter".equals(streamProcType)) {
            return new DocumentFilteringStreamProcessor();
        }
        if ("hdfs-to-solr".equals(streamProcType)) {
            return new HdfsToSolrRDDProcessor();
        }
        if ("logs2solr".equals(streamProcType)) {
            return new Logs2SolrRDDProcessor();
        }
        if ("query-solr-benchmark".equals(streamProcType)) {
            return new QueryBenchmark();
        }
        if ("kmeans-anomaly".equals(streamProcType)) {
            return new KMeansAnomaly();
        }
        if ("eventsim".equals(streamProcType)) {
            return new EventsimIndexer();
        }
        if ("mllib-svm".equals(streamProcType)) {
            return new SVMExample();
        }
        for (Class<RDDProcessor> next : SparkApp.findProcessorClassesInPackage("com.lucidworks.spark")) {
            RDDProcessor streamProc = next.newInstance();
            if (!streamProcType.equals(streamProc.getName())) continue;
            return streamProc;
        }
        System.err.println("\n\n " + streamProcType + " not supported! Please check your command-line arguments and re-try. \n\n");
        System.exit(1);
        return null;
    }

    private static void displayProcessorOptions(PrintStream out) throws Exception {
        HelpFormatter formatter = new HelpFormatter();
        formatter.printHelp("twitter-to-solr", SparkApp.getProcessorOptions(new TwitterToSolrStreamProcessor()));
        formatter.printHelp("word-count", SparkApp.getProcessorOptions(new WordCount()));
        formatter.printHelp("term-vectors", SparkApp.getProcessorOptions(new ReadTermVectors()));
        formatter.printHelp("docfilter", SparkApp.getProcessorOptions(new DocumentFilteringStreamProcessor()));
        formatter.printHelp("hdfs-to-solr", SparkApp.getProcessorOptions(new HdfsToSolrRDDProcessor()));
        formatter.printHelp("logs2solr", SparkApp.getProcessorOptions(new Logs2SolrRDDProcessor()));
        formatter.printHelp("query-solr-benchmark", SparkApp.getProcessorOptions(new QueryBenchmark()));
        formatter.printHelp("kmeans-anomaly", SparkApp.getProcessorOptions(new KMeansAnomaly()));
        formatter.printHelp("eventsim", SparkApp.getProcessorOptions(new EventsimIndexer()));
        formatter.printHelp("mllib-svm", SparkApp.getProcessorOptions(new SVMExample()));
        List<Class<RDDProcessor>> toolClasses = SparkApp.findProcessorClassesInPackage("com.lucidworks.spark");
        for (Class<RDDProcessor> next : toolClasses) {
            RDDProcessor tool = next.newInstance();
            formatter.printHelp(tool.getName(), SparkApp.getProcessorOptions(tool));
        }
    }

    private static Options getProcessorOptions(RDDProcessor tool) {
        Options options = new Options();
        options.addOption("h", "help", false, "Print this message");
        options.addOption("v", "verbose", false, "Generate verbose log messages");
        Option[] toolOpts = SparkApp.joinCommonAndProcessorOptions(tool.getOptions());
        for (int i = 0; i < toolOpts.length; ++i) {
            options.addOption(toolOpts[i]);
        }
        return options;
    }

    public static Option[] joinCommonAndProcessorOptions(Option[] toolOpts) {
        return SparkApp.joinOptions(SparkApp.getCommonOptions(), toolOpts);
    }

    public static Option[] joinOptions(Option[] lhs, Option[] rhs) {
        ArrayList<Option> options = new ArrayList<Option>();
        if (lhs != null && lhs.length > 0) {
            for (Option opt : lhs) {
                options.add(opt);
            }
        }
        if (rhs != null) {
            for (Option opt : rhs) {
                options.add(opt);
            }
        }
        return options.toArray(new Option[0]);
    }

    public static CommandLine processCommandLineArgs(Option[] customOptions, String[] args) {
        Options options = new Options();
        options.addOption("h", "help", false, "Print this message");
        options.addOption("v", "verbose", false, "Generate verbose log messages");
        if (customOptions != null) {
            for (int i = 0; i < customOptions.length; ++i) {
                options.addOption(customOptions[i]);
            }
        }
        CommandLine cli = null;
        try {
            cli = new GnuParser().parse(options, args);
        }
        catch (ParseException exp) {
            boolean hasHelpArg = false;
            if (args != null && args.length > 0) {
                for (int z = 0; z < args.length; ++z) {
                    if (!"-h".equals(args[z]) && !"-help".equals(args[z])) continue;
                    hasHelpArg = true;
                    break;
                }
            }
            if (!hasHelpArg) {
                System.err.println("Failed to parse command-line arguments due to: " + exp.getMessage());
            }
            HelpFormatter formatter = new HelpFormatter();
            formatter.printHelp(SparkApp.class.getName(), options);
            System.exit(1);
        }
        if (cli.hasOption("help")) {
            HelpFormatter formatter = new HelpFormatter();
            formatter.printHelp(SparkApp.class.getName(), options);
            System.exit(0);
        }
        return cli;
    }

    private static List<Class<RDDProcessor>> findProcessorClassesInPackage(String packageName) {
        ArrayList<Class<RDDProcessor>> streamProcClasses = new ArrayList<Class<RDDProcessor>>();
        try {
            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            String path = packageName.replace('.', '/');
            Enumeration<URL> resources = classLoader.getResources(path);
            TreeSet<String> classes = new TreeSet<String>();
            while (resources.hasMoreElements()) {
                URL resource = resources.nextElement();
                classes.addAll(SparkApp.findClasses(resource.getFile(), packageName));
            }
            for (String classInPackage : classes) {
                Class<?> theClass = classLoader.loadClass(classInPackage);
                if (!RDDProcessor.class.isAssignableFrom(theClass)) continue;
                streamProcClasses.add(theClass);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return streamProcClasses;
    }

    private static Set<String> findClasses(String path, String packageName) throws Exception {
        TreeSet<String> classes;
        block4: {
            block3: {
                ZipEntry entry;
                classes = new TreeSet<String>();
                if (!path.startsWith("file:") || !path.contains("!")) break block3;
                String[] split = path.split("!");
                URL jar = new URL(split[0]);
                ZipInputStream zip = new ZipInputStream(jar.openStream());
                while ((entry = zip.getNextEntry()) != null) {
                    String className;
                    if (!entry.getName().endsWith(".class") || !(className = entry.getName().replaceAll("[$].*", "").replaceAll("[.]class", "").replace('/', '.')).startsWith(packageName)) continue;
                    classes.add(className);
                }
                break block4;
            }
            File dir = new File(path);
            if (!dir.isDirectory()) break block4;
            String packagePath = packageName.replace('.', File.separatorChar);
            if (dir.getAbsolutePath().endsWith(packagePath)) {
                for (File file : dir.listFiles()) {
                    String className;
                    if (!file.getName().endsWith(".class") || (className = file.getName().replaceAll("[$].*", "").replaceAll("[.]class", "").replace('/', '.')).indexOf("$") != -1) continue;
                    classes.add(packageName + "." + className);
                }
            }
        }
        return classes;
    }

    public static Serializable assertSerializable(Object obj) {
        Serializable ser = (Serializable)obj;
        try {
            ser = SparkApp.bytes2ser(SparkApp.ser2bytes(ser));
        }
        catch (Exception exc) {
            throw new IllegalArgumentException("Object of type [" + obj.getClass().getName() + "] is not Serializable due to: " + exc);
        }
        return ser;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Serializable bytes2ser(byte[] objBytes) throws Exception {
        Serializable ser = null;
        ObjectInputStream ois = null;
        try {
            ois = new ObjectInputStream(new ByteArrayInputStream(objBytes));
            ser = (Serializable)ois.readObject();
        }
        finally {
            if (ois != null) {
                try {
                    ois.close();
                }
                catch (Exception exception) {}
            }
        }
        return ser;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static byte[] ser2bytes(Serializable obj) throws Exception {
        byte[] objBytes = null;
        ObjectOutputStream oos = null;
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            oos = new ObjectOutputStream(baos);
            oos.writeObject(obj);
            oos.flush();
            objBytes = baos.toByteArray();
        }
        finally {
            if (oos != null) {
                try {
                    oos.close();
                }
                catch (Exception exception) {}
            }
        }
        return objBytes;
    }

    public static abstract class StreamProcessor
    implements RDDProcessor {
        protected String zkHost;
        protected String collection;
        protected int batchSize;

        @Override
        public int run(SparkConf conf, CommandLine cli) throws Exception {
            this.zkHost = cli.getOptionValue("zkHost", "localhost:9983");
            this.collection = cli.getOptionValue("collection", "collection1");
            this.batchSize = Integer.parseInt(cli.getOptionValue("batchSize", "10"));
            int batchIntervalSecs = Integer.parseInt(cli.getOptionValue("batchInterval", "1"));
            JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration((long)batchIntervalSecs * 1000L));
            if (cli.hasOption("pipeline")) {
                File pipelineFile = new File(cli.getOptionValue("pipeline"));
                if (!pipelineFile.isFile()) {
                    throw new FileNotFoundException(pipelineFile.getAbsolutePath() + " not found!");
                }
                jssc.sparkContext().addFile(cli.getOptionValue("pipeline"));
            }
            this.setup(jssc, cli);
            jssc.start();
            jssc.awaitTermination();
            return 0;
        }

        public String getCollection() {
            return this.collection;
        }

        public String getZkHost() {
            return this.zkHost;
        }

        public int getBatchSize() {
            return this.batchSize;
        }

        public abstract void setup(JavaStreamingContext var1, CommandLine var2) throws Exception;
    }

    public static interface RDDProcessor
    extends Serializable {
        public String getName();

        public Option[] getOptions();

        public int run(SparkConf var1, CommandLine var2) throws Exception;
    }
}

