/*
 * Decompiled with CFR 0.152.
 */
package kafka.producer;

import java.util.Hashtable;
import java.util.Properties;
import kafka.admin.AdminUtils$;
import kafka.api.FetchRequestBuilder;
import kafka.api.FetchResponse;
import kafka.common.ErrorMapping$;
import kafka.common.FailedToSendMessageException;
import kafka.consumer.SimpleConsumer;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRequestHandler;
import kafka.server.KafkaServer;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import kafka.utils.SystemTime$;
import kafka.utils.TestUtils$;
import kafka.utils.Utils$;
import kafka.utils.ZkUtils$;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.ZooKeeperTestHarness;
import kafka.zk.ZooKeeperTestHarness$class;
import org.I0Itec.zkclient.ZkClient;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Test;
import org.scalatest.exceptions.TestFailedException;
import org.scalatest.junit.JUnit3Suite;
import scala.Function0;
import scala.MatchError;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/*
 * Duplicate member names - consider using --renamedupmembers true
 */
@ScalaSignature(bytes="\u0006\u0001\u0005%f\u0001B\u0001\u0003\u0001\u001d\u0011A\u0002\u0015:pIV\u001cWM\u001d+fgRT!a\u0001\u0003\u0002\u0011A\u0014x\u000eZ;dKJT\u0011!B\u0001\u0006W\u000647.Y\u0002\u0001'\u0011\u0001\u0001B\u0005\r\u0011\u0005%\u0001R\"\u0001\u0006\u000b\u0005-a\u0011!\u00026v]&$(BA\u0007\u000f\u0003%\u00198-\u00197bi\u0016\u001cHOC\u0001\u0010\u0003\ry'oZ\u0005\u0003#)\u00111BS+oSR\u001c4+^5uKB\u00111CF\u0007\u0002))\u0011Q\u0003B\u0001\u0003u.L!a\u0006\u000b\u0003)i{wnS3fa\u0016\u0014H+Z:u\u0011\u0006\u0014h.Z:t!\tIB$D\u0001\u001b\u0015\tYB!A\u0003vi&d7/\u0003\u0002\u001e5\t9Aj\\4hS:<\u0007\"B\u0010\u0001\t\u0003\u0001\u0013A\u0002\u001fj]&$h\bF\u0001\"!\t\u0011\u0003!D\u0001\u0003\u0011\u001d!\u0003A1A\u0005\n\u0015\n\u0011B\u0019:pW\u0016\u0014\u0018\nZ\u0019\u0016\u0003\u0019\u0002\"a\n\u0016\u000e\u0003!R\u0011!K\u0001\u0006g\u000e\fG.Y\u0005\u0003W!\u00121!\u00138u\u0011\u0019i\u0003\u0001)A\u0005M\u0005Q!M]8lKJLE-\r\u0011\t\u000f=\u0002!\u0019!C\u0005K\u0005I!M]8lKJLEM\r\u0005\u0007c\u0001\u0001\u000b\u0011\u0002\u0014\u0002\u0015\t\u0014xn[3s\u0013\u0012\u0014\u0004\u0005C\u00044\u0001\t\u0007I\u0011\u0002\u001b\u0002\u000bA|'\u000f^:\u0016\u0003U\u00022A\u000e '\u001d\t9DH\u0004\u00029w5\t\u0011H\u0003\u0002;\r\u00051AH]8pizJ\u0011!K\u0005\u0003{!\nq\u0001]1dW\u0006<W-\u0003\u0002@\u0001\n!A*[:u\u0015\ti\u0004\u0006\u0003\u0004C\u0001\u0001\u0006I!N\u0001\u0007a>\u0014Ho\u001d\u0011\t\u000f\u0011\u0003\u0011\u0011)A\u0005\u000b\u0006\u0019\u0001\u0010J\u0019\u0011\t\u001d2eEJ\u0005\u0003\u000f\"\u0012a\u0001V;qY\u0016\u0014\u0004bB%\u0001\u0005\u0004%I!J\u0001\u0006a>\u0014H/\r\u0005\u0007\u0017\u0002\u0001\u000b\u0011\u0002\u0014\u0002\rA|'\u000f^\u0019!\u0011\u001di\u0005A1A\u0005\n\u0015\nQ\u0001]8siJBaa\u0014\u0001!\u0002\u00131\u0013A\u00029peR\u0014\u0004\u0005C\u0004R\u0001\u0001\u0007I\u0011\u0002*\u0002\u000fM,'O^3scU\t1\u000b\u0005\u0002U/6\tQK\u0003\u0002W\t\u000511/\u001a:wKJL!\u0001W+\u0003\u0017-\u000bgm[1TKJ4XM\u001d\u0005\b5\u0002\u0001\r\u0011\"\u0003\\\u0003-\u0019XM\u001d<feFzF%Z9\u0015\u0005q{\u0006CA\u0014^\u0013\tq\u0006F\u0001\u0003V]&$\bb\u0002#Z\u0003\u0003\u0005\ra\u0015\u0005\u0007C\u0002\u0001\u000b\u0015B*\u0002\u0011M,'O^3sc\u0001Bqa\u0019\u0001A\u0002\u0013%!+A\u0004tKJ4XM\u001d\u001a\t\u000f\u0015\u0004\u0001\u0019!C\u0005M\u0006Y1/\u001a:wKJ\u0014t\fJ3r)\tav\rC\u0004EI\u0006\u0005\t\u0019A*\t\r%\u0004\u0001\u0015)\u0003T\u0003!\u0019XM\u001d<feJ\u0002\u0003bB6\u0001\u0001\u0004%I\u0001\\\u0001\nG>t7/^7feF*\u0012!\u001c\t\u0003]Fl\u0011a\u001c\u0006\u0003a\u0012\t\u0001bY8ogVlWM]\u0005\u0003e>\u0014abU5na2,7i\u001c8tk6,'\u000fC\u0004u\u0001\u0001\u0007I\u0011B;\u0002\u001b\r|gn];nKJ\ft\fJ3r)\taf\u000fC\u0004Eg\u0006\u0005\t\u0019A7\t\ra\u0004\u0001\u0015)\u0003n\u0003)\u0019wN\\:v[\u0016\u0014\u0018\u0007\t\u0005\bu\u0002\u0001\r\u0011\"\u0003m\u0003%\u0019wN\\:v[\u0016\u0014(\u0007C\u0004}\u0001\u0001\u0007I\u0011B?\u0002\u001b\r|gn];nKJ\u0014t\fJ3r)\taf\u0010C\u0004Ew\u0006\u0005\t\u0019A7\t\u000f\u0005\u0005\u0001\u0001)Q\u0005[\u0006Q1m\u001c8tk6,'O\r\u0011\t\u0013\u0005\u0015\u0001A1A\u0005\n\u0005\u001d\u0011\u0001\u0006:fcV,7\u000f\u001e%b]\u0012dWM\u001d'pO\u001e,'/\u0006\u0002\u0002\nA!\u00111BA\u000b\u001b\t\tiA\u0003\u0003\u0002\u0010\u0005E\u0011!\u00027pORR'bAA\n\u001d\u00051\u0011\r]1dQ\u0016LA!a\u0006\u0002\u000e\t1Aj\\4hKJD\u0001\"a\u0007\u0001A\u0003%\u0011\u0011B\u0001\u0016e\u0016\fX/Z:u\u0011\u0006tG\r\\3s\u0019><w-\u001a:!\u0011%\ty\u0002\u0001a\u0001\n\u0013\t\t#A\u0004tKJ4XM]:\u0016\u0005\u0005\r\u0002#BA\u0013\u0003_\u0019VBAA\u0014\u0015\u0011\tI#a\u000b\u0002\u0013%lW.\u001e;bE2,'bAA\u0017Q\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0007}\n9\u0003C\u0005\u00024\u0001\u0001\r\u0011\"\u0003\u00026\u0005Y1/\u001a:wKJ\u001cx\fJ3r)\ra\u0016q\u0007\u0005\n\t\u0006E\u0012\u0011!a\u0001\u0003GA\u0001\"a\u000f\u0001A\u0003&\u00111E\u0001\tg\u0016\u0014h/\u001a:tA!I\u0011q\b\u0001C\u0002\u0013%\u0011\u0011I\u0001\u0007aJ|\u0007o]\u0019\u0016\u0005\u0005\r\u0003\u0003BA#\u0003\u001fj!!a\u0012\u000b\t\u0005%\u00131J\u0001\u0005kRLGN\u0003\u0002\u0002N\u0005!!.\u0019<b\u0013\u0011\t\t&a\u0012\u0003\u0015A\u0013x\u000e]3si&,7\u000f\u0003\u0005\u0002V\u0001\u0001\u000b\u0011BA\"\u0003\u001d\u0001(o\u001c9tc\u0001B\u0011\"!\u0017\u0001\u0005\u0004%I!a\u0017\u0002\u000f\r|gNZ5hcU\u0011\u0011Q\f\t\u0004)\u0006}\u0013bAA1+\nY1*\u00194lC\u000e{gNZ5h\u0011!\t)\u0007\u0001Q\u0001\n\u0005u\u0013\u0001C2p]\u001aLw-\r\u0011\t\u0013\u0005%\u0004A1A\u0005\n\u0005\u0005\u0013A\u00029s_B\u001c(\u0007\u0003\u0005\u0002n\u0001\u0001\u000b\u0011BA\"\u0003\u001d\u0001(o\u001c9te\u0001B\u0011\"!\u001d\u0001\u0005\u0004%I!a\u0017\u0002\u000f\r|gNZ5he!A\u0011Q\u000f\u0001!\u0002\u0013\ti&\u0001\u0005d_:4\u0017n\u001a\u001a!\u0011\u001d\tI\b\u0001C!\u0003w\nQa]3u+B$\u0012\u0001\u0018\u0005\b\u0003\u007f\u0002A\u0011IA>\u0003!!X-\u0019:E_^t\u0007bBAB\u0001\u0011\u0005\u00111P\u0001\u001ei\u0016\u001cH/\u00169eCR,'I]8lKJ\u0004\u0016M\u001d;ji&|g.\u00138g_\"\"\u0011\u0011QAD!\u0011\tI)!$\u000e\u0005\u0005-%BA\u0006\u000f\u0013\u0011\ty)a#\u0003\tQ+7\u000f\u001e\u0005\b\u0003'\u0003A\u0011AA>\u0003I!Xm\u001d;TK:$Gk\u001c(foR{\u0007/[2)\t\u0005E\u0015q\u0011\u0005\b\u00033\u0003A\u0011AA>\u0003Y!Xm\u001d;TK:$w+\u001b;i\t\u0016\fGM\u0011:pW\u0016\u0014\b\u0006BAL\u0003\u000fCq!a(\u0001\t\u0003\tY(\u0001\u0015uKN$\u0018i]=oGN+g\u000eZ\"b]\u000e{'O]3di2Lh)Y5m/&$\b\u000eV5nK>,H\u000f\u000b\u0003\u0002\u001e\u0006\u001d\u0005bBAS\u0001\u0011\u0005\u00111P\u0001\u0014i\u0016\u001cHoU3oI:+H\u000e\\'fgN\fw-\u001a\u0015\u0005\u0003G\u000b9\t")
public class ProducerTest
extends JUnit3Suite
implements ZooKeeperTestHarness,
Logging {
    private final int brokerId1;
    private final int brokerId2;
    private final List<Object> ports;
    private final Tuple2<Object, Object> x$1;
    private final int port1;
    private final int port2;
    private KafkaServer server1;
    private KafkaServer server2;
    private SimpleConsumer consumer1;
    private SimpleConsumer consumer2;
    private final Logger requestHandlerLogger;
    private List<KafkaServer> servers;
    private final Properties props1;
    private final KafkaConfig config1;
    private final Properties props2;
    private final KafkaConfig config2;
    private final String loggerName;
    private final Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    private final String zkConnect;
    private EmbeddedZookeeper zookeeper;
    private ZkClient zkClient;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;
    private volatile boolean bitmap$0;

    public String loggerName() {
        return this.loggerName;
    }

    private Logger logger$lzycompute() {
        ProducerTest producerTest = this;
        synchronized (producerTest) {
            if (!this.bitmap$0) {
                this.logger = Logging.class.logger((Logging)this);
                this.bitmap$0 = true;
            }
            return this.logger;
        }
    }

    public Logger logger() {
        return this.bitmap$0 ? this.logger : this.logger$lzycompute();
    }

    public String logIdent() {
        return this.logIdent;
    }

    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public Log4jController$ kafka$utils$Logging$$log4jController() {
        return this.kafka$utils$Logging$$log4jController;
    }

    public void kafka$utils$Logging$_setter_$loggerName_$eq(String x$1) {
        this.loggerName = x$1;
    }

    public void kafka$utils$Logging$_setter_$kafka$utils$Logging$$log4jController_$eq(Log4jController$ x$1) {
        this.kafka$utils$Logging$$log4jController = x$1;
    }

    public void trace(Function0<String> msg) {
        Logging.class.trace((Logging)this, msg);
    }

    public Object trace(Function0<Throwable> e) {
        return Logging.class.trace((Logging)this, e);
    }

    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.trace((Logging)this, msg, e);
    }

    public void swallowTrace(Function0<BoxedUnit> action) {
        Logging.class.swallowTrace((Logging)this, action);
    }

    public void debug(Function0<String> msg) {
        Logging.class.debug((Logging)this, msg);
    }

    public Object debug(Function0<Throwable> e) {
        return Logging.class.debug((Logging)this, e);
    }

    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.debug((Logging)this, msg, e);
    }

    public void swallowDebug(Function0<BoxedUnit> action) {
        Logging.class.swallowDebug((Logging)this, action);
    }

    public void info(Function0<String> msg) {
        Logging.class.info((Logging)this, msg);
    }

    public Object info(Function0<Throwable> e) {
        return Logging.class.info((Logging)this, e);
    }

    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.info((Logging)this, msg, e);
    }

    public void swallowInfo(Function0<BoxedUnit> action) {
        Logging.class.swallowInfo((Logging)this, action);
    }

    public void warn(Function0<String> msg) {
        Logging.class.warn((Logging)this, msg);
    }

    public Object warn(Function0<Throwable> e) {
        return Logging.class.warn((Logging)this, e);
    }

    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.warn((Logging)this, msg, e);
    }

    public void swallowWarn(Function0<BoxedUnit> action) {
        Logging.class.swallowWarn((Logging)this, action);
    }

    public void swallow(Function0<BoxedUnit> action) {
        Logging.class.swallow((Logging)this, action);
    }

    public void error(Function0<String> msg) {
        Logging.class.error((Logging)this, msg);
    }

    public Object error(Function0<Throwable> e) {
        return Logging.class.error((Logging)this, e);
    }

    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.error((Logging)this, msg, e);
    }

    public void swallowError(Function0<BoxedUnit> action) {
        Logging.class.swallowError((Logging)this, action);
    }

    public void fatal(Function0<String> msg) {
        Logging.class.fatal((Logging)this, msg);
    }

    public Object fatal(Function0<Throwable> e) {
        return Logging.class.fatal((Logging)this, e);
    }

    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.fatal((Logging)this, msg, e);
    }

    @Override
    public String zkConnect() {
        return this.zkConnect;
    }

    @Override
    public EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    @Override
    public void zookeeper_$eq(EmbeddedZookeeper x$1) {
        this.zookeeper = x$1;
    }

    @Override
    public ZkClient zkClient() {
        return this.zkClient;
    }

    @Override
    public void zkClient_$eq(ZkClient x$1) {
        this.zkClient = x$1;
    }

    @Override
    public int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    @Override
    public int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$$super$setUp() {
        super.setUp();
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$$super$tearDown() {
        super.tearDown();
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnect_$eq(String x$1) {
        this.zkConnect = x$1;
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnectionTimeout_$eq(int x$1) {
        this.zkConnectionTimeout = x$1;
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkSessionTimeout_$eq(int x$1) {
        this.zkSessionTimeout = x$1;
    }

    private int brokerId1() {
        return this.brokerId1;
    }

    private int brokerId2() {
        return this.brokerId2;
    }

    private List<Object> ports() {
        return this.ports;
    }

    private int port1() {
        return this.port1;
    }

    private int port2() {
        return this.port2;
    }

    private KafkaServer server1() {
        return this.server1;
    }

    private void server1_$eq(KafkaServer x$1) {
        this.server1 = x$1;
    }

    private KafkaServer server2() {
        return this.server2;
    }

    private void server2_$eq(KafkaServer x$1) {
        this.server2 = x$1;
    }

    private SimpleConsumer consumer1() {
        return this.consumer1;
    }

    private void consumer1_$eq(SimpleConsumer x$1) {
        this.consumer1 = x$1;
    }

    private SimpleConsumer consumer2() {
        return this.consumer2;
    }

    private void consumer2_$eq(SimpleConsumer x$1) {
        this.consumer2 = x$1;
    }

    private Logger requestHandlerLogger() {
        return this.requestHandlerLogger;
    }

    private List<KafkaServer> servers() {
        return this.servers;
    }

    private void servers_$eq(List<KafkaServer> x$1) {
        this.servers = x$1;
    }

    private Properties props1() {
        return this.props1;
    }

    private KafkaConfig config1() {
        return this.config1;
    }

    private Properties props2() {
        return this.props2;
    }

    private KafkaConfig config2() {
        return this.config2;
    }

    @Override
    public void setUp() {
        ZooKeeperTestHarness$class.setUp(this);
        this.server1_$eq(TestUtils$.MODULE$.createServer(this.config1(), TestUtils$.MODULE$.createServer$default$2()));
        this.server2_$eq(TestUtils$.MODULE$.createServer(this.config2(), TestUtils$.MODULE$.createServer$default$2()));
        this.servers_$eq((List<KafkaServer>)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{this.server1(), this.server2()})));
        Properties props = new Properties();
        ((Hashtable)props).put("host", "localhost");
        ((Hashtable)props).put("port", ((Object)BoxesRunTime.boxToInteger((int)this.port1())).toString());
        this.consumer1_$eq(new SimpleConsumer("localhost", this.port1(), 1000000, 65536, ""));
        this.consumer2_$eq(new SimpleConsumer("localhost", this.port2(), 100, 65536, ""));
        this.requestHandlerLogger().setLevel(Level.FATAL);
    }

    @Override
    public void tearDown() {
        this.requestHandlerLogger().setLevel(Level.ERROR);
        this.server1().shutdown();
        this.server2().shutdown();
        Utils$.MODULE$.rm(this.server1().config().logDirs());
        Utils$.MODULE$.rm(this.server2().config().logDirs());
        ZooKeeperTestHarness$class.tearDown(this);
    }

    @Test
    public void testUpdateBrokerPartitionInfo() {
        String topic = "new-topic";
        AdminUtils$.MODULE$.createTopic(this.zkClient(), topic, 1, 2, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq<KafkaServer>)this.servers(), topic, 0, 500L);
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkClient(), topic, 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        Properties props1 = new Properties();
        ((Hashtable)props1).put("metadata.broker.list", "localhost:80,localhost:81");
        ((Hashtable)props1).put("serializer.class", "kafka.serializer.StringEncoder");
        ProducerConfig producerConfig1 = new ProducerConfig(props1);
        Producer producer1 = new Producer(producerConfig1);
        try {
            try {
                producer1.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test1")}));
                throw this.fail("Test should fail because the broker list provided are not valid");
            }
            catch (Throwable throwable) {
                throw this.fail("fails with exception", throwable);
            }
            catch (FailedToSendMessageException failedToSendMessageException) {
            }
        }
        finally {
            producer1.close();
        }
        Properties props2 = new Properties();
        ((Hashtable)props2).put("metadata.broker.list", new StringBuilder().append((Object)"localhost:80,").append((Object)TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaConfig[]{this.config1()}))))).toString());
        ((Hashtable)props2).put("serializer.class", "kafka.serializer.StringEncoder");
        ProducerConfig producerConfig2 = new ProducerConfig(props2);
        Producer producer2 = new Producer(producerConfig2);
        try {
            producer2.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test1")}));
        }
        catch (Throwable throwable) {
            try {
                throw this.fail("Should succeed sending the message", throwable);
            }
            catch (Throwable throwable2) {
                producer2.close();
                throw throwable2;
            }
        }
        producer2.close();
        Properties props3 = new Properties();
        ((Hashtable)props3).put("metadata.broker.list", TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaConfig[]{this.config1(), this.config2()})))));
        ((Hashtable)props3).put("serializer.class", "kafka.serializer.StringEncoder");
        ProducerConfig producerConfig3 = new ProducerConfig(props3);
        Producer producer3 = new Producer(producerConfig3);
        try {
            producer3.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test1")}));
        }
        catch (Throwable throwable) {
            try {
                throw this.fail("Should succeed sending the message", throwable);
            }
            catch (Throwable throwable3) {
                producer3.close();
                throw throwable3;
            }
        }
        producer3.close();
    }

    @Test
    public void testSendToNewTopic() {
        Buffer buffer;
        Properties props1 = new Properties();
        ((Hashtable)props1).put("serializer.class", "kafka.serializer.StringEncoder");
        ((Hashtable)props1).put("partitioner.class", "kafka.utils.StaticPartitioner");
        ((Hashtable)props1).put("metadata.broker.list", TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaConfig[]{this.config1(), this.config2()})))));
        ((Hashtable)props1).put("request.required.acks", "2");
        ((Hashtable)props1).put("request.timeout.ms", "1000");
        Properties props2 = new Properties();
        ((Hashtable)props2).putAll(props1);
        ((Hashtable)props2).put("request.required.acks", "3");
        ((Hashtable)props2).put("request.timeout.ms", "1000");
        ProducerConfig producerConfig1 = new ProducerConfig(props1);
        ProducerConfig producerConfig2 = new ProducerConfig(props2);
        String topic = "new-topic";
        AdminUtils$.MODULE$.createTopic(this.zkClient(), topic, 1, 2, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq<KafkaServer>)this.servers(), topic, 0, 1000L);
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkClient(), topic, 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        Producer producer1 = new Producer(producerConfig1);
        Producer producer2 = new Producer(producerConfig2);
        producer1.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test1")}));
        producer1.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test2")}));
        Option leaderOpt = ZkUtils$.MODULE$.getLeaderForPartition(this.zkClient(), topic, 0);
        Assert.assertTrue((String)"Leader for topic new-topic partition 0 should exist", (boolean)leaderOpt.isDefined());
        int leader = BoxesRunTime.unboxToInt((Object)leaderOpt.get());
        if (leader == this.server1().config().brokerId()) {
            FetchResponse response1 = this.consumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0L, 10000).build());
            buffer = response1.messageSet("new-topic", 0).iterator().toBuffer();
        } else {
            FetchResponse response2 = this.consumer2().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0L, 10000).build());
            buffer = response2.messageSet("new-topic", 0).iterator().toBuffer();
        }
        Buffer messageSet = buffer;
        Assert.assertEquals((String)"Should have fetched 2 messages", (Object)BoxesRunTime.boxToInteger((int)2), (Object)BoxesRunTime.boxToInteger((int)messageSet.size()));
        Assert.assertEquals((Object)new Message("test1".getBytes(), "test".getBytes()), (Object)((MessageAndOffset)messageSet.apply(0)).message());
        Assert.assertEquals((Object)new Message("test2".getBytes(), "test".getBytes()), (Object)((MessageAndOffset)messageSet.apply(1)).message());
        producer1.close();
        try {
            try {
                producer2.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test2")}));
                throw this.fail("Should have timed out for 3 acks.");
            }
            catch (Throwable throwable) {
                throw this.fail("Not expected", throwable);
            }
            catch (FailedToSendMessageException failedToSendMessageException) {
            }
        }
        finally {
            producer2.close();
        }
    }

    /*
     * Loose catch block
     */
    @Test
    public void testSendWithDeadBroker() {
        Properties props = new Properties();
        ((Hashtable)props).put("serializer.class", "kafka.serializer.StringEncoder");
        ((Hashtable)props).put("partitioner.class", "kafka.utils.StaticPartitioner");
        ((Hashtable)props).put("request.timeout.ms", "2000");
        ((Hashtable)props).put("request.required.acks", "1");
        ((Hashtable)props).put("metadata.broker.list", TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaConfig[]{this.config1(), this.config2()})))));
        String topic = "new-topic";
        AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK(this.zkClient(), topic, (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{0}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)1)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{0}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)2)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{0}))), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)3)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{0})))})), AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK$default$4(), AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK$default$5());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq<KafkaServer>)this.servers(), topic, 0, 1000L);
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkClient(), topic, 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkClient(), topic, 1, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkClient(), topic, 2, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkClient(), topic, 3, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        ProducerConfig config = new ProducerConfig(props);
        Producer producer = new Producer(config);
        try {
            producer.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test1")}));
        }
        catch (Throwable throwable) {
            throw this.fail(new StringBuilder().append((Object)"Unexpected exception: ").append((Object)throwable).toString());
        }
        this.server1().shutdown();
        this.server1().awaitShutdown();
        try {
            producer.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test1")}));
            throw this.fail("Should fail since no leader exists for the partition.");
        }
        catch (Throwable throwable) {
            this.server1().startup();
            TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkClient(), topic, 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
            FetchResponse response1 = this.consumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0L, 10000).build());
            Iterator messageSet1 = response1.messageSet(topic, 0).iterator();
            Assert.assertTrue((String)"Message set should have 1 message", (boolean)messageSet1.hasNext());
            Assert.assertEquals((Object)new Message("test1".getBytes(), "test".getBytes()), (Object)((MessageAndOffset)messageSet1.next()).message());
            Assert.assertFalse((String)"Message set should have another message", (boolean)messageSet1.hasNext());
            producer.close();
            return;
        }
        catch (TestFailedException testFailedException) {
            throw testFailedException;
        }
        {
            catch (Exception exception) {
                throw this.fail("Not expected", exception);
            }
        }
    }

    @Test
    public void testAsyncSendCanCorrectlyFailWithTimeout() {
        int timeoutMs = 500;
        Properties props = new Properties();
        ((Hashtable)props).put("serializer.class", "kafka.serializer.StringEncoder");
        ((Hashtable)props).put("partitioner.class", "kafka.utils.StaticPartitioner");
        ((Hashtable)props).put("request.timeout.ms", String.valueOf(timeoutMs));
        ((Hashtable)props).put("metadata.broker.list", TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaConfig[]{this.config1(), this.config2()})))));
        ((Hashtable)props).put("request.required.acks", "1");
        ((Hashtable)props).put("client.id", "ProducerTest-testAsyncSendCanCorrectlyFailWithTimeout");
        ProducerConfig config = new ProducerConfig(props);
        Producer producer = new Producer(config);
        String topic = "new-topic";
        AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK(this.zkClient(), topic, (Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{0, 1})))})), AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK$default$4(), AdminUtils$.MODULE$.createOrUpdateTopicPartitionAssignmentPathInZK$default$5());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq<KafkaServer>)this.servers(), topic, 0, 1000L);
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkClient(), topic, 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        try {
            producer.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test")}));
            FetchResponse response1 = this.consumer1().fetch(new FetchRequestBuilder().addFetch(topic, 0, 0L, 10000).build());
            Iterator messageSet1 = response1.messageSet("new-topic", 0).iterator();
            Assert.assertTrue((String)"Message set should have 1 message", (boolean)messageSet1.hasNext());
            Assert.assertEquals((Object)new Message("test".getBytes()), (Object)((MessageAndOffset)messageSet1.next()).message());
        }
        catch (Exception exception) {
            producer.close();
            throw this.fail("Not expected", exception);
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        this.server1().requestHandlerPool().shutdown();
        long t1 = SystemTime$.MODULE$.milliseconds();
        try {
            producer.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KeyedMessage[]{new KeyedMessage(topic, (Object)"test", (Object)"test")}));
        }
        catch (Exception exception) {
            throw this.fail("Not expected", exception);
        }
        catch (FailedToSendMessageException failedToSendMessageException) {
        }
        finally {
            producer.close();
        }
        long t2 = SystemTime$.MODULE$.milliseconds();
        Assert.assertTrue((t2 - t1 >= (long)(timeoutMs * config.messageSendMaxRetries()) ? 1 : 0) != 0);
    }

    @Test
    public void testSendNullMessage() {
        Properties props = new Properties();
        ((Hashtable)props).put("serializer.class", "kafka.serializer.StringEncoder");
        ((Hashtable)props).put("partitioner.class", "kafka.utils.StaticPartitioner");
        ((Hashtable)props).put("metadata.broker.list", TestUtils$.MODULE$.getBrokerListStrFromConfigs((Seq<KafkaConfig>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaConfig[]{this.config1(), this.config2()})))));
        ProducerConfig config = new ProducerConfig(props);
        Producer producer = new Producer(config);
        try {
            AdminUtils$.MODULE$.createTopic(this.zkClient(), "new-topic", 2, 1, AdminUtils$.MODULE$.createTopic$default$5());
            Assert.assertTrue((String)"Topic new-topic not created after timeout", (boolean)TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)new Serializable(this){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ ProducerTest $outer;

                public final boolean apply() {
                    return this.apply$mcZ$sp();
                }

                public boolean apply$mcZ$sp() {
                    return AdminUtils$.MODULE$.fetchTopicMetadataFromZk("new-topic", this.$outer.zkClient()).errorCode() != ErrorMapping$.MODULE$.UnknownTopicOrPartitionCode();
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                }
            }, this.zookeeper().tickTime()));
            TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkClient(), "new-topic", 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
            KeyedMessage[] keyedMessageArray = new KeyedMessage[1];
            keyedMessageArray[0] = new KeyedMessage("new-topic", (Object)"key", null);
            producer.send((Seq)Predef$.MODULE$.wrapRefArray((Object[])keyedMessageArray));
            return;
        }
        finally {
            producer.close();
        }
    }

    public ProducerTest() {
        ZooKeeperTestHarness$class.$init$(this);
        Logging.class.$init$((Logging)this);
        this.brokerId1 = 0;
        this.brokerId2 = 1;
        this.ports = TestUtils$.MODULE$.choosePorts(2);
        Tuple2.mcII.sp sp2 = new Tuple2.mcII.sp(BoxesRunTime.unboxToInt((Object)this.ports().apply(0)), BoxesRunTime.unboxToInt((Object)this.ports().apply(1)));
        if (sp2 != null) {
            Tuple2.mcII.sp sp3;
            int port1 = sp2._1$mcI$sp();
            int port2 = sp2._2$mcI$sp();
            this.x$1 = sp3 = new Tuple2.mcII.sp(port1, port2);
            this.port1 = this.x$1._1$mcI$sp();
            this.port2 = this.x$1._2$mcI$sp();
            this.server1 = null;
            this.server2 = null;
            this.consumer1 = null;
            this.consumer2 = null;
            this.requestHandlerLogger = Logger.getLogger(KafkaRequestHandler.class);
            this.servers = List$.MODULE$.empty();
            this.props1 = TestUtils$.MODULE$.createBrokerConfig(this.brokerId1(), this.port1());
            ((Hashtable)this.props1()).put("num.partitions", "4");
            this.config1 = new KafkaConfig(this.props1());
            this.props2 = TestUtils$.MODULE$.createBrokerConfig(this.brokerId2(), this.port2());
            ((Hashtable)this.props2()).put("num.partitions", "4");
            this.config2 = new KafkaConfig(this.props2());
            return;
        }
        throw new MatchError((Object)sp2);
    }
}

