/*
 * Decompiled with CFR 0.152.
 */
package com.holdenkarau.spark.testing;

import com.holdenkarau.spark.testing.StreamingSuiteBase;
import com.holdenkarau.spark.testing.StreamingSuiteBase$;
import com.holdenkarau.spark.testing.TestOutputStream;
import com.holdenkarau.spark.testing.Utils$;
import java.io.File;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.TestStreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.util.TestManualClock;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.TripleEquals;
import org.scalactic.TripleEqualsSupport;
import org.scalatest.Assertions;
import org.scalatest.concurrent.Eventually$;
import org.scalatest.time.Span$;
import org.scalatest.time.Units;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.IndexedSeqLike;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.Traversable;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.BufferLike;
import scala.collection.mutable.Queue;
import scala.collection.mutable.ResizableArray;
import scala.collection.mutable.StringBuilder;
import scala.collection.mutable.SynchronizedBuffer;
import scala.collection.script.Message;
import scala.reflect.ClassTag;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

public abstract class StreamingSuiteBase$class {
    public static String framework(StreamingSuiteBase $this) {
        return $this.getClass().getSimpleName();
    }

    public static String master(StreamingSuiteBase $this) {
        return "local[2]";
    }

    public static Duration batchDuration(StreamingSuiteBase $this) {
        return Seconds$.MODULE$.apply(1L);
    }

    public static String checkpointDir(StreamingSuiteBase $this) {
        File dir = Utils$.MODULE$.createTempDir(Utils$.MODULE$.createTempDir$default$1());
        $this.logDebug((Function0)new Serializable($this, dir){
            public static final long serialVersionUID = 0L;
            private final File dir$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"checkpointDir: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.dir$1}));
            }
            {
                this.dir$1 = dir$1;
            }
        });
        return dir.toString();
    }

    public static InputDStream createTestInputStream(StreamingSuiteBase $this, SparkContext sc, TestStreamingContext ssc_, Seq input, ClassTag evidence$2) {
        Queue rdds = (Queue)new Queue().$plus$plus$eq((TraversableOnce)input.map((Function1)new Serializable($this, sc, evidence$2){
            public static final long serialVersionUID = 0L;
            private final SparkContext sc$1;
            private final ClassTag evidence$2$1;

            public final RDD<T> apply(Seq<T> elems) {
                return this.sc$1.parallelize(elems, this.sc$1.parallelize$default$2(), this.evidence$2$1);
            }
            {
                this.sc$1 = sc$1;
                this.evidence$2$1 = evidence$2$1;
            }
        }, Seq$.MODULE$.canBuildFrom()));
        RDD defaultRDD = sc.parallelize((Seq)Seq$.MODULE$.apply((Seq)Nil$.MODULE$), sc.parallelize$default$2(), evidence$2);
        return ssc_.queueStream(rdds, true, defaultRDD, evidence$2);
    }

    public static int numInputPartitions(StreamingSuiteBase $this) {
        return 2;
    }

    public static int maxWaitTimeMillis(StreamingSuiteBase $this) {
        return 10000;
    }

    public static boolean useManualClock(StreamingSuiteBase $this) {
        return true;
    }

    public static boolean actuallyWait(StreamingSuiteBase $this) {
        return false;
    }

    public static void beforeAll(StreamingSuiteBase $this) {
        SparkConf sparkConf;
        if ($this.useManualClock()) {
            $this.logInfo((Function0)new Serializable($this){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Using manual clock";
                }
            });
            sparkConf = $this.conf().set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
        } else {
            $this.logInfo((Function0)new Serializable($this){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Using real clock";
                }
            });
            sparkConf = $this.conf().set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock");
        }
        $this.com$holdenkarau$spark$testing$StreamingSuiteBase$$super$beforeAll();
    }

    public static void afterAll(StreamingSuiteBase $this) {
        System.clearProperty("spark.streaming.clock");
        $this.com$holdenkarau$spark$testing$StreamingSuiteBase$$super$afterAll();
    }

    public static void withOutputAndStreamingContext(StreamingSuiteBase $this, Tuple2 outputStreamSSC, Function2 block) {
        TestOutputStream outputStream = (TestOutputStream)outputStreamSSC._1();
        TestStreamingContext ssc = (TestStreamingContext)((Object)outputStreamSSC._2());
        try {
            block.apply((Object)outputStream, (Object)ssc);
        }
        catch (Throwable throwable) {
            try {
                ssc.stop(true);
            }
            catch (Exception exception) {
                $this.logError((Function0)new Serializable($this){
                    public static final long serialVersionUID = 0L;

                    public final String apply() {
                        return "Error stopping StreamingContext";
                    }
                }, exception);
            }
            throw throwable;
        }
        try {
            ssc.stop(true);
        }
        catch (Exception exception) {
            $this.logError((Function0)new /* invalid duplicate definition of identical inner class */, exception);
        }
    }

    public static Tuple2 setupStreams(StreamingSuiteBase $this, Seq input, Function1 operation, int numPartitions, ClassTag evidence$3, ClassTag evidence$4) {
        TestStreamingContext ssc = new TestStreamingContext($this.sc(), $this.batchDuration());
        if ($this.checkpointDir() != null) {
            ssc.checkpoint($this.checkpointDir());
        }
        InputDStream inputStream = $this.createTestInputStream($this.sc(), ssc, input, evidence$3);
        DStream operatedStream = (DStream)operation.apply(inputStream);
        TestOutputStream outputStream = new TestOutputStream(operatedStream, new SynchronizedBuffer<Seq<V>>($this){

            public int scala$collection$mutable$SynchronizedBuffer$$super$length() {
                return ResizableArray.class.length((ResizableArray)this);
            }

            public Iterator scala$collection$mutable$SynchronizedBuffer$$super$iterator() {
                return IndexedSeqLike.class.iterator((IndexedSeqLike)this);
            }

            public Object scala$collection$mutable$SynchronizedBuffer$$super$apply(int n) {
                return ResizableArray.class.apply((ResizableArray)this, (int)n);
            }

            public SynchronizedBuffer scala$collection$mutable$SynchronizedBuffer$$super$$plus$eq(Object elem) {
                return (SynchronizedBuffer)super.$plus$eq(elem);
            }

            public Buffer scala$collection$mutable$SynchronizedBuffer$$super$$plus$plus(GenTraversableOnce xs) {
                return BufferLike.class.$plus$plus((Buffer)this, (GenTraversableOnce)xs);
            }

            public SynchronizedBuffer scala$collection$mutable$SynchronizedBuffer$$super$$plus$plus$eq(TraversableOnce xs) {
                return (SynchronizedBuffer)super.$plus$plus$eq(xs);
            }

            public void scala$collection$mutable$SynchronizedBuffer$$super$appendAll(TraversableOnce xs) {
                BufferLike.class.appendAll((Buffer)this, (TraversableOnce)xs);
            }

            public SynchronizedBuffer scala$collection$mutable$SynchronizedBuffer$$super$$plus$eq$colon(Object elem) {
                return (SynchronizedBuffer)super.$plus$eq$colon(elem);
            }

            public SynchronizedBuffer scala$collection$mutable$SynchronizedBuffer$$super$$plus$plus$eq$colon(TraversableOnce xs) {
                return (SynchronizedBuffer)super.$plus$plus$eq$colon(xs);
            }

            public void scala$collection$mutable$SynchronizedBuffer$$super$prependAll(TraversableOnce xs) {
                BufferLike.class.prependAll((Buffer)this, (TraversableOnce)xs);
            }

            public void scala$collection$mutable$SynchronizedBuffer$$super$insertAll(int n, Traversable elems) {
                super.insertAll(n, elems);
            }

            public void scala$collection$mutable$SynchronizedBuffer$$super$update(int n, Object newelem) {
                ResizableArray.class.update((ResizableArray)this, (int)n, (Object)newelem);
            }

            public Object scala$collection$mutable$SynchronizedBuffer$$super$remove(int n) {
                return super.remove(n);
            }

            public void scala$collection$mutable$SynchronizedBuffer$$super$clear() {
                super.clear();
            }

            public void scala$collection$mutable$SynchronizedBuffer$$super$$less$less(Message cmd) {
                BufferLike.class.$less$less((Buffer)this, (Message)cmd);
            }

            public Buffer scala$collection$mutable$SynchronizedBuffer$$super$clone() {
                return BufferLike.class.clone((Buffer)this);
            }

            public int scala$collection$mutable$SynchronizedBuffer$$super$hashCode() {
                return IndexedSeqLike.class.hashCode((IndexedSeqLike)this);
            }

            public int length() {
                return SynchronizedBuffer.class.length((SynchronizedBuffer)this);
            }

            public Iterator<Seq<V>> iterator() {
                return SynchronizedBuffer.class.iterator((SynchronizedBuffer)this);
            }

            public Seq<V> apply(int n) {
                return SynchronizedBuffer.class.apply((SynchronizedBuffer)this, (int)n);
            }

            /*
             * Ignored method signature, as it can't be verified against descriptor
             */
            public SynchronizedBuffer $plus$eq(Object elem) {
                return SynchronizedBuffer.class.$plus$eq((SynchronizedBuffer)this, (Object)elem);
            }

            public Buffer<Seq<V>> $plus$plus(GenTraversableOnce<Seq<V>> xs) {
                return SynchronizedBuffer.class.$plus$plus((SynchronizedBuffer)this, xs);
            }

            public SynchronizedBuffer<Seq<V>> $plus$plus$eq(TraversableOnce<Seq<V>> xs) {
                return SynchronizedBuffer.class.$plus$plus$eq((SynchronizedBuffer)this, xs);
            }

            public void append(Seq<Seq<V>> elems) {
                SynchronizedBuffer.class.append((SynchronizedBuffer)this, elems);
            }

            public void appendAll(TraversableOnce<Seq<V>> xs) {
                SynchronizedBuffer.class.appendAll((SynchronizedBuffer)this, xs);
            }

            /*
             * Ignored method signature, as it can't be verified against descriptor
             */
            public SynchronizedBuffer $plus$eq$colon(Object elem) {
                return SynchronizedBuffer.class.$plus$eq$colon((SynchronizedBuffer)this, (Object)elem);
            }

            public SynchronizedBuffer<Seq<V>> $plus$plus$eq$colon(TraversableOnce<Seq<V>> xs) {
                return SynchronizedBuffer.class.$plus$plus$eq$colon((SynchronizedBuffer)this, xs);
            }

            public void prepend(Seq<Seq<V>> elems) {
                SynchronizedBuffer.class.prepend((SynchronizedBuffer)this, elems);
            }

            public void prependAll(TraversableOnce<Seq<V>> xs) {
                SynchronizedBuffer.class.prependAll((SynchronizedBuffer)this, xs);
            }

            public void insert(int n, Seq<Seq<V>> elems) {
                SynchronizedBuffer.class.insert((SynchronizedBuffer)this, (int)n, elems);
            }

            public void insertAll(int n, Traversable<Seq<V>> xs) {
                SynchronizedBuffer.class.insertAll((SynchronizedBuffer)this, (int)n, xs);
            }

            /*
             * Ignored method signature, as it can't be verified against descriptor
             */
            public void update(int n, Object newelem) {
                SynchronizedBuffer.class.update((SynchronizedBuffer)this, (int)n, (Object)newelem);
            }

            public Seq<V> remove(int n) {
                return SynchronizedBuffer.class.remove((SynchronizedBuffer)this, (int)n);
            }

            public void clear() {
                SynchronizedBuffer.class.clear((SynchronizedBuffer)this);
            }

            public void $less$less(Message<Seq<V>> cmd) {
                SynchronizedBuffer.class.$less$less((SynchronizedBuffer)this, cmd);
            }

            public Buffer<Seq<V>> clone() {
                return SynchronizedBuffer.class.clone((SynchronizedBuffer)this);
            }

            public int hashCode() {
                return SynchronizedBuffer.class.hashCode((SynchronizedBuffer)this);
            }
            {
                SynchronizedBuffer.class.$init$((SynchronizedBuffer)this);
            }
        }, evidence$4);
        return new Tuple2(outputStream, (Object)ssc);
    }

    public static Tuple2 setupStreams(StreamingSuiteBase $this, Seq input1, Seq input2, Function2 operation, ClassTag evidence$5, ClassTag evidence$6, ClassTag evidence$7) {
        TestStreamingContext ssc = new TestStreamingContext($this.sc(), $this.batchDuration());
        if ($this.checkpointDir() != null) {
            ssc.checkpoint($this.checkpointDir());
        }
        InputDStream inputStream1 = $this.createTestInputStream($this.sc(), ssc, input1, evidence$5);
        InputDStream inputStream2 = $this.createTestInputStream($this.sc(), ssc, input2, evidence$6);
        DStream operatedStream = (DStream)operation.apply(inputStream1, inputStream2);
        TestOutputStream outputStream = new TestOutputStream(operatedStream, new SynchronizedBuffer<Seq<W>>($this){

            public int scala$collection$mutable$SynchronizedBuffer$$super$length() {
                return ResizableArray.class.length((ResizableArray)this);
            }

            public Iterator scala$collection$mutable$SynchronizedBuffer$$super$iterator() {
                return IndexedSeqLike.class.iterator((IndexedSeqLike)this);
            }

            public Object scala$collection$mutable$SynchronizedBuffer$$super$apply(int n) {
                return ResizableArray.class.apply((ResizableArray)this, (int)n);
            }

            public SynchronizedBuffer scala$collection$mutable$SynchronizedBuffer$$super$$plus$eq(Object elem) {
                return (SynchronizedBuffer)super.$plus$eq(elem);
            }

            public Buffer scala$collection$mutable$SynchronizedBuffer$$super$$plus$plus(GenTraversableOnce xs) {
                return BufferLike.class.$plus$plus((Buffer)this, (GenTraversableOnce)xs);
            }

            public SynchronizedBuffer scala$collection$mutable$SynchronizedBuffer$$super$$plus$plus$eq(TraversableOnce xs) {
                return (SynchronizedBuffer)super.$plus$plus$eq(xs);
            }

            public void scala$collection$mutable$SynchronizedBuffer$$super$appendAll(TraversableOnce xs) {
                BufferLike.class.appendAll((Buffer)this, (TraversableOnce)xs);
            }

            public SynchronizedBuffer scala$collection$mutable$SynchronizedBuffer$$super$$plus$eq$colon(Object elem) {
                return (SynchronizedBuffer)super.$plus$eq$colon(elem);
            }

            public SynchronizedBuffer scala$collection$mutable$SynchronizedBuffer$$super$$plus$plus$eq$colon(TraversableOnce xs) {
                return (SynchronizedBuffer)super.$plus$plus$eq$colon(xs);
            }

            public void scala$collection$mutable$SynchronizedBuffer$$super$prependAll(TraversableOnce xs) {
                BufferLike.class.prependAll((Buffer)this, (TraversableOnce)xs);
            }

            public void scala$collection$mutable$SynchronizedBuffer$$super$insertAll(int n, Traversable elems) {
                super.insertAll(n, elems);
            }

            public void scala$collection$mutable$SynchronizedBuffer$$super$update(int n, Object newelem) {
                ResizableArray.class.update((ResizableArray)this, (int)n, (Object)newelem);
            }

            public Object scala$collection$mutable$SynchronizedBuffer$$super$remove(int n) {
                return super.remove(n);
            }

            public void scala$collection$mutable$SynchronizedBuffer$$super$clear() {
                super.clear();
            }

            public void scala$collection$mutable$SynchronizedBuffer$$super$$less$less(Message cmd) {
                BufferLike.class.$less$less((Buffer)this, (Message)cmd);
            }

            public Buffer scala$collection$mutable$SynchronizedBuffer$$super$clone() {
                return BufferLike.class.clone((Buffer)this);
            }

            public int scala$collection$mutable$SynchronizedBuffer$$super$hashCode() {
                return IndexedSeqLike.class.hashCode((IndexedSeqLike)this);
            }

            public int length() {
                return SynchronizedBuffer.class.length((SynchronizedBuffer)this);
            }

            public Iterator<Seq<W>> iterator() {
                return SynchronizedBuffer.class.iterator((SynchronizedBuffer)this);
            }

            public Seq<W> apply(int n) {
                return SynchronizedBuffer.class.apply((SynchronizedBuffer)this, (int)n);
            }

            /*
             * Ignored method signature, as it can't be verified against descriptor
             */
            public SynchronizedBuffer $plus$eq(Object elem) {
                return SynchronizedBuffer.class.$plus$eq((SynchronizedBuffer)this, (Object)elem);
            }

            public Buffer<Seq<W>> $plus$plus(GenTraversableOnce<Seq<W>> xs) {
                return SynchronizedBuffer.class.$plus$plus((SynchronizedBuffer)this, xs);
            }

            public SynchronizedBuffer<Seq<W>> $plus$plus$eq(TraversableOnce<Seq<W>> xs) {
                return SynchronizedBuffer.class.$plus$plus$eq((SynchronizedBuffer)this, xs);
            }

            public void append(Seq<Seq<W>> elems) {
                SynchronizedBuffer.class.append((SynchronizedBuffer)this, elems);
            }

            public void appendAll(TraversableOnce<Seq<W>> xs) {
                SynchronizedBuffer.class.appendAll((SynchronizedBuffer)this, xs);
            }

            /*
             * Ignored method signature, as it can't be verified against descriptor
             */
            public SynchronizedBuffer $plus$eq$colon(Object elem) {
                return SynchronizedBuffer.class.$plus$eq$colon((SynchronizedBuffer)this, (Object)elem);
            }

            public SynchronizedBuffer<Seq<W>> $plus$plus$eq$colon(TraversableOnce<Seq<W>> xs) {
                return SynchronizedBuffer.class.$plus$plus$eq$colon((SynchronizedBuffer)this, xs);
            }

            public void prepend(Seq<Seq<W>> elems) {
                SynchronizedBuffer.class.prepend((SynchronizedBuffer)this, elems);
            }

            public void prependAll(TraversableOnce<Seq<W>> xs) {
                SynchronizedBuffer.class.prependAll((SynchronizedBuffer)this, xs);
            }

            public void insert(int n, Seq<Seq<W>> elems) {
                SynchronizedBuffer.class.insert((SynchronizedBuffer)this, (int)n, elems);
            }

            public void insertAll(int n, Traversable<Seq<W>> xs) {
                SynchronizedBuffer.class.insertAll((SynchronizedBuffer)this, (int)n, xs);
            }

            /*
             * Ignored method signature, as it can't be verified against descriptor
             */
            public void update(int n, Object newelem) {
                SynchronizedBuffer.class.update((SynchronizedBuffer)this, (int)n, (Object)newelem);
            }

            public Seq<W> remove(int n) {
                return SynchronizedBuffer.class.remove((SynchronizedBuffer)this, (int)n);
            }

            public void clear() {
                SynchronizedBuffer.class.clear((SynchronizedBuffer)this);
            }

            public void $less$less(Message<Seq<W>> cmd) {
                SynchronizedBuffer.class.$less$less((SynchronizedBuffer)this, cmd);
            }

            public Buffer<Seq<W>> clone() {
                return SynchronizedBuffer.class.clone((SynchronizedBuffer)this);
            }

            public int hashCode() {
                return SynchronizedBuffer.class.hashCode((SynchronizedBuffer)this);
            }
            {
                SynchronizedBuffer.class.$init$((SynchronizedBuffer)this);
            }
        }, evidence$7);
        return new Tuple2(outputStream, (Object)ssc);
    }

    public static int setupStreams$default$3(StreamingSuiteBase $this) {
        return $this.numInputPartitions();
    }

    public static Seq runStreams(StreamingSuiteBase $this, TestOutputStream outputStream, TestStreamingContext ssc, int numBatches, int numExpectedOutput, ClassTag evidence$8) {
        int $org_scalatest_assert_macro_left = numBatches;
        int $org_scalatest_assert_macro_right = 0;
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left > $org_scalatest_assert_macro_right);
        ((Assertions)$this).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"Number of batches to run stream computation is zero");
        int $org_scalatest_assert_macro_left2 = numExpectedOutput;
        int $org_scalatest_assert_macro_right2 = 0;
        Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left2), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left2 > $org_scalatest_assert_macro_right2);
        ((Assertions)$this).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)new StringBuilder().append((Object)"Number of expected outputs after ").append((Object)BoxesRunTime.boxToInteger((int)numBatches)).append((Object)" is zero").toString());
        $this.logInfo((Function0)new Serializable($this, numBatches, numExpectedOutput){
            public static final long serialVersionUID = 0L;
            private final int numBatches$1;
            private final int numExpectedOutput$1;

            public final String apply() {
                return new StringBuilder().append((Object)"numBatches = ").append((Object)BoxesRunTime.boxToInteger((int)this.numBatches$1)).append((Object)", numExpectedOutput = ").append((Object)BoxesRunTime.boxToInteger((int)this.numExpectedOutput$1)).toString();
            }
            {
                this.numBatches$1 = numBatches$1;
                this.numExpectedOutput$1 = numExpectedOutput$1;
            }
        });
        ArrayBuffer output = outputStream.output();
        try {
            ssc.start();
            TestManualClock clock = new TestManualClock(ssc.getScheduler().clock());
            $this.logInfo((Function0)new Serializable($this, clock){
                public static final long serialVersionUID = 0L;
                private final TestManualClock clock$1;

                public final String apply() {
                    return new StringBuilder().append((Object)"Manual clock before advancing = ").append((Object)BoxesRunTime.boxToLong((long)this.clock$1.currentTime())).toString();
                }
                {
                    this.clock$1 = clock$1;
                }
            });
            if ($this.actuallyWait()) {
                RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), numBatches).foreach$mVc$sp((Function1)new Serializable($this, clock){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ StreamingSuiteBase $outer;
                    private final TestManualClock clock$1;

                    public final void apply(int i) {
                        this.apply$mcVI$sp(i);
                    }

                    public void apply$mcVI$sp(int i) {
                        this.$outer.logInfo((Function0)new Serializable(this){
                            public static final long serialVersionUID = 0L;
                            private final /* synthetic */ StreamingSuiteBase$.anonfun.runStreams.1 $outer;

                            public final String apply() {
                                return new StringBuilder().append((Object)"Actually waiting for ").append((Object)this.$outer.com$holdenkarau$spark$testing$StreamingSuiteBase$$anonfun$$$outer().batchDuration()).toString();
                            }
                            {
                                if ($outer == null) {
                                    throw new NullPointerException();
                                }
                                this.$outer = $outer;
                            }
                        });
                        this.clock$1.addToTime().apply$mcVJ$sp(this.$outer.batchDuration().milliseconds());
                        Thread.sleep(this.$outer.batchDuration().milliseconds());
                    }

                    public /* synthetic */ StreamingSuiteBase com$holdenkarau$spark$testing$StreamingSuiteBase$$anonfun$$$outer() {
                        return this.$outer;
                    }
                    {
                        if ($outer == null) {
                            throw new NullPointerException();
                        }
                        this.$outer = $outer;
                        this.clock$1 = clock$1;
                    }
                });
            } else {
                clock.addToTime().apply$mcVJ$sp((long)numBatches * $this.batchDuration().milliseconds());
            }
            $this.logInfo((Function0)new Serializable($this, clock){
                public static final long serialVersionUID = 0L;
                private final TestManualClock clock$1;

                public final String apply() {
                    return new StringBuilder().append((Object)"Manual clock after advancing = ").append((Object)BoxesRunTime.boxToLong((long)this.clock$1.currentTime())).toString();
                }
                {
                    this.clock$1 = clock$1;
                }
            });
            long startTime = System.currentTimeMillis();
            while (output.size() < numExpectedOutput && System.currentTimeMillis() - startTime < (long)$this.maxWaitTimeMillis()) {
                $this.logInfo((Function0)new Serializable($this, output, numExpectedOutput){
                    public static final long serialVersionUID = 0L;
                    private final ArrayBuffer output$1;
                    private final int numExpectedOutput$1;

                    public final String apply() {
                        return new StringBuilder().append((Object)"output.size = ").append((Object)BoxesRunTime.boxToInteger((int)this.output$1.size())).append((Object)", numExpectedOutput = ").append((Object)BoxesRunTime.boxToInteger((int)this.numExpectedOutput$1)).toString();
                    }
                    {
                        this.output$1 = output$1;
                        this.numExpectedOutput$1 = numExpectedOutput$1;
                    }
                });
                ssc.awaitTermination(50L);
            }
            long timeTaken = System.currentTimeMillis() - startTime;
            $this.logInfo((Function0)new Serializable($this, timeTaken){
                public static final long serialVersionUID = 0L;
                private final long timeTaken$1;

                public final String apply() {
                    return new StringBuilder().append((Object)"Output generated in ").append((Object)BoxesRunTime.boxToLong((long)this.timeTaken$1)).append((Object)" milliseconds").toString();
                }
                {
                    this.timeTaken$1 = timeTaken$1;
                }
            });
            output.foreach((Function1)new Serializable($this){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ StreamingSuiteBase $outer;

                public final void apply(Seq<V> x) {
                    this.$outer.logInfo((Function0)new Serializable(this, x){
                        public static final long serialVersionUID = 0L;
                        private final Seq x$1;

                        public final String apply() {
                            return new StringBuilder().append((Object)"[").append((Object)this.x$1.mkString(",")).append((Object)"]").toString();
                        }
                        {
                            this.x$1 = x$1;
                        }
                    });
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                }
            });
            long $org_scalatest_assert_macro_left3 = timeTaken;
            int $org_scalatest_assert_macro_right3 = $this.maxWaitTimeMillis();
            Bool $org_scalatest_assert_macro_expr3 = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_left3), "<", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right3), $org_scalatest_assert_macro_left3 < (long)$org_scalatest_assert_macro_right3);
            ((Assertions)$this).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr3, (Object)new StringBuilder().append((Object)"Operation timed out after ").append((Object)BoxesRunTime.boxToLong((long)timeTaken)).append((Object)" ms").toString());
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left4 = ((TripleEquals)$this).convertToEqualizer((Object)BoxesRunTime.boxToInteger((int)output.size()));
            int $org_scalatest_assert_macro_right4 = numExpectedOutput;
            Bool $org_scalatest_assert_macro_expr4 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left4, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right4), $org_scalatest_assert_macro_left4.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right4), Equality$.MODULE$.default()));
            ((Assertions)$this).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr4, (Object)"Unexpected number of outputs generated");
            Thread.sleep(100L);
            return output.toSeq();
        }
        finally {
            ssc.stop(true);
        }
    }

    public static void verifyOutput(StreamingSuiteBase $this, Seq output, Seq expectedOutput, boolean useSet, ClassTag evidence$9) {
        $this.logInfo((Function0)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "--------------------------------";
            }
        });
        $this.logInfo((Function0)new Serializable($this, output){
            public static final long serialVersionUID = 0L;
            private final Seq output$2;

            public final String apply() {
                return new StringBuilder().append((Object)"output.size = ").append((Object)BoxesRunTime.boxToInteger((int)this.output$2.size())).toString();
            }
            {
                this.output$2 = output$2;
            }
        });
        $this.logInfo((Function0)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "output";
            }
        });
        output.foreach((Function1)new Serializable($this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ StreamingSuiteBase $outer;

            public final void apply(Seq<V> x) {
                this.$outer.logInfo((Function0)new Serializable(this, x){
                    public static final long serialVersionUID = 0L;
                    private final Seq x$2;

                    public final String apply() {
                        return new StringBuilder().append((Object)"[").append((Object)this.x$2.mkString(",")).append((Object)"]").toString();
                    }
                    {
                        this.x$2 = x$2;
                    }
                });
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        });
        $this.logInfo((Function0)new Serializable($this, expectedOutput){
            public static final long serialVersionUID = 0L;
            private final Seq expectedOutput$1;

            public final String apply() {
                return new StringBuilder().append((Object)"expected output.size = ").append((Object)BoxesRunTime.boxToInteger((int)this.expectedOutput$1.size())).toString();
            }
            {
                this.expectedOutput$1 = expectedOutput$1;
            }
        });
        $this.logInfo((Function0)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "expected output";
            }
        });
        expectedOutput.foreach((Function1)new Serializable($this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ StreamingSuiteBase $outer;

            public final void apply(Seq<V> x) {
                this.$outer.logInfo((Function0)new Serializable(this, x){
                    public static final long serialVersionUID = 0L;
                    private final Seq x$3;

                    public final String apply() {
                        return new StringBuilder().append((Object)"[").append((Object)this.x$3.mkString(",")).append((Object)"]").toString();
                    }
                    {
                        this.x$3 = x$3;
                    }
                });
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        });
        $this.logInfo((Function0)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "--------------------------------";
            }
        });
        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = ((TripleEquals)$this).convertToEqualizer((Object)BoxesRunTime.boxToInteger((int)output.size()));
        int $org_scalatest_assert_macro_right = expectedOutput.size();
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()));
        ((Assertions)$this).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"Number of outputs do not match");
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), output.size()).foreach$mVc$sp((Function1)new Serializable($this, output, expectedOutput, useSet){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ StreamingSuiteBase $outer;
            private final Seq output$2;
            private final Seq expectedOutput$1;
            private final boolean useSet$1;

            public final void apply(int i) {
                this.apply$mcVI$sp(i);
            }

            public void apply$mcVI$sp(int i) {
                if (this.useSet$1) {
                    TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = ((TripleEquals)this.$outer).convertToEqualizer((Object)((TraversableOnce)this.output$2.apply(i)).toSet());
                    Set $org_scalatest_assert_macro_right = ((TraversableOnce)this.expectedOutput$1.apply(i)).toSet();
                    Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()));
                    ((Assertions)this.$outer).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"");
                } else {
                    TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = ((TripleEquals)this.$outer).convertToEqualizer((Object)((TraversableOnce)this.output$2.apply(i)).toList());
                    List $org_scalatest_assert_macro_right = ((TraversableOnce)this.expectedOutput$1.apply(i)).toList();
                    Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)$org_scalatest_assert_macro_right, $org_scalatest_assert_macro_left.$eq$eq$eq((Object)$org_scalatest_assert_macro_right, Equality$.MODULE$.default()));
                    ((Assertions)this.$outer).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"");
                }
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.output$2 = output$2;
                this.expectedOutput$1 = expectedOutput$1;
                this.useSet$1 = useSet$1;
            }
        });
        $this.logInfo((Function0)new Serializable($this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Output verified successfully";
            }
        });
    }

    public static void testOperation(StreamingSuiteBase $this, Seq input, Function1 operation, Seq expectedOutput, boolean useSet, ClassTag evidence$10, ClassTag evidence$11) {
        $this.testOperation(input, operation, expectedOutput, -1, useSet, evidence$10, evidence$11);
    }

    public static void testOperation(StreamingSuiteBase $this, Seq input, Function1 operation, Seq expectedOutput, int numBatches, boolean useSet, ClassTag evidence$12, ClassTag evidence$13) {
        int numBatches_ = numBatches > 0 ? numBatches : expectedOutput.size();
        $this.withOutputAndStreamingContext($this.setupStreams(input, operation, $this.setupStreams$default$3(), evidence$12, evidence$13), new Serializable($this, numBatches_, expectedOutput, useSet, evidence$13){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ StreamingSuiteBase $outer;
            private final int numBatches_$1;
            private final Seq expectedOutput$2;
            private final boolean useSet$2;
            private final ClassTag evidence$13$1;

            public final void apply(TestOutputStream<V> outputStream, TestStreamingContext ssc) {
                Seq<Seq<V>> output = this.$outer.runStreams(outputStream, ssc, this.numBatches_$1, this.expectedOutput$2.size(), this.evidence$13$1);
                this.$outer.verifyOutput(output, this.expectedOutput$2, this.useSet$2, this.evidence$13$1);
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.numBatches_$1 = numBatches_$1;
                this.expectedOutput$2 = expectedOutput$2;
                this.useSet$2 = useSet$2;
                this.evidence$13$1 = evidence$13$1;
            }
        });
        BoxedUnit output = BoxedUnit.UNIT;
    }

    public static void testOperation(StreamingSuiteBase $this, Seq input1, Seq input2, Function2 operation, Seq expectedOutput, boolean useSet, ClassTag evidence$14, ClassTag evidence$15, ClassTag evidence$16) {
        $this.testOperation(input1, input2, operation, expectedOutput, -1, useSet, evidence$14, evidence$15, evidence$16);
    }

    public static void testOperation(StreamingSuiteBase $this, Seq input1, Seq input2, Function2 operation, Seq expectedOutput, int numBatches, boolean useSet, ClassTag evidence$17, ClassTag evidence$18, ClassTag evidence$19) {
        int numBatches_ = numBatches > 0 ? numBatches : expectedOutput.size();
        $this.withOutputAndStreamingContext($this.setupStreams(input1, input2, operation, evidence$17, evidence$18, evidence$19), new Serializable($this, numBatches_, expectedOutput, useSet, evidence$19){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ StreamingSuiteBase $outer;
            private final int numBatches_$2;
            private final Seq expectedOutput$3;
            private final boolean useSet$3;
            private final ClassTag evidence$19$1;

            public final void apply(TestOutputStream<W> outputStream, TestStreamingContext ssc) {
                Seq<Seq<W>> output = this.$outer.runStreams(outputStream, ssc, this.numBatches_$2, this.expectedOutput$3.size(), this.evidence$19$1);
                this.$outer.verifyOutput(output, this.expectedOutput$3, this.useSet$3, this.evidence$19$1);
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.numBatches_$2 = numBatches_$2;
                this.expectedOutput$3 = expectedOutput$3;
                this.useSet$3 = useSet$3;
                this.evidence$19$1 = evidence$19$1;
            }
        });
    }

    public static boolean testOperation$default$4(StreamingSuiteBase $this) {
        return false;
    }

    public static void $init$(StreamingSuiteBase $this) {
        $this.com$holdenkarau$spark$testing$StreamingSuiteBase$_setter_$conf_$eq(new SparkConf().setMaster($this.master()).setAppName($this.framework()));
        $this.com$holdenkarau$spark$testing$StreamingSuiteBase$_setter_$eventuallyTimeout_$eq(Eventually$.MODULE$.timeout(Span$.MODULE$.apply(10L, (Units)org.scalatest.time.Seconds$.MODULE$)));
    }
}

