/*
 * Decompiled with CFR 0.152.
 */
package com.holdenkarau.spark.testing;

import com.holdenkarau.spark.testing.BatchCountListener;
import com.holdenkarau.spark.testing.StreamingSuiteBase;
import com.holdenkarau.spark.testing.TestInputStream;
import java.io.Serializable;
import org.apache.spark.streaming.TestStreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.util.TestManualClock;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Prettifier$;
import org.scalactic.source.Position;
import org.scalatest.Assertions;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.collection.Seq;
import scala.reflect.ClassTag;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005ea!C\u0004\t!\u0003\r\t!EA\u0003\u0011\u0015a\u0002\u0001\"\u0001\u001e\u0011\u001d\t\u0003A1A\u0005\u0002\tBQA\n\u0001\u0005\u0002\u001dBQA\u0018\u0001\u0005\n}CQA\u001b\u0001\u0005\n-DQ!\u001f\u0001\u0005\ni\u00141c\u0015;sK\u0006l\u0017N\\4BGRLwN\u001c\"bg\u0016T!!\u0003\u0006\u0002\u000fQ,7\u000f^5oO*\u00111\u0002D\u0001\u0006gB\f'o\u001b\u0006\u0003\u001b9\t1\u0002[8mI\u0016t7.\u0019:bk*\tq\"A\u0002d_6\u001c\u0001aE\u0002\u0001%a\u0001\"a\u0005\f\u000e\u0003QQ\u0011!F\u0001\u0006g\u000e\fG.Y\u0005\u0003/Q\u0011a!\u00118z%\u00164\u0007CA\r\u001b\u001b\u0005A\u0011BA\u000e\t\u0005I\u0019FO]3b[&twmU;ji\u0016\u0014\u0015m]3\u0002\r\u0011Jg.\u001b;%)\u0005q\u0002CA\n \u0013\t\u0001CC\u0001\u0003V]&$\u0018A\u00052bi\u000eD7i\\;oi2K7\u000f^3oKJ,\u0012a\t\t\u00033\u0011J!!\n\u0005\u0003%\t\u000bGo\u00195D_VtG\u000fT5ti\u0016tWM]\u0001\neVt\u0017i\u0019;j_:,\"\u0001\u000b\u001b\u0015\u0007%jD\n\u0006\u0002\u001fU!91fAA\u0001\u0002\ba\u0013AC3wS\u0012,gnY3%cA\u0019Q\u0006\r\u001a\u000e\u00039R!a\f\u000b\u0002\u000fI,g\r\\3di&\u0011\u0011G\f\u0002\t\u00072\f7o\u001d+bOB\u00111\u0007\u000e\u0007\u0001\t\u0015)4A1\u00017\u0005\u0005)\u0016CA\u001c;!\t\u0019\u0002(\u0003\u0002:)\t9aj\u001c;iS:<\u0007CA\n<\u0013\taDCA\u0002B]fDQAP\u0002A\u0002}\nQ!\u001b8qkR\u00042\u0001\u0011%L\u001d\t\teI\u0004\u0002C\u000b6\t1I\u0003\u0002E!\u00051AH]8pizJ\u0011!F\u0005\u0003\u000fR\tq\u0001]1dW\u0006<W-\u0003\u0002J\u0015\n\u00191+Z9\u000b\u0005\u001d#\u0002c\u0001!Ie!)Qj\u0001a\u0001\u001d\u0006Iq\u000e]3sCRLwN\u001c\t\u0005'=\u000bf$\u0003\u0002Q)\tIa)\u001e8di&|g.\r\t\u0004%r\u0013T\"A*\u000b\u0005Q+\u0016a\u00023tiJ,\u0017-\u001c\u0006\u0003-^\u000b\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005-A&BA-[\u0003\u0019\t\u0007/Y2iK*\t1,A\u0002pe\u001eL!!X*\u0003\u000f\u0011\u001bFO]3b[\u0006!r/\u001b;i'R\u0014X-Y7j]\u001e\u001cuN\u001c;fqR$\"\u0001\u00195\u0015\u0005y\t\u0007\"\u00022\u0005\u0001\u0004\u0019\u0017!\u00022m_\u000e\\\u0007\u0003B\nPIz\u0001\"!\u001a4\u000e\u0003UK!aZ+\u0003)Q+7\u000f^*ue\u0016\fW.\u001b8h\u0007>tG/\u001a=u\u0011\u0015IG\u00011\u0001e\u0003=yW\u000f\u001e9viN#(/Z1n'N\u001b\u0015aC:fiV\u00048\u000b\u001e:fC6,\"\u0001\u001c:\u0015\u00075\u001ch\u000f\u0006\u0002e]\"9q.BA\u0001\u0002\b\u0001\u0018AC3wS\u0012,gnY3%eA\u0019Q\u0006M9\u0011\u0005M\u0012H!B\u001b\u0006\u0005\u00041\u0004\"\u0002 \u0006\u0001\u0004!\bc\u0001!IkB\u0019\u0001\tS9\t\u000b5+\u0001\u0019A<\u0011\tMy\u0005P\u000f\t\u0004%r\u000b\u0018a\u0004:v]\u0006\u001bG/[8o'R\u0014X-Y7\u0015\u0007yYX\u0010C\u0003}\r\u0001\u0007A-A\u0002tg\u000eDQA \u0004A\u0002}\f!B\\;n\u0005\u0006$8\r[3t!\r\u0019\u0012\u0011A\u0005\u0004\u0003\u0007!\"aA%oiJ1\u0011qAA\u0006\u0003\u001b1a!!\u0003\u0001\u0001\u0005\u0015!\u0001\u0004\u001fsK\u001aLg.Z7f]Rt\u0004CA\r\u0001!\u0011\ty!!\u0006\u000e\u0005\u0005E!bAA\n5\u0006I1oY1mCR,7\u000f^\u0005\u0005\u0003/\t\tBA\u0003Tk&$X\r")
public interface StreamingActionBase
extends StreamingSuiteBase {
    public void com$holdenkarau$spark$testing$StreamingActionBase$_setter_$batchCountListener_$eq(BatchCountListener var1);

    public BatchCountListener batchCountListener();

    public static /* synthetic */ void runAction$(StreamingActionBase $this, Seq input, Function1 operation, ClassTag evidence$1) {
        $this.runAction(input, operation, evidence$1);
    }

    default public <U> void runAction(Seq<Seq<U>> input, Function1<DStream<U>, BoxedUnit> operation, ClassTag<U> evidence$1) {
        int numBatches_ = input.size();
        this.withStreamingContext(this.setupStream(input, operation, evidence$1), (Function1<TestStreamingContext, BoxedUnit>)(Function1 & Serializable & scala.Serializable)ssc -> {
            this.runActionStream(ssc, numBatches_);
            return BoxedUnit.UNIT;
        });
    }

    private void withStreamingContext(TestStreamingContext outputStreamSSC, Function1<TestStreamingContext, BoxedUnit> block) {
        try {
            block.apply((Object)outputStreamSSC);
        }
        finally {
            try {
                outputStreamSSC.stop(false);
            }
            catch (Throwable e) {
                this.logError((Function0 & Serializable & scala.Serializable)() -> "Error stopping StreamingContext", e);
            }
        }
    }

    private <U> TestStreamingContext setupStream(Seq<Seq<U>> input, Function1<DStream<U>, Object> operation, ClassTag<U> evidence$2) {
        TestStreamingContext ssc = new TestStreamingContext(this.sc(), this.batchDuration());
        ssc.addStreamingListener(this.batchCountListener());
        if (this.checkpointDir() != null) {
            ssc.checkpoint(this.checkpointDir());
        }
        TestInputStream<U> inputStream = this.createTestInputStream(this.sc(), ssc, input, evidence$2);
        operation.apply(inputStream);
        return ssc;
    }

    private void runActionStream(TestStreamingContext ssc, int numBatches) {
        int $org_scalatest_assert_macro_left = numBatches;
        int $org_scalatest_assert_macro_right = 0;
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left > $org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
        ((Assertions)this).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"Number of batches to run stream computation is zero", Prettifier$.MODULE$.default(), new Position("StreamingActionBase.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 83));
        this.batchCountListener().batchCount_$eq(0);
        ssc.start();
        TestManualClock clock = (TestManualClock)ssc.getScheduler().clock();
        this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(32).append("Manual clock before advancing = ").append(clock.currentTime()).toString());
        if (this.actuallyWait()) {
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), numBatches).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(21).append("Actually waiting for ").append(this.batchDuration()).toString());
                clock.addToTime(this.batchDuration().milliseconds());
                Thread.sleep(this.batchDuration().milliseconds());
            });
        } else {
            clock.addToTime((long)numBatches * this.batchDuration().milliseconds());
        }
        this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(31).append("Manual clock after advancing = ").append(clock.currentTime()).toString());
        long startTime = System.currentTimeMillis();
        while (this.batchCountListener().batchCount() < numBatches && System.currentTimeMillis() - startTime < (long)this.maxWaitTimeMillis()) {
            this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(25).append("batches: run = ").append(this.batchCountListener().batchCount()).append(" ").append("target = ").append(numBatches).toString());
            ssc.awaitTerminationOrTimeout(50L);
        }
        long timeTaken = System.currentTimeMillis() - startTime;
        this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(33).append("Output generated in ").append(timeTaken).append(" milliseconds").toString());
        Thread.sleep(100L);
    }

    public static void $init$(StreamingActionBase $this) {
        $this.com$holdenkarau$spark$testing$StreamingActionBase$_setter_$batchCountListener_$eq(new BatchCountListener());
    }
}

