/*
 * Decompiled with CFR 0.152.
 */
package kafka.producer;

import java.io.File;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Properties;
import kafka.admin.AdminUtils$;
import kafka.api.ProducerRequest;
import kafka.api.ProducerResponse;
import kafka.api.ProducerResponseStatus;
import kafka.common.TopicAndPartition;
import kafka.integration.KafkaServerTestHarness;
import kafka.integration.KafkaServerTestHarness$class;
import kafka.message.ByteBufferMessageSet;
import kafka.message.CompressionCodec;
import kafka.message.Message;
import kafka.message.Message$;
import kafka.message.MessageSet$;
import kafka.message.NoCompressionCodec$;
import kafka.network.SocketServer;
import kafka.producer.SyncProducer;
import kafka.producer.SyncProducerConfig;
import kafka.producer.SyncProducerConfig$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import kafka.utils.SystemTime$;
import kafka.utils.TestUtils$;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.ZooKeeperTestHarness$class;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.collection.mutable.StringBuilder;
import scala.collection.mutable.WrappedArray;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.TraitSetter;

/*
 * Duplicate member names - consider using --renamedupmembers true
 */
@ScalaSignature(bytes="\u0006\u0001\u0005ub\u0001B\u0001\u0003\u0001\u001d\u0011\u0001cU=oGB\u0013x\u000eZ;dKJ$Vm\u001d;\u000b\u0005\r!\u0011\u0001\u00039s_\u0012,8-\u001a:\u000b\u0003\u0015\tQa[1gW\u0006\u001c\u0001aE\u0002\u0001\u0011I\u0001\"!\u0003\t\u000e\u0003)Q!a\u0003\u0007\u0002\u000b),h.\u001b;\u000b\u00055q\u0011!C:dC2\fG/Z:u\u0015\u0005y\u0011aA8sO&\u0011\u0011C\u0003\u0002\u000b\u0015Vs\u0017\u000e^*vSR,\u0007CA\n\u0017\u001b\u0005!\"BA\u000b\u0005\u0003-Ig\u000e^3he\u0006$\u0018n\u001c8\n\u0005]!\"AF&bM.\f7+\u001a:wKJ$Vm\u001d;ICJtWm]:\t\u000be\u0001A\u0011\u0001\u000e\u0002\rqJg.\u001b;?)\u0005Y\u0002C\u0001\u000f\u0001\u001b\u0005\u0011\u0001b\u0002\u0010\u0001\u0005\u0004%IaH\u0001\r[\u0016\u001c8/Y4f\u0005f$Xm]\u000b\u0002AA\u0019\u0011\u0005\n\u0014\u000e\u0003\tR\u0011aI\u0001\u0006g\u000e\fG.Y\u0005\u0003K\t\u0012Q!\u0011:sCf\u0004\"!I\u0014\n\u0005!\u0012#\u0001\u0002\"zi\u0016DaA\u000b\u0001!\u0002\u0013\u0001\u0013!D7fgN\fw-\u001a\"zi\u0016\u001c\b\u0005C\u0003-\u0001\u0011\u0005Q&A\bhK:,'/\u0019;f\u0007>tg-[4t)\u0005q\u0003cA\u00185m5\t\u0001G\u0003\u00022e\u0005I\u0011.\\7vi\u0006\u0014G.\u001a\u0006\u0003g\t\n!bY8mY\u0016\u001cG/[8o\u0013\t)\u0004G\u0001\u0003MSN$\bCA\u001c;\u001b\u0005A$BA\u001d\u0005\u0003\u0019\u0019XM\u001d<fe&\u00111\b\u000f\u0002\f\u0017\u000647.Y\"p]\u001aLw\rC\u0003>\u0001\u0011%a(\u0001\bqe>$WoY3SKF,Xm\u001d;\u0015\u0011}*ej\u0015.]=\u0002\u0004\"\u0001Q\"\u000e\u0003\u0005S!A\u0011\u0003\u0002\u0007\u0005\u0004\u0018.\u0003\u0002E\u0003\ny\u0001K]8ek\u000e,'OU3rk\u0016\u001cH\u000fC\u0003Gy\u0001\u0007q)A\u0003u_BL7\r\u0005\u0002I\u0017:\u0011\u0011%S\u0005\u0003\u0015\n\na\u0001\u0015:fI\u00164\u0017B\u0001'N\u0005\u0019\u0019FO]5oO*\u0011!J\t\u0005\u0006\u001fr\u0002\r\u0001U\u0001\na\u0006\u0014H/\u001b;j_:\u0004\"!I)\n\u0005I\u0013#aA%oi\")A\u000b\u0010a\u0001+\u00069Q.Z:tC\u001e,\u0007C\u0001,Y\u001b\u00059&B\u0001+\u0005\u0013\tIvK\u0001\u000bCsR,')\u001e4gKJlUm]:bO\u0016\u001cV\r\u001e\u0005\u00067r\u0002\r\u0001U\u0001\u0005C\u000e\\7\u000fC\u0004^yA\u0005\t\u0019\u0001)\u0002\u000fQLW.Z8vi\"9q\f\u0010I\u0001\u0002\u0004\u0001\u0016!D2peJ,G.\u0019;j_:LE\rC\u0004byA\u0005\t\u0019A$\u0002\u0011\rd\u0017.\u001a8u\u0013\u0012DQa\u0019\u0001\u0005\u0002\u0011\f1\u0003^3tiJ+\u0017m\u00195bE2,7+\u001a:wKJ$\u0012!\u001a\t\u0003C\u0019L!a\u001a\u0012\u0003\tUs\u0017\u000e\u001e\u0015\u0003E&\u0004\"A\u001b7\u000e\u0003-T!a\u0003\b\n\u00055\\'\u0001\u0002+fgRDQa\u001c\u0001\u0005\u0002\u0011\fq\u0003^3ti\u0016k\u0007\u000f^=Qe>$WoY3SKF,Xm\u001d;)\u00059L\u0007\"\u0002:\u0001\t\u0003!\u0017a\u0006;fgRlUm]:bO\u0016\u001c\u0016N_3U_>d\u0015M]4fQ\t\t\u0018\u000eC\u0003v\u0001\u0011\u0005A-\u0001\u0012uKN$X*Z:tC\u001e,7+\u001b>f)>|G*\u0019:hK^KG\u000f[!dWj+'o\u001c\u0015\u0003i&DQ\u0001\u001f\u0001\u0005\u0002\u0011\fA\u0005^3tiB\u0013x\u000eZ;dK\u000e{'O]3di2L(+Z2fSZ,7OU3ta>t7/\u001a\u0015\u0003o&DQa\u001f\u0001\u0005\u0002\u0011\fa\u0003^3tiB\u0013x\u000eZ;dKJ\u001c\u0015M\u001c+j[\u0016|W\u000f\u001e\u0015\u0003u&DQA \u0001\u0005\u0002\u0011\f\u0001\u0005^3tiB\u0013x\u000eZ;dKJ+\u0017/^3ti^KG\u000f\u001b(p%\u0016\u001c\bo\u001c8tK\"\u0012Q0\u001b\u0005\u0007\u0003\u0007\u0001A\u0011\u00013\u0002+Q,7\u000f\u001e(pi\u0016sw.^4i%\u0016\u0004H.[2bg\"\u001a\u0011\u0011A5\t\u0013\u0005%\u0001!%A\u0005\n\u0005-\u0011\u0001\u00079s_\u0012,8-\u001a*fcV,7\u000f\u001e\u0013eK\u001a\fW\u000f\u001c;%kU\u0011\u0011Q\u0002\u0016\u0004!\u0006=1FAA\t!\u0011\t\u0019\"!\b\u000e\u0005\u0005U!\u0002BA\f\u00033\t\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005m!%\u0001\u0006b]:|G/\u0019;j_:LA!a\b\u0002\u0016\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\t\u0013\u0005\r\u0002!%A\u0005\n\u0005-\u0011\u0001\u00079s_\u0012,8-\u001a*fcV,7\u000f\u001e\u0013eK\u001a\fW\u000f\u001c;%m!I\u0011q\u0005\u0001\u0012\u0002\u0013%\u0011\u0011F\u0001\u0019aJ|G-^2f%\u0016\fX/Z:uI\u0011,g-Y;mi\u0012:TCAA\u0016U\r9\u0015q\u0002\u0015\b\u0001\u0005=\u0012QGA\u001d!\r\t\u0013\u0011G\u0005\u0004\u0003g\u0011#A\u00033faJ,7-\u0019;fI\u0006\u0012\u0011qG\u0001I)\"L7\u000f\t;fgR\u0004\u0003.Y:!E\u0016,g\u000e\t3faJ,7-\u0019;fI\u0002\ng\u000e\u001a\u0011ji\u0002:\u0018\u000e\u001c7!E\u0016\u0004#/Z7pm\u0016$\u0007%\u001b8!C\u00022W\u000f^;sK\u0002\u0012X\r\\3bg\u0016\f#!a\u000f\u0002\u0011Ar\u0013\u0007\r\u00181]A\u0002")
public class SyncProducerTest
extends JUnitSuite
implements KafkaServerTestHarness {
    private final byte[] messageBytes;
    private Seq<KafkaConfig> instanceConfigs;
    private Buffer<KafkaServer> servers;
    private String brokerList;
    private boolean[] alive;
    private final String kafkaPrincipalType;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;
    private ZkUtils zkUtils;
    private EmbeddedZookeeper zookeeper;
    private final String loggerName;
    private final Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    private volatile boolean bitmap$0;

    @Override
    public Seq<KafkaConfig> instanceConfigs() {
        return this.instanceConfigs;
    }

    @Override
    @TraitSetter
    public void instanceConfigs_$eq(Seq<KafkaConfig> x$1) {
        this.instanceConfigs = x$1;
    }

    @Override
    public Buffer<KafkaServer> servers() {
        return this.servers;
    }

    @Override
    @TraitSetter
    public void servers_$eq(Buffer<KafkaServer> x$1) {
        this.servers = x$1;
    }

    @Override
    public String brokerList() {
        return this.brokerList;
    }

    @Override
    @TraitSetter
    public void brokerList_$eq(String x$1) {
        this.brokerList = x$1;
    }

    @Override
    public boolean[] alive() {
        return this.alive;
    }

    @Override
    @TraitSetter
    public void alive_$eq(boolean[] x$1) {
        this.alive = x$1;
    }

    @Override
    public String kafkaPrincipalType() {
        return this.kafkaPrincipalType;
    }

    @Override
    public void kafka$integration$KafkaServerTestHarness$$super$setUp() {
        ZooKeeperTestHarness$class.setUp(this);
    }

    @Override
    public void kafka$integration$KafkaServerTestHarness$$super$tearDown() {
        ZooKeeperTestHarness$class.tearDown(this);
    }

    @Override
    public void kafka$integration$KafkaServerTestHarness$_setter_$kafkaPrincipalType_$eq(String x$1) {
        this.kafkaPrincipalType = x$1;
    }

    @Override
    public void setAclsBeforeServersStart() {
        KafkaServerTestHarness$class.setAclsBeforeServersStart(this);
    }

    @Override
    public Seq<KafkaConfig> configs() {
        return KafkaServerTestHarness$class.configs(this);
    }

    @Override
    public Option<KafkaServer> serverForId(int id) {
        return KafkaServerTestHarness$class.serverForId(this, id);
    }

    @Override
    public SecurityProtocol securityProtocol() {
        return KafkaServerTestHarness$class.securityProtocol(this);
    }

    @Override
    public Option<File> trustStoreFile() {
        return KafkaServerTestHarness$class.trustStoreFile(this);
    }

    @Override
    public Option<Properties> saslProperties() {
        return KafkaServerTestHarness$class.saslProperties(this);
    }

    @Override
    @Before
    public void setUp() {
        KafkaServerTestHarness$class.setUp(this);
    }

    @Override
    @After
    public void tearDown() {
        KafkaServerTestHarness$class.tearDown(this);
    }

    @Override
    public int killRandomBroker() {
        return KafkaServerTestHarness$class.killRandomBroker(this);
    }

    @Override
    public void restartDeadBrokers() {
        KafkaServerTestHarness$class.restartDeadBrokers(this);
    }

    @Override
    public int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    @Override
    public int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    @Override
    public ZkUtils zkUtils() {
        return this.zkUtils;
    }

    @Override
    public void zkUtils_$eq(ZkUtils x$1) {
        this.zkUtils = x$1;
    }

    @Override
    public EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    @Override
    public void zookeeper_$eq(EmbeddedZookeeper x$1) {
        this.zookeeper = x$1;
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnectionTimeout_$eq(int x$1) {
        this.zkConnectionTimeout = x$1;
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkSessionTimeout_$eq(int x$1) {
        this.zkSessionTimeout = x$1;
    }

    @Override
    public int zkPort() {
        return ZooKeeperTestHarness$class.zkPort(this);
    }

    @Override
    public String zkConnect() {
        return ZooKeeperTestHarness$class.zkConnect(this);
    }

    public String loggerName() {
        return this.loggerName;
    }

    private Logger logger$lzycompute() {
        SyncProducerTest syncProducerTest = this;
        synchronized (syncProducerTest) {
            if (!this.bitmap$0) {
                this.logger = Logging.class.logger((Logging)this);
                this.bitmap$0 = true;
            }
            return this.logger;
        }
    }

    public Logger logger() {
        return this.bitmap$0 ? this.logger : this.logger$lzycompute();
    }

    public String logIdent() {
        return this.logIdent;
    }

    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public Log4jController$ kafka$utils$Logging$$log4jController() {
        return this.kafka$utils$Logging$$log4jController;
    }

    public void kafka$utils$Logging$_setter_$loggerName_$eq(String x$1) {
        this.loggerName = x$1;
    }

    public void kafka$utils$Logging$_setter_$kafka$utils$Logging$$log4jController_$eq(Log4jController$ x$1) {
        this.kafka$utils$Logging$$log4jController = x$1;
    }

    public void trace(Function0<String> msg) {
        Logging.class.trace((Logging)this, msg);
    }

    public Object trace(Function0<Throwable> e) {
        return Logging.class.trace((Logging)this, e);
    }

    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.trace((Logging)this, msg, e);
    }

    public void swallowTrace(Function0<BoxedUnit> action) {
        Logging.class.swallowTrace((Logging)this, action);
    }

    public boolean isDebugEnabled() {
        return Logging.class.isDebugEnabled((Logging)this);
    }

    public void debug(Function0<String> msg) {
        Logging.class.debug((Logging)this, msg);
    }

    public Object debug(Function0<Throwable> e) {
        return Logging.class.debug((Logging)this, e);
    }

    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.debug((Logging)this, msg, e);
    }

    public void swallowDebug(Function0<BoxedUnit> action) {
        Logging.class.swallowDebug((Logging)this, action);
    }

    public void info(Function0<String> msg) {
        Logging.class.info((Logging)this, msg);
    }

    public Object info(Function0<Throwable> e) {
        return Logging.class.info((Logging)this, e);
    }

    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.info((Logging)this, msg, e);
    }

    public void swallowInfo(Function0<BoxedUnit> action) {
        Logging.class.swallowInfo((Logging)this, action);
    }

    public void warn(Function0<String> msg) {
        Logging.class.warn((Logging)this, msg);
    }

    public Object warn(Function0<Throwable> e) {
        return Logging.class.warn((Logging)this, e);
    }

    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.warn((Logging)this, msg, e);
    }

    public void swallowWarn(Function0<BoxedUnit> action) {
        Logging.class.swallowWarn((Logging)this, action);
    }

    public void swallow(Function0<BoxedUnit> action) {
        Logging.class.swallow((Logging)this, action);
    }

    public void error(Function0<String> msg) {
        Logging.class.error((Logging)this, msg);
    }

    public Object error(Function0<Throwable> e) {
        return Logging.class.error((Logging)this, e);
    }

    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.error((Logging)this, msg, e);
    }

    public void swallowError(Function0<BoxedUnit> action) {
        Logging.class.swallowError((Logging)this, action);
    }

    public void fatal(Function0<String> msg) {
        Logging.class.fatal((Logging)this, msg);
    }

    public Object fatal(Function0<Throwable> e) {
        return Logging.class.fatal((Logging)this, e);
    }

    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.fatal((Logging)this, msg, e);
    }

    private byte[] messageBytes() {
        return this.messageBytes;
    }

    public List<KafkaConfig> generateConfigs() {
        return List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaConfig[]{KafkaConfig$.MODULE$.fromProps((Properties)TestUtils$.MODULE$.createBrokerConfigs(1, this.zkConnect(), false, TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12()).head())}));
    }

    private ProducerRequest produceRequest(String topic, int partition, ByteBufferMessageSet message, int acks, int timeout, int correlationId, String clientId) {
        return TestUtils$.MODULE$.produceRequest(topic, partition, message, acks, timeout, correlationId, clientId);
    }

    private int produceRequest$default$5() {
        return SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs();
    }

    private int produceRequest$default$6() {
        return 0;
    }

    private String produceRequest$default$7() {
        return SyncProducerConfig$.MODULE$.DefaultClientId();
    }

    @Test
    public void testReachableServer() {
        KafkaServer server = (KafkaServer)this.servers().head();
        SocketServer qual$1 = server.socketServer();
        SecurityProtocol x$3 = qual$1.boundPort$default$1();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(qual$1.boundPort(x$3));
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        long firstStart = SystemTime$.MODULE$.milliseconds();
        try {
            ProducerResponse response = producer.send(this.produceRequest("test", 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())})), 1, this.produceRequest$default$5(), this.produceRequest$default$6(), this.produceRequest$default$7()));
            Assert.assertNotNull((Object)response);
        }
        catch (Exception exception) {
            throw this.fail(new StringBuilder().append((Object)"Unexpected failure sending message to broker. ").append((Object)exception.getMessage()).toString());
        }
        long firstEnd = SystemTime$.MODULE$.milliseconds();
        Assert.assertTrue((firstEnd - firstStart < 2000L ? 1 : 0) != 0);
        long secondStart = SystemTime$.MODULE$.milliseconds();
        try {
            ProducerResponse response = producer.send(this.produceRequest("test", 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())})), 1, this.produceRequest$default$5(), this.produceRequest$default$6(), this.produceRequest$default$7()));
            Assert.assertNotNull((Object)response);
        }
        catch (Exception exception) {
            throw this.fail(new StringBuilder().append((Object)"Unexpected failure sending message to broker. ").append((Object)exception.getMessage()).toString());
        }
        long secondEnd = SystemTime$.MODULE$.milliseconds();
        Assert.assertTrue((secondEnd - secondStart < 2000L ? 1 : 0) != 0);
        try {
            ProducerResponse response = producer.send(this.produceRequest("test", 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())})), 1, this.produceRequest$default$5(), this.produceRequest$default$6(), this.produceRequest$default$7()));
            Assert.assertNotNull((Object)response);
            return;
        }
        catch (Exception exception) {
            throw this.fail(new StringBuilder().append((Object)"Unexpected failure sending message to broker. ").append((Object)exception.getMessage()).toString());
        }
    }

    @Test
    public void testEmptyProduceRequest() {
        int ackTimeoutMs;
        short ack;
        String clientId;
        int correlationId;
        ProducerRequest emptyRequest;
        SecurityProtocol x$4;
        KafkaServer server = (KafkaServer)this.servers().head();
        SocketServer qual$2 = server.socketServer();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(qual$2.boundPort(x$4 = qual$2.boundPort$default$1()));
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        ProducerResponse response = producer.send(emptyRequest = new ProducerRequest(correlationId = 0, clientId = SyncProducerConfig$.MODULE$.DefaultClientId(), ack = 1, ackTimeoutMs = SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs(), (Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$)));
        Assert.assertTrue((response != null ? 1 : 0) != 0);
        Assert.assertTrue((!response.hasError() && response.status().isEmpty() ? 1 : 0) != 0);
    }

    @Test
    public void testMessageSizeTooLarge() {
        KafkaServer server = (KafkaServer)this.servers().head();
        SocketServer qual$3 = server.socketServer();
        SecurityProtocol x$5 = qual$3.boundPort$default$1();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(qual$3.boundPort(x$5));
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        TestUtils$.MODULE$.createTopic(this.zkUtils(), "test", 1, 1, (Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.createTopic$default$6());
        Message message1 = new Message(new byte[Predef$.MODULE$.Integer2int(((KafkaConfig)this.configs().head()).messageMaxBytes()) + 1]);
        ByteBufferMessageSet messageSet1 = new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{message1}));
        ProducerResponse response1 = producer.send(this.produceRequest("test", 0, messageSet1, 1, this.produceRequest$default$5(), this.produceRequest$default$6(), this.produceRequest$default$7()));
        Assert.assertEquals((long)1L, (long)response1.status().count((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final boolean apply(Tuple2<TopicAndPartition, ProducerResponseStatus> x$1) {
                return ((ProducerResponseStatus)x$1._2()).error() != Errors.NONE.code();
            }
        }));
        Assert.assertEquals((long)Errors.MESSAGE_TOO_LARGE.code(), (long)((ProducerResponseStatus)response1.status().apply((Object)new TopicAndPartition("test", 0))).error());
        Assert.assertEquals((long)-1L, (long)((ProducerResponseStatus)response1.status().apply((Object)new TopicAndPartition("test", 0))).offset());
        int safeSize = Predef$.MODULE$.Integer2int(((KafkaConfig)this.configs().head()).messageMaxBytes()) - Message$.MODULE$.MinMessageOverhead() - Message$.MODULE$.TimestampLength() - MessageSet$.MODULE$.LogOverhead() - 1;
        Message message2 = new Message(new byte[safeSize]);
        ByteBufferMessageSet messageSet2 = new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{message2}));
        ProducerResponse response2 = producer.send(this.produceRequest("test", 0, messageSet2, 1, this.produceRequest$default$5(), this.produceRequest$default$6(), this.produceRequest$default$7()));
        Assert.assertEquals((long)1L, (long)response1.status().count((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final boolean apply(Tuple2<TopicAndPartition, ProducerResponseStatus> x$2) {
                return ((ProducerResponseStatus)x$2._2()).error() != Errors.NONE.code();
            }
        }));
        Assert.assertEquals((long)Errors.NONE.code(), (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("test", 0))).error());
        Assert.assertEquals((long)0L, (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("test", 0))).offset());
    }

    @Test
    public void testMessageSizeTooLargeWithAckZero() {
        KafkaServer server = (KafkaServer)this.servers().head();
        SocketServer qual$4 = server.socketServer();
        SecurityProtocol x$6 = qual$4.boundPort$default$1();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(qual$4.boundPort(x$6));
        props.put("request.required.acks", "0");
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        AdminUtils$.MODULE$.createTopic(this.zkUtils(), "test", 1, 1, AdminUtils$.MODULE$.createTopic$default$5(), AdminUtils$.MODULE$.createTopic$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), "test", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        producer.send(this.produceRequest("test", 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(new byte[Predef$.MODULE$.Integer2int(((KafkaConfig)this.configs().head()).messageMaxBytes()) + 1])})), 0, this.produceRequest$default$5(), this.produceRequest$default$6(), this.produceRequest$default$7()));
        try {
            producer.send(this.produceRequest("test", 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(new byte[Predef$.MODULE$.Integer2int(((KafkaConfig)this.configs().head()).messageMaxBytes()) + 1])})), 0, this.produceRequest$default$5(), this.produceRequest$default$6(), this.produceRequest$default$7()));
        }
        catch (Throwable throwable) {
            throw throwable;
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    @Test
    public void testProduceCorrectlyReceivesResponse() {
        KafkaServer server = (KafkaServer)this.servers().head();
        SocketServer qual$5 = server.socketServer();
        SecurityProtocol x$7 = qual$5.boundPort$default$1();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(qual$5.boundPort(x$7));
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        ByteBufferMessageSet messages2 = new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())}));
        WrappedArray x$8 = Predef$.MODULE$.wrapRefArray((Object[])new String[]{"topic1", "topic2", "topic3"});
        WrappedArray x$9 = Predef$.MODULE$.wrapIntArray(new int[]{0});
        ByteBufferMessageSet x$10 = messages2;
        int x$11 = 1;
        int x$12 = SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs();
        String x$13 = SyncProducerConfig$.MODULE$.DefaultClientId();
        int x$14 = TestUtils$.MODULE$.produceRequestWithAcks$default$6();
        ProducerRequest request = TestUtils$.MODULE$.produceRequestWithAcks((Seq<String>)x$8, (Seq<Object>)x$9, x$10, x$11, x$12, x$14, x$13);
        ProducerResponse response = producer.send(request);
        Assert.assertNotNull((Object)response);
        Assert.assertEquals((long)request.correlationId(), (long)response.correlationId());
        Assert.assertEquals((long)3L, (long)response.status().size());
        response.status().values().foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final void apply(ProducerResponseStatus x0$1) {
                ProducerResponseStatus producerResponseStatus = x0$1;
                if (producerResponseStatus != null) {
                    short error = producerResponseStatus.error();
                    long nextOffset = producerResponseStatus.offset();
                    long timestamp = producerResponseStatus.timestamp();
                    Assert.assertEquals((long)Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), (long)error);
                    Assert.assertEquals((long)-1L, (long)nextOffset);
                    Assert.assertEquals((long)Message$.MODULE$.NoTimestamp(), (long)timestamp);
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
                throw new MatchError((Object)producerResponseStatus);
            }
        });
        AdminUtils$.MODULE$.createTopic(this.zkUtils(), "topic1", 1, 1, AdminUtils$.MODULE$.createTopic$default$5(), AdminUtils$.MODULE$.createTopic$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), "topic1", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        AdminUtils$.MODULE$.createTopic(this.zkUtils(), "topic3", 1, 1, AdminUtils$.MODULE$.createTopic$default$5(), AdminUtils$.MODULE$.createTopic$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), "topic3", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        ProducerResponse response2 = producer.send(request);
        Assert.assertNotNull((Object)response2);
        Assert.assertEquals((long)request.correlationId(), (long)response2.correlationId());
        Assert.assertEquals((long)3L, (long)response2.status().size());
        Assert.assertEquals((long)Errors.NONE.code(), (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic1", 0))).error());
        Assert.assertEquals((long)Errors.NONE.code(), (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic3", 0))).error());
        Assert.assertEquals((long)0L, (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic1", 0))).offset());
        Assert.assertEquals((long)0L, (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic3", 0))).offset());
        Assert.assertEquals((long)Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic2", 0))).error());
        Assert.assertEquals((long)-1L, (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic2", 0))).offset());
    }

    @Test
    public void testProducerCanTimeout() {
        int timeoutMs = 500;
        KafkaServer server = (KafkaServer)this.servers().head();
        SocketServer qual$6 = server.socketServer();
        SecurityProtocol x$15 = qual$6.boundPort$default$1();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(qual$6.boundPort(x$15));
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        ByteBufferMessageSet messages2 = new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())}));
        ProducerRequest request = this.produceRequest("topic1", 0, messages2, 1, this.produceRequest$default$5(), this.produceRequest$default$6(), this.produceRequest$default$7());
        server.requestHandlerPool().shutdown();
        long t1 = SystemTime$.MODULE$.milliseconds();
        try {
            producer.send(request);
            throw this.fail("Should have received timeout exception since request handling is stopped.");
        }
        catch (Throwable throwable) {
            throw this.fail(new StringBuilder().append((Object)"Unexpected exception when expecting timeout: ").append((Object)throwable).toString());
        }
        catch (SocketTimeoutException socketTimeoutException) {
            long t2 = SystemTime$.MODULE$.milliseconds();
            Assert.assertTrue((t2 - t1 >= (long)timeoutMs ? 1 : 0) != 0);
            return;
        }
    }

    @Test
    public void testProduceRequestWithNoResponse() {
        int ackTimeoutMs;
        short ack;
        String clientId;
        int correlationId;
        ProducerRequest emptyRequest;
        KafkaServer server = (KafkaServer)this.servers().head();
        int port = server.socketServer().boundPort(SecurityProtocol.PLAINTEXT);
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(port);
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        ProducerResponse response = producer.send(emptyRequest = new ProducerRequest(correlationId = 0, clientId = SyncProducerConfig$.MODULE$.DefaultClientId(), ack = 0, ackTimeoutMs = SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs(), (Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$)));
        Assert.assertTrue((response == null ? 1 : 0) != 0);
    }

    @Test
    public void testNotEnoughReplicas() {
        String topicName = "minisrtest";
        KafkaServer server = (KafkaServer)this.servers().head();
        SocketServer qual$7 = server.socketServer();
        SecurityProtocol x$16 = qual$7.boundPort$default$1();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(qual$7.boundPort(x$16));
        props.put("request.required.acks", "-1");
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        Properties topicProps = new Properties();
        topicProps.put("min.insync.replicas", "2");
        AdminUtils$.MODULE$.createTopic(this.zkUtils(), topicName, 1, 1, topicProps, AdminUtils$.MODULE$.createTopic$default$6());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), topicName, 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        ProducerResponse response = producer.send(this.produceRequest(topicName, 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())})), -1, this.produceRequest$default$5(), this.produceRequest$default$6(), this.produceRequest$default$7()));
        Assert.assertEquals((long)Errors.NOT_ENOUGH_REPLICAS.code(), (long)((ProducerResponseStatus)response.status().apply((Object)new TopicAndPartition(topicName, 0))).error());
    }

    public SyncProducerTest() {
        Logging.class.$init$((Logging)this);
        ZooKeeperTestHarness$class.$init$(this);
        KafkaServerTestHarness$class.$init$(this);
        this.messageBytes = new byte[2];
    }
}

