/*
 * Decompiled with CFR 0.152.
 */
package kafka.integration;

import java.io.File;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import kafka.cluster.Broker;
import kafka.cluster.Broker$;
import kafka.cluster.Cluster;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerFetcherManager;
import kafka.consumer.FetchedDataChunk;
import kafka.consumer.PartitionTopicInfo;
import kafka.integration.KafkaServerTestHarness;
import kafka.integration.KafkaServerTestHarness$class;
import kafka.message.MessageAndOffset;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.ZooKeeperTestHarness$class;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;

/*
 * Duplicate member names - consider using --renamedupmembers true
 */
@ScalaSignature(bytes="\u0006\u0001\u0005eb\u0001B\u0001\u0003\u0001\u001d\u00111BR3uG\",'\u000fV3ti*\u00111\u0001B\u0001\fS:$Xm\u001a:bi&|gNC\u0001\u0006\u0003\u0015Y\u0017MZ6b\u0007\u0001\u00192\u0001\u0001\u0005\u0013!\tI\u0001#D\u0001\u000b\u0015\tYA\"A\u0003kk:LGO\u0003\u0002\u000e\u001d\u0005I1oY1mCR,7\u000f\u001e\u0006\u0002\u001f\u0005\u0019qN]4\n\u0005EQ!A\u0003&V]&$8+^5uKB\u00111\u0003F\u0007\u0002\u0005%\u0011QC\u0001\u0002\u0017\u0017\u000647.Y*feZ,'\u000fV3ti\"\u000b'O\\3tg\")q\u0003\u0001C\u00011\u00051A(\u001b8jiz\"\u0012!\u0007\t\u0003'\u0001Aqa\u0007\u0001C\u0002\u0013\u0005A$\u0001\u0005ok6tu\u000eZ3t+\u0005i\u0002C\u0001\u0010\"\u001b\u0005y\"\"\u0001\u0011\u0002\u000bM\u001c\u0017\r\\1\n\u0005\tz\"aA%oi\"1A\u0005\u0001Q\u0001\nu\t\u0011B\\;n\u001d>$Wm\u001d\u0011\t\u000b\u0019\u0002A\u0011A\u0014\u0002\u001f\u001d,g.\u001a:bi\u0016\u001cuN\u001c4jON$\u0012\u0001\u000b\t\u0004SE\"dB\u0001\u00160\u001d\tYc&D\u0001-\u0015\tic!\u0001\u0004=e>|GOP\u0005\u0002A%\u0011\u0001gH\u0001\ba\u0006\u001c7.Y4f\u0013\t\u00114GA\u0002TKFT!\u0001M\u0010\u0011\u0005UBT\"\u0001\u001c\u000b\u0005]\"\u0011AB:feZ,'/\u0003\u0002:m\tY1*\u00194lC\u000e{gNZ5h\u0011\u001dY\u0004A1A\u0005\u0002q\n\u0001\"\\3tg\u0006<Wm]\u000b\u0002{A!ahQ\u000fF\u001b\u0005y$B\u0001!B\u0003\u001diW\u000f^1cY\u0016T!AQ\u0010\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002E\u007f\t9\u0001*Y:i\u001b\u0006\u0004\bc\u0001$H\u00116\t\u0011)\u0003\u00023\u0003B\u0019a$S&\n\u0005){\"!B!se\u0006L\bC\u0001\u0010M\u0013\tiuD\u0001\u0003CsR,\u0007BB(\u0001A\u0003%Q(A\u0005nKN\u001c\u0018mZ3tA!9\u0011\u000b\u0001b\u0001\n\u0003\u0011\u0016!\u0002;pa&\u001cW#A*\u0011\u0005QKV\"A+\u000b\u0005Y;\u0016\u0001\u00027b]\u001eT\u0011\u0001W\u0001\u0005U\u00064\u0018-\u0003\u0002[+\n11\u000b\u001e:j]\u001eDa\u0001\u0018\u0001!\u0002\u0013\u0019\u0016A\u0002;pa&\u001c\u0007\u0005C\u0004_\u0001\t\u0007I\u0011A0\u0002\u000bE,X-^3\u0016\u0003\u0001\u00042!\u00194i\u001b\u0005\u0011'BA2e\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003K^\u000bA!\u001e;jY&\u0011qM\u0019\u0002\u0014\u0019&t7.\u001a3CY>\u001c7.\u001b8h#V,W/\u001a\t\u0003S2l\u0011A\u001b\u0006\u0003W\u0012\t\u0001bY8ogVlWM]\u0005\u0003[*\u0014\u0001CR3uG\",G\rR1uC\u000eCWO\\6\t\r=\u0004\u0001\u0015!\u0003a\u0003\u0019\tX/Z;fA!9\u0011\u000f\u0001a\u0001\n\u0003\u0011\u0018a\u00024fi\u000eDWM]\u000b\u0002gB\u0011\u0011\u000e^\u0005\u0003k*\u0014acQ8ogVlWM\u001d$fi\u000eDWM]'b]\u0006<WM\u001d\u0005\bo\u0002\u0001\r\u0011\"\u0001y\u0003-1W\r^2iKJ|F%Z9\u0015\u0005ed\bC\u0001\u0010{\u0013\tYxD\u0001\u0003V]&$\bbB?w\u0003\u0003\u0005\ra]\u0001\u0004q\u0012\n\u0004BB@\u0001A\u0003&1/\u0001\u0005gKR\u001c\u0007.\u001a:!\u0011\u001d\t\u0019\u0001\u0001C!\u0003\u000b\tQa]3u+B$\u0012!\u001f\u0015\u0005\u0003\u0003\tI\u0001\u0005\u0003\u0002\f\u0005=QBAA\u0007\u0015\tYa\"\u0003\u0003\u0002\u0012\u00055!A\u0002\"fM>\u0014X\rC\u0004\u0002\u0016\u0001!\t%!\u0002\u0002\u0011Q,\u0017M\u001d#po:DC!a\u0005\u0002\u001aA!\u00111BA\u000e\u0013\u0011\ti\"!\u0004\u0003\u000b\u00053G/\u001a:\t\u000f\u0005\u0005\u0002\u0001\"\u0001\u0002\u0006\u0005YA/Z:u\r\u0016$8\r[3sQ\u0011\ty\"!\n\u0011\t\u0005-\u0011qE\u0005\u0005\u0003S\tiA\u0001\u0003UKN$\bbBA\u0017\u0001\u0011\u0005\u0011QA\u0001\u0011CN\u001cXM\u001d;Rk\u0016,X-R7qifDq!!\r\u0001\t\u0003\t\u0019$A\u0003gKR\u001c\u0007\u000eF\u0002z\u0003kAq!a\u000e\u00020\u0001\u0007Q$\u0001\u0005fqB,7\r^3e\u0001")
public class FetcherTest
extends JUnitSuite
implements KafkaServerTestHarness {
    private final int numNodes;
    private final HashMap<Object, Seq<byte[]>> messages;
    private final String topic;
    private final LinkedBlockingQueue<FetchedDataChunk> queue;
    private ConsumerFetcherManager fetcher;
    private Seq<KafkaConfig> instanceConfigs;
    private Buffer<KafkaServer> servers;
    private String brokerList;
    private boolean[] alive;
    private EmbeddedZookeeper zookeeper;
    private int zkPort;
    private ZkUtils zkUtils;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;
    private final String loggerName;
    private final Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    private volatile boolean bitmap$0;

    @Override
    public Seq<KafkaConfig> instanceConfigs() {
        return this.instanceConfigs;
    }

    @Override
    public void instanceConfigs_$eq(Seq<KafkaConfig> x$1) {
        this.instanceConfigs = x$1;
    }

    @Override
    public Buffer<KafkaServer> servers() {
        return this.servers;
    }

    @Override
    public void servers_$eq(Buffer<KafkaServer> x$1) {
        this.servers = x$1;
    }

    @Override
    public String brokerList() {
        return this.brokerList;
    }

    @Override
    public void brokerList_$eq(String x$1) {
        this.brokerList = x$1;
    }

    @Override
    public boolean[] alive() {
        return this.alive;
    }

    @Override
    public void alive_$eq(boolean[] x$1) {
        this.alive = x$1;
    }

    @Override
    public void kafka$integration$KafkaServerTestHarness$$super$setUp() {
        ZooKeeperTestHarness$class.setUp(this);
    }

    @Override
    public void kafka$integration$KafkaServerTestHarness$$super$tearDown() {
        ZooKeeperTestHarness$class.tearDown(this);
    }

    @Override
    public Seq<KafkaConfig> configs() {
        return KafkaServerTestHarness$class.configs(this);
    }

    @Override
    public Option<KafkaServer> serverForId(int id) {
        return KafkaServerTestHarness$class.serverForId(this, id);
    }

    @Override
    public SecurityProtocol securityProtocol() {
        return KafkaServerTestHarness$class.securityProtocol(this);
    }

    @Override
    public Option<File> trustStoreFile() {
        return KafkaServerTestHarness$class.trustStoreFile(this);
    }

    @Override
    public int killRandomBroker() {
        return KafkaServerTestHarness$class.killRandomBroker(this);
    }

    @Override
    public void restartDeadBrokers() {
        KafkaServerTestHarness$class.restartDeadBrokers(this);
    }

    @Override
    public EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    @Override
    public void zookeeper_$eq(EmbeddedZookeeper x$1) {
        this.zookeeper = x$1;
    }

    @Override
    public int zkPort() {
        return this.zkPort;
    }

    @Override
    public void zkPort_$eq(int x$1) {
        this.zkPort = x$1;
    }

    @Override
    public ZkUtils zkUtils() {
        return this.zkUtils;
    }

    @Override
    public void zkUtils_$eq(ZkUtils x$1) {
        this.zkUtils = x$1;
    }

    @Override
    public int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    @Override
    public int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnectionTimeout_$eq(int x$1) {
        this.zkConnectionTimeout = x$1;
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkSessionTimeout_$eq(int x$1) {
        this.zkSessionTimeout = x$1;
    }

    @Override
    public String zkConnect() {
        return ZooKeeperTestHarness$class.zkConnect(this);
    }

    @Override
    public String confFile() {
        return ZooKeeperTestHarness$class.confFile(this);
    }

    public String loggerName() {
        return this.loggerName;
    }

    private Logger logger$lzycompute() {
        FetcherTest fetcherTest = this;
        synchronized (fetcherTest) {
            if (!this.bitmap$0) {
                this.logger = Logging.class.logger((Logging)this);
                this.bitmap$0 = true;
            }
            return this.logger;
        }
    }

    public Logger logger() {
        return this.bitmap$0 ? this.logger : this.logger$lzycompute();
    }

    public String logIdent() {
        return this.logIdent;
    }

    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public Log4jController$ kafka$utils$Logging$$log4jController() {
        return this.kafka$utils$Logging$$log4jController;
    }

    public void kafka$utils$Logging$_setter_$loggerName_$eq(String x$1) {
        this.loggerName = x$1;
    }

    public void kafka$utils$Logging$_setter_$kafka$utils$Logging$$log4jController_$eq(Log4jController$ x$1) {
        this.kafka$utils$Logging$$log4jController = x$1;
    }

    public void trace(Function0<String> msg) {
        Logging.class.trace((Logging)this, msg);
    }

    public Object trace(Function0<Throwable> e) {
        return Logging.class.trace((Logging)this, e);
    }

    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.trace((Logging)this, msg, e);
    }

    public void swallowTrace(Function0<BoxedUnit> action) {
        Logging.class.swallowTrace((Logging)this, action);
    }

    public void debug(Function0<String> msg) {
        Logging.class.debug((Logging)this, msg);
    }

    public Object debug(Function0<Throwable> e) {
        return Logging.class.debug((Logging)this, e);
    }

    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.debug((Logging)this, msg, e);
    }

    public void swallowDebug(Function0<BoxedUnit> action) {
        Logging.class.swallowDebug((Logging)this, action);
    }

    public void info(Function0<String> msg) {
        Logging.class.info((Logging)this, msg);
    }

    public Object info(Function0<Throwable> e) {
        return Logging.class.info((Logging)this, e);
    }

    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.info((Logging)this, msg, e);
    }

    public void swallowInfo(Function0<BoxedUnit> action) {
        Logging.class.swallowInfo((Logging)this, action);
    }

    public void warn(Function0<String> msg) {
        Logging.class.warn((Logging)this, msg);
    }

    public Object warn(Function0<Throwable> e) {
        return Logging.class.warn((Logging)this, e);
    }

    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.warn((Logging)this, msg, e);
    }

    public void swallowWarn(Function0<BoxedUnit> action) {
        Logging.class.swallowWarn((Logging)this, action);
    }

    public void swallow(Function0<BoxedUnit> action) {
        Logging.class.swallow((Logging)this, action);
    }

    public void error(Function0<String> msg) {
        Logging.class.error((Logging)this, msg);
    }

    public Object error(Function0<Throwable> e) {
        return Logging.class.error((Logging)this, e);
    }

    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.error((Logging)this, msg, e);
    }

    public void swallowError(Function0<BoxedUnit> action) {
        Logging.class.swallowError((Logging)this, action);
    }

    public void fatal(Function0<String> msg) {
        Logging.class.fatal((Logging)this, msg);
    }

    public Object fatal(Function0<Throwable> e) {
        return Logging.class.fatal((Logging)this, e);
    }

    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.fatal((Logging)this, msg, e);
    }

    public int numNodes() {
        return this.numNodes;
    }

    @Override
    public Seq<KafkaConfig> generateConfigs() {
        return (Seq)TestUtils$.MODULE$.createBrokerConfigs(this.numNodes(), this.zkConnect(), TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final KafkaConfig apply(Properties props) {
                return KafkaConfig$.MODULE$.fromProps(props);
            }
        }, Seq$.MODULE$.canBuildFrom());
    }

    public HashMap<Object, Seq<byte[]>> messages() {
        return this.messages;
    }

    public String topic() {
        return this.topic;
    }

    public LinkedBlockingQueue<FetchedDataChunk> queue() {
        return this.queue;
    }

    public ConsumerFetcherManager fetcher() {
        return this.fetcher;
    }

    public void fetcher_$eq(ConsumerFetcherManager x$1) {
        this.fetcher = x$1;
    }

    @Override
    @Before
    public void setUp() {
        KafkaServerTestHarness$class.setUp(this);
        TestUtils$.MODULE$.createTopic(this.zkUtils(), this.topic(), (Map<Object, Seq<Object>>)((Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{((KafkaConfig)this.configs().head()).brokerId()})))}))), (Seq<KafkaServer>)this.servers());
        Cluster cluster = new Cluster((Iterable)this.servers().map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Broker apply(KafkaServer s) {
                return new Broker(s.config().brokerId(), "localhost", s.boundPort(s.boundPort$default$1()), Broker$.MODULE$.$lessinit$greater$default$4());
            }
        }, Buffer$.MODULE$.canBuildFrom()));
        this.fetcher_$eq(new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils$.MODULE$.createConsumerProperties("", "", "", TestUtils$.MODULE$.createConsumerProperties$default$4())), this.zkUtils()));
        this.fetcher().stopConnections();
        Seq topicInfos = (Seq)this.configs().map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ FetcherTest $outer;

            public final PartitionTopicInfo apply(KafkaConfig c) {
                return new PartitionTopicInfo(this.$outer.topic(), 0, this.$outer.queue(), new AtomicLong(0L), new AtomicLong(0L), new AtomicInteger(0), "");
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        }, Seq$.MODULE$.canBuildFrom());
        this.fetcher().startConnections((Iterable)topicInfos, cluster);
    }

    @Override
    @After
    public void tearDown() {
        this.fetcher().stopConnections();
        KafkaServerTestHarness$class.tearDown(this);
    }

    @Test
    public void testFetcher() {
        int perNode = 2;
        int count = TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), perNode, TestUtils$.MODULE$.sendMessages$default$4(), TestUtils$.MODULE$.sendMessages$default$5()).size();
        this.fetch(count);
        this.assertQueueEmpty();
        count = TestUtils$.MODULE$.sendMessages((Seq<KafkaServer>)this.servers(), this.topic(), perNode, TestUtils$.MODULE$.sendMessages$default$4(), TestUtils$.MODULE$.sendMessages$default$5()).size();
        this.fetch(count);
        this.assertQueueEmpty();
    }

    public void assertQueueEmpty() {
        Assert.assertEquals((long)0L, (long)this.queue().size());
    }

    public void fetch(int expected) {
        IntRef count = new IntRef(0);
        do {
            FetchedDataChunk chunk = this.queue().poll(2L, TimeUnit.SECONDS);
            Assert.assertNotNull((String)new StringBuilder().append((Object)"Timed out waiting for data chunk ").append((Object)BoxesRunTime.boxToInteger((int)(count.elem + 1))).toString(), (Object)chunk);
            chunk.messages().foreach((Function1)new Serializable(this, count){
                public static final long serialVersionUID = 0L;
                private final IntRef count$1;

                public final void apply(MessageAndOffset message) {
                    ++this.count$1.elem;
                }
                {
                    this.count$1 = count$1;
                }
            });
        } while (count.elem != expected);
    }

    public FetcherTest() {
        Logging.class.$init$((Logging)this);
        ZooKeeperTestHarness$class.$init$(this);
        KafkaServerTestHarness$class.$init$(this);
        this.numNodes = 1;
        this.messages = new HashMap();
        this.topic = "topic";
        this.queue = new LinkedBlockingQueue();
        this.fetcher = null;
    }
}

