/*
 * Decompiled with CFR 0.152.
 */
package com.lucidworks.spark.port.example.events;

import com.lucidworks.spark.SparkApp;
import com.lucidworks.spark.fusion.FusionPipelineClient;
import com.lucidworks.spark.port.example.events.EventsimIndexer$;
import java.net.URL;
import java.util.Calendar;
import java.util.TimeZone;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\u00114A!\u0001\u0002\u0001\u001f\tyQI^3oiNLW.\u00138eKb,'O\u0003\u0002\u0004\t\u00051QM^3oiNT!!\u0002\u0004\u0002\u000f\u0015D\u0018-\u001c9mK*\u0011q\u0001C\u0001\u0005a>\u0014HO\u0003\u0002\n\u0015\u0005)1\u000f]1sW*\u00111\u0002D\u0001\u000bYV\u001c\u0017\u000eZ<pe.\u001c(\"A\u0007\u0002\u0007\r|Wn\u0001\u0001\u0014\u0007\u0001\u0001b\u0003\u0005\u0002\u0012)5\t!CC\u0001\u0014\u0003\u0015\u00198-\u00197b\u0013\t)\"C\u0001\u0004B]f\u0014VM\u001a\t\u0003/\rr!\u0001G\u0011\u000f\u0005e\u0001cB\u0001\u000e \u001d\tYb$D\u0001\u001d\u0015\tib\"\u0001\u0004=e>|GOP\u0005\u0002\u001b%\u00111\u0002D\u0005\u0003\u0013)I!A\t\u0005\u0002\u0011M\u0003\u0018M]6BaBL!\u0001J\u0013\u0003\u0019I#E\t\u0015:pG\u0016\u001c8o\u001c:\u000b\u0005\tB\u0001\"B\u0014\u0001\t\u0003A\u0013A\u0002\u001fj]&$h\bF\u0001*!\tQ\u0003!D\u0001\u0003\u0011\u001da\u0003A1A\u0005\u00025\n\u0001\u0003R#G\u0003VcEkX#O\tB{\u0015J\u0014+\u0016\u00039\u0002\"a\f\u001b\u000e\u0003AR!!\r\u001a\u0002\t1\fgn\u001a\u0006\u0002g\u0005!!.\u0019<b\u0013\t)\u0004G\u0001\u0004TiJLgn\u001a\u0005\u0007o\u0001\u0001\u000b\u0011\u0002\u0018\u0002#\u0011+e)Q+M)~+e\n\u0012)P\u0013:#\u0006\u0005C\u0003:\u0001\u0011\u0005!(A\u0004hKRt\u0015-\\3\u0015\u0003m\u0002\"\u0001P \u000f\u0005Ei\u0014B\u0001 \u0013\u0003\u0019\u0001&/\u001a3fM&\u0011Q\u0007\u0011\u0006\u0003}IAQA\u0011\u0001\u0005\u0002\r\u000b!bZ3u\u001fB$\u0018n\u001c8t)\u0005!\u0005cA\tF\u000f&\u0011aI\u0005\u0002\u0006\u0003J\u0014\u0018-\u001f\t\u0003\u0011Fk\u0011!\u0013\u0006\u0003\u0015.\u000b1a\u00197j\u0015\taU*A\u0004d_6lwN\\:\u000b\u00059{\u0015AB1qC\u000eDWMC\u0001Q\u0003\ry'oZ\u0005\u0003%&\u0013aa\u00149uS>t\u0007\"\u0002+\u0001\t\u0003)\u0016a\u0001:v]R\u0019a+\u00171\u0011\u0005E9\u0016B\u0001-\u0013\u0005\rIe\u000e\u001e\u0005\u00065N\u0003\raW\u0001\u0005G>tg\r\u0005\u0002]=6\tQL\u0003\u0002\n\u001b&\u0011q,\u0018\u0002\n'B\f'o[\"p]\u001aDQAS*A\u0002\u0005\u0004\"\u0001\u00132\n\u0005\rL%aC\"p[6\fg\u000e\u001a'j]\u0016\u0004")
public class EventsimIndexer
implements SparkApp.RDDProcessor {
    private final String DEFAULT_ENDPOINT;

    public String DEFAULT_ENDPOINT() {
        return this.DEFAULT_ENDPOINT;
    }

    @Override
    public String getName() {
        return "eventsim";
    }

    @Override
    public Option[] getOptions() {
        return (Option[])((Object[])new Option[]{Option.builder().hasArg().required(true).desc("Path to an eventsim JSON file").longOpt("eventsimJson").build(), Option.builder().hasArg().desc(new StringBuilder().append((Object)"Fusion endpoint(s); default is ").append((Object)this.DEFAULT_ENDPOINT()).toString()).longOpt("fusion").build(), Option.builder().hasArg().desc("Fusion username; default is admin").longOpt("fusionUser").build(), Option.builder().hasArg().desc("Fusion password; required if fusionAuthEnbled=true").longOpt("fusionPass").build(), Option.builder().hasArg().desc("Fusion security realm; default is native").longOpt("fusionRealm").build(), Option.builder().hasArg().desc("Fusion authentication enabled; default is true").longOpt("fusionAuthEnabled").build(), Option.builder().hasArg().desc("Fusion indexing batch size; default is 100").longOpt("fusionBatchSize").build()});
    }

    @Override
    public int run(SparkConf conf, CommandLine cli) {
        String fusionEndpoints = cli.getOptionValue("fusion", this.DEFAULT_ENDPOINT());
        boolean fusionAuthEnabled = "true".equalsIgnoreCase(cli.getOptionValue("fusionAuthEnabled", "true"));
        String fusionUser = cli.getOptionValue("fusionUser", "admin");
        String fusionPass = cli.getOptionValue("fusionPass");
        if (fusionAuthEnabled && (fusionPass == null || fusionPass.isEmpty())) {
            throw new IllegalArgumentException("Fusion password is required when authentication is enabled!");
        }
        String fusionRealm = cli.getOptionValue("fusionRealm", "native");
        int fusionBatchSize = new StringOps(Predef$.MODULE$.augmentString(cli.getOptionValue("fusionBatchSize", "100"))).toInt();
        String[] urls = (String[])Predef$.MODULE$.refArrayOps((Object[])fusionEndpoints.split(",")).distinct();
        URL url = new URL(urls[0]);
        String pipelinePath = url.getPath();
        SparkSession sparkSession = SparkSession$.MODULE$.builder().config(conf).getOrCreate();
        sparkSession.read().json(cli.getOptionValue("eventsimJson")).foreachPartition((Function1)new Serializable(this, fusionEndpoints, fusionAuthEnabled, fusionUser, fusionPass, fusionRealm, fusionBatchSize, pipelinePath){
            public static final long serialVersionUID = 0L;
            private final String fusionEndpoints$1;
            private final boolean fusionAuthEnabled$1;
            private final String fusionUser$1;
            private final String fusionPass$1;
            private final String fusionRealm$1;
            public final int fusionBatchSize$1;
            public final String pipelinePath$1;

            public final void apply(Iterator<Row> rows) {
                FusionPipelineClient fusion = this.fusionAuthEnabled$1 ? new FusionPipelineClient(this.fusionEndpoints$1, this.fusionUser$1, this.fusionPass$1, this.fusionRealm$1) : new FusionPipelineClient(this.fusionEndpoints$1);
                ListBuffer batch = new ListBuffer();
                rows.foreach((Function1)new Serializable(this, fusion, batch){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$run$1 $outer;
                    private final FusionPipelineClient fusion$1;
                    private final ListBuffer batch$1;

                    public final void apply(Row next) {
                        ObjectRef userId = ObjectRef.create((Object)"");
                        ObjectRef sessionId = ObjectRef.create((Object)"");
                        LongRef ts = LongRef.create((long)0L);
                        ListBuffer fields = new ListBuffer();
                        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), next.length() - 1).foreach((Function1)new Serializable(this, userId, sessionId, ts, fields, next){
                            public static final long serialVersionUID = 0L;
                            private final ObjectRef userId$1;
                            private final ObjectRef sessionId$1;
                            private final LongRef ts$1;
                            private final ListBuffer fields$1;
                            private final Row next$1;

                            public final Object apply(int c) {
                                BoxedUnit boxedUnit;
                                Object obj = this.next$1.get(c);
                                if (obj == null) {
                                    boxedUnit = BoxedUnit.UNIT;
                                } else {
                                    Object colValue = obj;
                                    String fieldName = this.next$1.schema().fieldNames()[c];
                                    if ("ts".equals(fieldName)) {
                                        this.ts$1.elem = BoxesRunTime.unboxToLong((Object)obj);
                                        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
                                        cal.setTimeInMillis(this.ts$1.elem);
                                        colValue = cal.getTime().toInstant().toString();
                                    } else if ("userId".equals(fieldName)) {
                                        this.userId$1.elem = obj.toString();
                                    } else if ("sessionId".equals(fieldName)) {
                                        this.sessionId$1.elem = obj.toString();
                                    }
                                    boxedUnit = this.fields$1.$plus$eq((Object)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"name"), (Object)fieldName), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"value"), colValue)})));
                                }
                                return boxedUnit;
                            }
                            {
                                this.userId$1 = userId$1;
                                this.sessionId$1 = sessionId$1;
                                this.ts$1 = ts$1;
                                this.fields$1 = fields$1;
                                this.next$1 = next$1;
                            }
                        });
                        this.batch$1.$plus$eq((Object)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"id"), (Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "-", "-", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{(String)userId.elem, (String)sessionId.elem, BoxesRunTime.boxToLong((long)ts.elem)}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"fields"), (Object)fields)})));
                        if (this.batch$1.size() == this.$outer.fusionBatchSize$1) {
                            this.fusion$1.postBatchToPipeline(this.$outer.pipelinePath$1, JavaConversions$.MODULE$.bufferAsJavaList((Buffer)this.batch$1));
                            this.batch$1.clear();
                        }
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.fusion$1 = fusion$1;
                        this.batch$1 = batch$1;
                    }
                });
                if (!batch.isEmpty()) {
                    fusion.postBatchToPipeline(this.pipelinePath$1, JavaConversions$.MODULE$.bufferAsJavaList((Buffer)batch));
                    batch.clear();
                }
            }
            {
                this.fusionEndpoints$1 = fusionEndpoints$1;
                this.fusionAuthEnabled$1 = fusionAuthEnabled$1;
                this.fusionUser$1 = fusionUser$1;
                this.fusionPass$1 = fusionPass$1;
                this.fusionRealm$1 = fusionRealm$1;
                this.fusionBatchSize$1 = fusionBatchSize$1;
                this.pipelinePath$1 = pipelinePath$1;
            }
        });
        sparkSession.stop();
        return 0;
    }

    public EventsimIndexer() {
        this.DEFAULT_ENDPOINT = "http://localhost:8764/api/apollo/index-pipelines/eventsim-default/collections/eventsim/index";
    }
}

