package org.apache.samza.system.chooser;

import org.apache.samza.SamzaException;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Logging;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.JavaConverters$;
import scala.collection.MapLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: BootstrappingChooser.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005%g\u0001B\u0001\u0003\u00015\u0011ACQ8piN$(/\u00199qS:<7\t[8pg\u0016\u0014(BA\u0002\u0005\u0003\u001d\u0019\u0007n\\8tKJT!!\u0002\u0004\u0002\rML8\u000f^3n\u0015\t9\u0001\"A\u0003tC6T\u0018M\u0003\u0002\n\u0015\u00051\u0011\r]1dQ\u0016T\u0011aC\u0001\u0004_J<7\u0001A\n\u0005\u000191\"\u0004\u0005\u0002\u0010)5\t\u0001C\u0003\u0002\u0012%\u0005!A.\u00198h\u0015\u0005\u0019\u0012\u0001\u00026bm\u0006L!!\u0006\t\u0003\r=\u0013'.Z2u!\t9\u0002$D\u0001\u0003\u0013\tI\"A\u0001\bNKN\u001c\u0018mZ3DQ>|7/\u001a:\u0011\u0005mqR\"\u0001\u000f\u000b\u0005u1\u0011\u0001B;uS2L!a\b\u000f\u0003\u000f1{wmZ5oO\"A\u0011\u0005\u0001B\u0001B\u0003%a#A\u0004xe\u0006\u0004\b/\u001a3\t\u0011\r\u0002!\u00111A\u0005\u0002\u0011\nqCY8piN$(/\u00199TiJ,\u0017-\\'fi\u0006$\u0017\r^1\u0016\u0003\u0015\u0002BAJ\u00183m9\u0011q%\f\t\u0003Q-j\u0011!\u000b\u0006\u0003U1\ta\u0001\u0010:p_Rt$\"\u0001\u0017\u0002\u000bM\u001c\u0017\r\\1\n\u00059Z\u0013A\u0002)sK\u0012,g-\u0003\u00021c\t\u0019Q*\u00199\u000b\u00059Z\u0003CA\u001a5\u001b\u0005!\u0011BA\u001b\u0005\u00051\u0019\u0016p\u001d;f[N#(/Z1n!\t\u0019t'\u0003\u00029\t\t!2+_:uK6\u001cFO]3b[6+G/\u00193bi\u0006D\u0001B\u000f\u0001\u0003\u0002\u0004%\taO\u0001\u001cE>|Go\u001d;sCB\u001cFO]3b[6+G/\u00193bi\u0006|F%Z9\u0015\u0005q\u0002\u0005CA\u001f?\u001b\u0005Y\u0013BA ,\u0005\u0011)f.\u001b;\t\u000f\u0005K\u0014\u0011!a\u0001K\u0005\u0019\u0001\u0010J\u0019\t\u0011\r\u0003!\u0011!Q!\n\u0015\n\u0001DY8piN$(/\u00199TiJ,\u0017-\\'fi\u0006$\u0017\r^1!\u0011!)\u0005A!A!\u0002\u00131\u0015aB7fiJL7m\u001d\t\u0003/\u001dK!\u0001\u0013\u0002\u00037\t{w\u000e^:ue\u0006\u0004\b/\u001b8h\u0007\"|wn]3s\u001b\u0016$(/[2t\u0011!Q\u0005A!A!\u0002\u0013Y\u0015\u0001D:zgR,W.\u00113nS:\u001c\b\u0003\u0002\u00140\u0019>\u0003\"AJ'\n\u00059\u000b$AB*ue&tw\r\u0005\u00024!&\u0011\u0011\u000b\u0002\u0002\f'f\u001cH/Z7BI6Lg\u000eC\u0003T\u0001\u0011\u0005A+\u0001\u0004=S:LGO\u0010\u000b\u0006+Z;\u0006,\u0017\t\u0003/\u0001AQ!\t*A\u0002YAqa\t*\u0011\u0002\u0003\u0007Q\u0005C\u0004F%B\u0005\t\u0019\u0001$\t\u000f)\u0013\u0006\u0013!a\u0001\u0017\"91\f\u0001a\u0001\n\u0003a\u0016!F:zgR,Wn\u0015;sK\u0006lG*Y4D_VtGo]\u000b\u0002;B!al\u0019\u001ae\u001b\u0005y&B\u00011b\u0003%IW.\\;uC\ndWM\u0003\u0002cW\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005Az\u0006CA\u001ff\u0013\t17FA\u0002J]RDq\u0001\u001b\u0001A\u0002\u0013\u0005\u0011.A\rtsN$X-\\*ue\u0016\fW\u000eT1h\u0007>,h\u000e^:`I\u0015\fHC\u0001\u001fk\u0011\u001d\tu-!AA\u0002uCa\u0001\u001c\u0001!B\u0013i\u0016AF:zgR,Wn\u0015;sK\u0006lG*Y4D_VtGo\u001d\u0011\t\u000f9\u0004\u0001\u0019!C\u0001_\u0006iB.Y4hS:<7+_:uK6\u001cFO]3b[B\u000b'\u000f^5uS>t7/F\u0001q!\rq\u0016o]\u0005\u0003e~\u00131aU3u!\t\u0019D/\u0003\u0002v\t\t)2+_:uK6\u001cFO]3b[B\u000b'\u000f^5uS>t\u0007bB<\u0001\u0001\u0004%\t\u0001_\u0001\"Y\u0006<w-\u001b8h'f\u001cH/Z7TiJ,\u0017-\u001c)beRLG/[8og~#S-\u001d\u000b\u0003yeDq!\u0011<\u0002\u0002\u0003\u0007\u0001\u000f\u0003\u0004|\u0001\u0001\u0006K\u0001]\u0001\u001fY\u0006<w-\u001b8h'f\u001cH/Z7TiJ,\u0017-\u001c)beRLG/[8og\u0002Bq! \u0001A\u0002\u0013\u0005a0\u0001\u0011sK\u001eL7\u000f^3sK\u0012\u001c\u0016p\u001d;f[N#(/Z1n!\u0006\u0014H/\u001b;j_:\u001cX#A@\u0011\r\u0005\u0005\u0011qA:M\u001b\t\t\u0019AC\u0002\u0002\u0006\u0005\fq!\\;uC\ndW-C\u00021\u0003\u0007A\u0011\"a\u0003\u0001\u0001\u0004%\t!!\u0004\u0002II,w-[:uKJ,GmU=ti\u0016l7\u000b\u001e:fC6\u0004\u0016M\u001d;ji&|gn]0%KF$2\u0001PA\b\u0011!\t\u0015\u0011BA\u0001\u0002\u0004y\bbBA\n\u0001\u0001\u0006Ka`\u0001\"e\u0016<\u0017n\u001d;fe\u0016$7+_:uK6\u001cFO]3b[B\u000b'\u000f^5uS>t7\u000f\t\u0005\t\u0003/\u0001\u0001\u0019!C\u00019\u0006!R\u000f\u001d3bi\u0016$7+_:uK6\u001cFO]3b[ND\u0011\"a\u0007\u0001\u0001\u0004%\t!!\b\u00021U\u0004H-\u0019;fINK8\u000f^3n'R\u0014X-Y7t?\u0012*\u0017\u000fF\u0002=\u0003?A\u0001\"QA\r\u0003\u0003\u0005\r!\u0018\u0005\b\u0003G\u0001\u0001\u0015)\u0003^\u0003U)\b\u000fZ1uK\u0012\u001c\u0016p\u001d;f[N#(/Z1ng\u0002Bq!a\n\u0001\t\u0003\tI#A\u0003ti\u0006\u0014H\u000fF\u0001=\u0011\u001d\ti\u0003\u0001C\u0001\u0003S\tAa\u001d;pa\"9\u0011\u0011\u0007\u0001\u0005B\u0005M\u0012\u0001\u0003:fO&\u001cH/\u001a:\u0015\u000bq\n)$!\u000f\t\u000f\u0005]\u0012q\u0006a\u0001g\u0006)2/_:uK6\u001cFO]3b[B\u000b'\u000f^5uS>t\u0007bBA\u001e\u0003_\u0001\r\u0001T\u0001\u0007_\u001a47/\u001a;\t\u000f\u0005}\u0002\u0001\"\u0001\u0002B\u00051Q\u000f\u001d3bi\u0016$2\u0001PA\"\u0011!\t)%!\u0010A\u0002\u0005\u001d\u0013\u0001C3om\u0016dw\u000e]3\u0011\u0007M\nI%C\u0002\u0002L\u0011\u0011q#\u00138d_6LgnZ'fgN\fw-Z#om\u0016dw\u000e]3\t\u000f\u0005=\u0003\u0001\"\u0001\u0002R\u000511\r[8pg\u0016$\"!a\u0012\t\u000f\u0005U\u0003\u0001\"\u0003\u0002X\u0005Y1\r[3dW>3gm]3u)\u001da\u0014\u0011LA.\u0003;Bq!a\u000e\u0002T\u0001\u00071\u000fC\u0004\u0002<\u0005M\u0003\u0019\u0001'\t\u0011\u0005}\u00131\u000ba\u0001\u0003C\n!b\u001c4gg\u0016$H+\u001f9f!\u0011\t\u0019'!\u001f\u000f\t\u0005\u0015\u0014Q\u000f\b\u0005\u0003O\n\u0019H\u0004\u0003\u0002j\u0005Ed\u0002BA6\u0003_r1\u0001KA7\u0013\u0005Y\u0011BA\u0005\u000b\u0013\t9\u0001\"\u0003\u0002\u0006\r%\u0019\u0011q\u000f\u0003\u0002)MK8\u000f^3n'R\u0014X-Y7NKR\fG-\u0019;b\u0013\u0011\tY(! \u0003\u0015=3gm]3u)f\u0004XMC\u0002\u0002x\u0011Aq!!!\u0001\t\u0013\t\u0019)\u0001\u0006pWR{7\t[8pg\u0016,\"!!\"\u0011\u0007u\n9)C\u0002\u0002\n.\u0012qAQ8pY\u0016\fgnB\u0005\u0002\u000e\n\t\t\u0011#\u0001\u0002\u0010\u0006!\"i\\8ugR\u0014\u0018\r\u001d9j]\u001e\u001c\u0005n\\8tKJ\u00042aFAI\r!\t!!!A\t\u0002\u0005M5\u0003BAI\u0003+\u00032!PAL\u0013\r\tIj\u000b\u0002\u0007\u0003:L(+\u001a4\t\u000fM\u000b\t\n\"\u0001\u0002\u001eR\u0011\u0011q\u0012\u0005\u000b\u0003C\u000b\t*%A\u0005\u0002\u0005\r\u0016a\u0007\u0013mKN\u001c\u0018N\\5uI\u001d\u0014X-\u0019;fe\u0012\"WMZ1vYR$#'\u0006\u0002\u0002&*\u001aQ%a*,\u0005\u0005%\u0006\u0003BAV\u0003kk!!!,\u000b\t\u0005=\u0016\u0011W\u0001\nk:\u001c\u0007.Z2lK\u0012T1!a-,\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0003o\u000biKA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016D!\"a/\u0002\u0012F\u0005I\u0011AA_\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%gU\u0011\u0011q\u0018\u0016\u0004\r\u0006\u001d\u0006BCAb\u0003#\u000b\n\u0011\"\u0001\u0002F\u0006YB\u0005\\3tg&t\u0017\u000e\u001e\u0013he\u0016\fG/\u001a:%I\u00164\u0017-\u001e7uIQ*\"!a2+\u0007-\u000b9\u000b")
/* loaded from: input_file:org/apache/samza/system/chooser/BootstrappingChooser.class */
public class BootstrappingChooser implements MessageChooser, Logging {
    private final MessageChooser wrapped;
    private Map<SystemStream, SystemStreamMetadata> bootstrapStreamMetadata;
    private final BootstrappingChooserMetrics metrics;
    private final Map<String, SystemAdmin> systemAdmins;
    private Map<SystemStream, Object> systemStreamLagCounts;
    private Set<SystemStreamPartition> laggingSystemStreamPartitions;
    private scala.collection.mutable.Map<SystemStreamPartition, String> registeredSystemStreamPartitions;
    private Map<SystemStream, Object> updatedSystemStreams;
    private final String loggerName;
    private Logger logger;
    private final String startupLoggerName;
    private Logger startupLogger;
    private volatile byte bitmap$0;

    @Override // org.apache.samza.util.Logging
    public void startupLog(Function0<Object> function0) {
        startupLog(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void trace(Function0<Object> function0) {
        trace(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void trace(Function0<Object> function0, Function0<Throwable> function02) {
        trace(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void debug(Function0<Object> function0) {
        debug(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void debug(Function0<Object> function0, Function0<Throwable> function02) {
        debug(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void info(Function0<Object> function0) {
        info(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void info(Function0<Object> function0, Function0<Throwable> function02) {
        info(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void warn(Function0<Object> function0) {
        warn(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void warn(Function0<Object> function0, Function0<Throwable> function02) {
        warn(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void error(Function0<Object> function0) {
        error(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void error(Function0<Object> function0, Function0<Throwable> function02) {
        error(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void putMDC(Function0<String> function0, Function0<String> function02) {
        putMDC(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public String getMDC(Function0<String> function0) {
        String mdc;
        mdc = getMDC(function0);
        return mdc;
    }

    @Override // org.apache.samza.util.Logging
    public void removeMDC(Function0<String> function0) {
        removeMDC(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void clearMDC() {
        clearMDC();
    }

    @Override // org.apache.samza.util.Logging
    public String loggerName() {
        return this.loggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.system.chooser.BootstrappingChooser] */
    private Logger logger$lzycompute() {
        Logger logger;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                logger = logger();
                this.logger = logger;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.logger;
    }

    @Override // org.apache.samza.util.Logging
    public Logger logger() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? logger$lzycompute() : this.logger;
    }

    @Override // org.apache.samza.util.Logging
    public String startupLoggerName() {
        return this.startupLoggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.system.chooser.BootstrappingChooser] */
    private Logger startupLogger$lzycompute() {
        Logger startupLogger;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                startupLogger = startupLogger();
                this.startupLogger = startupLogger;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.startupLogger;
    }

    @Override // org.apache.samza.util.Logging
    public Logger startupLogger() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? startupLogger$lzycompute() : this.startupLogger;
    }

    @Override // org.apache.samza.util.Logging
    public void org$apache$samza$util$Logging$_setter_$loggerName_$eq(String str) {
        this.loggerName = str;
    }

    @Override // org.apache.samza.util.Logging
    public void org$apache$samza$util$Logging$_setter_$startupLoggerName_$eq(String str) {
        this.startupLoggerName = str;
    }

    public Map<SystemStream, SystemStreamMetadata> bootstrapStreamMetadata() {
        return this.bootstrapStreamMetadata;
    }

    public void bootstrapStreamMetadata_$eq(Map<SystemStream, SystemStreamMetadata> map) {
        this.bootstrapStreamMetadata = map;
    }

    public Map<SystemStream, Object> systemStreamLagCounts() {
        return this.systemStreamLagCounts;
    }

    public void systemStreamLagCounts_$eq(Map<SystemStream, Object> map) {
        this.systemStreamLagCounts = map;
    }

    public Set<SystemStreamPartition> laggingSystemStreamPartitions() {
        return this.laggingSystemStreamPartitions;
    }

    public void laggingSystemStreamPartitions_$eq(Set<SystemStreamPartition> set) {
        this.laggingSystemStreamPartitions = set;
    }

    public scala.collection.mutable.Map<SystemStreamPartition, String> registeredSystemStreamPartitions() {
        return this.registeredSystemStreamPartitions;
    }

    public void registeredSystemStreamPartitions_$eq(scala.collection.mutable.Map<SystemStreamPartition, String> map) {
        this.registeredSystemStreamPartitions = map;
    }

    public Map<SystemStream, Object> updatedSystemStreams() {
        return this.updatedSystemStreams;
    }

    public void updatedSystemStreams_$eq(Map<SystemStream, Object> map) {
        this.updatedSystemStreams = map;
    }

    public void start() {
        registeredSystemStreamPartitions().withFilter(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$start$1(tuple2));
        }).foreach(tuple22 -> {
            $anonfun$start$2(this, tuple22);
            return BoxedUnit.UNIT;
        });
        laggingSystemStreamPartitions_$eq((Set) laggingSystemStreamPartitions().filter(systemStreamPartition -> {
            return BoxesRunTime.boxToBoolean($anonfun$start$3(this, systemStreamPartition));
        }));
        systemStreamLagCounts_$eq((Map) laggingSystemStreamPartitions().groupBy(systemStreamPartition2 -> {
            return systemStreamPartition2.getSystemStream();
        }).map(tuple23 -> {
            if (tuple23 == null) {
                throw new MatchError(tuple23);
            }
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((SystemStream) tuple23._1()), BoxesRunTime.boxToInteger(((Set) tuple23._2()).size()));
        }, Map$.MODULE$.canBuildFrom()));
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Starting bootstrapping chooser with bootstrap metadata: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{this.bootstrapStreamMetadata()}));
        });
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got lagging partition counts for bootstrap streams: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{this.systemStreamLagCounts()}));
        });
        this.metrics.setLaggingSystemStreams(() -> {
            return this.laggingSystemStreamPartitions().size();
        });
        systemStreamLagCounts().keys().foreach(systemStream -> {
            $anonfun$start$9(this, systemStream);
            return BoxedUnit.UNIT;
        });
        this.wrapped.start();
    }

    public void stop() {
        this.wrapped.stop();
    }

    public void register(SystemStreamPartition systemStreamPartition, String str) {
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Registering stream partition with offset: %s, %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{systemStreamPartition, str}));
        });
        this.wrapped.register(systemStreamPartition, str);
        String system = systemStreamPartition.getSystem();
        SystemAdmin systemAdmin = (SystemAdmin) this.systemAdmins.getOrElse(system, () -> {
            throw new SamzaException(new StringOps(Predef$.MODULE$.augmentString("SystemAdmin is undefined for System: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{system})));
        });
        if (!registeredSystemStreamPartitions().contains(systemStreamPartition)) {
            registeredSystemStreamPartitions().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(systemStreamPartition), str));
            return;
        }
        if (str != null) {
            String str2 = (String) registeredSystemStreamPartitions().apply(systemStreamPartition);
            Integer offsetComparator = systemAdmin.offsetComparator(str2, str);
            if (offsetComparator == null) {
                warn(() -> {
                    return new StringOps(Predef$.MODULE$.augmentString("Existing offset: %s and incoming offset: %s of system stream partition: %s are not comparable.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str2, str, systemStreamPartition}));
                });
            } else if (Predef$.MODULE$.Integer2int(offsetComparator) > 0) {
                registeredSystemStreamPartitions().$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(systemStreamPartition), str));
            }
        }
    }

    public void update(IncomingMessageEnvelope incomingMessageEnvelope) {
        this.wrapped.update(incomingMessageEnvelope);
        if (laggingSystemStreamPartitions().contains(incomingMessageEnvelope.getSystemStreamPartition())) {
            trace(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Bumping available message count for stream partition: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{incomingMessageEnvelope.getSystemStreamPartition()}));
            });
            SystemStream systemStream = incomingMessageEnvelope.getSystemStreamPartition().getSystemStream();
            updatedSystemStreams_$eq(updatedSystemStreams().$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(systemStream), BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(updatedSystemStreams().getOrElse(systemStream, () -> {
                return 0;
            })) + 1))));
        }
    }

    public IncomingMessageEnvelope choose() {
        if (laggingSystemStreamPartitions().size() == 0) {
            trace(() -> {
                return "No streams are lagging, so bypassing bootstrap chooser.";
            });
            return this.wrapped.choose();
        }
        if (!okToChoose()) {
            trace(() -> {
                return "Blocking wrapped.chooser since bootstrapping is not done, but not all streams have messages available.";
            });
            return null;
        }
        trace(() -> {
            return "Choosing from wrapped chooser, since wrapped choser has an envelope from all bootstrap streams.";
        });
        IncomingMessageEnvelope choose = this.wrapped.choose();
        if (choose != null) {
            trace(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Wrapped chooser chose non-null envelope: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{choose}));
            });
            SystemStreamPartition systemStreamPartition = choose.getSystemStreamPartition();
            String offset = choose.getOffset();
            if (laggingSystemStreamPartitions().contains(systemStreamPartition)) {
                SystemStream systemStream = systemStreamPartition.getSystemStream();
                updatedSystemStreams_$eq(updatedSystemStreams().$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(systemStream), BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(updatedSystemStreams().getOrElse(systemStream, () -> {
                    return 0;
                })) - 1))));
            }
            checkOffset(systemStreamPartition, offset, SystemStreamMetadata.OffsetType.NEWEST);
        }
        return choose;
    }

    private void checkOffset(SystemStreamPartition systemStreamPartition, String str, SystemStreamMetadata.OffsetType offsetType) {
        SystemStream systemStream = systemStreamPartition.getSystemStream();
        SystemStreamMetadata systemStreamMetadata = (SystemStreamMetadata) bootstrapStreamMetadata().getOrElse(systemStreamPartition.getSystemStream(), () -> {
            return null;
        });
        SystemStreamMetadata.SystemStreamPartitionMetadata systemStreamPartitionMetadata = systemStreamMetadata != null ? (SystemStreamMetadata.SystemStreamPartitionMetadata) systemStreamMetadata.getSystemStreamPartitionMetadata().get(systemStreamPartition.getPartition()) : null;
        String offset = systemStreamPartitionMetadata == null ? null : systemStreamPartitionMetadata.getOffset(offsetType);
        trace(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Check %s offset %s against %s for %s.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{offsetType, str, offset, systemStreamPartition}));
        });
        if (str == null || !str.equals(offset)) {
            return;
        }
        laggingSystemStreamPartitions_$eq((Set) laggingSystemStreamPartitions().$minus(systemStreamPartition));
        systemStreamLagCounts_$eq(systemStreamLagCounts().$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(systemStream), BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(systemStreamLagCounts().apply(systemStream)) - 1))));
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Bootstrap stream partition is fully caught up: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{systemStreamPartition}));
        });
        if (BoxesRunTime.unboxToInt(systemStreamLagCounts().apply(systemStream)) == 0) {
            info(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Bootstrap stream is fully caught up: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{systemStream}));
            });
            systemStreamLagCounts_$eq((Map) systemStreamLagCounts().$minus(systemStream));
        }
    }

    private boolean okToChoose() {
        return ((TraversableOnce) updatedSystemStreams().values().filter(i -> {
            return i > 0;
        })).size() == laggingSystemStreamPartitions().groupBy(systemStreamPartition -> {
            return systemStreamPartition.getSystemStream();
        }).size();
    }

    public static final /* synthetic */ int $anonfun$systemStreamLagCounts$1(SystemStreamMetadata systemStreamMetadata) {
        return systemStreamMetadata.getSystemStreamPartitionMetadata().size();
    }

    public static final /* synthetic */ boolean $anonfun$start$1(Tuple2 tuple2) {
        return tuple2 != null;
    }

    public static final /* synthetic */ void $anonfun$start$2(BootstrappingChooser bootstrappingChooser, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        bootstrappingChooser.checkOffset((SystemStreamPartition) tuple2._1(), (String) tuple2._2(), SystemStreamMetadata.OffsetType.UPCOMING);
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ boolean $anonfun$start$3(BootstrappingChooser bootstrappingChooser, SystemStreamPartition systemStreamPartition) {
        return bootstrappingChooser.registeredSystemStreamPartitions().contains(systemStreamPartition);
    }

    public static final /* synthetic */ void $anonfun$start$9(BootstrappingChooser bootstrappingChooser, SystemStream systemStream) {
        bootstrappingChooser.metrics.setLagCount(systemStream, () -> {
            return BoxesRunTime.unboxToInt(bootstrappingChooser.systemStreamLagCounts().getOrElse(systemStream, () -> {
                return 0;
            }));
        });
    }

    public BootstrappingChooser(MessageChooser messageChooser, Map<SystemStream, SystemStreamMetadata> map, BootstrappingChooserMetrics bootstrappingChooserMetrics, Map<String, SystemAdmin> map2) {
        this.wrapped = messageChooser;
        this.bootstrapStreamMetadata = map;
        this.metrics = bootstrappingChooserMetrics;
        this.systemAdmins = map2;
        Logging.$init$(this);
        this.systemStreamLagCounts = bootstrapStreamMetadata().mapValues(systemStreamMetadata -> {
            return BoxesRunTime.boxToInteger($anonfun$systemStreamLagCounts$1(systemStreamMetadata));
        });
        this.laggingSystemStreamPartitions = ((TraversableOnce) bootstrapStreamMetadata().flatMap(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            SystemStream systemStream = (SystemStream) tuple2._1();
            return (Iterable) ((MapLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(((SystemStreamMetadata) tuple2._2()).getSystemStreamPartitionMetadata()).asScala()).keys().map(partition -> {
                return new SystemStreamPartition(systemStream, partition);
            }, Iterable$.MODULE$.canBuildFrom());
        }, scala.collection.immutable.Iterable$.MODULE$.canBuildFrom())).toSet();
        this.registeredSystemStreamPartitions = scala.collection.mutable.Map$.MODULE$.apply(Nil$.MODULE$);
        this.updatedSystemStreams = Predef$.MODULE$.Map().apply(Nil$.MODULE$);
    }
}
