/*
 * Decompiled with CFR 0.152.
 */
package com.lucidworks.spark.example.query;

import com.lucidworks.spark.SparkApp;
import com.lucidworks.spark.rdd.SolrJavaRDD;
import com.lucidworks.spark.util.ConfigurationConstants;
import com.lucidworks.spark.util.PivotField;
import com.lucidworks.spark.util.SolrQuerySupport;
import com.lucidworks.spark.util.SolrSupport;
import java.sql.Timestamp;
import java.util.HashMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;

public class KMeansAnomaly
implements SparkApp.RDDProcessor {
    private static final String UID_FIELD = "clientip_s";
    private static final String TS_FIELD = "timestamp_tdt";

    @Override
    public String getName() {
        return "kmeans-anomaly";
    }

    @Override
    public Option[] getOptions() {
        Option[] optionArray = new Option[2];
        OptionBuilder.withArgName((String)"QUERY");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)false);
        OptionBuilder.withDescription((String)"URL encoded Solr query to send to Solr");
        optionArray[0] = OptionBuilder.create((String)"query");
        OptionBuilder.withArgName((String)"SQL");
        OptionBuilder.hasArg();
        OptionBuilder.isRequired((boolean)false);
        OptionBuilder.withDescription((String)"File containing a SQL query to execute to generate the aggregated data.");
        optionArray[1] = OptionBuilder.create((String)"aggregationSQL");
        return optionArray;
    }

    @Override
    public int run(SparkConf conf, CommandLine cli) throws Exception {
        String getLogsQuery = "+clientip_s:[* TO *] +timestamp_tdt:[* TO *] +bytes_s:[* TO *] +verb_s:[* TO *] +response_s:[* TO *]";
        String zkHost = cli.getOptionValue("zkHost", "localhost:9983");
        String collection = cli.getOptionValue("collection", "apache_logs");
        String queryStr = cli.getOptionValue("query", getLogsQuery);
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();
        JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());
        HashMap<String, String> options = new HashMap<String, String>();
        options.put("zkhost", zkHost);
        options.put("collection", collection);
        options.put("query", queryStr);
        options.put(ConfigurationConstants.SOLR_SPLIT_FIELD_PARAM(), "_version_");
        options.put(ConfigurationConstants.SOLR_SPLITS_PER_SHARD_PARAM(), "4");
        options.put(ConfigurationConstants.SOLR_FIELD_PARAM(), "id,_version_,clientip_s,timestamp_tdt,bytes_s,response_s,verb_s");
        Dataset logEvents = sparkSession.read().format("solr").options(options).load();
        PivotField[] pivotFields = new PivotField[]{new PivotField("verb_s", "http_method_"), new PivotField("response_s", "http_code_")};
        SolrJavaRDD solrRDD = SolrJavaRDD.get(zkHost, collection, jsc.sc());
        Dataset<Row> solrDataWithPivots = SolrQuerySupport.withPivotFields((Dataset<Row>)logEvents, pivotFields, solrRDD.rdd(), false);
        solrDataWithPivots.registerTempTable("logs");
        sparkSession.udf().register("ts2ms", (UDF1)new UDF1<Timestamp, Long>(){

            public Long call(Timestamp ts) throws Exception {
                return ts != null ? ts.getTime() : 0L;
            }
        }, DataTypes.LongType);
        long maxGapMs = 30000L;
        String lagWindowSpec = "(PARTITION BY clientip_s ORDER BY timestamp_tdt)";
        String lagSql = "SELECT *, sum(IF(diff_ms > " + maxGapMs + ", 1, 0)) OVER " + lagWindowSpec + " session_id FROM (SELECT *, ts2ms(" + TS_FIELD + ") - lag(ts2ms(" + TS_FIELD + ")) OVER " + lagWindowSpec + " as diff_ms FROM logs) tmp";
        Dataset userSessions = sparkSession.sql(lagSql);
        userSessions.createOrReplaceTempView("sessions");
        sparkSession.udf().register("asInt", (UDF1)new UDF1<String, Integer>(){

            public Integer call(String str) throws Exception {
                return str != null ? new Integer(str) : 0;
            }
        }, DataTypes.IntegerType);
        Dataset sessionsAgg = sparkSession.sql("SELECT   concat_ws('||', clientip_s,session_id) as id,          first(clientip_s) as clientip_s,          min(timestamp_tdt) as session_start_tdt,          max(timestamp_tdt) as session_end_tdt,          (ts2ms(max(timestamp_tdt)) - ts2ms(min(timestamp_tdt))) as session_len_ms_l,          sum(asInt(bytes_s)) as total_bytes_l,          count(*) as total_requests_l,          sum(http_method_get) as num_get_l,          sum(http_method_head) as num_head_l,          sum(http_method_post) as num_post_l    FROM sessions GROUP BY clientip_s,session_id");
        sessionsAgg.cache();
        sessionsAgg.printSchema();
        String aggCollection = collection + "_aggr";
        options = new HashMap();
        options.put("zkhost", zkHost);
        options.put("collection", aggCollection);
        sessionsAgg.write().format("solr").options(options).mode(SaveMode.Overwrite).save();
        SolrSupport.getCachedCloudClient(zkHost).commit(aggCollection);
        JavaRDD vectors = sessionsAgg.javaRDD().map((Function)new Function<Row, Vector>(){

            public Vector call(Row row) throws Exception {
                long sessionLenMs = row.getLong(row.fieldIndex("session_len_ms_l"));
                long totalBytes = row.getLong(row.fieldIndex("total_bytes_l"));
                long numGets = row.getLong(row.fieldIndex("num_get_l"));
                long numHeads = row.getLong(row.fieldIndex("num_head_l"));
                return Vectors.dense((double[])new double[]{sessionLenMs, totalBytes, numGets, numHeads});
            }
        });
        vectors.cache();
        int k = 8;
        int iterations = 20;
        KMeansModel clusters = KMeans.train((RDD)vectors.rdd(), (int)k, (int)iterations);
        double WSSSE = clusters.computeCost(vectors.rdd());
        System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
        jsc.stop();
        return 0;
    }
}

