package org.apache.samza.system;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Timer;
import org.apache.samza.serializers.SerdeManager;
import org.apache.samza.system.chooser.MessageChooser;
import org.apache.samza.util.Logging;
import org.apache.samza.util.TimerUtils;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Predef$;
import scala.Some;
import scala.collection.Iterable$;
import scala.collection.JavaConverters$;
import scala.collection.TraversableLike;
import scala.collection.generic.Growable;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: SystemConsumers.scala */
@ScalaSignature(bytes = "\u0006\u0001\t5q!B\u0001\u0003\u0011\u0003Y\u0011aD*zgR,WnQ8ogVlWM]:\u000b\u0005\r!\u0011AB:zgR,WN\u0003\u0002\u0006\r\u0005)1/Y7{C*\u0011q\u0001C\u0001\u0007CB\f7\r[3\u000b\u0003%\t1a\u001c:h\u0007\u0001\u0001\"\u0001D\u0007\u000e\u0003\t1QA\u0004\u0002\t\u0002=\u0011qbU=ti\u0016l7i\u001c8tk6,'o]\n\u0003\u001bA\u0001\"!\u0005\u000b\u000e\u0003IQ\u0011aE\u0001\u0006g\u000e\fG.Y\u0005\u0003+I\u0011a!\u00118z%\u00164\u0007\"B\f\u000e\t\u0003A\u0012A\u0002\u001fj]&$h\bF\u0001\f\u0011\u001dQRB1A\u0005\u0002m\t\u0001\u0004R#G\u0003VcEk\u0018)P\u00192{\u0016J\u0014+F%Z\u000bEjX'T+\u0005a\u0002CA\t\u001e\u0013\tq\"CA\u0002J]RDa\u0001I\u0007!\u0002\u0013a\u0012!\u0007#F\r\u0006+F\nV0Q\u001f2cu,\u0013(U\u000bJ3\u0016\tT0N'\u0002BqAI\u0007C\u0002\u0013\u00051$A\u0010E\u000b\u001a\u000bU\u000b\u0014+`\u001d>{f*R,`\u001b\u0016\u001b6+Q$F'~#\u0016*T#P+RCa\u0001J\u0007!\u0002\u0013a\u0012\u0001\t#F\r\u0006+F\nV0O\u001f~sUiV0N\u000bN\u001b\u0016iR#T?RKU*R(V)\u0002BqAJ\u0007C\u0002\u0013\u0005q%\u0001\u0011E\u000b\u001a\u000bU\u000b\u0014+`\tJ{\u0005kX*F%&\u000bE*\u0013.B)&{ejX#S%>\u0013V#\u0001\u0015\u0011\u0005EI\u0013B\u0001\u0016\u0013\u0005\u001d\u0011un\u001c7fC:Da\u0001L\u0007!\u0002\u0013A\u0013!\t#F\r\u0006+F\nV0E%>\u0003vlU#S\u0013\u0006c\u0015JW!U\u0013>su,\u0012*S\u001fJ\u0003\u0003b\u0002\u0018\u000e#\u0003%\taL\u0001\u001cI1,7o]5oSR$sM]3bi\u0016\u0014H\u0005Z3gCVdG\u000fJ\u001a\u0016\u0003AR#!M\u001c\u0011\u0005I*T\"A\u001a\u000b\u0005Q\"\u0011aC:fe&\fG.\u001b>feNL!AN\u001a\u0003\u0019M+'\u000fZ3NC:\fw-\u001a:,\u0003a\u0002\"!\u000f \u000e\u0003iR!a\u000f\u001f\u0002\u0013Ut7\r[3dW\u0016$'BA\u001f\u0013\u0003)\tgN\\8uCRLwN\\\u0005\u0003\u007fi\u0012\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u0011\u001d\tU\"%A\u0005\u0002\t\u000b1\u0004\n7fgNLg.\u001b;%OJ,\u0017\r^3sI\u0011,g-Y;mi\u0012\"T#A\"+\u0005\u0011;\u0004C\u0001\u0007F\u0013\t1%A\u0001\fTsN$X-\\\"p]N,X.\u001a:t\u001b\u0016$(/[2t\u0011\u001dAU\"%A\u0005\u0002%\u000b1\u0004\n7fgNLg.\u001b;%OJ,\u0017\r^3sI\u0011,g-Y;mi\u0012*T#\u0001&+\u0005q9\u0004b\u0002'\u000e#\u0003%\t!T\u0001\u001cI1,7o]5oSR$sM]3bi\u0016\u0014H\u0005Z3gCVdG\u000f\n\u001c\u0016\u00039S#\u0001K\u001c\t\u000fAk\u0011\u0013!C\u0001\u0013\u0006YB\u0005\\3tg&t\u0017\u000e\u001e\u0013he\u0016\fG/\u001a:%I\u00164\u0017-\u001e7uI]BqAU\u0007\u0012\u0002\u0013\u00051+A\u000e%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$H\u0005O\u000b\u0002)*\u0012Qk\u000e\t\u0004#YC\u0016BA,\u0013\u0005%1UO\\2uS>t\u0007\u0007\u0005\u0002\u00123&\u0011!L\u0005\u0002\u0005\u0019>twM\u0002\u0003\u000f\u0005\u0001a6\u0003B.\u0011;\u000e\u0004\"AX1\u000e\u0003}S!\u0001\u0019\u0003\u0002\tU$\u0018\u000e\\\u0005\u0003E~\u0013q\u0001T8hO&tw\r\u0005\u0002_I&\u0011Qm\u0018\u0002\u000b)&lWM]+uS2\u001c\b\u0002C4\\\u0005\u0003\u0005\u000b\u0011\u00025\u0002\u000f\rDwn\\:feB\u0011\u0011n[\u0007\u0002U*\u0011qMA\u0005\u0003Y*\u0014a\"T3tg\u0006<Wm\u00115p_N,'\u000f\u0003\u0005o7\n\u0005\t\u0015!\u0003p\u0003%\u0019wN\\:v[\u0016\u00148\u000f\u0005\u0003qojlhBA9v!\t\u0011(#D\u0001t\u0015\t!(\"\u0001\u0004=e>|GOP\u0005\u0003mJ\ta\u0001\u0015:fI\u00164\u0017B\u0001=z\u0005\ri\u0015\r\u001d\u0006\u0003mJ\u0001\"\u0001]>\n\u0005qL(AB*ue&tw\r\u0005\u0002\r}&\u0011qP\u0001\u0002\u000f'f\u001cH/Z7D_:\u001cX/\\3s\u0011%\t\u0019a\u0017B\u0001B\u0003%\u0011'\u0001\u0007tKJ$W-T1oC\u001e,'\u000fC\u0005\u0002\bm\u0013\t\u0011)A\u0005\t\u00069Q.\u001a;sS\u000e\u001c\b\"CA\u00067\n\u0005\t\u0015!\u0003\u001d\u0003QqwNT3x\u001b\u0016\u001c8/Y4fgRKW.Z8vi\"I\u0011qB.\u0003\u0002\u0003\u0006I\u0001K\u0001\u0019IJ|\u0007\u000fR3tKJL\u0017\r\\5{CRLwN\\#se>\u0014\b\"CA\n7\n\u0015\r\u0011\"\u0001\u001c\u00039\u0001x\u000e\u001c7J]R,'O^1m\u001bND\u0011\"a\u0006\\\u0005\u0003\u0005\u000b\u0011\u0002\u000f\u0002\u001fA|G\u000e\\%oi\u0016\u0014h/\u00197Ng\u0002B!\"a\u0007\\\u0005\u000b\u0007I\u0011AA\u000f\u0003\u0015\u0019Gn\\2l+\u0005)\u0006\"CA\u00117\n\u0005\t\u0015!\u0003V\u0003\u0019\u0019Gn\\2lA!1qc\u0017C\u0001\u0003K!\"#a\n\u0002*\u0005-\u0012QFA\u0018\u0003c\t\u0019$!\u000e\u00028A\u0011Ab\u0017\u0005\u0007O\u0006\r\u0002\u0019\u00015\t\r9\f\u0019\u00031\u0001p\u0011%\t\u0019!a\t\u0011\u0002\u0003\u0007\u0011\u0007C\u0005\u0002\b\u0005\r\u0002\u0013!a\u0001\t\"I\u00111BA\u0012!\u0003\u0005\r\u0001\b\u0005\n\u0003\u001f\t\u0019\u0003%AA\u0002!B\u0011\"a\u0005\u0002$A\u0005\t\u0019\u0001\u000f\t\u0013\u0005m\u00111\u0005I\u0001\u0002\u0004)\u0006\"CA\u001e7\n\u0007I\u0011BA\u001f\u0003a)h\u000e\u001d:pG\u0016\u001c8/\u001a3NKN\u001c\u0018mZ3t\u0005f\u001c6\u000bU\u000b\u0003\u0003\u007f\u0001\u0002\"!\u0011\u0002J\u00055\u00131K\u0007\u0003\u0003\u0007R1\u0001YA#\u0015\t\t9%\u0001\u0003kCZ\f\u0017\u0002BA&\u0003\u0007\u0012q\u0001S1tQ6\u000b\u0007\u000fE\u0002\r\u0003\u001fJ1!!\u0015\u0003\u0005U\u0019\u0016p\u001d;f[N#(/Z1n!\u0006\u0014H/\u001b;j_:\u0004b!!\u0011\u0002V\u0005e\u0013\u0002BA,\u0003\u0007\u0012Q!U;fk\u0016\u00042\u0001DA.\u0013\r\tiF\u0001\u0002\u0018\u0013:\u001cw.\\5oO6+7o]1hK\u0016sg/\u001a7pa\u0016D\u0001\"!\u0019\\A\u0003%\u0011qH\u0001\u001ak:\u0004(o\\2fgN,G-T3tg\u0006<Wm\u001d\"z'N\u0003\u0006\u0005C\u0005\u0002fm\u0013\r\u0011\"\u0003\u0002h\u0005yQM\u001c3PMN#(/Z1n'N\u00036/\u0006\u0002\u0002jA1\u0011\u0011IA6\u0003\u001bJA!!\u001c\u0002D\t9\u0001*Y:i'\u0016$\b\u0002CA97\u0002\u0006I!!\u001b\u0002!\u0015tGm\u00144TiJ,\u0017-\\*T!N\u0004\u0003\"CA;7\n\u0007I\u0011BA<\u0003\r*W\u000e\u001d;z'f\u001cH/Z7TiJ,\u0017-\u001c)beRLG/[8og\nK8+_:uK6,\"!!\u001f\u0011\u000f\u0005\u0005\u0013\u0011\n>\u0002|A1\u0011\u0011IA?\u0003\u001bJA!a \u0002D\t\u00191+\u001a;\t\u0011\u0005\r5\f)A\u0005\u0003s\nA%Z7qif\u001c\u0016p\u001d;f[N#(/Z1n!\u0006\u0014H/\u001b;j_:\u001c()_*zgR,W\u000e\t\u0005\t\u0003\u000f[\u0006\u0019!C\u00017\u00059A/[7f_V$\b\"CAF7\u0002\u0007I\u0011AAG\u0003-!\u0018.\\3pkR|F%Z9\u0015\t\u0005=\u0015Q\u0013\t\u0004#\u0005E\u0015bAAJ%\t!QK\\5u\u0011%\t9*!#\u0002\u0002\u0003\u0007A$A\u0002yIEBq!a'\\A\u0003&A$\u0001\u0005uS6,w.\u001e;!\u0011%\tyj\u0017a\u0001\n\u0003\t\t+\u0001\u0006mCN$\bk\u001c7m\u001dN,\u0012\u0001\u0017\u0005\n\u0003K[\u0006\u0019!C\u0001\u0003O\u000ba\u0002\\1tiB{G\u000e\u001c(t?\u0012*\u0017\u000f\u0006\u0003\u0002\u0010\u0006%\u0006\"CAL\u0003G\u000b\t\u00111\u0001Y\u0011\u001d\tik\u0017Q!\na\u000b1\u0002\\1tiB{G\u000e\u001c(tA!A\u0011\u0011W.A\u0002\u0013\u00051$\u0001\ru_R\fG.\u00168qe>\u001cWm]:fI6+7o]1hKND\u0011\"!.\\\u0001\u0004%\t!a.\u00029Q|G/\u00197V]B\u0014xnY3tg\u0016$W*Z:tC\u001e,7o\u0018\u0013fcR!\u0011qRA]\u0011%\t9*a-\u0002\u0002\u0003\u0007A\u0004C\u0004\u0002>n\u0003\u000b\u0015\u0002\u000f\u00023Q|G/\u00197V]B\u0014xnY3tg\u0016$W*Z:tC\u001e,7\u000f\t\u0005\b\u0003\u0003\\F\u0011AAb\u0003\u0015\u0019H/\u0019:u+\t\ty\tC\u0004\u0002Hn#\t!a1\u0002\tM$x\u000e\u001d\u0005\b\u0003\u0017\\F\u0011AAg\u0003!\u0011XmZ5ti\u0016\u0014HCBAH\u0003\u001f\f\u0019\u000e\u0003\u0005\u0002R\u0006%\u0007\u0019AA'\u0003U\u0019\u0018p\u001d;f[N#(/Z1n!\u0006\u0014H/\u001b;j_:Dq!!6\u0002J\u0002\u0007!0\u0001\u0004pM\u001a\u001cX\r\u001e\u0005\b\u00033\\F\u0011AAn\u00035I7/\u00128e\u001f\u001a\u001cFO]3b[R\u0019\u0001&!8\t\u0011\u0005E\u0017q\u001ba\u0001\u0003\u001bBq!!9\\\t\u0003\t\u0019/\u0001\u0004dQ>|7/\u001a\u000b\u0005\u00033\n)\u000fC\u0005\u0002h\u0006}\u0007\u0013!a\u0001Q\u0005iQ\u000f\u001d3bi\u0016\u001c\u0005n\\8tKJDq!a;\\\t\u0013\ti/\u0001\u0003q_2dG\u0003BAH\u0003_Dq!!=\u0002j\u0002\u0007!0\u0001\u0006tsN$X-\u001c(b[\u0016Dq!!>\\\t\u0003\t90A\u0005uef,\u0006\u000fZ1uKR!\u0011qRA}\u0011!\tY0a=A\u0002\u00055\u0013aA:ta\"9\u0011q`.\u0005\n\u0005\r\u0017a\u0002:fMJ,7\u000f\u001b\u0005\b\u0005\u0007YF\u0011\u0002B\u0003\u0003\u0019)\b\u000fZ1uKR\u0019\u0001Fa\u0002\t\u0011\u0005E'\u0011\u0001a\u0001\u0003\u001bB\u0001Ba\u0003\\#\u0003%\t!T\u0001\u0011G\"|wn]3%I\u00164\u0017-\u001e7uIE\u0002")
/* loaded from: input_file:org/apache/samza/system/SystemConsumers.class */
public class SystemConsumers implements Logging, TimerUtils {
    private final MessageChooser chooser;
    private final Map<String, SystemConsumer> consumers;
    private final SerdeManager serdeManager;
    private final SystemConsumersMetrics metrics;
    private final int noNewMessagesTimeout;
    private final boolean dropDeserializationError;
    private final int pollIntervalMs;
    private final Function0<Object> clock;
    private final HashMap<SystemStreamPartition, Queue<IncomingMessageEnvelope>> unprocessedMessagesBySSP;
    private final HashSet<SystemStreamPartition> endOfStreamSSPs;
    private final HashMap<String, Set<SystemStreamPartition>> emptySystemStreamPartitionsBySystem;
    private int timeout;
    private long lastPollNs;
    private int totalUnprocessedMessages;
    private final String loggerName;
    private Logger logger;
    private final String startupLoggerName;
    private Logger startupLogger;
    private volatile byte bitmap$0;

    public static boolean DEFAULT_DROP_SERIALIZATION_ERROR() {
        return SystemConsumers$.MODULE$.DEFAULT_DROP_SERIALIZATION_ERROR();
    }

    public static int DEFAULT_NO_NEW_MESSAGES_TIMEOUT() {
        return SystemConsumers$.MODULE$.DEFAULT_NO_NEW_MESSAGES_TIMEOUT();
    }

    public static int DEFAULT_POLL_INTERVAL_MS() {
        return SystemConsumers$.MODULE$.DEFAULT_POLL_INTERVAL_MS();
    }

    @Override // org.apache.samza.util.TimerUtils
    public <T> T updateTimer(Timer timer, Function0<T> function0) {
        Object updateTimer;
        updateTimer = updateTimer(timer, function0);
        return (T) updateTimer;
    }

    @Override // org.apache.samza.util.TimerUtils
    public long updateTimerAndGetDuration(Timer timer, Function1<Object, BoxedUnit> function1) {
        long updateTimerAndGetDuration;
        updateTimerAndGetDuration = updateTimerAndGetDuration(timer, function1);
        return updateTimerAndGetDuration;
    }

    @Override // org.apache.samza.util.Logging
    public void startupLog(Function0<Object> function0) {
        startupLog(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void trace(Function0<Object> function0) {
        trace(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void trace(Function0<Object> function0, Function0<Throwable> function02) {
        trace(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void debug(Function0<Object> function0) {
        debug(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void debug(Function0<Object> function0, Function0<Throwable> function02) {
        debug(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void info(Function0<Object> function0) {
        info(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void info(Function0<Object> function0, Function0<Throwable> function02) {
        info(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void warn(Function0<Object> function0) {
        warn(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void warn(Function0<Object> function0, Function0<Throwable> function02) {
        warn(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void error(Function0<Object> function0) {
        error(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void error(Function0<Object> function0, Function0<Throwable> function02) {
        error(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void putMDC(Function0<String> function0, Function0<String> function02) {
        putMDC(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public String getMDC(Function0<String> function0) {
        String mdc;
        mdc = getMDC(function0);
        return mdc;
    }

    @Override // org.apache.samza.util.Logging
    public void removeMDC(Function0<String> function0) {
        removeMDC(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void clearMDC() {
        clearMDC();
    }

    @Override // org.apache.samza.util.Logging
    public String loggerName() {
        return this.loggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.system.SystemConsumers] */
    private Logger logger$lzycompute() {
        Logger logger;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                logger = logger();
                this.logger = logger;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.logger;
    }

    @Override // org.apache.samza.util.Logging
    public Logger logger() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? logger$lzycompute() : this.logger;
    }

    @Override // org.apache.samza.util.Logging
    public String startupLoggerName() {
        return this.startupLoggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.system.SystemConsumers] */
    private Logger startupLogger$lzycompute() {
        Logger startupLogger;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                startupLogger = startupLogger();
                this.startupLogger = startupLogger;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.startupLogger;
    }

    @Override // org.apache.samza.util.Logging
    public Logger startupLogger() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? startupLogger$lzycompute() : this.startupLogger;
    }

    @Override // org.apache.samza.util.Logging
    public void org$apache$samza$util$Logging$_setter_$loggerName_$eq(String str) {
        this.loggerName = str;
    }

    @Override // org.apache.samza.util.Logging
    public void org$apache$samza$util$Logging$_setter_$startupLoggerName_$eq(String str) {
        this.startupLoggerName = str;
    }

    public int pollIntervalMs() {
        return this.pollIntervalMs;
    }

    @Override // org.apache.samza.util.TimerUtils
    public Function0<Object> clock() {
        return this.clock;
    }

    private HashMap<SystemStreamPartition, Queue<IncomingMessageEnvelope>> unprocessedMessagesBySSP() {
        return this.unprocessedMessagesBySSP;
    }

    private HashSet<SystemStreamPartition> endOfStreamSSPs() {
        return this.endOfStreamSSPs;
    }

    private HashMap<String, Set<SystemStreamPartition>> emptySystemStreamPartitionsBySystem() {
        return this.emptySystemStreamPartitionsBySystem;
    }

    public int timeout() {
        return this.timeout;
    }

    public void timeout_$eq(int i) {
        this.timeout = i;
    }

    public long lastPollNs() {
        return this.lastPollNs;
    }

    public void lastPollNs_$eq(long j) {
        this.lastPollNs = j;
    }

    public int totalUnprocessedMessages() {
        return this.totalUnprocessedMessages;
    }

    public void totalUnprocessedMessages_$eq(int i) {
        this.totalUnprocessedMessages = i;
    }

    public void start() {
        debug(() -> {
            return "Starting consumers.";
        });
        ((Growable) JavaConverters$.MODULE$.mapAsScalaMapConverter(emptySystemStreamPartitionsBySystem()).asScala()).$plus$plus$eq(((TraversableLike) JavaConverters$.MODULE$.asScalaSetConverter(unprocessedMessagesBySSP().keySet()).asScala()).groupBy(systemStreamPartition -> {
            return systemStreamPartition.getSystem();
        }).mapValues(set -> {
            return new HashSet((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(set.toSeq()).asJava());
        }));
        this.consumers.keySet().foreach(str -> {
            $anonfun$start$4(this, str);
            return BoxedUnit.UNIT;
        });
        this.consumers.values().foreach(systemConsumer -> {
            systemConsumer.start();
            return BoxedUnit.UNIT;
        });
        this.chooser.start();
        refresh();
    }

    public void stop() {
        debug(() -> {
            return "Stopping consumers.";
        });
        this.consumers.values().foreach(systemConsumer -> {
            systemConsumer.stop();
            return BoxedUnit.UNIT;
        });
        this.chooser.stop();
    }

    public void register(SystemStreamPartition systemStreamPartition, String str) {
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Registering stream: %s, %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{systemStreamPartition, str}));
        });
        if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(str)) {
            info(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Stream : %s is already at end of stream")).format(Predef$.MODULE$.genericWrapArray(new Object[]{systemStreamPartition}));
            });
            endOfStreamSSPs().add(systemStreamPartition);
            return;
        }
        this.metrics.registerSystemStreamPartition(systemStreamPartition);
        unprocessedMessagesBySSP().put(systemStreamPartition, new ArrayDeque());
        this.chooser.register(systemStreamPartition, str);
        try {
            ((SystemConsumer) this.consumers.apply(systemStreamPartition.getSystem())).register(systemStreamPartition, str);
        } catch (NoSuchElementException e) {
            throw new SystemConsumersException("can't register " + systemStreamPartition.getSystem() + "'s consumer.", e);
        }
    }

    public boolean isEndOfStream(SystemStreamPartition systemStreamPartition) {
        return endOfStreamSSPs().contains(systemStreamPartition);
    }

    public IncomingMessageEnvelope choose(boolean z) {
        IncomingMessageEnvelope choose = this.chooser.choose();
        updateTimer(this.metrics.deserializationNs(), () -> {
            if (choose == null) {
                this.trace(() -> {
                    return "Chooser returned null.";
                });
                this.metrics.choseNull().inc();
                this.timeout_$eq(this.noNewMessagesTimeout);
                return;
            }
            SystemStreamPartition systemStreamPartition = choose.getSystemStreamPartition();
            if (choose.isEndOfStream()) {
                this.info(() -> {
                    return new StringOps(Predef$.MODULE$.augmentString("End of stream reached for partition: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{systemStreamPartition}));
                });
                BoxesRunTime.boxToBoolean(this.endOfStreamSSPs().add(systemStreamPartition));
            } else {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            }
            this.trace(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Chooser returned an incoming message envelope: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{choose}));
            });
            this.timeout_$eq(0);
            this.metrics.choseObject().inc();
            ((Counter) this.metrics.systemStreamMessagesChosen().apply(choose.getSystemStreamPartition())).inc();
            if (z) {
                this.trace(() -> {
                    return "Update chooser for " + systemStreamPartition.getPartition();
                });
                this.tryUpdate(systemStreamPartition);
            }
        });
        updateTimer(this.metrics.pollNs(), () -> {
            if (choose == null || TimeUnit.NANOSECONDS.toMillis(this.clock().apply$mcJ$sp() - this.lastPollNs()) > this.pollIntervalMs()) {
                this.refresh();
            }
        });
        return choose;
    }

    public boolean choose$default$1() {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void poll(String str) {
        Set emptySet;
        trace(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Polling system consumer: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str}));
        });
        ((Counter) this.metrics.systemPolls().apply(str)).inc();
        trace(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Getting fetch map for system: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str}));
        });
        if (emptySystemStreamPartitionsBySystem().containsKey(str)) {
            HashSet hashSet = new HashSet(emptySystemStreamPartitionsBySystem().get(str));
            hashSet.removeAll(endOfStreamSSPs());
            emptySet = hashSet;
        } else {
            emptySet = Collections.emptySet();
        }
        Set set = emptySet;
        if (set == null || set.size() <= 0) {
            trace(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Skipping polling for %s. Already have messages available for all registered SystemStreamPartitions.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str}));
            });
            return;
        }
        SystemConsumer systemConsumer = (SystemConsumer) this.consumers.apply(str);
        trace(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Fetching: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{set}));
        });
        ((Counter) this.metrics.systemStreamPartitionFetchesPerPoll().apply(str)).inc(set.size());
        java.util.Map poll = systemConsumer.poll(set, timeout());
        trace(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got incoming message envelopes: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{poll}));
        });
        ((Counter) this.metrics.systemMessagesPerPoll().apply(str)).inc();
        for (Map.Entry entry : poll.entrySet()) {
            SystemStreamPartition systemStreamPartition = (SystemStreamPartition) entry.getKey();
            ArrayDeque arrayDeque = new ArrayDeque((Collection) entry.getValue());
            int size = arrayDeque.size();
            totalUnprocessedMessages_$eq(totalUnprocessedMessages() + size);
            if (size > 0) {
                unprocessedMessagesBySSP().put(systemStreamPartition, arrayDeque);
                if (emptySystemStreamPartitionsBySystem().get(systemStreamPartition.getSystem()).remove(systemStreamPartition)) {
                    tryUpdate(systemStreamPartition);
                }
            }
        }
    }

    public void tryUpdate(SystemStreamPartition systemStreamPartition) {
        boolean z = false;
        try {
            z = update(systemStreamPartition);
            if (z) {
                return;
            }
            emptySystemStreamPartitionsBySystem().get(systemStreamPartition.getSystem()).add(systemStreamPartition);
        } catch (Throwable th) {
            if (!z) {
                emptySystemStreamPartitionsBySystem().get(systemStreamPartition.getSystem()).add(systemStreamPartition);
            }
            throw th;
        }
    }

    private void refresh() {
        trace(() -> {
            return "Refreshing chooser with new messages.";
        });
        lastPollNs_$eq(clock().apply$mcJ$sp());
        this.consumers.keys().map(str -> {
            this.poll(str);
            return BoxedUnit.UNIT;
        }, Iterable$.MODULE$.canBuildFrom());
    }

    private boolean update(SystemStreamPartition systemStreamPartition) {
        Some some;
        boolean z = false;
        Queue<IncomingMessageEnvelope> queue = unprocessedMessagesBySSP().get(systemStreamPartition);
        while (queue.size() > 0 && !z) {
            try {
                some = new Some(this.serdeManager.fromBytes(queue.remove()));
            } catch (Throwable th) {
                if (th != null && !this.dropDeserializationError) {
                    throw new SystemConsumersException(new StringOps(Predef$.MODULE$.augmentString("Cannot deserialize an incoming message for %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{systemStreamPartition.getSystemStream().toString()})), th);
                }
                if (th == null) {
                    throw th;
                }
                debug(() -> {
                    return new StringOps(Predef$.MODULE$.augmentString("Cannot deserialize an incoming message for %s. Dropping the error message.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{systemStreamPartition.getSystemStream().toString()}));
                }, () -> {
                    return th;
                });
                this.metrics.deserializationError().inc();
                some = None$.MODULE$;
            }
            Some some2 = some;
            if (some2.isDefined()) {
                this.chooser.update((IncomingMessageEnvelope) some2.get());
                z = true;
            }
            totalUnprocessedMessages_$eq(totalUnprocessedMessages() - 1);
        }
        return z;
    }

    public static final /* synthetic */ void $anonfun$start$4(SystemConsumers systemConsumers, String str) {
        systemConsumers.metrics.registerSystem(str);
    }

    public SystemConsumers(MessageChooser messageChooser, scala.collection.immutable.Map<String, SystemConsumer> map, SerdeManager serdeManager, SystemConsumersMetrics systemConsumersMetrics, int i, boolean z, int i2, Function0<Object> function0) {
        this.chooser = messageChooser;
        this.consumers = map;
        this.serdeManager = serdeManager;
        this.metrics = systemConsumersMetrics;
        this.noNewMessagesTimeout = i;
        this.dropDeserializationError = z;
        this.pollIntervalMs = i2;
        this.clock = function0;
        Logging.$init$(this);
        TimerUtils.$init$(this);
        this.unprocessedMessagesBySSP = new HashMap<>();
        this.endOfStreamSSPs = new HashSet<>();
        this.emptySystemStreamPartitionsBySystem = new HashMap<>();
        this.timeout = i;
        this.lastPollNs = 0L;
        this.totalUnprocessedMessages = 0;
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got stream consumers: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{this.consumers}));
        });
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got no new message timeout: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.noNewMessagesTimeout)}));
        });
        systemConsumersMetrics.setTimeout(() -> {
            return this.timeout();
        });
        systemConsumersMetrics.setNeededByChooser(() -> {
            return this.emptySystemStreamPartitionsBySystem().size();
        });
        systemConsumersMetrics.setUnprocessedMessages(() -> {
            return this.totalUnprocessedMessages();
        });
    }
}
