package org.apache.samza.checkpoint;

import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.container.TaskName;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Logging;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Predef$any2stringadd$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.MapLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.generic.Growable;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Set;
import scala.collection.mutable.Set$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: OffsetManager.scala */
@ScalaSignature(bytes = "\u0006\u0001\tur!B\u0001\u0003\u0011\u0003Y\u0011!D(gMN,G/T1oC\u001e,'O\u0003\u0002\u0004\t\u0005Q1\r[3dWB|\u0017N\u001c;\u000b\u0005\u00151\u0011!B:b[j\f'BA\u0004\t\u0003\u0019\t\u0007/Y2iK*\t\u0011\"A\u0002pe\u001e\u001c\u0001\u0001\u0005\u0002\r\u001b5\t!AB\u0003\u000f\u0005!\u0005qBA\u0007PM\u001a\u001cX\r^'b]\u0006<WM]\n\u0004\u001bA1\u0002CA\t\u0015\u001b\u0005\u0011\"\"A\n\u0002\u000bM\u001c\u0017\r\\1\n\u0005U\u0011\"AB!osJ+g\r\u0005\u0002\u001855\t\u0001D\u0003\u0002\u001a\t\u0005!Q\u000f^5m\u0013\tY\u0002DA\u0004M_\u001e<\u0017N\\4\t\u000buiA\u0011\u0001\u0010\u0002\rqJg.\u001b;?)\u0005Y\u0001\"\u0002\u0011\u000e\t\u0003\t\u0013!B1qa2LH#\u0004\u0012\u0002R\u0006u\u00171^Aw\u0003_\f\t\u0010\u0005\u0002\rG\u0019!aB\u0001\u0001%'\r\u0019\u0003C\u0006\u0005\tM\r\u0012)\u0019!C\u0001O\u0005qqN\u001a4tKR\u001cV\r\u001e;j]\u001e\u001cX#\u0001\u0015\u0011\t%bc\u0006N\u0007\u0002U)\u00111FE\u0001\u000bG>dG.Z2uS>t\u0017BA\u0017+\u0005\ri\u0015\r\u001d\t\u0003_Ij\u0011\u0001\r\u0006\u0003c\u0011\taa]=ti\u0016l\u0017BA\u001a1\u00051\u0019\u0016p\u001d;f[N#(/Z1n!\taQ'\u0003\u00027\u0005\tiqJ\u001a4tKR\u001cV\r\u001e;j]\u001eD\u0001\u0002O\u0012\u0003\u0002\u0003\u0006I\u0001K\u0001\u0010_\u001a47/\u001a;TKR$\u0018N\\4tA!A!h\tBC\u0002\u0013\u00051(A\tdQ\u0016\u001c7\u000e]8j]Rl\u0015M\\1hKJ,\u0012\u0001\u0010\t\u0003\u0019uJ!A\u0010\u0002\u0003#\rCWmY6q_&tG/T1oC\u001e,'\u000f\u0003\u0005AG\t\u0005\t\u0015!\u0003=\u0003I\u0019\u0007.Z2la>Lg\u000e^'b]\u0006<WM\u001d\u0011\t\u0011\t\u001b#Q1A\u0005\u0002\r\u000bAb]=ti\u0016l\u0017\tZ7j]N,\u0012\u0001\u0012\t\u0005S1*\u0005\u000b\u0005\u0002G\u001b:\u0011qi\u0013\t\u0003\u0011Ji\u0011!\u0013\u0006\u0003\u0015*\ta\u0001\u0010:p_Rt\u0014B\u0001'\u0013\u0003\u0019\u0001&/\u001a3fM&\u0011aj\u0014\u0002\u0007'R\u0014\u0018N\\4\u000b\u00051\u0013\u0002CA\u0018R\u0013\t\u0011\u0006GA\u0006TsN$X-\\!e[&t\u0007\u0002\u0003+$\u0005\u0003\u0005\u000b\u0011\u0002#\u0002\u001bML8\u000f^3n\u0003\u0012l\u0017N\\:!\u0011!16E!A!\u0002\u00139\u0016aE2iK\u000e\\\u0007o\\5oi2K7\u000f^3oKJ\u001c\b\u0003B\u0015-\u000bb\u0003\"\u0001D-\n\u0005i\u0013!AE\"iK\u000e\\\u0007o\\5oi2K7\u000f^3oKJD\u0001\u0002X\u0012\u0003\u0006\u0004%\t!X\u0001\u0015_\u001a47/\u001a;NC:\fw-\u001a:NKR\u0014\u0018nY:\u0016\u0003y\u0003\"\u0001D0\n\u0005\u0001\u0014!\u0001F(gMN,G/T1oC\u001e,'/T3ue&\u001c7\u000f\u0003\u0005cG\t\u0005\t\u0015!\u0003_\u0003UygMZ:fi6\u000bg.Y4fe6+GO]5dg\u0002BQ!H\u0012\u0005\u0002\u0011$bAI3gO\"L\u0007b\u0002\u0014d!\u0003\u0005\r\u0001\u000b\u0005\bu\r\u0004\n\u00111\u0001=\u0011\u001d\u00115\r%AA\u0002\u0011CqAV2\u0011\u0002\u0003\u0007q\u000bC\u0004]GB\u0005\t\u0019\u00010\t\u000f-\u001c#\u0019!C\u0001Y\u0006!B.Y:u!J|7-Z:tK\u0012|eMZ:fiN,\u0012!\u001c\t\u0005]R4H0D\u0001p\u0015\t\u0001\u0018/\u0001\u0006d_:\u001cWO\u001d:f]RT!!\u0007:\u000b\u0003M\fAA[1wC&\u0011Qo\u001c\u0002\u0012\u0007>t7-\u001e:sK:$\b*Y:i\u001b\u0006\u0004\bCA<{\u001b\u0005A(BA=\u0005\u0003%\u0019wN\u001c;bS:,'/\u0003\u0002|q\nAA+Y:l\u001d\u0006lW\r\u0005\u0003oiv,\u0005CA\u0018\u007f\u0013\ty\bGA\u000bTsN$X-\\*ue\u0016\fW\u000eU1si&$\u0018n\u001c8\t\u000f\u0005\r1\u0005)A\u0005[\u0006)B.Y:u!J|7-Z:tK\u0012|eMZ:fiN\u0004\u0003\"CA\u0004G\u0001\u0007I\u0011AA\u0005\u0003=\u0019H/\u0019:uS:<wJ\u001a4tKR\u001cXCAA\u0006!\u0015ICF^A\u0007!\u0011IC&`#\t\u0013\u0005E1\u00051A\u0005\u0002\u0005M\u0011aE:uCJ$\u0018N\\4PM\u001a\u001cX\r^:`I\u0015\fH\u0003BA\u000b\u00037\u00012!EA\f\u0013\r\tIB\u0005\u0002\u0005+:LG\u000f\u0003\u0006\u0002\u001e\u0005=\u0011\u0011!a\u0001\u0003\u0017\t1\u0001\u001f\u00132\u0011!\t\tc\tQ!\n\u0005-\u0011\u0001E:uCJ$\u0018N\\4PM\u001a\u001cX\r^:!\u0011%\t)c\tb\u0001\n\u0003\t9#\u0001\ftsN$X-\\*ue\u0016\fW\u000eU1si&$\u0018n\u001c8t+\t\tI\u0003E\u0004\u0002,\u0005Eb/a\r\u000e\u0005\u00055\"bAA\u0018U\u00059Q.\u001e;bE2,\u0017bA\u0017\u0002.A)\u00111FA\u001b{&!\u0011qGA\u0017\u0005\r\u0019V\r\u001e\u0005\t\u0003w\u0019\u0003\u0015!\u0003\u0002*\u000592/_:uK6\u001cFO]3b[B\u000b'\u000f^5uS>t7\u000f\t\u0005\b\u0003\u007f\u0019C\u0011AA!\u0003!\u0011XmZ5ti\u0016\u0014HCBA\u000b\u0003\u0007\n9\u0005C\u0004\u0002F\u0005u\u0002\u0019\u0001<\u0002\u0011Q\f7o\u001b(b[\u0016D\u0001\"!\u0013\u0002>\u0001\u0007\u00111J\u0001!gf\u001cH/Z7TiJ,\u0017-\u001c)beRLG/[8ogR{'+Z4jgR,'\u000f\u0005\u0003*\u0003\u001bj\u0018bAA\u001cU!9\u0011\u0011K\u0012\u0005\u0002\u0005M\u0013!B:uCJ$XCAA\u000b\u0011\u001d\t9f\tC\u0001\u00033\na!\u001e9eCR,G\u0003CA\u000b\u00037\ni&!\u0019\t\u000f\u0005\u0015\u0013Q\u000ba\u0001m\"9\u0011qLA+\u0001\u0004i\u0018!F:zgR,Wn\u0015;sK\u0006l\u0007+\u0019:uSRLwN\u001c\u0005\b\u0003G\n)\u00061\u0001F\u0003\u0019ygMZ:fi\"9\u0011qM\u0012\u0005\u0002\u0005%\u0014AF4fi2\u000b7\u000f\u001e)s_\u000e,7o]3e\u001f\u001a47/\u001a;\u0015\r\u0005-\u0014\u0011OA:!\u0011\t\u0012QN#\n\u0007\u0005=$C\u0001\u0004PaRLwN\u001c\u0005\b\u0003\u000b\n)\u00071\u0001w\u0011\u001d\ty&!\u001aA\u0002uDq!a\u001e$\t\u0003\tI(A\thKR\u001cF/\u0019:uS:<wJ\u001a4tKR$b!a\u001b\u0002|\u0005u\u0004bBA#\u0003k\u0002\rA\u001e\u0005\b\u0003?\n)\b1\u0001~\u0011\u001d\t\ti\tC\u0001\u0003\u0007\u000b\u0011c]3u'R\f'\u000f^5oO>3gm]3u)!\t)\"!\"\u0002\b\u0006-\u0005bBA#\u0003\u007f\u0002\rA\u001e\u0005\b\u0003\u0013\u000by\b1\u0001~\u0003\r\u00198\u000f\u001d\u0005\b\u0003G\ny\b1\u0001F\u0011\u001d\tyi\tC\u0001\u0003#\u000bqBY;jY\u0012\u001c\u0005.Z2la>Lg\u000e\u001e\u000b\u0005\u0003'\u000bI\nE\u0002\r\u0003+K1!a&\u0003\u0005)\u0019\u0005.Z2la>Lg\u000e\u001e\u0005\b\u0003\u000b\ni\t1\u0001w\u0011\u001d\tij\tC\u0001\u0003?\u000bqb\u001e:ji\u0016\u001c\u0005.Z2la>Lg\u000e\u001e\u000b\u0007\u0003+\t\t+a)\t\u000f\u0005\u0015\u00131\u0014a\u0001m\"91!a'A\u0002\u0005M\u0005bBATG\u0011\u0005\u00111K\u0001\u0005gR|\u0007\u000fC\u0004\u0002,\u000e\"I!a\u0015\u00023I,w-[:uKJ\u001c\u0005.Z2la>Lg\u000e^'b]\u0006<WM\u001d\u0005\b\u0003_\u001bC\u0011BA*\u0003\u0001bw.\u00193PM\u001a\u001cX\r^:Ge>l7\t[3dWB|\u0017N\u001c;NC:\fw-\u001a:\t\u000f\u0005M6\u0005\"\u0003\u00026\u0006a\"/Z:u_J,wJ\u001a4tKR\u001chI]8n\u0007\",7m\u001b9pS:$H\u0003BA\u0006\u0003oCq!!\u0012\u00022\u0002\u0007a\u000fC\u0004\u0002<\u000e\"I!a\u0015\u0002#M$(/\u001b9SKN,Go\u0015;sK\u0006l7\u000fC\u0004\u0002@\u000e\"I!!1\u0002A\u001d,GoU=ti\u0016l7\u000b\u001e:fC6\u0004\u0016M\u001d;ji&|gn\u001d+p%\u0016\u001cX\r\u001e\u000b\u0005\u0003\u0007\f)\rE\u0003*YY\fY\u0005C\u0004\u0002H\u0006u\u0006\u0019A7\u0002AQ\f7o\u001b(b[\u0016$vn]=ti\u0016l7\u000b\u001e:fC6\u0004\u0016M\u001d;ji&|gn\u001d\u0005\b\u0003\u0017\u001cC\u0011BA*\u0003Maw.\u00193Ti\u0006\u0014H/\u001b8h\u001f\u001a47/\u001a;t\u0011\u001d\tym\tC\u0005\u0003'\nA\u0002\\8bI\u0012+g-Y;miNDq!a5 \u0001\u0004\t).\u0001\u000btsN$X-\\*ue\u0016\fW.T3uC\u0012\fG/\u0019\t\u0006S1r\u0013q\u001b\t\u0004_\u0005e\u0017bAAna\t!2+_:uK6\u001cFO]3b[6+G/\u00193bi\u0006Dq!a8 \u0001\u0004\t\t/\u0001\u0004d_:4\u0017n\u001a\t\u0005\u0003G\f9/\u0004\u0002\u0002f*\u0019\u0011q\u001c\u0003\n\t\u0005%\u0018Q\u001d\u0002\u0007\u0007>tg-[4\t\u000fiz\u0002\u0013!a\u0001y!9!i\bI\u0001\u0002\u0004!\u0005b\u0002, !\u0003\u0005\ra\u0016\u0005\b9~\u0001\n\u00111\u0001_\u0011%\t)0DI\u0001\n\u0003\t90A\bbaBd\u0017\u0010\n3fM\u0006,H\u000e\u001e\u00134+\t\tIPK\u0002=\u0003w\\#!!@\u0011\t\u0005}(\u0011B\u0007\u0003\u0005\u0003QAAa\u0001\u0003\u0006\u0005IQO\\2iK\u000e\\W\r\u001a\u0006\u0004\u0005\u000f\u0011\u0012AC1o]>$\u0018\r^5p]&!!1\u0002B\u0001\u0005E)hn\u00195fG.,GMV1sS\u0006t7-\u001a\u0005\n\u0005\u001fi\u0011\u0013!C\u0001\u0005#\tq\"\u00199qYf$C-\u001a4bk2$H\u0005N\u000b\u0003\u0005'Q3\u0001RA~\u0011%\u00119\"DI\u0001\n\u0003\u0011I\"A\bbaBd\u0017\u0010\n3fM\u0006,H\u000e\u001e\u00136+\t\u0011YBK\u0002X\u0003wD\u0011Ba\b\u000e#\u0003%\tA!\t\u0002\u001f\u0005\u0004\b\u000f\\=%I\u00164\u0017-\u001e7uIY*\"Aa\t+\u0007y\u000bY\u0010C\u0005\u0003(5\t\n\u0011\"\u0001\u0003*\u0005YB\u0005\\3tg&t\u0017\u000e\u001e\u0013he\u0016\fG/\u001a:%I\u00164\u0017-\u001e7uIE*\"Aa\u000b+\u0007!\nY\u0010C\u0005\u000305\t\n\u0011\"\u0001\u0002x\u0006YB\u0005\\3tg&t\u0017\u000e\u001e\u0013he\u0016\fG/\u001a:%I\u00164\u0017-\u001e7uIIB\u0011Ba\r\u000e#\u0003%\tA!\u0005\u00027\u0011bWm]:j]&$He\u001a:fCR,'\u000f\n3fM\u0006,H\u000e\u001e\u00134\u0011%\u00119$DI\u0001\n\u0003\u0011I\"A\u000e%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$H\u0005\u000e\u0005\n\u0005wi\u0011\u0013!C\u0001\u0005C\t1\u0004\n7fgNLg.\u001b;%OJ,\u0017\r^3sI\u0011,g-Y;mi\u0012*\u0004")
/* loaded from: input_file:org/apache/samza/checkpoint/OffsetManager.class */
public class OffsetManager implements Logging {
    private final Map<SystemStream, OffsetSetting> offsetSettings;
    private final CheckpointManager checkpointManager;
    private final Map<String, SystemAdmin> systemAdmins;
    private final Map<String, CheckpointListener> checkpointListeners;
    private final OffsetManagerMetrics offsetManagerMetrics;
    private final ConcurrentHashMap<TaskName, ConcurrentHashMap<SystemStreamPartition, String>> lastProcessedOffsets;
    private Map<TaskName, Map<SystemStreamPartition, String>> startingOffsets;
    private final scala.collection.mutable.Map<TaskName, Set<SystemStreamPartition>> systemStreamPartitions;
    private final String loggerName;
    private Logger logger;
    private final String startupLoggerName;
    private Logger startupLogger;
    private volatile byte bitmap$0;

    public static OffsetManager apply(Map<SystemStream, SystemStreamMetadata> map, Config config, CheckpointManager checkpointManager, Map<String, SystemAdmin> map2, Map<String, CheckpointListener> map3, OffsetManagerMetrics offsetManagerMetrics) {
        return OffsetManager$.MODULE$.apply(map, config, checkpointManager, map2, map3, offsetManagerMetrics);
    }

    @Override // org.apache.samza.util.Logging
    public void startupLog(Function0<Object> function0) {
        startupLog(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void trace(Function0<Object> function0) {
        trace(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void trace(Function0<Object> function0, Function0<Throwable> function02) {
        trace(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void debug(Function0<Object> function0) {
        debug(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void debug(Function0<Object> function0, Function0<Throwable> function02) {
        debug(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void info(Function0<Object> function0) {
        info(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void info(Function0<Object> function0, Function0<Throwable> function02) {
        info(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void warn(Function0<Object> function0) {
        warn(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void warn(Function0<Object> function0, Function0<Throwable> function02) {
        warn(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void error(Function0<Object> function0) {
        error(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void error(Function0<Object> function0, Function0<Throwable> function02) {
        error(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void putMDC(Function0<String> function0, Function0<String> function02) {
        putMDC(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public String getMDC(Function0<String> function0) {
        String mdc;
        mdc = getMDC(function0);
        return mdc;
    }

    @Override // org.apache.samza.util.Logging
    public void removeMDC(Function0<String> function0) {
        removeMDC(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void clearMDC() {
        clearMDC();
    }

    @Override // org.apache.samza.util.Logging
    public String loggerName() {
        return this.loggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.checkpoint.OffsetManager] */
    private Logger logger$lzycompute() {
        Logger logger;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                logger = logger();
                this.logger = logger;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.logger;
    }

    @Override // org.apache.samza.util.Logging
    public Logger logger() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? logger$lzycompute() : this.logger;
    }

    @Override // org.apache.samza.util.Logging
    public String startupLoggerName() {
        return this.startupLoggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.checkpoint.OffsetManager] */
    private Logger startupLogger$lzycompute() {
        Logger startupLogger;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                startupLogger = startupLogger();
                this.startupLogger = startupLogger;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.startupLogger;
    }

    @Override // org.apache.samza.util.Logging
    public Logger startupLogger() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? startupLogger$lzycompute() : this.startupLogger;
    }

    @Override // org.apache.samza.util.Logging
    public void org$apache$samza$util$Logging$_setter_$loggerName_$eq(String str) {
        this.loggerName = str;
    }

    @Override // org.apache.samza.util.Logging
    public void org$apache$samza$util$Logging$_setter_$startupLoggerName_$eq(String str) {
        this.startupLoggerName = str;
    }

    public Map<SystemStream, OffsetSetting> offsetSettings() {
        return this.offsetSettings;
    }

    public CheckpointManager checkpointManager() {
        return this.checkpointManager;
    }

    public Map<String, SystemAdmin> systemAdmins() {
        return this.systemAdmins;
    }

    public OffsetManagerMetrics offsetManagerMetrics() {
        return this.offsetManagerMetrics;
    }

    public ConcurrentHashMap<TaskName, ConcurrentHashMap<SystemStreamPartition, String>> lastProcessedOffsets() {
        return this.lastProcessedOffsets;
    }

    public Map<TaskName, Map<SystemStreamPartition, String>> startingOffsets() {
        return this.startingOffsets;
    }

    public void startingOffsets_$eq(Map<TaskName, Map<SystemStreamPartition, String>> map) {
        this.startingOffsets = map;
    }

    public scala.collection.mutable.Map<TaskName, Set<SystemStreamPartition>> systemStreamPartitions() {
        return this.systemStreamPartitions;
    }

    public void register(TaskName taskName, scala.collection.Set<SystemStreamPartition> set) {
        ((Growable) systemStreamPartitions().getOrElseUpdate(taskName, () -> {
            return Set$.MODULE$.apply(Nil$.MODULE$);
        })).$plus$plus$eq(set);
        systemStreamPartitions().foreach(tuple2 -> {
            $anonfun$register$2(this, tuple2);
            return BoxedUnit.UNIT;
        });
    }

    public void start() {
        registerCheckpointManager();
        loadOffsetsFromCheckpointManager();
        stripResetStreams();
        loadStartingOffsets();
        loadDefaults();
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Successfully loaded last processed offsets: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{this.lastProcessedOffsets()}));
        });
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Successfully loaded starting offsets: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{this.startingOffsets()}));
        });
    }

    public void update(TaskName taskName, SystemStreamPartition systemStreamPartition, String str) {
        lastProcessedOffsets().putIfAbsent(taskName, new ConcurrentHashMap<>());
        if (str == null || str.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
            return;
        }
        lastProcessedOffsets().get(taskName).put(systemStreamPartition, str);
    }

    public Option<String> getLastProcessedOffset(TaskName taskName, SystemStreamPartition systemStreamPartition) {
        return Option$.MODULE$.apply(lastProcessedOffsets().get(taskName)).map(concurrentHashMap -> {
            return (String) concurrentHashMap.get(systemStreamPartition);
        });
    }

    public Option<String> getStartingOffset(TaskName taskName, SystemStreamPartition systemStreamPartition) {
        Option<String> option;
        Some some = startingOffsets().get(taskName);
        if (some instanceof Some) {
            option = ((Map) some.value()).get(systemStreamPartition);
        } else {
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            option = None$.MODULE$;
        }
        return option;
    }

    public void setStartingOffset(TaskName taskName, SystemStreamPartition systemStreamPartition, String str) {
        startingOffsets_$eq(startingOffsets().$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(taskName), ((MapLike) startingOffsets().apply(taskName)).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(systemStreamPartition), str)))));
    }

    public Checkpoint buildCheckpoint(TaskName taskName) {
        Map apply;
        if (checkpointManager() == null && !this.checkpointListeners.nonEmpty()) {
            debug(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Returning null checkpoint for taskName %s because no checkpoint manager/callback is defined.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{taskName}));
            });
            return null;
        }
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Getting checkpoint offsets for taskName %s.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{taskName}));
        });
        scala.collection.immutable.Set set = ((TraversableOnce) systemStreamPartitions().getOrElse(taskName, () -> {
            throw new SamzaException("No SSPs registered for task: " + taskName);
        })).toSet();
        ConcurrentHashMap<SystemStreamPartition, String> concurrentHashMap = lastProcessedOffsets().get(taskName);
        if (concurrentHashMap != null) {
            apply = ((MapLike) JavaConverters$.MODULE$.mapAsScalaConcurrentMapConverter(concurrentHashMap).asScala()).filterKeys(systemStreamPartition -> {
                return BoxesRunTime.boxToBoolean(set.contains(systemStreamPartition));
            });
        } else {
            warn(() -> {
                return Predef$any2stringadd$.MODULE$.$plus$extension(Predef$.MODULE$.any2stringadd(taskName), " is not found... ");
            });
            apply = Map$.MODULE$.apply(Nil$.MODULE$);
        }
        return new Checkpoint(new HashMap((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(apply).asJava()));
    }

    public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) {
        if (checkpoint != null) {
            if (checkpointManager() != null || this.checkpointListeners.nonEmpty()) {
                debug(() -> {
                    return new StringOps(Predef$.MODULE$.augmentString("Writing checkpoint for taskName %s with offsets %s.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{taskName, checkpoint}));
                });
                if (checkpointManager() != null) {
                    checkpointManager().writeCheckpoint(taskName, checkpoint);
                    java.util.Map offsets = checkpoint.getOffsets();
                    if (offsets != null) {
                        ((IterableLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(offsets).asScala()).foreach(tuple2 -> {
                            if (tuple2 == null) {
                                throw new MatchError(tuple2);
                            }
                            SystemStreamPartition systemStreamPartition = (SystemStreamPartition) tuple2._1();
                            return (String) this.offsetManagerMetrics().checkpointedOffsets().get(systemStreamPartition).set((String) tuple2._2());
                        });
                    }
                }
                ((TraversableLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(checkpoint.getOffsets()).asScala()).groupBy(tuple22 -> {
                    if (tuple22 != null) {
                        return ((SystemStreamPartition) tuple22._1()).getSystem();
                    }
                    throw new MatchError(tuple22);
                }).foreach(tuple23 -> {
                    $anonfun$writeCheckpoint$4(this, tuple23);
                    return BoxedUnit.UNIT;
                });
            }
        }
    }

    public void stop() {
        if (checkpointManager() == null) {
            debug(() -> {
                return "Skipping checkpoint manager shutdown because no checkpoint manager is defined.";
            });
        } else {
            debug(() -> {
                return "Shutting down checkpoint manager.";
            });
            checkpointManager().stop();
        }
    }

    private void registerCheckpointManager() {
        if (checkpointManager() == null) {
            debug(() -> {
                return "Skipping checkpoint manager registration because no manager was defined.";
            });
        } else {
            debug(() -> {
                return "Registering checkpoint manager.";
            });
            systemStreamPartitions().keys().foreach(taskName -> {
                $anonfun$registerCheckpointManager$2(this, taskName);
                return BoxedUnit.UNIT;
            });
        }
    }

    private void loadOffsetsFromCheckpointManager() {
        if (checkpointManager() == null) {
            debug(() -> {
                return "Skipping offset load from checkpoint manager because no manager was defined.";
            });
            return;
        }
        debug(() -> {
            return "Loading offsets from checkpoint manager.";
        });
        checkpointManager().start();
        ((TraversableOnce) systemStreamPartitions().keys().flatMap(taskName -> {
            return this.restoreOffsetsFromCheckpoint(taskName);
        }, Iterable$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return this.lastProcessedOffsets().put((TaskName) tuple2._1(), new ConcurrentHashMap<>((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter((Map) ((Map) tuple2._2()).filter(tuple2 -> {
                return BoxesRunTime.boxToBoolean($anonfun$loadOffsetsFromCheckpointManager$4(this, tuple2));
            })).asJava()));
        }, scala.collection.immutable.Iterable$.MODULE$.canBuildFrom());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<TaskName, Map<SystemStreamPartition, String>> restoreOffsetsFromCheckpoint(TaskName taskName) {
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Loading checkpoints for taskName: %s.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{taskName}));
        });
        Checkpoint readLastCheckpoint = checkpointManager().readLastCheckpoint(taskName);
        if (readLastCheckpoint != null) {
            return Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(taskName), ((TraversableOnce) JavaConverters$.MODULE$.mapAsScalaMapConverter(readLastCheckpoint.getOffsets()).asScala()).toMap(Predef$.MODULE$.$conforms()))}));
        }
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{taskName}));
        });
        return Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(taskName), Map$.MODULE$.apply(Nil$.MODULE$))}));
    }

    private void stripResetStreams() {
        Map<TaskName, scala.collection.Set<SystemStreamPartition>> systemStreamPartitionsToReset = getSystemStreamPartitionsToReset(lastProcessedOffsets());
        systemStreamPartitionsToReset.foreach(tuple2 -> {
            $anonfun$stripResetStreams$1(this, tuple2);
            return BoxedUnit.UNIT;
        });
        ((Iterator) JavaConverters$.MODULE$.enumerationAsScalaIteratorConverter(lastProcessedOffsets().keys()).asScala()).foreach(taskName -> {
            return BoxesRunTime.boxToBoolean($anonfun$stripResetStreams$4(this, systemStreamPartitionsToReset, taskName));
        });
    }

    private Map<TaskName, scala.collection.Set<SystemStreamPartition>> getSystemStreamPartitionsToReset(ConcurrentHashMap<TaskName, ConcurrentHashMap<SystemStreamPartition, String>> concurrentHashMap) {
        return (Map) ((TraversableLike) JavaConverters$.MODULE$.mapAsScalaConcurrentMapConverter(concurrentHashMap).asScala()).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((TaskName) tuple2._1()), ((MapLike) ((TraversableLike) JavaConverters$.MODULE$.mapAsScalaConcurrentMapConverter((ConcurrentHashMap) tuple2._2()).asScala()).filter(tuple2 -> {
                return BoxesRunTime.boxToBoolean($anonfun$getSystemStreamPartitionsToReset$2(this, tuple2));
            })).keys().toSet());
        }, scala.collection.mutable.Map$.MODULE$.canBuildFrom());
    }

    private void loadStartingOffsets() {
        startingOffsets_$eq((Map) ((TraversableLike) JavaConverters$.MODULE$.mapAsScalaConcurrentMapConverter(lastProcessedOffsets()).asScala()).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((TaskName) tuple2._1()), ((TraversableLike) JavaConverters$.MODULE$.mapAsScalaConcurrentMapConverter((ConcurrentHashMap) tuple2._2()).asScala()).groupBy(tuple2 -> {
                return ((SystemStream) tuple2._1()).getSystem();
            }).flatMap(tuple22 -> {
                if (tuple22 == null) {
                    throw new MatchError(tuple22);
                }
                String str = (String) tuple22._1();
                return (scala.collection.mutable.Map) JavaConverters$.MODULE$.mapAsScalaMapConverter(((SystemAdmin) this.systemAdmins().getOrElse(str, () -> {
                    throw new SamzaException(new StringOps(Predef$.MODULE$.augmentString("Missing system admin for %s. Need system admin to load starting offsets.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str})));
                })).getOffsetsAfter((java.util.Map) JavaConverters$.MODULE$.mutableMapAsJavaMapConverter((scala.collection.mutable.Map) tuple22._2()).asJava())).asScala();
            }, scala.collection.immutable.Map$.MODULE$.canBuildFrom()));
        }, scala.collection.mutable.Map$.MODULE$.canBuildFrom()));
    }

    private void loadDefaults() {
        systemStreamPartitions().foreach(tuple2 -> {
            $anonfun$loadDefaults$1(this, tuple2);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$register$3(OffsetManager offsetManager, SystemStreamPartition systemStreamPartition) {
        offsetManager.offsetManagerMetrics().addCheckpointedOffset(systemStreamPartition, "");
    }

    public static final /* synthetic */ void $anonfun$register$2(OffsetManager offsetManager, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        ((Set) tuple2._2()).foreach(systemStreamPartition -> {
            $anonfun$register$3(offsetManager, systemStreamPartition);
            return BoxedUnit.UNIT;
        });
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ void $anonfun$writeCheckpoint$5(scala.collection.mutable.Map map, CheckpointListener checkpointListener) {
        checkpointListener.onCheckpoint((java.util.Map) JavaConverters$.MODULE$.mutableMapAsJavaMapConverter(map).asJava());
    }

    public static final /* synthetic */ void $anonfun$writeCheckpoint$4(OffsetManager offsetManager, Tuple2 tuple2) {
        if (tuple2 != null) {
            String str = (String) tuple2._1();
            scala.collection.mutable.Map map = (scala.collection.mutable.Map) tuple2._2();
            if (str != null && map != null) {
                offsetManager.checkpointListeners.get(str).foreach(checkpointListener -> {
                    $anonfun$writeCheckpoint$5(map, checkpointListener);
                    return BoxedUnit.UNIT;
                });
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        throw new MatchError(tuple2);
    }

    public static final /* synthetic */ void $anonfun$registerCheckpointManager$2(OffsetManager offsetManager, TaskName taskName) {
        offsetManager.checkpointManager().register(taskName);
    }

    public static final /* synthetic */ boolean $anonfun$loadOffsetsFromCheckpointManager$4(OffsetManager offsetManager, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        SystemStreamPartition systemStreamPartition = (SystemStreamPartition) tuple2._1();
        String str = (String) tuple2._2();
        boolean contains = offsetManager.offsetSettings().contains(systemStreamPartition.getSystemStream());
        if (!contains) {
            offsetManager.info(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Ignoring previously checkpointed offset %s for %s since the offset is for a stream that is not currently an input stream.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str, systemStreamPartition}));
            });
        }
        offsetManager.info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Checkpointed offset is currently %s for %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str, systemStreamPartition}));
        });
        return contains;
    }

    public static final /* synthetic */ void $anonfun$stripResetStreams$2(OffsetManager offsetManager, TaskName taskName, SystemStreamPartition systemStreamPartition) {
        String str = offsetManager.lastProcessedOffsets().get(taskName).get(systemStreamPartition);
        offsetManager.info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got offset %s for %s, but ignoring, since stream was configured to reset offsets.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str, systemStreamPartition}));
        });
    }

    public static final /* synthetic */ void $anonfun$stripResetStreams$1(OffsetManager offsetManager, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        TaskName taskName = (TaskName) tuple2._1();
        ((scala.collection.Set) tuple2._2()).foreach(systemStreamPartition -> {
            $anonfun$stripResetStreams$2(offsetManager, taskName, systemStreamPartition);
            return BoxedUnit.UNIT;
        });
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ boolean $anonfun$stripResetStreams$4(OffsetManager offsetManager, Map map, TaskName taskName) {
        return offsetManager.lastProcessedOffsets().get(taskName).keySet().removeAll((Collection) JavaConverters$.MODULE$.setAsJavaSetConverter((scala.collection.Set) map.apply(taskName)).asJava());
    }

    public static final /* synthetic */ boolean $anonfun$getSystemStreamPartitionsToReset$2(OffsetManager offsetManager, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        SystemStream systemStream = ((SystemStreamPartition) tuple2._1()).getSystemStream();
        return ((OffsetSetting) offsetManager.offsetSettings().getOrElse(systemStream, () -> {
            throw new SamzaException(new StringOps(Predef$.MODULE$.augmentString("Attempting to reset a stream that doesn't have offset settings %s.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{systemStream})));
        })).resetOffset();
    }

    public static final /* synthetic */ void $anonfun$loadDefaults$2(OffsetManager offsetManager, TaskName taskName, SystemStreamPartition systemStreamPartition) {
        String str;
        if (offsetManager.startingOffsets().contains(taskName) && ((MapLike) offsetManager.startingOffsets().apply(taskName)).contains(systemStreamPartition)) {
            return;
        }
        SystemStream systemStream = systemStreamPartition.getSystemStream();
        Partition partition = systemStreamPartition.getPartition();
        OffsetSetting offsetSetting = (OffsetSetting) offsetManager.offsetSettings().getOrElse(systemStream, () -> {
            throw new SamzaException(new StringOps(Predef$.MODULE$.augmentString("Attempting to load defaults for stream %s, which has no offset settings.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{systemStream})));
        });
        SystemStreamMetadata metadata = offsetSetting.metadata();
        SystemStreamMetadata.OffsetType defaultOffset = offsetSetting.defaultOffset();
        offsetManager.debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got default offset type %s for %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{defaultOffset, systemStreamPartition}));
        });
        SystemStreamMetadata.SystemStreamPartitionMetadata systemStreamPartitionMetadata = (SystemStreamMetadata.SystemStreamPartitionMetadata) metadata.getSystemStreamPartitionMetadata().get(partition);
        if (systemStreamPartitionMetadata == null) {
            throw new SamzaException(new StringOps(Predef$.MODULE$.augmentString("No metadata available for partition %s.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{systemStreamPartitionMetadata})));
        }
        String offset = systemStreamPartitionMetadata.getOffset(defaultOffset);
        if (offset == null) {
            offsetManager.warn(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Requested offset type %s in %s, but the stream is empty. Defaulting to the upcoming offset.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{defaultOffset, systemStreamPartition}));
            });
            str = systemStreamPartitionMetadata.getOffset(SystemStreamMetadata.OffsetType.UPCOMING);
        } else {
            str = offset;
        }
        String str2 = str;
        offsetManager.debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got next default offset %s for %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str2, systemStreamPartition}));
        });
        Some some = offsetManager.startingOffsets().get(taskName);
        if (some instanceof Some) {
            offsetManager.startingOffsets_$eq(offsetManager.startingOffsets().$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(taskName), ((Map) some.value()).$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(systemStreamPartition), str2)))));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            offsetManager.startingOffsets_$eq(offsetManager.startingOffsets().$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(taskName), Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(systemStreamPartition), str2)})))));
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$loadDefaults$1(OffsetManager offsetManager, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        TaskName taskName = (TaskName) tuple2._1();
        ((scala.collection.Set) tuple2._2()).foreach(systemStreamPartition -> {
            $anonfun$loadDefaults$2(offsetManager, taskName, systemStreamPartition);
            return BoxedUnit.UNIT;
        });
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public OffsetManager(Map<SystemStream, OffsetSetting> map, CheckpointManager checkpointManager, Map<String, SystemAdmin> map2, Map<String, CheckpointListener> map3, OffsetManagerMetrics offsetManagerMetrics) {
        this.offsetSettings = map;
        this.checkpointManager = checkpointManager;
        this.systemAdmins = map2;
        this.checkpointListeners = map3;
        this.offsetManagerMetrics = offsetManagerMetrics;
        Logging.$init$(this);
        this.lastProcessedOffsets = new ConcurrentHashMap<>();
        this.startingOffsets = Map$.MODULE$.apply(Nil$.MODULE$);
        this.systemStreamPartitions = scala.collection.mutable.Map$.MODULE$.apply(Nil$.MODULE$);
    }
}
