package org.apache.samza.system;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Timer;
import org.apache.samza.serializers.SerdeManager;
import org.apache.samza.system.chooser.MessageChooser;
import org.apache.samza.util.Logging;
import org.apache.samza.util.TimerUtil;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.TraversableLike;
import scala.collection.generic.Growable;
import scala.collection.immutable.Map;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: SystemConsumers.scala */
@ScalaSignature(bytes = "\u0006\u0001\t}u!B&M\u0011\u0003)f!B,M\u0011\u0003A\u0006\"B0\u0002\t\u0003\u0001\u0007bB1\u0002\u0005\u0004%\tA\u0019\u0005\u0007M\u0006\u0001\u000b\u0011B2\t\u000f\u001d\f!\u0019!C\u0001Q\"1A.\u0001Q\u0001\n%Dq!\\\u0001\u0012\u0002\u0013\u0005a\u000e\u0003\u0005��\u0003E\u0005I\u0011AA\u0001\u0011%\tY!AI\u0001\n\u0003\ti\u0001C\u0005\u0002\u0012\u0005\t\n\u0011\"\u0001\u0002\u0014!I\u0011qC\u0001\u0012\u0002\u0013\u0005\u0011Q\u0002\u0005\n\u00033\t\u0011\u0013!C\u0001\u00037A\u0011\"a\u000b\u0002#\u0003%\t!!\u0004\t\u0013\u00055\u0012!%A\u0005\u0002\u0005=b!B,M\u0001\u0005%\u0003BCA/\u001f\t\u0005\t\u0015!\u0003\u0002`!Q\u0011\u0011N\b\u0003\u0002\u0003\u0006I!a\u001b\t\u0015\u0005]tB!A!\u0002\u0013\tI\bC\u0005\u0002��=\u0011\t\u0011)A\u0005a\"Q\u0011\u0011Q\b\u0003\u0002\u0003\u0006I!!\u0002\t\u0013\u0005\ruB!A!\u0002\u0013\u0019\u0007\"CAC\u001f\t\u0005\t\u0015!\u0003j\u0011%\t9i\u0004BC\u0002\u0013\u0005!\rC\u0005\u0002\n>\u0011\t\u0011)A\u0005G\"Q\u00111R\b\u0003\u0006\u0004%\t!!$\t\u0015\u0005=uB!A!\u0002\u0013\ty\u0002C\u0005\u0002\u0012>\u0011)\u0019!C\u0001E\"I\u00111S\b\u0003\u0002\u0003\u0006Ia\u0019\u0005\u000b\u0003+{!Q1A\u0005\u0002\u0005]\u0005BCAM\u001f\t\u0005\t\u0015!\u0003\u00024!1ql\u0004C\u0001\u00037C\u0011\"!.\u0010\u0005\u0004%I!a.\t\u0011\u00055w\u0002)A\u0005\u0003sC\u0011\"a4\u0010\u0005\u0004%I!!5\t\u0011\u0005ew\u0002)A\u0005\u0003'D\u0011\"a7\u0010\u0005\u0004%I!!5\t\u0011\u0005uw\u0002)A\u0005\u0003'D\u0011\"a8\u0010\u0005\u0004%I!!9\t\u0011\u0005\u0015x\u0002)A\u0005\u0003GD\u0011\"a:\u0010\u0005\u0004%I!!;\t\u0011\u0005ex\u0002)A\u0005\u0003WD\u0011\"a?\u0010\u0005\u0004%I!!5\t\u0011\u0005ux\u0002)A\u0005\u0003'D\u0011\"a@\u0010\u0005\u0004%IA!\u0001\t\u0011\t-q\u0002)A\u0005\u0005\u0007A\u0001B!\u0004\u0010\u0001\u0004%I\u0001\u001b\u0005\n\u0005\u001fy\u0001\u0019!C\u0005\u0005#AqA!\b\u0010A\u0003&\u0011\u000e\u0003\u0005\u0003 =\u0001\r\u0011\"\u0003i\u0011%\u0011\tc\u0004a\u0001\n\u0013\u0011\u0019\u0003C\u0004\u0003(=\u0001\u000b\u0015B5\t\u0011\tEr\u00021A\u0005\u0002\tD\u0011Ba\r\u0010\u0001\u0004%\tA!\u000e\t\u000f\ter\u0002)Q\u0005G\"I!1H\bA\u0002\u0013\u0005!Q\b\u0005\n\u0005\u007fy\u0001\u0019!C\u0001\u0005\u0003B\u0001B!\u0012\u0010A\u0003&\u0011Q\u0005\u0005\t\u0005\u000fz\u0001\u0019!C\u0001E\"I!\u0011J\bA\u0002\u0013\u0005!1\n\u0005\b\u0005\u001fz\u0001\u0015)\u0003d\u0011\u001d\u0011\tf\u0004C\u0001\u0005'BqA!\u0016\u0010\t\u0003\u0011\u0019\u0006C\u0004\u0003X=!\tA!\u0017\t\u000f\tms\u0002\"\u0001\u0003^!9!qM\b\u0005\u0002\t%\u0004b\u0002B7\u001f\u0011\u0005!q\u000e\u0005\b\u0005kzA\u0011\u0001B<\u0011%\u0011ihDI\u0001\n\u0003\t\u0019\u0002C\u0004\u0003��=!IA!!\t\u000f\t\u001du\u0002\"\u0001\u0003\n\"9!QR\b\u0005\n\tM\u0003b\u0002BH\u001f\u0011%!\u0011\f\u0005\b\u0005#{A\u0011\u0002BJ\u0011\u001d\u00119j\u0004C\u0005\u00053\u000bqbU=ti\u0016l7i\u001c8tk6,'o\u001d\u0006\u0003\u001b:\u000baa]=ti\u0016l'BA(Q\u0003\u0015\u0019\u0018-\u001c>b\u0015\t\t&+\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002'\u0006\u0019qN]4\u0004\u0001A\u0011a+A\u0007\u0002\u0019\ny1+_:uK6\u001cuN\\:v[\u0016\u00148o\u0005\u0002\u00023B\u0011!,X\u0007\u00027*\tA,A\u0003tG\u0006d\u0017-\u0003\u0002_7\n1\u0011I\\=SK\u001a\fa\u0001P5oSRtD#A+\u0002?\u0011+e)Q+M)~suj\u0018(F/~kUiU*B\u000f\u0016\u001bv\fV%N\u000b>+F+F\u0001d!\tQF-\u0003\u0002f7\n\u0019\u0011J\u001c;\u0002A\u0011+e)Q+M)~suj\u0018(F/~kUiU*B\u000f\u0016\u001bv\fV%N\u000b>+F\u000bI\u0001!\t\u00163\u0015)\u0016'U?\u0012\u0013v\nU0T\u000bJK\u0015\tT%[\u0003RKuJT0F%J{%+F\u0001j!\tQ&.\u0003\u0002l7\n9!i\\8mK\u0006t\u0017!\t#F\r\u0006+F\nV0E%>\u0003vlU#S\u0013\u0006c\u0015JW!U\u0013>su,\u0012*S\u001fJ\u0003\u0013a\u0007\u0013mKN\u001c\u0018N\\5uI\u001d\u0014X-\u0019;fe\u0012\"WMZ1vYR$C'F\u0001pU\t\u0001h\u000f\u0005\u0002ri6\t!O\u0003\u0002t\u001d\u0006Y1/\u001a:jC2L'0\u001a:t\u0013\t)(O\u0001\u0007TKJ$W-T1oC\u001e,'oK\u0001x!\tAX0D\u0001z\u0015\tQ80A\u0005v]\u000eDWmY6fI*\u0011ApW\u0001\u000bC:tw\u000e^1uS>t\u0017B\u0001@z\u0005E)hn\u00195fG.,GMV1sS\u0006t7-Z\u0001\u001cI1,7o]5oSR$sM]3bi\u0016\u0014H\u0005Z3gCVdG\u000fJ\u001b\u0016\u0005\u0005\r!fAA\u0003mB\u0019a+a\u0002\n\u0007\u0005%AJ\u0001\fTsN$X-\\\"p]N,X.\u001a:t\u001b\u0016$(/[2t\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%mU\u0011\u0011q\u0002\u0016\u0003GZ\f1\u0004\n7fgNLg.\u001b;%OJ,\u0017\r^3sI\u0011,g-Y;mi\u0012:TCAA\u000bU\tIg/A\u000e%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$H\u0005O\u0001\u001cI1,7o]5oSR$sM]3bi\u0016\u0014H\u0005Z3gCVdG\u000fJ\u001d\u0016\u0005\u0005u!fAA\u0010mB)!,!\t\u0002&%\u0019\u00111E.\u0003\u0013\u0019+hn\u0019;j_:\u0004\u0004c\u0001.\u0002(%\u0019\u0011\u0011F.\u0003\t1{gnZ\u0001\u001dI1,7o]5oSR$sM]3bi\u0016\u0014H\u0005Z3gCVdG\u000fJ\u00191\u0003q!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%cE*\"!!\r+\u0007\u0005Mb\u000f\u0005\u0003\u00026\u0005\rc\u0002BA\u001c\u0003\u007f\u00012!!\u000f\\\u001b\t\tYDC\u0002\u0002>Q\u000ba\u0001\u0010:p_Rt\u0014bAA!7\u00061\u0001K]3eK\u001aLA!!\u0012\u0002H\t11\u000b\u001e:j]\u001eT1!!\u0011\\'\u0019y\u0011,a\u0013\u0002XA!\u0011QJA*\u001b\t\tyEC\u0002\u0002R9\u000bA!\u001e;jY&!\u0011QKA(\u0005\u001daunZ4j]\u001e\u0004B!!\u0014\u0002Z%!\u00111LA(\u0005%!\u0016.\\3s+RLG.A\u0004dQ>|7/\u001a:\u0011\t\u0005\u0005\u0014QM\u0007\u0003\u0003GR1!!\u0018M\u0013\u0011\t9'a\u0019\u0003\u001d5+7o]1hK\u000eCwn\\:fe\u0006I1m\u001c8tk6,'o\u001d\t\t\u0003k\ti'a\r\u0002r%!\u0011qNA$\u0005\ri\u0015\r\u001d\t\u0004-\u0006M\u0014bAA;\u0019\nq1+_:uK6\u001cuN\\:v[\u0016\u0014\u0018\u0001D:zgR,W.\u00113nS:\u001c\bc\u0001,\u0002|%\u0019\u0011Q\u0010'\u0003\u0019MK8\u000f^3n\u0003\u0012l\u0017N\\:\u0002\u0019M,'\u000fZ3NC:\fw-\u001a:\u0002\u000f5,GO]5dg\u0006!bn\u001c(fo6+7o]1hKN$\u0016.\\3pkR\f\u0001\u0004\u001a:pa\u0012+7/\u001a:jC2L'0\u0019;j_:,%O]8s\u00039\u0001x\u000e\u001c7J]R,'O^1m\u001bN\fq\u0002]8mY&sG/\u001a:wC2l5\u000fI\u0001\u0006G2|7m[\u000b\u0003\u0003?\taa\u00197pG.\u0004\u0013\u0001E3mCN$\u0018nY5us\u001a\u000b7\r^8s\u0003E)G.Y:uS\u000eLG/\u001f$bGR|'\u000fI\u0001\u0006eVt\u0017\nZ\u000b\u0003\u0003g\taA];o\u0013\u0012\u0004C\u0003GAO\u0003?\u000b\t+a)\u0002&\u0006\u001d\u0016\u0011VAV\u0003[\u000by+!-\u00024B\u0011ak\u0004\u0005\b\u0003;z\u0002\u0019AA0\u0011\u001d\tIg\ba\u0001\u0003WBq!a\u001e \u0001\u0004\tI\b\u0003\u0005\u0002��}\u0001\n\u00111\u0001q\u0011%\t\ti\bI\u0001\u0002\u0004\t)\u0001\u0003\u0005\u0002\u0004~\u0001\n\u00111\u0001d\u0011!\t)i\bI\u0001\u0002\u0004I\u0007\u0002CAD?A\u0005\t\u0019A2\t\u0013\u0005-u\u0004%AA\u0002\u0005}\u0001\u0002CAI?A\u0005\t\u0019A2\t\u0013\u0005Uu\u0004%AA\u0002\u0005M\u0012AF:taR{'+Z4jgR,'/\u001a3PM\u001a\u001cX\r^:\u0016\u0005\u0005e\u0006\u0003CA^\u0003\u0007\f9-a\r\u000e\u0005\u0005u&\u0002BA)\u0003\u007fS!!!1\u0002\t)\fg/Y\u0005\u0005\u0003\u000b\fiLA\u0004ICNDW*\u00199\u0011\u0007Y\u000bI-C\u0002\u0002L2\u0013QcU=ti\u0016l7\u000b\u001e:fC6\u0004\u0016M\u001d;ji&|g.A\ftgB$vNU3hSN$XM]3e\u001f\u001a47/\u001a;tA\u000592o\u001d9LKf\u0014UoY6fiN\u0014VmZ5ti\u0016\u0014X\rZ\u000b\u0003\u0003'\u0004b!a/\u0002V\u0006\u001d\u0017\u0002BAl\u0003{\u0013q\u0001S1tQN+G/\u0001\rtgB\\U-\u001f\"vG.,Go\u001d*fO&\u001cH/\u001a:fI\u0002\n\u0001#\u001b8uKJlW\rZ5bi\u0016\u001c6\u000bU:\u0002#%tG/\u001a:nK\u0012L\u0017\r^3T'B\u001b\b%A\nj]R,'/\\3eS\u0006$XmU=ti\u0016l7/\u0006\u0002\u0002dB1\u00111XAk\u0003g\tA#\u001b8uKJlW\rZ5bi\u0016\u001c\u0016p\u001d;f[N\u0004\u0013\u0001G;oaJ|7-Z:tK\u0012lUm]:bO\u0016\u001c()_*T!V\u0011\u00111\u001e\t\t\u0003w\u000b\u0019-a2\u0002nB1\u00111XAx\u0003gLA!!=\u0002>\n)\u0011+^3vKB\u0019a+!>\n\u0007\u0005]HJA\fJ]\u000e|W.\u001b8h\u001b\u0016\u001c8/Y4f\u000b:4X\r\\8qK\u0006IRO\u001c9s_\u000e,7o]3e\u001b\u0016\u001c8/Y4fg\nK8k\u0015)!\u0003=)g\u000eZ(g'R\u0014X-Y7T'B\u001b\u0018\u0001E3oI>37\u000b\u001e:fC6\u001c6\u000bU:!\u0003\r*W\u000e\u001d;z'f\u001cH/Z7TiJ,\u0017-\u001c)beRLG/[8og\nK8+_:uK6,\"Aa\u0001\u0011\u0011\u0005m\u00161YA\u001a\u0005\u000b\u0001b!a/\u0003\b\u0005\u001d\u0017\u0002\u0002B\u0005\u0003{\u00131aU3u\u0003\u0011*W\u000e\u001d;z'f\u001cH/Z7TiJ,\u0017-\u001c)beRLG/[8og\nK8+_:uK6\u0004\u0013aB:uCJ$X\rZ\u0001\fgR\f'\u000f^3e?\u0012*\u0017\u000f\u0006\u0003\u0003\u0014\te\u0001c\u0001.\u0003\u0016%\u0019!qC.\u0003\tUs\u0017\u000e\u001e\u0005\t\u00057y\u0013\u0011!a\u0001S\u0006\u0019\u0001\u0010J\u0019\u0002\u0011M$\u0018M\u001d;fI\u0002\n!\"[:Ee\u0006Lg.\u001b8h\u00039I7\u000f\u0012:bS:LgnZ0%KF$BAa\u0005\u0003&!A!1\u0004\u001a\u0002\u0002\u0003\u0007\u0011.A\u0006jg\u0012\u0013\u0018-\u001b8j]\u001e\u0004\u0003fA\u001a\u0003,A\u0019!L!\f\n\u0007\t=2L\u0001\u0005w_2\fG/\u001b7f\u0003\u001d!\u0018.\\3pkR\f1\u0002^5nK>,Ho\u0018\u0013fcR!!1\u0003B\u001c\u0011!\u0011Y\"NA\u0001\u0002\u0004\u0019\u0017\u0001\u0003;j[\u0016|W\u000f\u001e\u0011\u0002\u00151\f7\u000f\u001e)pY2t5/\u0006\u0002\u0002&\u0005qA.Y:u!>dGNT:`I\u0015\fH\u0003\u0002B\n\u0005\u0007B\u0011Ba\u00079\u0003\u0003\u0005\r!!\n\u0002\u00171\f7\u000f\u001e)pY2t5\u000fI\u0001\u0019i>$\u0018\r\\+oaJ|7-Z:tK\u0012lUm]:bO\u0016\u001c\u0018\u0001\b;pi\u0006dWK\u001c9s_\u000e,7o]3e\u001b\u0016\u001c8/Y4fg~#S-\u001d\u000b\u0005\u0005'\u0011i\u0005\u0003\u0005\u0003\u001cm\n\t\u00111\u0001d\u0003e!x\u000e^1m+:\u0004(o\\2fgN,G-T3tg\u0006<Wm\u001d\u0011\u0002\u000bM$\u0018M\u001d;\u0016\u0005\tM\u0011\u0001B:u_B\fQ\u0001\u001a:bS:$\"Aa\u0005\u0002\u0011I,w-[:uKJ$bAa\u0005\u0003`\t\r\u0004b\u0002B1\u0001\u0002\u0007\u0011qY\u0001\u0004gN\u0004\bb\u0002B3\u0001\u0002\u0007\u00111G\u0001\u0007_\u001a47/\u001a;\u0002/I,w-[:uKJLe\u000e^3s[\u0016$\u0017.\u0019;f'N\u0003F\u0003\u0002B\n\u0005WBqA!\u0019B\u0001\u0004\t9-A\u0007jg\u0016sGm\u00144TiJ,\u0017-\u001c\u000b\u0004S\nE\u0004b\u0002B:\u0005\u0002\u0007\u0011qY\u0001\u0016gf\u001cH/Z7TiJ,\u0017-\u001c)beRLG/[8o\u0003\u0019\u0019\u0007n\\8tKR!\u00111\u001fB=\u0011!\u0011Yh\u0011I\u0001\u0002\u0004I\u0017!D;qI\u0006$Xm\u00115p_N,'/\u0001\tdQ>|7/\u001a\u0013eK\u001a\fW\u000f\u001c;%c\u0005!\u0001o\u001c7m)\u0011\u0011\u0019Ba!\t\u000f\t\u0015U\t1\u0001\u00024\u0005Q1/_:uK6t\u0015-\\3\u0002\u0013Q\u0014\u00180\u00169eCR,G\u0003\u0002B\n\u0005\u0017CqA!\u0019G\u0001\u0004\t9-A\u0004sK\u001a\u0014Xm\u001d5\u0002E]\u0014\u0018\u000e^3Ee\u0006LgnQ8oiJ|G.T3tg\u0006<W\rV8TgB\fV/Z;f\u0003\u0019)\b\u000fZ1uKR\u0019\u0011N!&\t\u000f\tM\u0014\n1\u0001\u0002H\u0006y!/Z7pm\u0016\\U-\u001f\"vG.,G\u000f\u0006\u0003\u0002H\nm\u0005b\u0002BO\u0015\u0002\u0007\u0011qY\u0001\u0011gN\u0004x+\u001b;i\u0017\u0016L()^2lKR\u0004")
/* loaded from: input_file:org/apache/samza/system/SystemConsumers.class */
public class SystemConsumers implements Logging, TimerUtil {
    private final MessageChooser chooser;
    private final Map<String, SystemConsumer> consumers;
    private final SystemAdmins systemAdmins;
    private final SerdeManager serdeManager;
    private final SystemConsumersMetrics metrics;
    private final int noNewMessagesTimeout;
    private final boolean dropDeserializationError;
    private final int pollIntervalMs;
    private final Function0<Object> clock;
    private final int elasticityFactor;
    private final String runId;
    private final HashMap<SystemStreamPartition, String> sspToRegisteredOffsets;
    private final HashSet<SystemStreamPartition> sspKeyBucketsRegistered;
    private final HashSet<SystemStreamPartition> org$apache$samza$system$SystemConsumers$$intermediateSSPs;
    private final HashSet<String> intermediateSystems;
    private final HashMap<SystemStreamPartition, Queue<IncomingMessageEnvelope>> org$apache$samza$system$SystemConsumers$$unprocessedMessagesBySSP;
    private final HashSet<SystemStreamPartition> endOfStreamSSPs;
    private final HashMap<String, Set<SystemStreamPartition>> emptySystemStreamPartitionsBySystem;
    private boolean started;
    private volatile boolean isDraining;
    private int timeout;
    private long lastPollNs;
    private int totalUnprocessedMessages;
    private final String loggerName;
    private Logger logger;
    private final String startupLoggerName;
    private Logger startupLogger;
    private volatile byte bitmap$0;

    public static boolean DEFAULT_DROP_SERIALIZATION_ERROR() {
        return SystemConsumers$.MODULE$.DEFAULT_DROP_SERIALIZATION_ERROR();
    }

    public static int DEFAULT_NO_NEW_MESSAGES_TIMEOUT() {
        return SystemConsumers$.MODULE$.DEFAULT_NO_NEW_MESSAGES_TIMEOUT();
    }

    @Override // org.apache.samza.util.TimerUtil
    public <T> T updateTimer(Timer timer, Function0<T> function0) {
        Object updateTimer;
        updateTimer = updateTimer(timer, function0);
        return (T) updateTimer;
    }

    @Override // org.apache.samza.util.TimerUtil
    public long updateTimerAndGetDuration(Timer timer, Function1<Object, BoxedUnit> function1) {
        long updateTimerAndGetDuration;
        updateTimerAndGetDuration = updateTimerAndGetDuration(timer, function1);
        return updateTimerAndGetDuration;
    }

    @Override // org.apache.samza.util.Logging
    public void startupLog(Function0<Object> function0) {
        startupLog(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void trace(Function0<Object> function0) {
        trace(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void trace(Function0<Object> function0, Function0<Throwable> function02) {
        trace(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void debug(Function0<Object> function0) {
        debug(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void debug(Function0<Object> function0, Function0<Throwable> function02) {
        debug(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void info(Function0<Object> function0) {
        info(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void info(Function0<Object> function0, Function0<Throwable> function02) {
        info(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void warn(Function0<Object> function0) {
        warn(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void warn(Function0<Object> function0, Function0<Throwable> function02) {
        warn(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void error(Function0<Object> function0) {
        error(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void error(Function0<Object> function0, Function0<Throwable> function02) {
        error(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void putMDC(Function0<String> function0, Function0<String> function02) {
        putMDC(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public String getMDC(Function0<String> function0) {
        String mdc;
        mdc = getMDC(function0);
        return mdc;
    }

    @Override // org.apache.samza.util.Logging
    public void removeMDC(Function0<String> function0) {
        removeMDC(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void clearMDC() {
        clearMDC();
    }

    @Override // org.apache.samza.util.Logging
    public String loggerName() {
        return this.loggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.system.SystemConsumers] */
    private Logger logger$lzycompute() {
        Logger logger;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                logger = logger();
                this.logger = logger;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.logger;
    }

    @Override // org.apache.samza.util.Logging
    public Logger logger() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? logger$lzycompute() : this.logger;
    }

    @Override // org.apache.samza.util.Logging
    public String startupLoggerName() {
        return this.startupLoggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.system.SystemConsumers] */
    private Logger startupLogger$lzycompute() {
        Logger startupLogger;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                startupLogger = startupLogger();
                this.startupLogger = startupLogger;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.startupLogger;
    }

    @Override // org.apache.samza.util.Logging
    public Logger startupLogger() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? startupLogger$lzycompute() : this.startupLogger;
    }

    @Override // org.apache.samza.util.Logging
    public void org$apache$samza$util$Logging$_setter_$loggerName_$eq(String str) {
        this.loggerName = str;
    }

    @Override // org.apache.samza.util.Logging
    public void org$apache$samza$util$Logging$_setter_$startupLoggerName_$eq(String str) {
        this.startupLoggerName = str;
    }

    public int pollIntervalMs() {
        return this.pollIntervalMs;
    }

    @Override // org.apache.samza.util.TimerUtil
    public Function0<Object> clock() {
        return this.clock;
    }

    public int elasticityFactor() {
        return this.elasticityFactor;
    }

    public String runId() {
        return this.runId;
    }

    private HashMap<SystemStreamPartition, String> sspToRegisteredOffsets() {
        return this.sspToRegisteredOffsets;
    }

    private HashSet<SystemStreamPartition> sspKeyBucketsRegistered() {
        return this.sspKeyBucketsRegistered;
    }

    public HashSet<SystemStreamPartition> org$apache$samza$system$SystemConsumers$$intermediateSSPs() {
        return this.org$apache$samza$system$SystemConsumers$$intermediateSSPs;
    }

    private HashSet<String> intermediateSystems() {
        return this.intermediateSystems;
    }

    public HashMap<SystemStreamPartition, Queue<IncomingMessageEnvelope>> org$apache$samza$system$SystemConsumers$$unprocessedMessagesBySSP() {
        return this.org$apache$samza$system$SystemConsumers$$unprocessedMessagesBySSP;
    }

    private HashSet<SystemStreamPartition> endOfStreamSSPs() {
        return this.endOfStreamSSPs;
    }

    private HashMap<String, Set<SystemStreamPartition>> emptySystemStreamPartitionsBySystem() {
        return this.emptySystemStreamPartitionsBySystem;
    }

    private boolean started() {
        return this.started;
    }

    private void started_$eq(boolean z) {
        this.started = z;
    }

    private boolean isDraining() {
        return this.isDraining;
    }

    private void isDraining_$eq(boolean z) {
        this.isDraining = z;
    }

    public int timeout() {
        return this.timeout;
    }

    public void timeout_$eq(int i) {
        this.timeout = i;
    }

    public long lastPollNs() {
        return this.lastPollNs;
    }

    public void lastPollNs_$eq(long j) {
        this.lastPollNs = j;
    }

    public int totalUnprocessedMessages() {
        return this.totalUnprocessedMessages;
    }

    public void totalUnprocessedMessages_$eq(int i) {
        this.totalUnprocessedMessages = i;
    }

    public void start() {
        ((TraversableLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(sspToRegisteredOffsets()).asScala()).withFilter(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$start$1(tuple2));
        }).foreach(tuple22 -> {
            $anonfun$start$2(this, tuple22);
            return BoxedUnit.UNIT;
        });
        debug(() -> {
            return "Starting consumers.";
        });
        ((Growable) JavaConverters$.MODULE$.mapAsScalaMapConverter(emptySystemStreamPartitionsBySystem()).asScala()).$plus$plus$eq(((TraversableLike) JavaConverters$.MODULE$.asScalaSetConverter(org$apache$samza$system$SystemConsumers$$unprocessedMessagesBySSP().keySet()).asScala()).groupBy(systemStreamPartition -> {
            return systemStreamPartition.getSystem();
        }).mapValues(set -> {
            return new HashSet((Collection) JavaConverters$.MODULE$.seqAsJavaListConverter(set.toSeq()).asJava());
        }));
        this.consumers.keySet().foreach(str -> {
            $anonfun$start$6(this, str);
            return BoxedUnit.UNIT;
        });
        this.consumers.values().foreach(systemConsumer -> {
            systemConsumer.start();
            return BoxedUnit.UNIT;
        });
        this.chooser.start();
        if (isDraining()) {
            writeDrainControlMessageToSspQueue();
        }
        started_$eq(true);
        refresh();
    }

    public void stop() {
        if (!started()) {
            debug(() -> {
                return "Ignoring the consumers stop request since it never started.";
            });
            return;
        }
        debug(() -> {
            return "Stopping consumers.";
        });
        this.consumers.values().foreach(systemConsumer -> {
            systemConsumer.stop();
            return BoxedUnit.UNIT;
        });
        this.chooser.stop();
        started_$eq(false);
    }

    public void drain() {
        if (isDraining()) {
            return;
        }
        isDraining_$eq(true);
        info(() -> {
            return "SystemConsumers is set to drain mode.";
        });
        if (started()) {
            writeDrainControlMessageToSspQueue();
        }
    }

    public void register(SystemStreamPartition systemStreamPartition, String str) {
        sspKeyBucketsRegistered().add(systemStreamPartition);
        SystemStreamPartition removeKeyBucket = removeKeyBucket(systemStreamPartition);
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Registering stream: %s, %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{removeKeyBucket, str}));
        });
        if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(str)) {
            info(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Stream : %s is already at end of stream")).format(Predef$.MODULE$.genericWrapArray(new Object[]{removeKeyBucket}));
            });
            endOfStreamSSPs().add(removeKeyBucket);
            return;
        }
        this.metrics.registerSystemStreamPartition(removeKeyBucket);
        org$apache$samza$system$SystemConsumers$$unprocessedMessagesBySSP().put(removeKeyBucket, new ArrayDeque());
        this.chooser.register(removeKeyBucket, str);
        try {
            String str2 = sspToRegisteredOffsets().get(removeKeyBucket);
            Integer offsetComparator = this.systemAdmins.getSystemAdmin(removeKeyBucket.getSystem()).offsetComparator(str2, str);
            if (str2 == null || (offsetComparator != null && Predef$.MODULE$.Integer2int(offsetComparator) > 0)) {
                sspToRegisteredOffsets().put(removeKeyBucket, str);
            }
        } catch (NoSuchElementException e) {
            throw new SystemConsumersException(new StringBuilder(27).append("can't register ").append(removeKeyBucket.getSystem()).append("'s consumer.").toString(), e);
        }
    }

    public void registerIntermediateSSP(SystemStreamPartition systemStreamPartition) {
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Registering intermediate stream: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{systemStreamPartition}));
        });
        org$apache$samza$system$SystemConsumers$$intermediateSSPs().add(systemStreamPartition);
        intermediateSystems().add(systemStreamPartition.getSystem());
    }

    public boolean isEndOfStream(SystemStreamPartition systemStreamPartition) {
        return endOfStreamSSPs().contains(removeKeyBucket(systemStreamPartition));
    }

    public IncomingMessageEnvelope choose(boolean z) {
        IncomingMessageEnvelope choose = this.chooser.choose();
        updateTimer(this.metrics.deserializationNs(), () -> {
            if (choose == null) {
                this.trace(() -> {
                    return "Chooser returned null.";
                });
                this.metrics.choseNull().inc();
                this.timeout_$eq(z ? this.noNewMessagesTimeout : 0);
                return;
            }
            SystemStreamPartition systemStreamPartition = choose.getSystemStreamPartition();
            if (choose.isEndOfStream()) {
                this.info(() -> {
                    return new StringOps(Predef$.MODULE$.augmentString("End of stream reached for partition: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{systemStreamPartition}));
                });
                BoxesRunTime.boxToBoolean(this.endOfStreamSSPs().add(systemStreamPartition));
            } else {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            }
            this.trace(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Chooser returned an incoming message envelope: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{choose}));
            });
            this.timeout_$eq(0);
            if (this.elasticityFactor() == 1) {
                this.metrics.choseObject().inc();
                ((Counter) this.metrics.systemStreamMessagesChosen().apply(choose.getSystemStreamPartition())).inc();
            } else if (this.sspKeyBucketsRegistered().contains(choose.getSystemStreamPartition(this.elasticityFactor()))) {
                this.metrics.choseObject().inc();
                ((Counter) this.metrics.systemStreamMessagesChosen().apply(choose.getSystemStreamPartition())).inc();
            } else {
                this.metrics.choseNull().inc();
            }
            if (z) {
                this.trace(() -> {
                    return new StringBuilder(19).append("Update chooser for ").append(systemStreamPartition.getPartition()).toString();
                });
                this.tryUpdate(systemStreamPartition);
            }
        });
        updateTimer(this.metrics.pollNs(), () -> {
            if (choose == null || TimeUnit.NANOSECONDS.toMillis(this.clock().apply$mcJ$sp() - this.lastPollNs()) > this.pollIntervalMs()) {
                this.refresh();
            }
        });
        return choose;
    }

    public boolean choose$default$1() {
        return true;
    }

    public void org$apache$samza$system$SystemConsumers$$poll(String str) {
        Set emptySet;
        trace(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Polling system consumer: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str}));
        });
        ((Counter) this.metrics.systemPolls().apply(str)).inc();
        trace(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Getting fetch map for system: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str}));
        });
        if (emptySystemStreamPartitionsBySystem().containsKey(str)) {
            HashSet hashSet = new HashSet(emptySystemStreamPartitionsBySystem().get(str));
            hashSet.removeAll(endOfStreamSSPs());
            emptySet = hashSet;
        } else {
            emptySet = Collections.emptySet();
        }
        Set set = emptySet;
        if (set == null || set.size() <= 0) {
            trace(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Skipping polling for %s. Already have messages available for all registered SystemStreamPartitions.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str}));
            });
            return;
        }
        SystemConsumer systemConsumer = (SystemConsumer) this.consumers.apply(str);
        trace(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Fetching: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{set}));
        });
        ((Counter) this.metrics.systemStreamPartitionFetchesPerPoll().apply(str)).inc(set.size());
        java.util.Map poll = systemConsumer.poll(set, timeout());
        trace(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got incoming message envelopes: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{poll}));
        });
        ((Counter) this.metrics.systemMessagesPerPoll().apply(str)).inc();
        for (Map.Entry entry : poll.entrySet()) {
            SystemStreamPartition systemStreamPartition = (SystemStreamPartition) entry.getKey();
            ArrayDeque arrayDeque = new ArrayDeque((Collection) entry.getValue());
            int size = arrayDeque.size();
            totalUnprocessedMessages_$eq(totalUnprocessedMessages() + size);
            if (size > 0) {
                org$apache$samza$system$SystemConsumers$$unprocessedMessagesBySSP().put(systemStreamPartition, arrayDeque);
                if (emptySystemStreamPartitionsBySystem().get(systemStreamPartition.getSystem()).remove(systemStreamPartition)) {
                    tryUpdate(systemStreamPartition);
                }
            }
        }
    }

    public void tryUpdate(SystemStreamPartition systemStreamPartition) {
        SystemStreamPartition removeKeyBucket = removeKeyBucket(systemStreamPartition);
        boolean z = false;
        try {
            z = update(removeKeyBucket);
            if (z) {
                return;
            }
            emptySystemStreamPartitionsBySystem().get(removeKeyBucket.getSystem()).add(removeKeyBucket);
        } catch (Throwable th) {
            if (!z) {
                emptySystemStreamPartitionsBySystem().get(removeKeyBucket.getSystem()).add(removeKeyBucket);
            }
            throw th;
        }
    }

    private void refresh() {
        lastPollNs_$eq(clock().apply$mcJ$sp());
        if (isDraining()) {
            trace(() -> {
                return "Refreshing chooser with new messages from intermediate systems.";
            });
            intermediateSystems().forEach(new Consumer<String>(this) { // from class: org.apache.samza.system.SystemConsumers$$anon$1
                private final /* synthetic */ SystemConsumers $outer;

                @Override // java.util.function.Consumer
                public Consumer<String> andThen(Consumer<? super String> consumer) {
                    return super.andThen(consumer);
                }

                @Override // java.util.function.Consumer
                public void accept(String str) {
                    this.$outer.org$apache$samza$system$SystemConsumers$$poll(str);
                }

                {
                    if (this == null) {
                        throw null;
                    }
                    this.$outer = this;
                }
            });
        } else {
            trace(() -> {
                return "Refreshing chooser with new messages.";
            });
            this.consumers.keys().foreach(str -> {
                this.org$apache$samza$system$SystemConsumers$$poll(str);
                return BoxedUnit.UNIT;
            });
        }
    }

    private void writeDrainControlMessageToSspQueue() {
        HashSet hashSet = new HashSet(sspKeyBucketsRegistered());
        hashSet.removeAll(org$apache$samza$system$SystemConsumers$$intermediateSSPs());
        hashSet.removeAll(endOfStreamSSPs());
        hashSet.forEach(new Consumer<SystemStreamPartition>(this) { // from class: org.apache.samza.system.SystemConsumers$$anon$2
            private final /* synthetic */ SystemConsumers $outer;

            @Override // java.util.function.Consumer
            public Consumer<SystemStreamPartition> andThen(Consumer<? super SystemStreamPartition> consumer) {
                return super.andThen(consumer);
            }

            @Override // java.util.function.Consumer
            public void accept(SystemStreamPartition systemStreamPartition) {
                Queue<IncomingMessageEnvelope> arrayDeque = this.$outer.org$apache$samza$system$SystemConsumers$$unprocessedMessagesBySSP().containsKey(systemStreamPartition) ? this.$outer.org$apache$samza$system$SystemConsumers$$unprocessedMessagesBySSP().get(systemStreamPartition) : new ArrayDeque<>();
                if (!this.$outer.org$apache$samza$system$SystemConsumers$$intermediateSSPs().isEmpty()) {
                    arrayDeque.add(IncomingMessageEnvelope.buildWatermarkEnvelope(systemStreamPartition, Long.MAX_VALUE));
                    this.$outer.totalUnprocessedMessages_$eq(this.$outer.totalUnprocessedMessages() + 1);
                }
                arrayDeque.add(IncomingMessageEnvelope.buildDrainMessage(systemStreamPartition, this.$outer.runId()));
                this.$outer.totalUnprocessedMessages_$eq(this.$outer.totalUnprocessedMessages() + 1);
                this.$outer.org$apache$samza$system$SystemConsumers$$unprocessedMessagesBySSP().put(systemStreamPartition, arrayDeque);
                this.$outer.tryUpdate(systemStreamPartition);
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        });
    }

    private boolean update(SystemStreamPartition systemStreamPartition) {
        Some some;
        boolean z = false;
        Queue<IncomingMessageEnvelope> queue = org$apache$samza$system$SystemConsumers$$unprocessedMessagesBySSP().get(systemStreamPartition);
        while (queue.size() > 0 && !z) {
            try {
                some = new Some(this.serdeManager.fromBytes(queue.remove()));
            } catch (Throwable th) {
                if (th != null && !this.dropDeserializationError) {
                    throw new SystemConsumersException(new StringOps(Predef$.MODULE$.augmentString("Cannot deserialize an incoming message for %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{systemStreamPartition.getSystemStream().toString()})), th);
                }
                if (th == null) {
                    throw th;
                }
                debug(() -> {
                    return new StringOps(Predef$.MODULE$.augmentString("Cannot deserialize an incoming message for %s. Dropping the error message.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{systemStreamPartition.getSystemStream().toString()}));
                }, () -> {
                    return th;
                });
                this.metrics.deserializationError().inc();
                some = None$.MODULE$;
            }
            Some some2 = some;
            if (some2.isDefined()) {
                this.chooser.update((IncomingMessageEnvelope) some2.get());
                z = true;
            }
            totalUnprocessedMessages_$eq(totalUnprocessedMessages() - 1);
        }
        return z;
    }

    private SystemStreamPartition removeKeyBucket(SystemStreamPartition systemStreamPartition) {
        return new SystemStreamPartition(systemStreamPartition.getSystem(), systemStreamPartition.getStream(), systemStreamPartition.getPartition());
    }

    public static final /* synthetic */ boolean $anonfun$start$1(Tuple2 tuple2) {
        return tuple2 != null;
    }

    public static final /* synthetic */ void $anonfun$start$2(SystemConsumers systemConsumers, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        SystemStreamPartition systemStreamPartition = (SystemStreamPartition) tuple2._1();
        ((SystemConsumer) systemConsumers.consumers.apply(systemStreamPartition.getSystem())).register(systemConsumers.removeKeyBucket(systemStreamPartition), (String) tuple2._2());
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ void $anonfun$start$6(SystemConsumers systemConsumers, String str) {
        systemConsumers.metrics.registerSystem(str);
    }

    public SystemConsumers(MessageChooser messageChooser, scala.collection.immutable.Map<String, SystemConsumer> map, SystemAdmins systemAdmins, SerdeManager serdeManager, SystemConsumersMetrics systemConsumersMetrics, int i, boolean z, int i2, Function0<Object> function0, int i3, String str) {
        this.chooser = messageChooser;
        this.consumers = map;
        this.systemAdmins = systemAdmins;
        this.serdeManager = serdeManager;
        this.metrics = systemConsumersMetrics;
        this.noNewMessagesTimeout = i;
        this.dropDeserializationError = z;
        this.pollIntervalMs = i2;
        this.clock = function0;
        this.elasticityFactor = i3;
        this.runId = str;
        Logging.$init$(this);
        TimerUtil.$init$(this);
        this.sspToRegisteredOffsets = new HashMap<>();
        this.sspKeyBucketsRegistered = new HashSet<>();
        this.org$apache$samza$system$SystemConsumers$$intermediateSSPs = new HashSet<>();
        this.intermediateSystems = new HashSet<>();
        this.org$apache$samza$system$SystemConsumers$$unprocessedMessagesBySSP = new HashMap<>();
        this.endOfStreamSSPs = new HashSet<>();
        this.emptySystemStreamPartitionsBySystem = new HashMap<>();
        this.started = false;
        this.isDraining = false;
        this.timeout = i;
        this.lastPollNs = 0L;
        this.totalUnprocessedMessages = 0;
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got elasticity factor: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.elasticityFactor())}));
        });
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got stream consumers: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{this.consumers}));
        });
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got no new message timeout: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.noNewMessagesTimeout)}));
        });
        systemConsumersMetrics.setTimeout(() -> {
            return this.timeout();
        });
        systemConsumersMetrics.setNeededByChooser(() -> {
            return this.emptySystemStreamPartitionsBySystem().size();
        });
        systemConsumersMetrics.setUnprocessedMessages(() -> {
            return this.totalUnprocessedMessages();
        });
    }
}
