/*
 * Decompiled with CFR 0.152.
 */
package com.holdenkarau.spark.testing;

import com.holdenkarau.spark.testing.StreamingSuiteCommon;
import com.holdenkarau.spark.testing.StreamingSuiteCommon$;
import com.holdenkarau.spark.testing.TestInputStream;
import com.holdenkarau.spark.testing.TestOutputStream;
import com.holdenkarau.spark.testing.TestOutputStream$;
import com.holdenkarau.spark.testing.Utils$;
import java.io.File;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.TestStreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.util.TestManualClock;
import org.scalatest.concurrent.Eventually$;
import org.scalatest.time.Seconds$;
import org.scalatest.time.Span$;
import org.scalatest.time.Units;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

public abstract class StreamingSuiteCommon$class {
    public static TestInputStream createTestInputStream(StreamingSuiteCommon $this, SparkContext sc, TestStreamingContext ssc_, Seq input, ClassTag evidence$2) {
        return new TestInputStream(sc, ssc_, input, $this.numInputPartitions(), evidence$2);
    }

    public static Duration batchDuration(StreamingSuiteCommon $this) {
        return org.apache.spark.streaming.Seconds$.MODULE$.apply(1L);
    }

    public static String framework(StreamingSuiteCommon $this) {
        return $this.getClass().getSimpleName();
    }

    public static int numInputPartitions(StreamingSuiteCommon $this) {
        return 2;
    }

    public static int maxWaitTimeMillis(StreamingSuiteCommon $this) {
        return 10000;
    }

    public static boolean useManualClock(StreamingSuiteCommon $this) {
        return true;
    }

    public static boolean actuallyWait(StreamingSuiteCommon $this) {
        return false;
    }

    public static String master(StreamingSuiteCommon $this) {
        return "local[4]";
    }

    public static String checkpointDir(StreamingSuiteCommon $this) {
        File dir = Utils$.MODULE$.createTempDir(Utils$.MODULE$.createTempDir$default$1());
        $this.logDebug((Function0)new Serializable($this, dir){
            public static final long serialVersionUID = 0L;
            private final File dir$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"checkpointDir: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.dir$1}));
            }
            {
                this.dir$1 = dir$1;
            }
        });
        return dir.toString();
    }

    public static SparkConf conf(StreamingSuiteCommon $this) {
        return new SparkConf().setMaster($this.master()).setAppName($this.framework()).set("spark.driver.host", "localhost").set("spark.streaming.clock", "org.apache.spark.streaming.util.TestManualClock");
    }

    public static void withOutputAndStreamingContext(StreamingSuiteCommon $this, Tuple2 outputStreamSSC, Function2 block) {
        TestOutputStream outputStream = (TestOutputStream)outputStreamSSC._1();
        TestStreamingContext ssc = (TestStreamingContext)((Object)outputStreamSSC._2());
        try {
            ssc.start();
            block.apply((Object)outputStream, (Object)ssc);
        }
        catch (Throwable throwable) {
            try {
                ssc.stop(false);
                Thread.sleep(200L);
            }
            catch (Exception exception) {
                $this.logError((Function0)new Serializable($this){
                    public static final long serialVersionUID = 0L;

                    public final String apply() {
                        return "Error stopping StreamingContext";
                    }
                }, exception);
            }
            throw throwable;
        }
        try {
            ssc.stop(false);
            Thread.sleep(200L);
        }
        catch (Exception exception) {
            $this.logError((Function0)new /* invalid duplicate definition of identical inner class */, exception);
        }
    }

    public static Tuple2 setupStreams(StreamingSuiteCommon $this, Seq input, Function1 operation, ClassTag evidence$3, ClassTag evidence$4) {
        TestStreamingContext ssc = new TestStreamingContext($this.sc(), $this.batchDuration());
        if ($this.checkpointDir() != null) {
            ssc.checkpoint($this.checkpointDir());
        }
        TestInputStream inputStream = $this.createTestInputStream($this.sc(), ssc, input, evidence$3);
        DStream operatedStream = (DStream)operation.apply(inputStream);
        TestOutputStream outputStream = new TestOutputStream(operatedStream, TestOutputStream$.MODULE$.$lessinit$greater$default$2(), evidence$4);
        return new Tuple2(outputStream, (Object)ssc);
    }

    public static Tuple2 setupStreams(StreamingSuiteCommon $this, Seq input1, Seq input2, Function2 operation, ClassTag evidence$5, ClassTag evidence$6, ClassTag evidence$7) {
        TestStreamingContext ssc = new TestStreamingContext($this.sc(), $this.batchDuration());
        if ($this.checkpointDir() != null) {
            ssc.checkpoint($this.checkpointDir());
        }
        TestInputStream inputStream1 = $this.createTestInputStream($this.sc(), ssc, input1, evidence$5);
        TestInputStream inputStream2 = $this.createTestInputStream($this.sc(), ssc, input2, evidence$6);
        DStream operatedStream = (DStream)operation.apply(inputStream1, inputStream2);
        TestOutputStream outputStream = new TestOutputStream(operatedStream, TestOutputStream$.MODULE$.$lessinit$greater$default$2(), evidence$7);
        return new Tuple2(outputStream, (Object)ssc);
    }

    public static Tuple2 setupStreamAndRDD(StreamingSuiteCommon $this, Seq input1, Seq input2, Function2 operation, ClassTag evidence$8, ClassTag evidence$9, ClassTag evidence$10) {
        TestStreamingContext ssc = new TestStreamingContext($this.sc(), $this.batchDuration());
        if ($this.checkpointDir() != null) {
            ssc.checkpoint($this.checkpointDir());
        }
        TestInputStream inputStream1 = $this.createTestInputStream($this.sc(), ssc, input1, evidence$8);
        SparkContext qual$1 = $this.sc();
        Seq x$1 = input2;
        int x$2 = qual$1.parallelize$default$2();
        RDD inputRDD2 = qual$1.parallelize(x$1, x$2, evidence$9);
        DStream operatedStream = (DStream)operation.apply(inputStream1, (Object)inputRDD2);
        TestOutputStream outputStream = new TestOutputStream(operatedStream, TestOutputStream$.MODULE$.$lessinit$greater$default$2(), evidence$10);
        return new Tuple2(outputStream, (Object)ssc);
    }

    public static Seq runStreams(StreamingSuiteCommon $this, TestOutputStream outputStream, TestStreamingContext ssc, int numBatches, int numExpectedOutput, ClassTag evidence$11) {
        Predef$.MODULE$.assert(numBatches > 0, (Function0)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Number of batches to run stream computation is zero";
            }
        });
        Predef$.MODULE$.assert(numExpectedOutput > 0, (Function0)new Serializable($this, numBatches){
            public static final long serialVersionUID = 0L;
            private final int numBatches$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Number of expected outputs after ", " is zero"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.numBatches$1)}));
            }
            {
                this.numBatches$1 = numBatches$1;
            }
        });
        $this.logInfo((Function0)new Serializable($this, numBatches, numExpectedOutput){
            public static final long serialVersionUID = 0L;
            private final int numBatches$1;
            private final int numExpectedOutput$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"numBatches = ", ", numExpectedOutput = ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.numBatches$1), BoxesRunTime.boxToInteger((int)this.numExpectedOutput$1)}));
            }
            {
                this.numBatches$1 = numBatches$1;
                this.numExpectedOutput$1 = numExpectedOutput$1;
            }
        });
        ConcurrentLinkedQueue output = outputStream.output();
        TestManualClock clock = (TestManualClock)ssc.getScheduler().clock();
        $this.logInfo((Function0)new Serializable($this, clock){
            public static final long serialVersionUID = 0L;
            private final TestManualClock clock$1;

            public final String apply() {
                return new StringBuilder().append((Object)"Manual clock before advancing = ").append((Object)BoxesRunTime.boxToLong((long)this.clock$1.currentTime())).toString();
            }
            {
                this.clock$1 = clock$1;
            }
        });
        if ($this.actuallyWait()) {
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), numBatches).foreach$mVc$sp((Function1)new Serializable($this, clock){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ StreamingSuiteCommon $outer;
                private final TestManualClock clock$1;

                public final void apply(int i) {
                    this.apply$mcVI$sp(i);
                }

                public void apply$mcVI$sp(int i) {
                    this.$outer.logInfo((Function0)new Serializable(this){
                        public static final long serialVersionUID = 0L;
                        private final /* synthetic */ StreamingSuiteCommon$.anonfun.runStreams.1 $outer;

                        public final String apply() {
                            return new StringBuilder().append((Object)"Actually waiting for ").append((Object)this.$outer.com$holdenkarau$spark$testing$StreamingSuiteCommon$$anonfun$$$outer().batchDuration()).toString();
                        }
                        {
                            if ($outer == null) {
                                throw null;
                            }
                            this.$outer = $outer;
                        }
                    });
                    this.clock$1.addToTime(this.$outer.batchDuration().milliseconds());
                    Thread.sleep(this.$outer.batchDuration().milliseconds());
                }

                public /* synthetic */ StreamingSuiteCommon com$holdenkarau$spark$testing$StreamingSuiteCommon$$anonfun$$$outer() {
                    return this.$outer;
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                    this.clock$1 = clock$1;
                }
            });
        } else {
            clock.addToTime((long)numBatches * $this.batchDuration().milliseconds());
        }
        $this.logInfo((Function0)new Serializable($this, clock){
            public static final long serialVersionUID = 0L;
            private final TestManualClock clock$1;

            public final String apply() {
                return new StringBuilder().append((Object)"Manual clock after advancing = ").append((Object)BoxesRunTime.boxToLong((long)this.clock$1.currentTime())).toString();
            }
            {
                this.clock$1 = clock$1;
            }
        });
        long startTime = System.currentTimeMillis();
        while (output.size() < numExpectedOutput && System.currentTimeMillis() - startTime < (long)$this.maxWaitTimeMillis()) {
            $this.logInfo((Function0)new Serializable($this, output, numExpectedOutput){
                public static final long serialVersionUID = 0L;
                private final ConcurrentLinkedQueue output$1;
                private final int numExpectedOutput$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"output.size = ", ", expected = ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.output$1.size()), BoxesRunTime.boxToInteger((int)this.numExpectedOutput$1)}));
                }
                {
                    this.output$1 = output$1;
                    this.numExpectedOutput$1 = numExpectedOutput$1;
                }
            });
            ssc.awaitTerminationOrTimeout(50L);
        }
        long timeTaken = System.currentTimeMillis() - startTime;
        $this.logInfo((Function0)new Serializable($this, timeTaken){
            public static final long serialVersionUID = 0L;
            private final long timeTaken$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Output generated in ", " milliseconds"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)this.timeTaken$1)}));
            }
            {
                this.timeTaken$1 = timeTaken$1;
            }
        });
        ((TraversableOnce)JavaConverters$.MODULE$.collectionAsScalaIterableConverter(output).asScala()).toSeq().foreach((Function1)new Serializable($this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ StreamingSuiteCommon $outer;

            public final void apply(Seq<V> x) {
                this.$outer.logInfo((Function0)new Serializable(this, x){
                    public static final long serialVersionUID = 0L;
                    private final Seq x$3;

                    public final String apply() {
                        return new StringBuilder().append((Object)"[").append((Object)this.x$3.mkString(",")).append((Object)"]").toString();
                    }
                    {
                        this.x$3 = x$3;
                    }
                });
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
        BoxedUnit outputArray = BoxedUnit.UNIT;
        Predef$.MODULE$.assert(timeTaken < (long)$this.maxWaitTimeMillis(), (Function0)new Serializable($this, timeTaken){
            public static final long serialVersionUID = 0L;
            private final long timeTaken$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Operation timed out after ", " ms"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)this.timeTaken$1)}));
            }
            {
                this.timeTaken$1 = timeTaken$1;
            }
        });
        Thread.sleep(200L);
        return ((TraversableOnce)JavaConverters$.MODULE$.collectionAsScalaIterableConverter(output).asScala()).toSeq();
    }

    public static SparkConf setupClock(StreamingSuiteCommon $this) {
        SparkConf sparkConf;
        if ($this.useManualClock()) {
            $this.logInfo((Function0)new Serializable($this){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Using manual clock";
                }
            });
            sparkConf = $this.conf().set("spark.streaming.clock", "org.apache.spark.streaming.util.TestManualClock");
        } else {
            $this.logInfo((Function0)new Serializable($this){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Using real clock";
                }
            });
            sparkConf = $this.conf().set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock");
        }
        return sparkConf;
    }

    public static void $init$(StreamingSuiteCommon $this) {
        $this.com$holdenkarau$spark$testing$StreamingSuiteCommon$_setter_$eventuallyTimeout_$eq(Eventually$.MODULE$.timeout(Span$.MODULE$.apply(10L, (Units)Seconds$.MODULE$)));
    }
}

