package org.apache.spark.streaming.scheduler;

import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.internal.Logging;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.util.WriteAheadLog;
import org.apache.spark.streaming.util.WriteAheadLogUtils$;
import org.apache.spark.util.Clock;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.Queue;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

/* compiled from: ReceivedBlockTracker.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u001dg!B\u0012%\u0001\u0019r\u0003\u0002C\u001e\u0001\u0005\u0003\u0005\u000b\u0011B\u001f\t\u0011\u0005\u0003!\u0011!Q\u0001\n\tC\u0001\"\u0013\u0001\u0003\u0002\u0003\u0006IA\u0013\u0005\t3\u0002\u0011\t\u0011)A\u00055\"A\u0001\r\u0001B\u0001B\u0003%\u0011\r\u0003\u0005e\u0001\t\u0005\t\u0015!\u0003f\u0011\u0015\u0001\b\u0001\"\u0001r\u000b\u0011Q\b\u0001B>\t\u0013\u00055\u0001A1A\u0005\n\u0005=\u0001\u0002CA\u000e\u0001\u0001\u0006I!!\u0005\t\u0013\u0005u\u0001A1A\u0005\n\u0005}\u0001\u0002CA\u0019\u0001\u0001\u0006I!!\t\t\u0013\u0005M\u0002A1A\u0005\n\u0005U\u0002\u0002CA\"\u0001\u0001\u0006I!a\u000e\t\u0013\u0005\u0015\u0003\u00011A\u0005\n\u0005\u001d\u0003\"CA%\u0001\u0001\u0007I\u0011BA&\u0011!\t9\u0006\u0001Q!\n\u0005\r\u0002bBA-\u0001\u0011\u0005\u00111\f\u0005\b\u0003C\u0002A\u0011AA2\u0011\u001d\tI\u0007\u0001C\u0001\u0003WBq!a\u001e\u0001\t\u0003\tI\bC\u0004\u0002\u0002\u0002!\t!a!\t\u000f\u0005\u0015\u0005\u0001\"\u0001\u0002\b\"9\u00111\u0012\u0001\u0005\u0002\u00055\u0005bBAL\u0001\u0011\u0005\u0011\u0011\u0014\u0005\b\u00037\u0003A\u0011BAM\u0011!\ti\n\u0001C\u0001M\u0005}\u0005bBAV\u0001\u0011%\u0011Q\u0016\u0005\b\u0003c\u0003A\u0011BAZ\u0011!\t)\f\u0001C\u0001M\u0005\ru\u0001CA\\I!\u0005a%!/\u0007\u000f\r\"\u0003\u0012\u0001\u0014\u0002<\"1\u0001\u000f\tC\u0001\u0003{Cq!a0!\t\u0003\t\tM\u0001\u000bSK\u000e,\u0017N^3e\u00052|7m\u001b+sC\u000e\\WM\u001d\u0006\u0003K\u0019\n\u0011b]2iK\u0012,H.\u001a:\u000b\u0005\u001dB\u0013!C:ue\u0016\fW.\u001b8h\u0015\tI#&A\u0003ta\u0006\u00148N\u0003\u0002,Y\u00051\u0011\r]1dQ\u0016T\u0011!L\u0001\u0004_J<7c\u0001\u00010kA\u0011\u0001gM\u0007\u0002c)\t!'A\u0003tG\u0006d\u0017-\u0003\u00025c\t1\u0011I\\=SK\u001a\u0004\"AN\u001d\u000e\u0003]R!\u0001\u000f\u0015\u0002\u0011%tG/\u001a:oC2L!AO\u001c\u0003\u000f1{wmZ5oO\u0006!1m\u001c8g\u0007\u0001\u0001\"AP \u000e\u0003!J!\u0001\u0011\u0015\u0003\u0013M\u0003\u0018M]6D_:4\u0017A\u00035bI>|\u0007oQ8oMB\u00111iR\u0007\u0002\t*\u00111(\u0012\u0006\u0003\r*\na\u0001[1e_>\u0004\u0018B\u0001%E\u00055\u0019uN\u001c4jOV\u0014\u0018\r^5p]\u0006I1\u000f\u001e:fC6LEm\u001d\t\u0004\u0017N3fB\u0001'R\u001d\ti\u0005+D\u0001O\u0015\tyE(\u0001\u0004=e>|GOP\u0005\u0002e%\u0011!+M\u0001\ba\u0006\u001c7.Y4f\u0013\t!VKA\u0002TKFT!AU\u0019\u0011\u0005A:\u0016B\u0001-2\u0005\rIe\u000e^\u0001\u0006G2|7m\u001b\t\u00037zk\u0011\u0001\u0018\u0006\u0003;\"\nA!\u001e;jY&\u0011q\f\u0018\u0002\u0006\u00072|7m[\u0001\u0019e\u0016\u001cwN^3s\rJ|Wn\u0016:ji\u0016\f\u0005.Z1e\u0019><\u0007C\u0001\u0019c\u0013\t\u0019\u0017GA\u0004C_>dW-\u00198\u0002'\rDWmY6q_&tG\u000fR5s\u001fB$\u0018n\u001c8\u0011\u0007A2\u0007.\u0003\u0002hc\t1q\n\u001d;j_:\u0004\"![7\u000f\u0005)\\\u0007CA'2\u0013\ta\u0017'\u0001\u0004Qe\u0016$WMZ\u0005\u0003]>\u0014aa\u0015;sS:<'B\u000172\u0003\u0019a\u0014N\\5u}Q9!\u000f^;wobL\bCA:\u0001\u001b\u0005!\u0003\"B\u001e\b\u0001\u0004i\u0004\"B!\b\u0001\u0004\u0011\u0005\"B%\b\u0001\u0004Q\u0005\"B-\b\u0001\u0004Q\u0006\"\u00021\b\u0001\u0004\t\u0007\"\u00023\b\u0001\u0004)'A\u0005*fG\u0016Lg/\u001a3CY>\u001c7.U;fk\u0016\u0004R\u0001`A\u0002\u0003\u000fi\u0011! \u0006\u0003}~\fq!\\;uC\ndWMC\u0002\u0002\u0002E\n!bY8mY\u0016\u001cG/[8o\u0013\r\t)! \u0002\u0006#V,W/\u001a\t\u0004g\u0006%\u0011bAA\u0006I\t\t\"+Z2fSZ,GM\u00117pG.LeNZ8\u0002AM$(/Z1n\u0013\u0012$v.\u00168bY2|7-\u0019;fI\ncwnY6Rk\u0016,Xm]\u000b\u0003\u0003#\u0001b\u0001`A\n-\u0006]\u0011bAA\u000b{\n9\u0001*Y:i\u001b\u0006\u0004\bcAA\r\u00115\t\u0001!A\u0011tiJ,\u0017-\\%e)>,f.\u00197m_\u000e\fG/\u001a3CY>\u001c7.U;fk\u0016\u001c\b%A\u000buS6,Gk\\!mY>\u001c\u0017\r^3e\u00052|7m[:\u0016\u0005\u0005\u0005\u0002c\u0002?\u0002\u0014\u0005\r\u00121\u0006\t\u0005\u0003K\t9#D\u0001'\u0013\r\tIC\n\u0002\u0005)&lW\rE\u0002t\u0003[I1!a\f%\u0005=\tE\u000e\\8dCR,GM\u00117pG.\u001c\u0018A\u0006;j[\u0016$v.\u00117m_\u000e\fG/\u001a3CY>\u001c7n\u001d\u0011\u0002']\u0014\u0018\u000e^3BQ\u0016\fG\rT8h\u001fB$\u0018n\u001c8\u0016\u0005\u0005]\u0002\u0003\u0002\u0019g\u0003s\u0001B!a\u000f\u0002@5\u0011\u0011Q\b\u0006\u0003;\u001aJA!!\u0011\u0002>\tiqK]5uK\u0006CW-\u00193M_\u001e\fAc\u001e:ji\u0016\f\u0005.Z1e\u0019><w\n\u001d;j_:\u0004\u0013A\u00067bgR\fE\u000e\\8dCR,GMQ1uG\"$\u0016.\\3\u0016\u0005\u0005\r\u0012A\u00077bgR\fE\u000e\\8dCR,GMQ1uG\"$\u0016.\\3`I\u0015\fH\u0003BA'\u0003'\u00022\u0001MA(\u0013\r\t\t&\r\u0002\u0005+:LG\u000fC\u0005\u0002VA\t\t\u00111\u0001\u0002$\u0005\u0019\u0001\u0010J\u0019\u0002/1\f7\u000f^!mY>\u001c\u0017\r^3e\u0005\u0006$8\r\u001b+j[\u0016\u0004\u0013\u0001C1eI\ncwnY6\u0015\u0007\u0005\fi\u0006C\u0004\u0002`I\u0001\r!a\u0002\u0002#I,7-Z5wK\u0012\u0014En\\2l\u0013:4w.A\u000bbY2|7-\u0019;f\u00052|7m[:U_\n\u000bGo\u00195\u0015\t\u00055\u0013Q\r\u0005\b\u0003O\u001a\u0002\u0019AA\u0012\u0003%\u0011\u0017\r^2i)&lW-\u0001\thKR\u0014En\\2lg>3')\u0019;dQR!\u0011QNA;!\u0019I\u0017q\u000e,\u0002t%\u0019\u0011\u0011O8\u0003\u00075\u000b\u0007\u000f\u0005\u0003L'\u0006\u001d\u0001bBA4)\u0001\u0007\u00111E\u0001\u001aO\u0016$(\t\\8dWN|eMQ1uG\"\fe\u000eZ*ue\u0016\fW\u000e\u0006\u0004\u0002t\u0005m\u0014Q\u0010\u0005\b\u0003O*\u0002\u0019AA\u0012\u0011\u0019\ty(\u0006a\u0001-\u0006A1\u000f\u001e:fC6LE-\u0001\u000fiCN,f.\u00197m_\u000e\fG/\u001a3SK\u000e,\u0017N^3e\u00052|7m[:\u0016\u0003\u0005\fAcZ3u+:\fG\u000e\\8dCR,GM\u00117pG.\u001cH\u0003BA:\u0003\u0013Ca!a \u0018\u0001\u00041\u0016!E2mK\u0006tW\u000f](mI\n\u000bGo\u00195fgR1\u0011QJAH\u0003'Cq!!%\u0019\u0001\u0004\t\u0019#A\tdY\u0016\fg.\u001e9UQJ,7\u000f\u001b+j[\u0016Da!!&\u0019\u0001\u0004\t\u0017!E<bSR4uN]\"p[BdW\r^5p]\u0006!1\u000f^8q)\t\ti%A\tsK\u000e|g/\u001a:QCN$XI^3oiN\f!b\u001e:ji\u0016$v\u000eT8h)\r\t\u0017\u0011\u0015\u0005\b\u0003G[\u0002\u0019AAS\u0003\u0019\u0011XmY8sIB\u00191/a*\n\u0007\u0005%FE\u0001\u000fSK\u000e,\u0017N^3e\u00052|7m\u001b+sC\u000e\\WM\u001d'pO\u00163XM\u001c;\u0002+\u001d,GOU3dK&4X\r\u001a\"m_\u000e\\\u0017+^3vKR!\u0011qCAX\u0011\u0019\ty\b\ba\u0001-\u0006\u00192M]3bi\u0016<&/\u001b;f\u0003\",\u0017\r\u001a'pOR\u0011\u0011qG\u0001\u0017SN<&/\u001b;f\u0003\",\u0017\r\u001a'pO\u0016s\u0017M\u00197fI\u0006!\"+Z2fSZ,GM\u00117pG.$&/Y2lKJ\u0004\"a\u001d\u0011\u0014\u0005\u0001zCCAA]\u0003U\u0019\u0007.Z2la>Lg\u000e\u001e#jeR{Gj\\4ESJ$2\u0001[Ab\u0011\u0019\t)M\ta\u0001Q\u0006i1\r[3dWB|\u0017N\u001c;ESJ\u0004")
/* loaded from: input_file:org/apache/spark/streaming/scheduler/ReceivedBlockTracker.class */
public class ReceivedBlockTracker implements Logging {
    private final SparkConf conf;
    private final Configuration hadoopConf;
    private final Seq<Object> streamIds;
    private final Clock clock;
    private final Option<String> checkpointDirOption;
    private final HashMap<Object, Queue<ReceivedBlockInfo>> streamIdToUnallocatedBlockQueues;
    private final HashMap<Time, AllocatedBlocks> timeToAllocatedBlocks;
    private final Option<WriteAheadLog> writeAheadLogOption;
    private Time lastAllocatedBatchTime;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static String checkpointDirToLogDir(String str) {
        return ReceivedBlockTracker$.MODULE$.checkpointDirToLogDir(str);
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private HashMap<Object, Queue<ReceivedBlockInfo>> streamIdToUnallocatedBlockQueues() {
        return this.streamIdToUnallocatedBlockQueues;
    }

    private HashMap<Time, AllocatedBlocks> timeToAllocatedBlocks() {
        return this.timeToAllocatedBlocks;
    }

    private Option<WriteAheadLog> writeAheadLogOption() {
        return this.writeAheadLogOption;
    }

    private Time lastAllocatedBatchTime() {
        return this.lastAllocatedBatchTime;
    }

    private void lastAllocatedBatchTime_$eq(Time time) {
        this.lastAllocatedBatchTime = time;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public boolean addBlock(ReceivedBlockInfo receivedBlockInfo) {
        try {
            boolean writeToLog = writeToLog(new BlockAdditionEvent(receivedBlockInfo));
            if (writeToLog) {
                synchronized (this) {
                    getReceivedBlockQueue(receivedBlockInfo.streamId()).$plus$eq(receivedBlockInfo);
                }
                logDebug(() -> {
                    return new StringBuilder(17).append("Stream ").append(receivedBlockInfo.streamId()).append(" received ").append(new StringBuilder(6).append("block ").append(receivedBlockInfo.blockStoreResult().blockId()).toString()).toString();
                });
            } else {
                logDebug(() -> {
                    return new StringBuilder(40).append("Failed to acknowledge stream ").append(receivedBlockInfo.streamId()).append(" receiving ").append(new StringBuilder(30).append("block ").append(receivedBlockInfo.blockStoreResult().blockId()).append(" in the Write Ahead Log.").toString()).toString();
                });
            }
            return writeToLog;
        } catch (Throwable th) {
            Option unapply = NonFatal$.MODULE$.unapply(th);
            if (unapply.isEmpty()) {
                throw th;
            }
            logError(() -> {
                return new StringBuilder(19).append("Error adding block ").append(receivedBlockInfo).toString();
            }, (Throwable) unapply.get());
            return false;
        }
    }

    public synchronized void allocateBlocksToBatch(Time time) {
        if (lastAllocatedBatchTime() != null && !time.$greater(lastAllocatedBatchTime())) {
            logInfo(() -> {
                return new StringBuilder(69).append("Possibly processed batch ").append(time).append(" needs to be processed again in WAL recovery").toString();
            });
            return;
        }
        AllocatedBlocks allocatedBlocks = new AllocatedBlocks(((TraversableOnce) this.streamIds.map(obj -> {
            return $anonfun$allocateBlocksToBatch$1(this, BoxesRunTime.unboxToInt(obj));
        }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
        if (!writeToLog(new BatchAllocationEvent(time, allocatedBlocks))) {
            logInfo(() -> {
                return new StringBuilder(69).append("Possibly processed batch ").append(time).append(" needs to be processed again in WAL recovery").toString();
            });
            return;
        }
        this.streamIds.foreach(i -> {
            this.getReceivedBlockQueue(i).clear();
        });
        timeToAllocatedBlocks().put(time, allocatedBlocks);
        lastAllocatedBatchTime_$eq(time);
    }

    public synchronized Map<Object, Seq<ReceivedBlockInfo>> getBlocksOfBatch(Time time) {
        return (Map) timeToAllocatedBlocks().get(time).map(allocatedBlocks -> {
            return allocatedBlocks.streamIdToAllocatedBlocks();
        }).getOrElse(() -> {
            return Predef$.MODULE$.Map().empty();
        });
    }

    public synchronized Seq<ReceivedBlockInfo> getBlocksOfBatchAndStream(Time time, int i) {
        return (Seq) timeToAllocatedBlocks().get(time).map(allocatedBlocks -> {
            return allocatedBlocks.getBlocksOfStream(i);
        }).getOrElse(() -> {
            return Seq$.MODULE$.empty();
        });
    }

    public synchronized boolean hasUnallocatedReceivedBlocks() {
        return !streamIdToUnallocatedBlockQueues().values().forall(queue -> {
            return BoxesRunTime.boxToBoolean(queue.isEmpty());
        });
    }

    public synchronized Seq<ReceivedBlockInfo> getUnallocatedBlocks(int i) {
        return getReceivedBlockQueue(i).toSeq();
    }

    public synchronized void cleanupOldBatches(Time time, boolean z) {
        Predef$.MODULE$.require(time.milliseconds() < this.clock.getTimeMillis());
        Seq seq = ((TraversableOnce) timeToAllocatedBlocks().keys().filter(time2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$cleanupOldBatches$1(time, time2));
        })).toSeq();
        logInfo(() -> {
            return new StringBuilder(18).append("Deleting batches: ").append(seq.mkString(" ")).toString();
        });
        if (!writeToLog(new BatchCleanupEvent(seq))) {
            logWarning(() -> {
                return "Failed to acknowledge batch clean up in the Write Ahead Log.";
            });
        } else {
            timeToAllocatedBlocks().$minus$minus$eq(seq);
            writeAheadLogOption().foreach(writeAheadLog -> {
                $anonfun$cleanupOldBatches$3(time, z, writeAheadLog);
                return BoxedUnit.UNIT;
            });
        }
    }

    public void stop() {
        writeAheadLogOption().foreach(writeAheadLog -> {
            writeAheadLog.close();
            return BoxedUnit.UNIT;
        });
    }

    private synchronized void recoverPastEvents() {
        writeAheadLogOption().foreach(writeAheadLog -> {
            $anonfun$recoverPastEvents$5(this, writeAheadLog);
            return BoxedUnit.UNIT;
        });
    }

    public boolean writeToLog(ReceivedBlockTrackerLogEvent receivedBlockTrackerLogEvent) {
        if (!isWriteAheadLogEnabled()) {
            return true;
        }
        logTrace(() -> {
            return new StringBuilder(16).append("Writing record: ").append(receivedBlockTrackerLogEvent).toString();
        });
        try {
            ((WriteAheadLog) writeAheadLogOption().get()).write(ByteBuffer.wrap(Utils$.MODULE$.serialize(receivedBlockTrackerLogEvent)), this.clock.getTimeMillis());
            return true;
        } catch (Throwable th) {
            Option unapply = NonFatal$.MODULE$.unapply(th);
            if (unapply.isEmpty()) {
                throw th;
            }
            logWarning(() -> {
                return new StringBuilder(61).append("Exception thrown while writing record: ").append(receivedBlockTrackerLogEvent).append(" to the WriteAheadLog.").toString();
            }, (Throwable) unapply.get());
            return false;
        }
    }

    private Queue<ReceivedBlockInfo> getReceivedBlockQueue(int i) {
        return (Queue) streamIdToUnallocatedBlockQueues().getOrElseUpdate(BoxesRunTime.boxToInteger(i), () -> {
            return new Queue();
        });
    }

    private Option<WriteAheadLog> createWriteAheadLog() {
        return this.checkpointDirOption.map(str -> {
            return WriteAheadLogUtils$.MODULE$.createLogForDriver(this.conf, ReceivedBlockTracker$.MODULE$.checkpointDirToLogDir((String) this.checkpointDirOption.get()), this.hadoopConf);
        });
    }

    public boolean isWriteAheadLogEnabled() {
        return writeAheadLogOption().nonEmpty();
    }

    public static final /* synthetic */ Tuple2 $anonfun$allocateBlocksToBatch$1(ReceivedBlockTracker receivedBlockTracker, int i) {
        return new Tuple2(BoxesRunTime.boxToInteger(i), ArrayBuffer$.MODULE$.apply(receivedBlockTracker.getReceivedBlockQueue(i).clone()));
    }

    public static final /* synthetic */ boolean $anonfun$cleanupOldBatches$1(Time time, Time time2) {
        return time2.$less(time);
    }

    public static final /* synthetic */ void $anonfun$cleanupOldBatches$3(Time time, boolean z, WriteAheadLog writeAheadLog) {
        writeAheadLog.clean(time.milliseconds(), z);
    }

    private final void insertAddedBlock$1(ReceivedBlockInfo receivedBlockInfo) {
        logTrace(() -> {
            return new StringBuilder(32).append("Recovery: Inserting added block ").append(receivedBlockInfo).toString();
        });
        receivedBlockInfo.setBlockIdInvalid();
        getReceivedBlockQueue(receivedBlockInfo.streamId()).$plus$eq(receivedBlockInfo);
    }

    private final void insertAllocatedBatch$1(Time time, AllocatedBlocks allocatedBlocks) {
        logTrace(() -> {
            return new StringBuilder(49).append("Recovery: Inserting allocated batch for time ").append(time).append(" to ").append(String.valueOf(allocatedBlocks.streamIdToAllocatedBlocks())).toString();
        });
        allocatedBlocks.streamIdToAllocatedBlocks().foreach(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return this.getReceivedBlockQueue(tuple2._1$mcI$sp()).dequeueAll(((Seq) tuple2._2()).toSet());
        });
        timeToAllocatedBlocks().put(time, allocatedBlocks);
        lastAllocatedBatchTime_$eq(time);
    }

    private final void cleanupBatches$1(Seq seq) {
        logTrace(() -> {
            return new StringBuilder(30).append("Recovery: Cleaning up batches ").append(seq).toString();
        });
        timeToAllocatedBlocks().$minus$minus$eq(seq);
    }

    public static final /* synthetic */ void $anonfun$recoverPastEvents$7(ReceivedBlockTracker receivedBlockTracker, ByteBuffer byteBuffer) {
        receivedBlockTracker.logInfo(() -> {
            return new StringBuilder(18).append("Recovering record ").append(byteBuffer).toString();
        });
        ReceivedBlockTrackerLogEvent receivedBlockTrackerLogEvent = (ReceivedBlockTrackerLogEvent) Utils$.MODULE$.deserialize(JavaUtils.bufferToArray(byteBuffer), Thread.currentThread().getContextClassLoader());
        if (receivedBlockTrackerLogEvent instanceof BlockAdditionEvent) {
            receivedBlockTracker.insertAddedBlock$1(((BlockAdditionEvent) receivedBlockTrackerLogEvent).receivedBlockInfo());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (receivedBlockTrackerLogEvent instanceof BatchAllocationEvent) {
            BatchAllocationEvent batchAllocationEvent = (BatchAllocationEvent) receivedBlockTrackerLogEvent;
            receivedBlockTracker.insertAllocatedBatch$1(batchAllocationEvent.time(), batchAllocationEvent.allocatedBlocks());
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            if (!(receivedBlockTrackerLogEvent instanceof BatchCleanupEvent)) {
                throw new MatchError(receivedBlockTrackerLogEvent);
            }
            receivedBlockTracker.cleanupBatches$1(((BatchCleanupEvent) receivedBlockTrackerLogEvent).times());
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$recoverPastEvents$5(ReceivedBlockTracker receivedBlockTracker, WriteAheadLog writeAheadLog) {
        receivedBlockTracker.logInfo(() -> {
            return new StringBuilder(36).append("Recovering from write ahead logs in ").append(receivedBlockTracker.checkpointDirOption.get()).toString();
        });
        ((Iterator) JavaConverters$.MODULE$.asScalaIteratorConverter(writeAheadLog.readAll()).asScala()).foreach(byteBuffer -> {
            $anonfun$recoverPastEvents$7(receivedBlockTracker, byteBuffer);
            return BoxedUnit.UNIT;
        });
    }

    public ReceivedBlockTracker(SparkConf sparkConf, Configuration configuration, Seq<Object> seq, Clock clock, boolean z, Option<String> option) {
        this.conf = sparkConf;
        this.hadoopConf = configuration;
        this.streamIds = seq;
        this.clock = clock;
        this.checkpointDirOption = option;
        Logging.$init$(this);
        this.streamIdToUnallocatedBlockQueues = new HashMap<>();
        this.timeToAllocatedBlocks = new HashMap<>();
        this.writeAheadLogOption = createWriteAheadLog();
        this.lastAllocatedBatchTime = null;
        if (z) {
            recoverPastEvents();
        }
    }
}
