package kafka.consumer;

import java.util.Map;
import java.util.Properties;
import java.util.Set;
import kafka.common.MessageStreamsExistException;
import kafka.integration.KafkaServerTestHarness;
import kafka.javaapi.consumer.ConsumerRebalanceListener;
import kafka.message.DefaultCompressionCodec$;
import kafka.message.GZIPCompressionCodec$;
import kafka.message.NoCompressionCodec$;
import kafka.serializer.StringDecoder;
import kafka.serializer.StringDecoder$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaRequestHandler;
import kafka.utils.Pool;
import kafka.utils.TestUtils$;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;
import kafka.utils.ZkUtils$;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.scalactic.source.Position;
import scala.Function0;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.Set$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.HashMap;
import scala.math.Ordering$String$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

/* compiled from: ZookeeperConsumerConnectorTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\tUb\u0001B\u0001\u0003\u0001\u001d\u0011aDW8pW\u0016,\u0007/\u001a:D_:\u001cX/\\3s\u0007>tg.Z2u_J$Vm\u001d;\u000b\u0005\r!\u0011\u0001C2p]N,X.\u001a:\u000b\u0003\u0015\tQa[1gW\u0006\u001c\u0001aE\u0002\u0001\u00119\u0001\"!\u0003\u0007\u000e\u0003)Q!a\u0003\u0003\u0002\u0017%tG/Z4sCRLwN\\\u0005\u0003\u001b)\u0011acS1gW\u0006\u001cVM\u001d<feR+7\u000f\u001e%be:,7o\u001d\t\u0003\u001fIi\u0011\u0001\u0005\u0006\u0003#\u0011\tQ!\u001e;jYNL!a\u0005\t\u0003\u000f1{wmZ5oO\")Q\u0003\u0001C\u0001-\u00051A(\u001b8jiz\"\u0012a\u0006\t\u00031\u0001i\u0011A\u0001\u0005\b5\u0001\u0011\r\u0011\"\u0001\u001c\u0003I\u0011VMY1mC:\u001cWMQ1dW>4g-T:\u0016\u0003q\u0001\"!\b\u0011\u000e\u0003yQ\u0011aH\u0001\u0006g\u000e\fG.Y\u0005\u0003Cy\u00111!\u00138u\u0011\u0019\u0019\u0003\u0001)A\u00059\u0005\u0019\"+\u001a2bY\u0006t7-\u001a\"bG.|gMZ'tA!9Q\u0005\u0001a\u0001\n\u00031\u0013\u0001\u00023jeN,\u0012a\n\t\u0003\u001f!J!!\u000b\t\u0003!i[uI]8vaR{\u0007/[2ESJ\u001c\bbB\u0016\u0001\u0001\u0004%\t\u0001L\u0001\tI&\u00148o\u0018\u0013fcR\u0011Q\u0006\r\t\u0003;9J!a\f\u0010\u0003\tUs\u0017\u000e\u001e\u0005\bc)\n\t\u00111\u0001(\u0003\rAH%\r\u0005\u0007g\u0001\u0001\u000b\u0015B\u0014\u0002\u000b\u0011L'o\u001d\u0011\t\u000fU\u0002!\u0019!C\u00017\u0005Aa.^7O_\u0012,7\u000f\u0003\u00048\u0001\u0001\u0006I\u0001H\u0001\n]Vlgj\u001c3fg\u0002Bq!\u000f\u0001C\u0002\u0013\u00051$\u0001\u0005ok6\u0004\u0016M\u001d;t\u0011\u0019Y\u0004\u0001)A\u00059\u0005Ia.^7QCJ$8\u000f\t\u0005\b{\u0001\u0011\r\u0011\"\u0001?\u0003\u0015!x\u000e]5d+\u0005y\u0004C\u0001!F\u001b\u0005\t%B\u0001\"D\u0003\u0011a\u0017M\\4\u000b\u0003\u0011\u000bAA[1wC&\u0011a)\u0011\u0002\u0007'R\u0014\u0018N\\4\t\r!\u0003\u0001\u0015!\u0003@\u0003\u0019!x\u000e]5dA!9!\n\u0001b\u0001\n\u0003Y\u0015aD8wKJ\u0014\u0018\u000eZ5oOB\u0013x\u000e]:\u0016\u00031\u0003\"!\u0014)\u000e\u00039S!aT\"\u0002\tU$\u0018\u000e\\\u0005\u0003#:\u0013!\u0002\u0015:pa\u0016\u0014H/[3t\u0011\u0019\u0019\u0006\u0001)A\u0005\u0019\u0006\u0001rN^3se&$\u0017N\\4Qe>\u00048\u000f\t\u0005\u0006+\u0002!\tEV\u0001\u0010O\u0016tWM]1uK\u000e{gNZ5hgV\tq\u000bE\u0002YA\u000et!!\u00170\u000f\u0005ikV\"A.\u000b\u0005q3\u0011A\u0002\u001fs_>$h(C\u0001 \u0013\tyf$A\u0004qC\u000e\\\u0017mZ3\n\u0005\u0005\u0014'aA*fc*\u0011qL\b\t\u0003I\u001el\u0011!\u001a\u0006\u0003M\u0012\taa]3sm\u0016\u0014\u0018B\u00015f\u0005-Y\u0015MZ6b\u0007>tg-[4\t\u000f)\u0004!\u0019!C\u0001}\u0005)qM]8va\"1A\u000e\u0001Q\u0001\n}\naa\u001a:pkB\u0004\u0003b\u00028\u0001\u0005\u0004%\tAP\u0001\nG>t7/^7feBBa\u0001\u001d\u0001!\u0002\u0013y\u0014AC2p]N,X.\u001a:1A!9!\u000f\u0001b\u0001\n\u0003q\u0014!C2p]N,X.\u001a:2\u0011\u0019!\b\u0001)A\u0005\u007f\u0005Q1m\u001c8tk6,'/\r\u0011\t\u000fY\u0004!\u0019!C\u0001}\u0005I1m\u001c8tk6,'O\r\u0005\u0007q\u0002\u0001\u000b\u0011B \u0002\u0015\r|gn];nKJ\u0014\u0004\u0005C\u0004{\u0001\t\u0007I\u0011\u0001 \u0002\u0013\r|gn];nKJ\u001c\u0004B\u0002?\u0001A\u0003%q(\u0001\u0006d_:\u001cX/\\3sg\u0001BqA \u0001C\u0002\u0013\u00051$A\u0005o\u001b\u0016\u001c8/Y4fg\"9\u0011\u0011\u0001\u0001!\u0002\u0013a\u0012A\u00038NKN\u001c\u0018mZ3tA!9\u0011Q\u0001\u0001\u0005B\u0005\u001d\u0011!B:fiV\u0003H#A\u0017)\t\u0005\r\u00111\u0002\t\u0005\u0003\u001b\t9\"\u0004\u0002\u0002\u0010)!\u0011\u0011CA\n\u0003\u0015QWO\\5u\u0015\t\t)\"A\u0002pe\u001eLA!!\u0007\u0002\u0010\t1!)\u001a4pe\u0016Dq!!\b\u0001\t\u0003\n9!\u0001\u0005uK\u0006\u0014Hi\\<oQ\u0011\tY\"!\t\u0011\t\u00055\u00111E\u0005\u0005\u0003K\tyAA\u0003BMR,'\u000fC\u0004\u0002*\u0001!\t!a\u0002\u0002\u0013Q,7\u000f\u001e\"bg&\u001c\u0007\u0006BA\u0014\u0003[\u0001B!!\u0004\u00020%!\u0011\u0011GA\b\u0005\u0011!Vm\u001d;\t\u000f\u0005U\u0002\u0001\"\u0001\u0002\b\u0005yA/Z:u\u0007>l\u0007O]3tg&|g\u000e\u000b\u0003\u00024\u00055\u0002bBA\u001e\u0001\u0011\u0005\u0011qA\u0001\u001ei\u0016\u001cHoQ8naJ,7o]5p]N+GoQ8ogVl\u0007\u000f^5p]\"\"\u0011\u0011HA\u0017\u0011\u001d\t\t\u0005\u0001C\u0001\u0003\u000f\t1\u0003^3ti\u000e{gn];nKJ$UmY8eKJDC!a\u0010\u0002.!9\u0011q\t\u0001\u0005\u0002\u0005\u001d\u0011a\b;fgRdU-\u00193feN+G.Z2uS>tgi\u001c:QCJ$\u0018\u000e^5p]\"\"\u0011QIA\u0017\u0011\u001d\ti\u0005\u0001C\u0001\u0003\u000f\tQ\u0004^3ti\u000e{gn];nKJ\u0014VMY1mC:\u001cW\rT5ti\u0016tWM\u001d\u0015\u0005\u0003\u0017\ni\u0003C\u0004\u0002T\u0001!\t!!\u0016\u0002'\u001d,GOW&DQ&dGM]3o-\u0006dW/Z:\u0015\t\u0005]\u00131\u000f\t\u0007\u00033\ny&!\u0019\u000e\u0005\u0005m#bAA/=\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0007\u0005\fY\u0006E\u0004\u001e\u0003G\n9'a\u001a\n\u0007\u0005\u0015dD\u0001\u0004UkBdWM\r\t\u0005\u0003S\nyGD\u0002\u001e\u0003WJ1!!\u001c\u001f\u0003\u0019\u0001&/\u001a3fM&\u0019a)!\u001d\u000b\u0007\u00055d\u0004\u0003\u0005\u0002v\u0005E\u0003\u0019AA4\u0003\u0011\u0001\u0018\r\u001e5\u0007\r\u0005e\u0004\u0001BA>\u0005u!Vm\u001d;D_:\u001cX/\\3s%\u0016\u0014\u0017\r\\1oG\u0016d\u0015n\u001d;f]\u0016\u00148CBA<\u0003{\n\u0019\tE\u0002A\u0003\u007fJ1!!!B\u0005\u0019y%M[3diB!\u0011QQAG\u001b\t\t9IC\u0002\u0004\u0003\u0013S1!a#\u0005\u0003\u001dQ\u0017M^1ba&LA!a$\u0002\b\nI2i\u001c8tk6,'OU3cC2\fgnY3MSN$XM\\3s\u0011\u001d)\u0012q\u000fC\u0001\u0003'#\"!!&\u0011\t\u0005]\u0015qO\u0007\u0002\u0001!Q\u00111TA<\u0001\u0004%\t!!(\u0002?\t,gm\u001c:f%\u0016dW-Y:j]\u001e\u0004\u0016M\u001d;ji&|gn]\"bY2,G-\u0006\u0002\u0002 B\u0019Q$!)\n\u0007\u0005\rfDA\u0004C_>dW-\u00198\t\u0015\u0005\u001d\u0016q\u000fa\u0001\n\u0003\tI+A\u0012cK\u001a|'/\u001a*fY\u0016\f7/\u001b8h!\u0006\u0014H/\u001b;j_:\u001c8)\u00197mK\u0012|F%Z9\u0015\u00075\nY\u000bC\u00052\u0003K\u000b\t\u00111\u0001\u0002 \"I\u0011qVA<A\u0003&\u0011qT\u0001!E\u00164wN]3SK2,\u0017m]5oOB\u000b'\u000f^5uS>t7oQ1mY\u0016$\u0007\u0005\u0003\u0006\u00024\u0006]\u0004\u0019!C\u0001\u0003;\u000bADY3g_J,7\u000b^1si&twMR3uG\",'o]\"bY2,G\r\u0003\u0006\u00028\u0006]\u0004\u0019!C\u0001\u0003s\u000b\u0001EY3g_J,7\u000b^1si&twMR3uG\",'o]\"bY2,Gm\u0018\u0013fcR\u0019Q&a/\t\u0013E\n),!AA\u0002\u0005}\u0005\"CA`\u0003o\u0002\u000b\u0015BAP\u0003u\u0011WMZ8sKN#\u0018M\u001d;j]\u001e4U\r^2iKJ\u001c8)\u00197mK\u0012\u0004\u0003BCAb\u0003o\u0002\r\u0011\"\u0001\u0002F\u0006Q1m\u001c8tk6,'/\u00133\u0016\u0005\u0005\u001d\u0004BCAe\u0003o\u0002\r\u0011\"\u0001\u0002L\u0006q1m\u001c8tk6,'/\u00133`I\u0015\fHcA\u0017\u0002N\"I\u0011'a2\u0002\u0002\u0003\u0007\u0011q\r\u0005\n\u0003#\f9\b)Q\u0005\u0003O\n1bY8ogVlWM]%eA!Q\u0011Q[A<\u0001\u0004%\t!a6\u0002%A\f'\u000f^5uS>twj\u001e8feND\u0017\u000e]\u000b\u0003\u00033\u0004r!TAn\u0003O\ny.C\u0002\u0002^:\u00131!T1q!\u0015i\u0015\u0011]As\u0013\r\t\u0019O\u0014\u0002\u0004'\u0016$\bc\u0001!\u0002h&\u0019\u0011\u0011^!\u0003\u000f%sG/Z4fe\"Q\u0011Q^A<\u0001\u0004%\t!a<\u0002-A\f'\u000f^5uS>twj\u001e8feND\u0017\u000e]0%KF$2!LAy\u0011%\t\u00141^A\u0001\u0002\u0004\tI\u000eC\u0005\u0002v\u0006]\u0004\u0015)\u0003\u0002Z\u0006\u0019\u0002/\u0019:uSRLwN\\(x]\u0016\u00148\u000f[5qA!Q\u0011\u0011`A<\u0001\u0004%\t!a?\u00021\u001ddwNY1m!\u0006\u0014H/\u001b;j_:|uO\\3sg\"L\u0007/\u0006\u0002\u0002~B9Q*a7\u0002h\u0005}\bcB'\u0002\\\u0006\u0015(\u0011\u0001\t\u00041\t\r\u0011b\u0001B\u0003\u0005\t\u00012i\u001c8tk6,'\u000f\u00165sK\u0006$\u0017\n\u001a\u0005\u000b\u0005\u0013\t9\b1A\u0005\u0002\t-\u0011\u0001H4m_\n\fG\u000eU1si&$\u0018n\u001c8Po:,'o\u001d5ja~#S-\u001d\u000b\u0004[\t5\u0001\"C\u0019\u0003\b\u0005\u0005\t\u0019AA\u007f\u0011%\u0011\t\"a\u001e!B\u0013\ti0A\rhY>\u0014\u0017\r\u001c)beRLG/[8o\u001f^tWM]:iSB\u0004\u0003\u0002\u0003B\u000b\u0003o\"\tEa\u0006\u00023\t,gm\u001c:f%\u0016dW-Y:j]\u001e\u0004\u0016M\u001d;ji&|gn\u001d\u000b\u0004[\te\u0001\u0002CAk\u0005'\u0001\r!!7\t\u0011\tu\u0011q\u000fC!\u0005?\taCY3g_J,7\u000b^1si&twMR3uG\",'o\u001d\u000b\u0006[\t\u0005\"1\u0005\u0005\t\u0003\u0007\u0014Y\u00021\u0001\u0002h!A\u0011\u0011 B\u000e\u0001\u0004\ti\u0010K\u0004\u0001\u0005O\u0011iC!\r\u0011\u0007u\u0011I#C\u0002\u0003,y\u0011!\u0002Z3qe\u0016\u001c\u0017\r^3eC\t\u0011y#\u0001%UQ&\u001c\b\u0005^3ti\u0002B\u0017m\u001d\u0011cK\u0016t\u0007\u0005Z3qe\u0016\u001c\u0017\r^3eA\u0005tG\rI5uA]LG\u000e\u001c\u0011cK\u0002\u0012X-\\8wK\u0012\u0004\u0013N\u001c\u0011bA\u0019,H/\u001e:fAI,G.Z1tK\u0006\u0012!1G\u0001\ta9\n\u0004G\f\u0019/a\u0001")
/* loaded from: input_file:kafka/consumer/ZookeeperConsumerConnectorTest.class */
public class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness {
    private final int RebalanceBackoffMs = 5000;
    private ZKGroupTopicDirs dirs = null;
    private final int numNodes = 2;
    private final int numParts = 2;
    private final String topic = "topic1";
    private final Properties overridingProps = new Properties();
    private final String group;
    private final String consumer0;
    private final String consumer1;
    private final String consumer2;
    private final String consumer3;
    private final int nMessages;

    /* compiled from: ZookeeperConsumerConnectorTest.scala */
    /* loaded from: input_file:kafka/consumer/ZookeeperConsumerConnectorTest$TestConsumerRebalanceListener.class */
    public class TestConsumerRebalanceListener implements ConsumerRebalanceListener {
        private boolean beforeReleasingPartitionsCalled;
        private boolean beforeStartingFetchersCalled;
        private String consumerId;
        private Map<String, Set<Integer>> partitionOwnership;
        private Map<String, Map<Integer, ConsumerThreadId>> globalPartitionOwnership;
        public final /* synthetic */ ZookeeperConsumerConnectorTest $outer;

        public boolean beforeReleasingPartitionsCalled() {
            return this.beforeReleasingPartitionsCalled;
        }

        public void beforeReleasingPartitionsCalled_$eq(boolean z) {
            this.beforeReleasingPartitionsCalled = z;
        }

        public boolean beforeStartingFetchersCalled() {
            return this.beforeStartingFetchersCalled;
        }

        public void beforeStartingFetchersCalled_$eq(boolean z) {
            this.beforeStartingFetchersCalled = z;
        }

        public String consumerId() {
            return this.consumerId;
        }

        public void consumerId_$eq(String str) {
            this.consumerId = str;
        }

        public Map<String, Set<Integer>> partitionOwnership() {
            return this.partitionOwnership;
        }

        public void partitionOwnership_$eq(Map<String, Set<Integer>> map) {
            this.partitionOwnership = map;
        }

        public Map<String, Map<Integer, ConsumerThreadId>> globalPartitionOwnership() {
            return this.globalPartitionOwnership;
        }

        public void globalPartitionOwnership_$eq(Map<String, Map<Integer, ConsumerThreadId>> map) {
            this.globalPartitionOwnership = map;
        }

        public void beforeReleasingPartitions(Map<String, Set<Integer>> map) {
            beforeReleasingPartitionsCalled_$eq(true);
            partitionOwnership_$eq(map);
        }

        public void beforeStartingFetchers(String str, Map<String, Map<Integer, ConsumerThreadId>> map) {
            beforeStartingFetchersCalled_$eq(true);
            consumerId_$eq(str);
            globalPartitionOwnership_$eq(map);
        }

        public /* synthetic */ ZookeeperConsumerConnectorTest kafka$consumer$ZookeeperConsumerConnectorTest$TestConsumerRebalanceListener$$$outer() {
            return this.$outer;
        }

        public TestConsumerRebalanceListener(ZookeeperConsumerConnectorTest zookeeperConsumerConnectorTest) {
            if (zookeeperConsumerConnectorTest == null) {
                throw null;
            }
            this.$outer = zookeeperConsumerConnectorTest;
            this.beforeReleasingPartitionsCalled = false;
            this.beforeStartingFetchersCalled = false;
            this.consumerId = "";
            this.partitionOwnership = null;
            this.globalPartitionOwnership = null;
        }
    }

    public int RebalanceBackoffMs() {
        return this.RebalanceBackoffMs;
    }

    public ZKGroupTopicDirs dirs() {
        return this.dirs;
    }

    public void dirs_$eq(ZKGroupTopicDirs zKGroupTopicDirs) {
        this.dirs = zKGroupTopicDirs;
    }

    public int numNodes() {
        return this.numNodes;
    }

    public int numParts() {
        return this.numParts;
    }

    public String topic() {
        return this.topic;
    }

    public Properties overridingProps() {
        return this.overridingProps;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    /* renamed from: generateConfigs */
    public Seq<KafkaConfig> mo435generateConfigs() {
        return (Seq) TestUtils$.MODULE$.createBrokerConfigs(numNodes(), zkConnect(), TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13()).map(new ZookeeperConsumerConnectorTest$$anonfun$generateConfigs$1(this), Seq$.MODULE$.canBuildFrom());
    }

    public String group() {
        return this.group;
    }

    public String consumer0() {
        return this.consumer0;
    }

    public String consumer1() {
        return this.consumer1;
    }

    public String consumer2() {
        return this.consumer2;
    }

    public String consumer3() {
        return this.consumer3;
    }

    public int nMessages() {
        return this.nMessages;
    }

    @Override // kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    @Before
    public void setUp() {
        super.setUp();
        dirs_$eq(new ZKGroupTopicDirs(group(), topic()));
    }

    @Override // kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    @After
    public void tearDown() {
        super.tearDown();
    }

    @Test
    public void testBasic() {
        Logger logger = Logger.getLogger(KafkaRequestHandler.class);
        logger.setLevel(Level.FATAL);
        ZookeeperConsumerConnector zookeeperConsumerConnector = new ZookeeperConsumerConnector(new ConsumerConfig(this) { // from class: kafka.consumer.ZookeeperConsumerConnectorTest$$anon$1
            private final int consumerTimeoutMs;

            public int consumerTimeoutMs() {
                return this.consumerTimeoutMs;
            }

            {
                super(TestUtils$.MODULE$.createConsumerProperties(this.zkConnect(), this.group(), this.consumer0(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
                this.consumerTimeoutMs = 200;
            }
        }, true);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 2).foreach$mVc$sp(new ZookeeperConsumerConnectorTest$$anonfun$testBasic$1(this, zookeeperConsumerConnector.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()))));
        zookeeperConsumerConnector.shutdown();
        List list = (List) TestUtils$.MODULE$.sendMessages(servers(), topic(), nMessages(), 0, TestUtils$.MODULE$.sendMessages$default$5()).$plus$plus(TestUtils$.MODULE$.sendMessages(servers(), topic(), nMessages(), 1, TestUtils$.MODULE$.sendMessages$default$5()), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), topic(), 0, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), topic(), 1, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        ZookeeperConsumerConnector zookeeperConsumerConnector2 = new ZookeeperConsumerConnector(new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(zkConnect(), group(), consumer1(), TestUtils$.MODULE$.createConsumerProperties$default$4())), true);
        scala.collection.Map<String, List<KafkaStream<String, String>>> createMessageStreams = zookeeperConsumerConnector2.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        Assert.assertEquals(list.sorted(Ordering$String$.MODULE$), TestUtils$.MODULE$.getMessages(createMessageStreams, nMessages() * 2).sorted(Ordering$String$.MODULE$));
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("0", "group1_consumer1-0"), new Tuple2("1", "group1_consumer1-0")})), getZKChildrenValues(dirs().consumerOwnerDir()));
        zookeeperConsumerConnector2.commitOffsets(true);
        ZookeeperConsumerConnector zookeeperConsumerConnector3 = new ZookeeperConsumerConnector(new ConsumerConfig(this) { // from class: kafka.consumer.ZookeeperConsumerConnectorTest$$anon$2
            private final int rebalanceBackoffMs;

            public int rebalanceBackoffMs() {
                return this.rebalanceBackoffMs;
            }

            {
                super(TestUtils$.MODULE$.createConsumerProperties(this.zkConnect(), this.group(), this.consumer2(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
                this.rebalanceBackoffMs = this.RebalanceBackoffMs();
            }
        }, true);
        scala.collection.Map<String, List<KafkaStream<String, String>>> createMessageStreams2 = zookeeperConsumerConnector3.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        List list2 = (List) TestUtils$.MODULE$.sendMessages(servers(), topic(), nMessages(), 0, TestUtils$.MODULE$.sendMessages$default$5()).$plus$plus(TestUtils$.MODULE$.sendMessages(servers(), topic(), nMessages(), 1, TestUtils$.MODULE$.sendMessages$default$5()), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        Assert.assertEquals(list2.sorted(Ordering$String$.MODULE$), ((List) TestUtils$.MODULE$.getMessages(createMessageStreams, nMessages()).$plus$plus(TestUtils$.MODULE$.getMessages(createMessageStreams2, nMessages()), List$.MODULE$.canBuildFrom())).sorted(Ordering$String$.MODULE$));
        Seq<Tuple2<String, String>> zKChildrenValues = getZKChildrenValues(dirs().consumerOwnerDir());
        List apply = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("0", "group1_consumer1-0"), new Tuple2("1", "group1_consumer2-0")}));
        Assert.assertEquals(apply, zKChildrenValues);
        ZookeeperConsumerConnector zookeeperConsumerConnector4 = new ZookeeperConsumerConnector(new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(zkConnect(), group(), consumer3(), TestUtils$.MODULE$.createConsumerProperties$default$4())), true);
        zookeeperConsumerConnector4.createMessageStreams(new HashMap());
        List list3 = (List) TestUtils$.MODULE$.sendMessages(servers(), topic(), nMessages(), 0, TestUtils$.MODULE$.sendMessages$default$5()).$plus$plus(TestUtils$.MODULE$.sendMessages(servers(), topic(), nMessages(), 1, TestUtils$.MODULE$.sendMessages$default$5()), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        Assert.assertEquals(list3.sorted(Ordering$String$.MODULE$), ((List) TestUtils$.MODULE$.getMessages(createMessageStreams, nMessages()).$plus$plus(TestUtils$.MODULE$.getMessages(createMessageStreams2, nMessages()), List$.MODULE$.canBuildFrom())).sorted(Ordering$String$.MODULE$));
        Assert.assertEquals(apply, getZKChildrenValues(dirs().consumerOwnerDir()));
        try {
            zookeeperConsumerConnector4.createMessageStreams(new HashMap());
            throw fail("Should fail with MessageStreamsExistException", new Position("ZookeeperConsumerConnectorTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 166));
        } catch (MessageStreamsExistException unused) {
            zookeeperConsumerConnector2.shutdown();
            zookeeperConsumerConnector3.shutdown();
            zookeeperConsumerConnector4.shutdown();
            info((Function0<String>) new ZookeeperConsumerConnectorTest$$anonfun$testBasic$2(this));
            logger.setLevel(Level.ERROR);
        }
    }

    @Test
    public void testCompression() {
        Logger logger = Logger.getLogger(KafkaRequestHandler.class);
        logger.setLevel(Level.FATAL);
        List list = (List) TestUtils$.MODULE$.sendMessages(servers(), topic(), nMessages(), 0, GZIPCompressionCodec$.MODULE$).$plus$plus(TestUtils$.MODULE$.sendMessages(servers(), topic(), nMessages(), 1, GZIPCompressionCodec$.MODULE$), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), topic(), 0, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), topic(), 1, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        ZookeeperConsumerConnector zookeeperConsumerConnector = new ZookeeperConsumerConnector(new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(zkConnect(), group(), consumer1(), TestUtils$.MODULE$.createConsumerProperties$default$4())), true);
        scala.collection.Map<String, List<KafkaStream<String, String>>> createMessageStreams = zookeeperConsumerConnector.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        Assert.assertEquals(list.sorted(Ordering$String$.MODULE$), TestUtils$.MODULE$.getMessages(createMessageStreams, nMessages() * 2).sorted(Ordering$String$.MODULE$));
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("0", "group1_consumer1-0"), new Tuple2("1", "group1_consumer1-0")})), getZKChildrenValues(dirs().consumerOwnerDir()));
        zookeeperConsumerConnector.commitOffsets(true);
        ZookeeperConsumerConnector zookeeperConsumerConnector2 = new ZookeeperConsumerConnector(new ConsumerConfig(this) { // from class: kafka.consumer.ZookeeperConsumerConnectorTest$$anon$3
            private final int rebalanceBackoffMs;

            public int rebalanceBackoffMs() {
                return this.rebalanceBackoffMs;
            }

            {
                super(TestUtils$.MODULE$.createConsumerProperties(this.zkConnect(), this.group(), this.consumer2(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
                this.rebalanceBackoffMs = this.RebalanceBackoffMs();
            }
        }, true);
        scala.collection.Map<String, List<KafkaStream<String, String>>> createMessageStreams2 = zookeeperConsumerConnector2.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        List list2 = (List) TestUtils$.MODULE$.sendMessages(servers(), topic(), nMessages(), 0, GZIPCompressionCodec$.MODULE$).$plus$plus(TestUtils$.MODULE$.sendMessages(servers(), topic(), nMessages(), 1, GZIPCompressionCodec$.MODULE$), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        Assert.assertEquals(list2.sorted(Ordering$String$.MODULE$), ((List) TestUtils$.MODULE$.getMessages(createMessageStreams, nMessages()).$plus$plus(TestUtils$.MODULE$.getMessages(createMessageStreams2, nMessages()), List$.MODULE$.canBuildFrom())).sorted(Ordering$String$.MODULE$));
        Seq<Tuple2<String, String>> zKChildrenValues = getZKChildrenValues(dirs().consumerOwnerDir());
        List apply = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("0", "group1_consumer1-0"), new Tuple2("1", "group1_consumer2-0")}));
        Assert.assertEquals(apply, zKChildrenValues);
        ZookeeperConsumerConnector zookeeperConsumerConnector3 = new ZookeeperConsumerConnector(new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(zkConnect(), group(), consumer3(), TestUtils$.MODULE$.createConsumerProperties$default$4())), true);
        zookeeperConsumerConnector3.createMessageStreams(new HashMap(), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        List list3 = (List) TestUtils$.MODULE$.sendMessages(servers(), topic(), nMessages(), 0, GZIPCompressionCodec$.MODULE$).$plus$plus(TestUtils$.MODULE$.sendMessages(servers(), topic(), nMessages(), 1, GZIPCompressionCodec$.MODULE$), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        Assert.assertEquals(list3.sorted(Ordering$String$.MODULE$), ((List) TestUtils$.MODULE$.getMessages(createMessageStreams, nMessages()).$plus$plus(TestUtils$.MODULE$.getMessages(createMessageStreams2, nMessages()), List$.MODULE$.canBuildFrom())).sorted(Ordering$String$.MODULE$));
        Assert.assertEquals(apply, getZKChildrenValues(dirs().consumerOwnerDir()));
        zookeeperConsumerConnector.shutdown();
        zookeeperConsumerConnector2.shutdown();
        zookeeperConsumerConnector3.shutdown();
        info((Function0<String>) new ZookeeperConsumerConnectorTest$$anonfun$testCompression$1(this));
        logger.setLevel(Level.ERROR);
    }

    @Test
    public void testCompressionSetConsumption() {
        List list = (List) TestUtils$.MODULE$.sendMessages(servers(), topic(), 200, 0, DefaultCompressionCodec$.MODULE$).$plus$plus(TestUtils$.MODULE$.sendMessages(servers(), topic(), 200, 1, DefaultCompressionCodec$.MODULE$), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), topic(), 0, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), topic(), 1, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        ZookeeperConsumerConnector zookeeperConsumerConnector = new ZookeeperConsumerConnector(new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(zkConnect(), group(), consumer0(), TestUtils$.MODULE$.createConsumerProperties$default$4())), true);
        Assert.assertEquals(list.sorted(Ordering$String$.MODULE$), TestUtils$.MODULE$.getMessages(zookeeperConsumerConnector.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1())), 400).sorted(Ordering$String$.MODULE$));
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("0", "group1_consumer0-0"), new Tuple2("1", "group1_consumer0-0")})), getZKChildrenValues(dirs().consumerOwnerDir()));
        zookeeperConsumerConnector.shutdown();
    }

    @Test
    public void testConsumerDecoder() {
        Logger logger = Logger.getLogger(KafkaRequestHandler.class);
        logger.setLevel(Level.FATAL);
        List list = (List) TestUtils$.MODULE$.sendMessages(servers(), topic(), nMessages(), 0, NoCompressionCodec$.MODULE$).$plus$plus(TestUtils$.MODULE$.sendMessages(servers(), topic(), nMessages(), 1, NoCompressionCodec$.MODULE$), List$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), topic(), 0, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), topic(), 1, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        ConsumerConfig consumerConfig = new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(zkConnect(), group(), consumer1(), TestUtils$.MODULE$.createConsumerProperties$default$4()));
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkUtils(), topic(), 1, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        ZookeeperConsumerConnector zookeeperConsumerConnector = new ZookeeperConsumerConnector(consumerConfig, true);
        scala.collection.Map createMessageStreams = zookeeperConsumerConnector.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        ObjectRef create = ObjectRef.create(Nil$.MODULE$);
        createMessageStreams.values().foreach(new ZookeeperConsumerConnectorTest$$anonfun$testConsumerDecoder$1(this, create));
        Assert.assertEquals(list.sorted(Ordering$String$.MODULE$), ((List) create.elem).sorted(Ordering$String$.MODULE$));
        zookeeperConsumerConnector.shutdown();
        logger.setLevel(Level.ERROR);
    }

    @Test
    public void testLeaderSelectionForPartition() {
        ZkUtils apply = ZkUtils$.MODULE$.apply(zkConnect(), 6000, 30000, false);
        TestUtils$.MODULE$.createTopic(apply, topic(), 1, 1, servers(), TestUtils$.MODULE$.createTopic$default$6());
        List<String> sendMessages = TestUtils$.MODULE$.sendMessages(servers(), topic(), nMessages(), TestUtils$.MODULE$.sendMessages$default$4(), TestUtils$.MODULE$.sendMessages$default$5());
        ZookeeperConsumerConnector zookeeperConsumerConnector = new ZookeeperConsumerConnector(new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(zkConnect(), group(), consumer1(), TestUtils$.MODULE$.createConsumerProperties$default$4())), true);
        scala.collection.Map<String, List<KafkaStream<String, String>>> createMessageStreams = zookeeperConsumerConnector.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        Pool topicRegistry = zookeeperConsumerConnector.getTopicRegistry();
        Assert.assertEquals(1L, ((TraversableOnce) topicRegistry.map(new ZookeeperConsumerConnectorTest$$anonfun$testLeaderSelectionForPartition$1(this), Iterable$.MODULE$.canBuildFrom())).size());
        Assert.assertEquals(topic(), ((IterableLike) topicRegistry.map(new ZookeeperConsumerConnectorTest$$anonfun$testLeaderSelectionForPartition$2(this), Iterable$.MODULE$.canBuildFrom())).head());
        Assert.assertEquals(0L, ((PartitionTopicInfo) ((IterableLike) ((Tuple2) ((Iterable) topicRegistry.map(new ZookeeperConsumerConnectorTest$$anonfun$1(this), Iterable$.MODULE$.canBuildFrom())).head())._2()).head()).partitionId());
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("0", "group1_consumer1-0")})), getZKChildrenValues(dirs().consumerOwnerDir()));
        Assert.assertEquals(sendMessages, TestUtils$.MODULE$.getMessages(createMessageStreams, nMessages()));
        zookeeperConsumerConnector.shutdown();
        apply.close();
    }

    @Test
    public void testConsumerRebalanceListener() {
        TestUtils$.MODULE$.sendMessages(servers(), topic(), nMessages(), 0, TestUtils$.MODULE$.sendMessages$default$5());
        TestUtils$.MODULE$.sendMessages(servers(), topic(), nMessages(), 1, TestUtils$.MODULE$.sendMessages$default$5());
        ZookeeperConsumerConnector zookeeperConsumerConnector = new ZookeeperConsumerConnector(new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(zkConnect(), group(), consumer1(), TestUtils$.MODULE$.createConsumerProperties$default$4())), true);
        TestConsumerRebalanceListener testConsumerRebalanceListener = new TestConsumerRebalanceListener(this);
        zookeeperConsumerConnector.setConsumerRebalanceListener(testConsumerRebalanceListener);
        scala.collection.Map<String, List<KafkaStream<String, String>>> createMessageStreams = zookeeperConsumerConnector.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        Assert.assertEquals(BoxesRunTime.boxToBoolean(true), BoxesRunTime.boxToBoolean(testConsumerRebalanceListener.beforeReleasingPartitionsCalled()));
        Assert.assertEquals(BoxesRunTime.boxToBoolean(true), BoxesRunTime.boxToBoolean(testConsumerRebalanceListener.beforeStartingFetchersCalled()));
        Assert.assertEquals((Object) null, testConsumerRebalanceListener.partitionOwnership().get(topic()));
        Assert.assertEquals("group1_consumer1", testConsumerRebalanceListener.globalPartitionOwnership().get(topic()).get(BoxesRunTime.boxToInteger(0)).consumer());
        Assert.assertEquals("group1_consumer1", testConsumerRebalanceListener.globalPartitionOwnership().get(topic()).get(BoxesRunTime.boxToInteger(1)).consumer());
        Assert.assertEquals(0L, testConsumerRebalanceListener.globalPartitionOwnership().get(topic()).get(BoxesRunTime.boxToInteger(0)).threadId());
        Assert.assertEquals(0L, testConsumerRebalanceListener.globalPartitionOwnership().get(topic()).get(BoxesRunTime.boxToInteger(1)).threadId());
        Assert.assertEquals("group1_consumer1", testConsumerRebalanceListener.consumerId());
        testConsumerRebalanceListener.beforeReleasingPartitionsCalled_$eq(false);
        testConsumerRebalanceListener.beforeStartingFetchersCalled_$eq(false);
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("0", "group1_consumer1-0"), new Tuple2("1", "group1_consumer1-0")})), getZKChildrenValues(dirs().consumerOwnerDir()));
        ZookeeperConsumerConnector zookeeperConsumerConnector2 = new ZookeeperConsumerConnector(new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties(zkConnect(), group(), consumer2(), TestUtils$.MODULE$.createConsumerProperties$default$4())), true);
        TestConsumerRebalanceListener testConsumerRebalanceListener2 = new TestConsumerRebalanceListener(this);
        zookeeperConsumerConnector2.setConsumerRebalanceListener(testConsumerRebalanceListener2);
        zookeeperConsumerConnector2.createMessageStreams(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(topic()), BoxesRunTime.boxToInteger(1))})), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        TestUtils$.MODULE$.getMessages(createMessageStreams, nMessages());
        Assert.assertEquals(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("0", "group1_consumer1-0"), new Tuple2("1", "group1_consumer2-0")})), getZKChildrenValues(dirs().consumerOwnerDir()));
        Assert.assertEquals(BoxesRunTime.boxToBoolean(true), BoxesRunTime.boxToBoolean(testConsumerRebalanceListener.beforeReleasingPartitionsCalled()));
        Assert.assertEquals(BoxesRunTime.boxToBoolean(true), BoxesRunTime.boxToBoolean(testConsumerRebalanceListener.beforeStartingFetchersCalled()));
        Assert.assertEquals(Set$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0, 1})), testConsumerRebalanceListener.partitionOwnership().get(topic()));
        Assert.assertEquals("group1_consumer1", testConsumerRebalanceListener.globalPartitionOwnership().get(topic()).get(BoxesRunTime.boxToInteger(0)).consumer());
        Assert.assertEquals("group1_consumer2", testConsumerRebalanceListener.globalPartitionOwnership().get(topic()).get(BoxesRunTime.boxToInteger(1)).consumer());
        Assert.assertEquals(0L, testConsumerRebalanceListener.globalPartitionOwnership().get(topic()).get(BoxesRunTime.boxToInteger(0)).threadId());
        Assert.assertEquals(0L, testConsumerRebalanceListener.globalPartitionOwnership().get(topic()).get(BoxesRunTime.boxToInteger(1)).threadId());
        Assert.assertEquals("group1_consumer1", testConsumerRebalanceListener.consumerId());
        Assert.assertEquals("group1_consumer2", testConsumerRebalanceListener2.consumerId());
        Assert.assertEquals(testConsumerRebalanceListener.globalPartitionOwnership(), testConsumerRebalanceListener2.globalPartitionOwnership());
        zookeeperConsumerConnector.shutdown();
        zookeeperConsumerConnector2.shutdown();
    }

    public Seq<Tuple2<String, String>> getZKChildrenValues(String str) {
        return (Seq) ((Seq) zkUtils().getChildren(str).sorted(Ordering$String$.MODULE$)).map(new ZookeeperConsumerConnectorTest$$anonfun$getZKChildrenValues$1(this, str), Seq$.MODULE$.canBuildFrom());
    }

    public ZookeeperConsumerConnectorTest() {
        overridingProps().put(KafkaConfig$.MODULE$.NumPartitionsProp(), BoxesRunTime.boxToInteger(numParts()).toString());
        this.group = "group1";
        this.consumer0 = "consumer0";
        this.consumer1 = "consumer1";
        this.consumer2 = "consumer2";
        this.consumer3 = "consumer3";
        this.nMessages = 2;
    }
}
