package org.apache.spark.streaming.util;

import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.Logging;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Milliseconds$;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.StreamingContext$;
import org.apache.spark.streaming.dstream.DStream;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Range;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.runtime.BooleanRef;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong;
import scala.util.Random$;

/* compiled from: MasterFailureTest.scala */
/* loaded from: input_file:org/apache/spark/streaming/util/MasterFailureTest$.class */
public final class MasterFailureTest$ implements Logging {
    public static final MasterFailureTest$ MODULE$ = null;
    private volatile boolean killed;
    private volatile int killCount;
    private volatile boolean setupCalled;
    private transient Logger org$apache$spark$Logging$$log_;

    static {
        new MasterFailureTest$();
    }

    public Logger org$apache$spark$Logging$$log_() {
        return this.org$apache$spark$Logging$$log_;
    }

    public void org$apache$spark$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$Logging$$log_ = logger;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public boolean killed() {
        return this.killed;
    }

    public void killed_$eq(boolean z) {
        this.killed = z;
    }

    public int killCount() {
        return this.killCount;
    }

    public void killCount_$eq(int i) {
        this.killCount = i;
    }

    public boolean setupCalled() {
        return this.setupCalled;
    }

    public void setupCalled_$eq(boolean z) {
        this.setupCalled = z;
    }

    public void main(String[] strArr) {
        if (Predef$.MODULE$.refArrayOps(strArr).size() < 2) {
            Predef$.MODULE$.println("Usage: MasterFailureTest <local/HDFS directory> <# batches> [<batch size in milliseconds>]");
            System.exit(1);
        }
        String str = strArr[0];
        int i = new StringOps(Predef$.MODULE$.augmentString(strArr[1])).toInt();
        Duration apply = Predef$.MODULE$.refArrayOps(strArr).size() > 2 ? Milliseconds$.MODULE$.apply(new StringOps(Predef$.MODULE$.augmentString(strArr[2])).toInt()) : Seconds$.MODULE$.apply(1L);
        Predef$.MODULE$.println("\n\n========================= MAP TEST =========================\n\n");
        testMap(str, i, apply);
        Predef$.MODULE$.println("\n\n================= UPDATE-STATE-BY-KEY TEST =================\n\n");
        testUpdateStateByKey(str, i, apply);
        Predef$.MODULE$.println("\n\nSUCCESS\n\n");
    }

    public void testMap(String str, int i, Duration duration) {
        Seq seq = ((Seq) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), i).map(new MasterFailureTest$$anonfun$2(), IndexedSeq$.MODULE$.canBuildFrom())).toSeq();
        Range.Inclusive inclusive = RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), i);
        scala.collection.Seq testOperation = testOperation(str, duration, seq, new MasterFailureTest$$anonfun$3(), inclusive, ClassTag$.MODULE$.Int());
        logInfo(new MasterFailureTest$$anonfun$testMap$1(inclusive));
        logInfo(new MasterFailureTest$$anonfun$testMap$2(inclusive));
        logInfo(new MasterFailureTest$$anonfun$testMap$3(testOperation));
        logInfo(new MasterFailureTest$$anonfun$testMap$4(testOperation));
        Predef$ predef$ = Predef$.MODULE$;
        Set set = ((TraversableOnce) testOperation.distinct()).toSet();
        Set set2 = inclusive.toSet();
        predef$.assert(set != null ? set.equals(set2) : set2 == null);
    }

    public void testUpdateStateByKey(String str, int i, Duration duration) {
        Seq seq = ((Seq) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), i).map(new MasterFailureTest$$anonfun$4(), IndexedSeq$.MODULE$.canBuildFrom())).toSeq();
        IndexedSeq indexedSeq = (IndexedSeq) ((TraversableLike) new RichLong(Predef$.MODULE$.longWrapper(1L)).to(BoxesRunTime.boxToLong(i)).map(new MasterFailureTest$$anonfun$1(), IndexedSeq$.MODULE$.canBuildFrom())).map(new MasterFailureTest$$anonfun$5(), IndexedSeq$.MODULE$.canBuildFrom());
        scala.collection.Seq testOperation = testOperation(str, duration, seq, new MasterFailureTest$$anonfun$6(duration), indexedSeq, ClassTag$.MODULE$.apply(Tuple2.class));
        logInfo(new MasterFailureTest$$anonfun$testUpdateStateByKey$1(indexedSeq));
        logInfo(new MasterFailureTest$$anonfun$testUpdateStateByKey$2(testOperation));
        testOperation.foreach(new MasterFailureTest$$anonfun$testUpdateStateByKey$3(indexedSeq));
        Predef$.MODULE$.assert(BoxesRunTime.equals(testOperation.last(), indexedSeq.last()));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T> scala.collection.Seq<T> testOperation(String str, Duration duration, scala.collection.Seq<String> seq, Function1<DStream<String>, DStream<T>> function1, scala.collection.Seq<T> seq2, ClassTag<T> classTag) {
        Predef$ predef$ = Predef$.MODULE$;
        Set set = ((TraversableOnce) seq2.distinct()).toSet();
        Set set2 = seq2.toSet();
        predef$.assert(set != null ? set.equals(set2) : set2 == null);
        reset();
        Path path = new Path(str, UUID.randomUUID().toString());
        FileSystem fileSystem = path.getFileSystem(new Configuration());
        Path path2 = new Path(path, "checkpoint");
        Path path3 = new Path(path, "test");
        fileSystem.mkdirs(path2);
        fileSystem.mkdirs(path3);
        StreamingContext orCreate = StreamingContext$.MODULE$.getOrCreate(path2.toString(), new MasterFailureTest$$anonfun$8(duration, function1, classTag, path2, path3), StreamingContext$.MODULE$.getOrCreate$default$3(), StreamingContext$.MODULE$.getOrCreate$default$4());
        Predef$.MODULE$.assert(setupCalled(), new MasterFailureTest$$anonfun$testOperation$1());
        FileGeneratingThread fileGeneratingThread = new FileGeneratingThread(seq, path3, duration.milliseconds());
        fileGeneratingThread.start();
        scala.collection.Seq<T> runStreams = runStreams(orCreate, seq2.last(), seq2.size() * duration.milliseconds() * 2, classTag);
        fileGeneratingThread.join();
        fileSystem.delete(path2, true);
        fileSystem.delete(path3, true);
        logInfo(new MasterFailureTest$$anonfun$testOperation$2());
        return runStreams;
    }

    public <T> StreamingContext org$apache$spark$streaming$util$MasterFailureTest$$setupStreams(Duration duration, Function1<DStream<String>, DStream<T>> function1, Path path, Path path2, ClassTag<T> classTag) {
        setupCalled_$eq(true);
        StreamingContext streamingContext = new StreamingContext("local[4]", "MasterFailureTest", duration, null, Nil$.MODULE$, Predef$.MODULE$.Map().apply(Nil$.MODULE$));
        streamingContext.checkpoint(path.toString());
        new TestOutputStream((DStream) function1.apply(streamingContext.textFileStream(path2.toString())), TestOutputStream$.MODULE$.$lessinit$greater$default$2(), classTag).register();
        return streamingContext;
    }

    private <T> scala.collection.Seq<T> runStreams(StreamingContext streamingContext, T t, long j, ClassTag<T> classTag) {
        StreamingContext streamingContext2 = streamingContext;
        LongRef longRef = new LongRef(0L);
        BooleanRef booleanRef = new BooleanRef(false);
        BooleanRef booleanRef2 = new BooleanRef(false);
        ArrayBuffer arrayBuffer = new ArrayBuffer();
        String checkpointDir = streamingContext2.checkpointDir();
        Duration batchDuration = streamingContext2.graph().batchDuration();
        while (!booleanRef.elem && !booleanRef2.elem) {
            ArrayBuffer<scala.collection.Seq<T>> output = ((TestOutputStream) Predef$.MODULE$.refArrayOps(streamingContext2.graph().getOutputStreams()).head()).output();
            killed_$eq(false);
            KillingThread killingThread = new KillingThread(streamingContext2, batchDuration.milliseconds() * 10);
            killingThread.start();
            LongRef longRef2 = new LongRef(0L);
            try {
                System.clearProperty("spark.streaming.clock");
                System.clearProperty("spark.driver.port");
                streamingContext2.start();
                long currentTimeMillis = System.currentTimeMillis();
                while (!killed() && !booleanRef.elem && !booleanRef2.elem) {
                    Thread.sleep(100L);
                    longRef2.elem = System.currentTimeMillis() - currentTimeMillis;
                    booleanRef.elem = !org$apache$spark$streaming$util$MasterFailureTest$$output$4(output).isEmpty() && BoxesRunTime.equals(org$apache$spark$streaming$util$MasterFailureTest$$output$4(output).last(), t);
                    booleanRef2.elem = longRef2.elem + longRef.elem > j;
                }
            } catch (Exception e) {
                logError(new MasterFailureTest$$anonfun$runStreams$1(), e);
            }
            if (killingThread.isAlive()) {
                killingThread.interrupt();
            }
            StreamingContext streamingContext3 = streamingContext2;
            streamingContext3.stop(streamingContext3.stop$default$1());
            logInfo(new MasterFailureTest$$anonfun$runStreams$2());
            logInfo(new MasterFailureTest$$anonfun$runStreams$3(booleanRef));
            logInfo(new MasterFailureTest$$anonfun$runStreams$4(booleanRef2));
            arrayBuffer.$plus$plus$eq(org$apache$spark$streaming$util$MasterFailureTest$$output$4(output));
            longRef.elem += longRef2.elem;
            logInfo(new MasterFailureTest$$anonfun$runStreams$5(output));
            logInfo(new MasterFailureTest$$anonfun$runStreams$6(arrayBuffer));
            logInfo(new MasterFailureTest$$anonfun$runStreams$7(longRef2));
            logInfo(new MasterFailureTest$$anonfun$runStreams$8(longRef));
            if (!booleanRef.elem && !booleanRef2.elem) {
                int nextInt = Random$.MODULE$.nextInt(((int) batchDuration.milliseconds()) * 10);
                logInfo(new MasterFailureTest$$anonfun$runStreams$9(nextInt));
                Thread.sleep(nextInt);
                streamingContext2 = StreamingContext$.MODULE$.getOrCreate(checkpointDir, new MasterFailureTest$$anonfun$runStreams$10(), StreamingContext$.MODULE$.getOrCreate$default$3(), StreamingContext$.MODULE$.getOrCreate$default$4());
            }
        }
        return arrayBuffer;
    }

    private <T> void verifyOutput(scala.collection.Seq<T> seq, scala.collection.Seq<T> seq2, ClassTag<T> classTag) {
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), seq2.size() - 1).foreach$mVc$sp(new MasterFailureTest$$anonfun$verifyOutput$1(seq2));
        Predef$.MODULE$.println(new StringBuilder().append("Expected output, size = ").append(BoxesRunTime.boxToInteger(seq2.size())).toString());
        Predef$.MODULE$.println(seq2.mkString("[", ",", "]"));
        Predef$.MODULE$.println(new StringBuilder().append("Output, size = ").append(BoxesRunTime.boxToInteger(seq.size())).toString());
        Predef$.MODULE$.println(seq.mkString("[", ",", "]"));
        seq.foreach(new MasterFailureTest$$anonfun$verifyOutput$2(seq2));
    }

    private void reset() {
        killed_$eq(false);
        killCount_$eq(0);
        setupCalled_$eq(false);
    }

    public final ArrayBuffer org$apache$spark$streaming$util$MasterFailureTest$$output$4(ArrayBuffer arrayBuffer) {
        return (ArrayBuffer) arrayBuffer.flatMap(new MasterFailureTest$$anonfun$org$apache$spark$streaming$util$MasterFailureTest$$output$4$1(), ArrayBuffer$.MODULE$.canBuildFrom());
    }

    private MasterFailureTest$() {
        MODULE$ = this;
        Logging.class.$init$(this);
        this.killed = false;
        this.killCount = 0;
        this.setupCalled = false;
    }
}
