package kafka.network;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Properties;
import java.util.Random;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.TrustManager;
import kafka.network.RequestChannel;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.SystemTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.collection.Iterable$;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.mutable.ArrayBuffer;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;

/* compiled from: SocketServerTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005-f\u0001B\u0001\u0003\u0001\u001d\u0011\u0001cU8dW\u0016$8+\u001a:wKJ$Vm\u001d;\u000b\u0005\r!\u0011a\u00028fi^|'o\u001b\u0006\u0002\u000b\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\t!\tI\u0001#D\u0001\u000b\u0015\tYA\"A\u0003kk:LGO\u0003\u0002\u000e\u001d\u0005I1oY1mCR,7\u000f\u001e\u0006\u0002\u001f\u0005\u0019qN]4\n\u0005EQ!A\u0003&V]&$8+^5uK\")1\u0003\u0001C\u0001)\u00051A(\u001b8jiz\"\u0012!\u0006\t\u0003-\u0001i\u0011A\u0001\u0005\b1\u0001\u0011\r\u0011\"\u0001\u001a\u0003\u0015\u0001(o\u001c9t+\u0005Q\u0002CA\u000e!\u001b\u0005a\"BA\u000f\u001f\u0003\u0011)H/\u001b7\u000b\u0003}\tAA[1wC&\u0011\u0011\u0005\b\u0002\u000b!J|\u0007/\u001a:uS\u0016\u001c\bBB\u0012\u0001A\u0003%!$\u0001\u0004qe>\u00048\u000f\t\u0005\bK\u0001\u0011\r\u0011\"\u0001'\u0003\u0019\u0019wN\u001c4jOV\tq\u0005\u0005\u0002)W5\t\u0011F\u0003\u0002+\t\u000511/\u001a:wKJL!\u0001L\u0015\u0003\u0017-\u000bgm[1D_:4\u0017n\u001a\u0005\u0007]\u0001\u0001\u000b\u0011B\u0014\u0002\u000f\r|gNZ5hA!9\u0001\u0007\u0001b\u0001\n\u0003\t\u0014aB7fiJL7m]\u000b\u0002eA\u00111GO\u0007\u0002i)\u0011\u0001'\u000e\u0006\u0003m]\naaY8n[>t'BA\u00039\u0015\tId\"\u0001\u0004ba\u0006\u001c\u0007.Z\u0005\u0003wQ\u0012q!T3ue&\u001c7\u000f\u0003\u0004>\u0001\u0001\u0006IAM\u0001\t[\u0016$(/[2tA!9!\u0006\u0001b\u0001\n\u0003yT#\u0001!\u0011\u0005Y\t\u0015B\u0001\"\u0003\u00051\u0019vnY6fiN+'O^3s\u0011\u0019!\u0005\u0001)A\u0005\u0001\u000691/\u001a:wKJ\u0004\u0003b\u0002$\u0001\u0005\u0004%\taR\u0001\bg>\u001c7.\u001a;t+\u0005A\u0005cA%Q%6\t!J\u0003\u0002L\u0019\u00069Q.\u001e;bE2,'BA'O\u0003)\u0019w\u000e\u001c7fGRLwN\u001c\u0006\u0002\u001f\u0006)1oY1mC&\u0011\u0011K\u0013\u0002\f\u0003J\u0014\u0018-\u001f\"vM\u001a,'\u000f\u0005\u0002T-6\tAK\u0003\u0002V=\u0005\u0019a.\u001a;\n\u0005]#&AB*pG.,G\u000f\u0003\u0004Z\u0001\u0001\u0006I\u0001S\u0001\tg>\u001c7.\u001a;tA!)1\f\u0001C\u00019\u0006Y1/\u001a8e%\u0016\fX/Z:u)\u0011i\u0016mY6\u0011\u0005y{V\"\u0001(\n\u0005\u0001t%\u0001B+oSRDQA\u0019.A\u0002I\u000baa]8dW\u0016$\b\"\u00023[\u0001\u0004)\u0017a\u0002:fcV,7\u000f\u001e\t\u0004=\u001aD\u0017BA4O\u0005\u0015\t%O]1z!\tq\u0016.\u0003\u0002k\u001d\n!!)\u001f;f\u0011\u001da'\f%AA\u00025\f!!\u001b3\u0011\u0007ys\u0007/\u0003\u0002p\u001d\n1q\n\u001d;j_:\u0004\"AX9\n\u0005It%!B*i_J$\b\"\u0002;\u0001\t\u0003)\u0018a\u0004:fG\u0016Lg/\u001a*fgB|gn]3\u0015\u0005\u00154\b\"\u00022t\u0001\u0004\u0011\u0006\"\u0002=\u0001\t\u0003I\u0018A\u00049s_\u000e,7o\u001d*fcV,7\u000f\u001e\u000b\u0003;jDQa_<A\u0002q\fqa\u00195b]:,G\u000e\u0005\u0002\u0017{&\u0011aP\u0001\u0002\u000f%\u0016\fX/Z:u\u0007\"\fgN\\3m\u0011\u0019A\b\u0001\"\u0001\u0002\u0002Q)Q,a\u0001\u0002\u0006!)1p a\u0001y\"1Am a\u0001\u0003\u000f\u0001B!!\u0003\u0002\u00109\u0019a#a\u0003\n\u0007\u00055!!\u0001\bSKF,Xm\u001d;DQ\u0006tg.\u001a7\n\t\u0005E\u00111\u0003\u0002\b%\u0016\fX/Z:u\u0015\r\tiA\u0001\u0005\b\u0003/\u0001A\u0011AA\r\u0003\u001d\u0019wN\u001c8fGR$RAUA\u000e\u0003?A\u0011\"!\b\u0002\u0016A\u0005\t\u0019\u0001!\u0002\u0003MD!\"!\t\u0002\u0016A\u0005\t\u0019AA\u0012\u0003!\u0001(o\u001c;pG>d\u0007\u0003BA\u0013\u0003Si!!a\n\u000b\u0007\u0005\u0005R'\u0003\u0003\u0002,\u0005\u001d\"\u0001E*fGV\u0014\u0018\u000e^=Qe>$xnY8m\u0011\u001d\ty\u0003\u0001C\u0001\u0003c\t\u0001\u0002^3be\u0012{wO\u001c\u000b\u0002;\"\"\u0011QFA\u001b!\u0011\t9$a\u000f\u000e\u0005\u0005e\"BA\u0006\u000f\u0013\u0011\ti$!\u000f\u0003\u000b\u00053G/\u001a:\t\u000f\u0005\u0005\u0003\u0001\"\u0003\u0002D\u0005!\u0002O]8ek\u000e,'OU3rk\u0016\u001cHOQ=uKN,\u0012!\u001a\u0005\b\u0003\u000f\u0002A\u0011AA\u0019\u00035\u0019\u0018.\u001c9mKJ+\u0017/^3ti\"\"\u0011QIA&!\u0011\t9$!\u0014\n\t\u0005=\u0013\u0011\b\u0002\u0005)\u0016\u001cH\u000fC\u0004\u0002T\u0001!\t!!\r\u0002/Q|wNQ5h%\u0016\fX/Z:u\u0013N\u0014VM[3di\u0016$\u0007\u0006BA)\u0003\u0017Bq!!\u0017\u0001\t\u0003\t\t$\u0001\u000euKN$8k\\2lKR\u001c8\t\\8tK>s7\u000b[;uI><h\u000e\u000b\u0003\u0002X\u0005-\u0003bBA0\u0001\u0011\u0005\u0011\u0011G\u0001\u0018i\u0016\u001cH/T1y\u0007>tg.Z2uS>t7\u000fU3s\u0013BDC!!\u0018\u0002L!9\u0011Q\r\u0001\u0005\u0002\u0005E\u0012\u0001\t;fgRl\u0015\r_\"p]:,7\r^5p]N\u0004VM]%q\u001fZ,'O]5eKNDC!a\u0019\u0002L!9\u00111\u000e\u0001\u0005\u0002\u0005E\u0012a\u0005;fgR\u001c6\u000f\\*pG.,GoU3sm\u0016\u0014\b\u0006BA5\u0003\u0017Bq!!\u001d\u0001\t\u0003\t\t$\u0001\u000buKN$8+Z:tS>t\u0007K]5oG&\u0004\u0018\r\u001c\u0015\u0005\u0003_\nY\u0005C\u0004\u0002x\u0001!\t!!\r\u0002YQ,7\u000f^\"mS\u0016tG\u000fR5tG>tg.Z2uS>tW\u000b\u001d3bi\u0016\u001c(+Z9vKN$X*\u001a;sS\u000e\u001c\b\u0006BA;\u0003\u0017Bq!! \u0001\t\u0003\t\t$A\u001buKN$(I]8lKJ\u001cVM\u001c3BMR,'o\u00115b]:,Gn\u00117pg\u0016$W\u000b\u001d3bi\u0016\u001c(+Z9vKN$X*\u001a;sS\u000e\u001c\b\u0006BA>\u0003\u0017B\u0011\"a!\u0001#\u0003%\t!!\"\u0002+M,g\u000e\u001a*fcV,7\u000f\u001e\u0013eK\u001a\fW\u000f\u001c;%gU\u0011\u0011q\u0011\u0016\u0004[\u0006%5FAAF!\u0011\ti)a&\u000e\u0005\u0005=%\u0002BAI\u0003'\u000b\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005Ue*\u0001\u0006b]:|G/\u0019;j_:LA!!'\u0002\u0010\n\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\t\u0013\u0005u\u0005!%A\u0005\u0002\u0005}\u0015!E2p]:,7\r\u001e\u0013eK\u001a\fW\u000f\u001c;%cU\u0011\u0011\u0011\u0015\u0016\u0004\u0001\u0006%\u0005\"CAS\u0001E\u0005I\u0011AAT\u0003E\u0019wN\u001c8fGR$C-\u001a4bk2$HEM\u000b\u0003\u0003SSC!a\t\u0002\n\u0002")
/* loaded from: input_file:kafka/network/SocketServerTest.class */
public class SocketServerTest extends JUnitSuite {
    private final Properties props = TestUtils$.MODULE$.createBrokerConfig(0, TestUtils$.MODULE$.MockZkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), 0, TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16());
    private final KafkaConfig config;
    private final Metrics metrics;
    private final SocketServer server;
    private final ArrayBuffer<Socket> sockets;

    public Properties props() {
        return this.props;
    }

    public KafkaConfig config() {
        return this.config;
    }

    public Metrics metrics() {
        return this.metrics;
    }

    public SocketServer server() {
        return this.server;
    }

    public ArrayBuffer<Socket> sockets() {
        return this.sockets;
    }

    public void sendRequest(Socket socket, byte[] bArr, Option<Object> option) {
        DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream());
        if (option instanceof Some) {
            short unboxToShort = BoxesRunTime.unboxToShort(((Some) option).x());
            dataOutputStream.writeInt(bArr.length + 2);
            dataOutputStream.writeShort(unboxToShort);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            None$ none$ = None$.MODULE$;
            if (none$ != null ? !none$.equals(option) : option != null) {
                throw new MatchError(option);
            }
            dataOutputStream.writeInt(bArr.length);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        dataOutputStream.write(bArr);
        dataOutputStream.flush();
    }

    public Option<Object> sendRequest$default$3() {
        return None$.MODULE$;
    }

    public byte[] receiveResponse(Socket socket) {
        DataInputStream dataInputStream = new DataInputStream(socket.getInputStream());
        byte[] bArr = new byte[dataInputStream.readInt()];
        dataInputStream.readFully(bArr);
        return bArr;
    }

    public void processRequest(RequestChannel requestChannel) {
        RequestChannel.Request receiveRequest = requestChannel.receiveRequest(2000L);
        Assert.assertNotNull("receiveRequest timed out", receiveRequest);
        processRequest(requestChannel, receiveRequest);
    }

    public void processRequest(RequestChannel requestChannel, RequestChannel.Request request) {
        ByteBuffer allocate = ByteBuffer.allocate(request.header().sizeOf() + request.body().sizeOf());
        request.header().writeTo(allocate);
        request.body().writeTo(allocate);
        allocate.rewind();
        requestChannel.sendResponse(new RequestChannel.Response(request.processor(), request, new NetworkSend(request.connectionId(), new ByteBuffer[]{allocate})));
    }

    public Socket connect(SocketServer socketServer, SecurityProtocol securityProtocol) {
        Socket socket = new Socket("localhost", socketServer.boundPort(securityProtocol));
        sockets().$plus$eq(socket);
        return socket;
    }

    public SocketServer connect$default$1() {
        return server();
    }

    public SecurityProtocol connect$default$2() {
        return SecurityProtocol.PLAINTEXT;
    }

    @After
    public void tearDown() {
        metrics().close();
        server().shutdown();
        sockets().foreach(new SocketServerTest$$anonfun$tearDown$1(this));
        sockets().clear();
    }

    private byte[] producerRequestBytes() {
        RequestHeader requestHeader = new RequestHeader((short) 0, "", -1);
        ProduceRequest produceRequest = new ProduceRequest((short) 0, 10000, new HashMap());
        ByteBuffer allocate = ByteBuffer.allocate(requestHeader.sizeOf() + produceRequest.sizeOf());
        requestHeader.writeTo(allocate);
        produceRequest.writeTo(allocate);
        allocate.rewind();
        byte[] bArr = new byte[allocate.remaining()];
        allocate.get(bArr);
        return bArr;
    }

    @Test
    public void simpleRequest() {
        Socket connect = connect(connect$default$1(), SecurityProtocol.PLAINTEXT);
        Socket connect2 = connect(connect$default$1(), SecurityProtocol.TRACE);
        byte[] producerRequestBytes = producerRequestBytes();
        sendRequest(connect, producerRequestBytes, sendRequest$default$3());
        processRequest(server().requestChannel());
        Assert.assertEquals(Predef$.MODULE$.byteArrayOps(producerRequestBytes).toSeq(), Predef$.MODULE$.byteArrayOps(receiveResponse(connect)).toSeq());
        sendRequest(connect2, producerRequestBytes, sendRequest$default$3());
        processRequest(server().requestChannel());
        Assert.assertEquals(Predef$.MODULE$.byteArrayOps(producerRequestBytes).toSeq(), Predef$.MODULE$.byteArrayOps(receiveResponse(connect2)).toSeq());
    }

    @Test
    public void tooBigRequestIsRejected() {
        byte[] bArr = new byte[Predef$.MODULE$.Integer2int(server().config().socketRequestMaxBytes()) + 1];
        new Random().nextBytes(bArr);
        Socket connect = connect(connect$default$1(), connect$default$2());
        DataOutputStream dataOutputStream = new DataOutputStream(connect.getOutputStream());
        dataOutputStream.writeInt(bArr.length);
        try {
            dataOutputStream.write(bArr);
            dataOutputStream.flush();
            receiveResponse(connect);
        } catch (IOException e) {
        }
    }

    @Test
    public void testSocketsCloseOnShutdown() {
        Socket connect = connect(connect$default$1(), SecurityProtocol.PLAINTEXT);
        Socket connect2 = connect(connect$default$1(), SecurityProtocol.TRACE);
        byte[] bArr = new byte[40];
        sendRequest(connect, bArr, new Some(BoxesRunTime.boxToShort((short) 0)));
        sendRequest(connect2, bArr, new Some(BoxesRunTime.boxToShort((short) 0)));
        processRequest(server().requestChannel());
        server().acceptors().values().map(new SocketServerTest$$anonfun$testSocketsCloseOnShutdown$1(this), Iterable$.MODULE$.canBuildFrom());
        server().shutdown();
        byte[] bArr2 = new byte[1000000];
        try {
            sendRequest(connect, bArr2, new Some(BoxesRunTime.boxToShort((short) 0)));
            throw fail("expected exception when writing to closed plain socket");
        } catch (IOException e) {
            try {
                sendRequest(connect2, bArr2, new Some(BoxesRunTime.boxToShort((short) 0)));
                throw fail("expected exception when writing to closed trace socket");
            } catch (IOException e2) {
            }
        }
    }

    @Test
    public void testMaxConnectionsPerIp() {
        IndexedSeq indexedSeq = (IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), Predef$.MODULE$.Integer2int(server().config().maxConnectionsPerIp())).map(new SocketServerTest$$anonfun$1(this), IndexedSeq$.MODULE$.canBuildFrom());
        Socket connect = connect(connect$default$1(), connect$default$2());
        connect.setSoTimeout(3000);
        Assert.assertEquals(-1L, connect.getInputStream().read());
        connect.close();
        InetAddress inetAddress = ((Socket) indexedSeq.head()).getInetAddress();
        ((Socket) indexedSeq.head()).close();
        TestUtils$.MODULE$.waitUntilTrue(new SocketServerTest$$anonfun$testMaxConnectionsPerIp$1(this, indexedSeq, inetAddress), "Failed to decrement connection count after close", TestUtils$.MODULE$.waitUntilTrue$default$3());
        sendRequest(connect(connect$default$1(), connect$default$2()), producerRequestBytes(), sendRequest$default$3());
        Assert.assertNotNull(server().requestChannel().receiveRequest(2000L));
    }

    @Test
    public void testMaxConnectionsPerIpOverrides() {
        int Integer2int = Predef$.MODULE$.Integer2int(server().config().maxConnectionsPerIp()) + 1;
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(0, TestUtils$.MODULE$.MockZkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), 0, TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16());
        createBrokerConfig.put(KafkaConfig$.MODULE$.MaxConnectionsPerIpOverridesProp(), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"localhost:", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(Integer2int)})));
        Metrics metrics = new Metrics();
        SocketServer socketServer = new SocketServer(KafkaConfig$.MODULE$.fromProps(createBrokerConfig), metrics, new SystemTime());
        try {
            socketServer.startup();
            sendRequest((Socket) ((IndexedSeq) RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), Integer2int).map(new SocketServerTest$$anonfun$2(this, socketServer), IndexedSeq$.MODULE$.canBuildFrom())).last(), producerRequestBytes(), sendRequest$default$3());
            Assert.assertNotNull(socketServer.requestChannel().receiveRequest(2000L));
            connect(socketServer, connect$default$2()).setSoTimeout(3000);
            Assert.assertEquals(-1L, r0.getInputStream().read());
        } finally {
            socketServer.shutdown();
            metrics.close();
        }
    }

    @Test
    public void testSslSocketServer() {
        File createTempFile = File.createTempFile("truststore", ".jks");
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(0, TestUtils$.MODULE$.MockZkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), new Some<>(SecurityProtocol.SSL), new Some<>(createTempFile), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16());
        createBrokerConfig.put(KafkaConfig$.MODULE$.ListenersProp(), "SSL://localhost:0");
        Metrics metrics = new Metrics();
        SocketServer socketServer = new SocketServer(KafkaConfig$.MODULE$.fromProps(createBrokerConfig), metrics, new SystemTime());
        try {
            socketServer.startup();
            SSLContext sSLContext = SSLContext.getInstance("TLSv1.2");
            sSLContext.init(null, new TrustManager[]{TestUtils$.MODULE$.trustAllCerts()}, new SecureRandom());
            SSLSocket sSLSocket = (SSLSocket) sSLContext.getSocketFactory().createSocket("localhost", socketServer.boundPort(SecurityProtocol.SSL));
            sSLSocket.setNeedClientAuth(false);
            RequestHeader requestHeader = new RequestHeader(ApiKeys.PRODUCE.id, "", -1);
            ProduceRequest produceRequest = new ProduceRequest((short) 0, 10000, new HashMap());
            ByteBuffer allocate = ByteBuffer.allocate(requestHeader.sizeOf() + produceRequest.sizeOf());
            requestHeader.writeTo(allocate);
            produceRequest.writeTo(allocate);
            allocate.rewind();
            byte[] bArr = new byte[allocate.remaining()];
            allocate.get(bArr);
            sendRequest(sSLSocket, bArr, sendRequest$default$3());
            processRequest(socketServer.requestChannel());
            Assert.assertEquals(Predef$.MODULE$.byteArrayOps(bArr).toSeq(), Predef$.MODULE$.byteArrayOps(receiveResponse(sSLSocket)).toSeq());
            sSLSocket.close();
        } finally {
            socketServer.shutdown();
            metrics.close();
        }
    }

    @Test
    public void testSessionPrincipal() {
        sendRequest(connect(connect$default$1(), connect$default$2()), new byte[40], new Some(BoxesRunTime.boxToShort((short) 0)));
        Assert.assertEquals(KafkaPrincipal.ANONYMOUS, server().requestChannel().receiveRequest(2000L).session().principal());
    }

    @Test
    public void testClientDisconnectionUpdatesRequestMetrics() {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(0, TestUtils$.MODULE$.MockZkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), 0, TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16());
        Metrics metrics = new Metrics();
        ObjectRef objectRef = new ObjectRef((Object) null);
        SocketServerTest$$anon$2 socketServerTest$$anon$2 = new SocketServerTest$$anon$2(this, createBrokerConfig, metrics, objectRef);
        try {
            socketServerTest$$anon$2.startup();
            objectRef.elem = connect(socketServerTest$$anon$2, connect$default$2());
            sendRequest((Socket) objectRef.elem, producerRequestBytes(), sendRequest$default$3());
            RequestChannel requestChannel = socketServerTest$$anon$2.requestChannel();
            RequestChannel.Request receiveRequest = requestChannel.receiveRequest(2000L);
            RequestMetrics requestMetrics = (RequestMetrics) RequestMetrics$.MODULE$.metricsMap().apply(ApiKeys.forId(receiveRequest.requestId()).name);
            long kafka$network$SocketServerTest$$totalTimeHistCount$1 = kafka$network$SocketServerTest$$totalTimeHistCount$1(requestMetrics) + 1;
            requestChannel.sendResponse(new RequestChannel.Response(receiveRequest.processor(), receiveRequest, new NetworkSend(receiveRequest.connectionId(), new ByteBuffer[]{ByteBuffer.allocate(550000)})));
            TestUtils$.MODULE$.waitUntilTrue(new SocketServerTest$$anonfun$testClientDisconnectionUpdatesRequestMetrics$1(this, requestMetrics, kafka$network$SocketServerTest$$totalTimeHistCount$1), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"request metrics not updated, expected: ", ", actual: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(kafka$network$SocketServerTest$$totalTimeHistCount$1), BoxesRunTime.boxToLong(kafka$network$SocketServerTest$$totalTimeHistCount$1(requestMetrics))})), TestUtils$.MODULE$.waitUntilTrue$default$3());
        } finally {
            socketServerTest$$anon$2.shutdown();
            metrics.close();
        }
    }

    @Test
    public void testBrokerSendAfterChannelClosedUpdatesRequestMetrics() {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(0, TestUtils$.MODULE$.MockZkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), 0, TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16());
        createBrokerConfig.setProperty(KafkaConfig$.MODULE$.ConnectionsMaxIdleMsProp(), "100");
        Metrics metrics = new Metrics();
        SocketServer socketServer = new SocketServer(KafkaConfig$.MODULE$.fromProps(createBrokerConfig), metrics, new SystemTime());
        try {
            socketServer.startup();
            sendRequest(connect(socketServer, connect$default$2()), producerRequestBytes(), sendRequest$default$3());
            RequestChannel requestChannel = socketServer.requestChannel();
            RequestChannel.Request receiveRequest = requestChannel.receiveRequest(2000L);
            TestUtils$.MODULE$.waitUntilTrue(new SocketServerTest$$anonfun$testBrokerSendAfterChannelClosedUpdatesRequestMetrics$1(this, socketServer, receiveRequest), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Idle connection `", "` was not closed by selector"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{receiveRequest.connectionId()})), TestUtils$.MODULE$.waitUntilTrue$default$3());
            RequestMetrics requestMetrics = (RequestMetrics) RequestMetrics$.MODULE$.metricsMap().apply(ApiKeys.forId(receiveRequest.requestId()).name);
            long kafka$network$SocketServerTest$$totalTimeHistCount$2 = kafka$network$SocketServerTest$$totalTimeHistCount$2(requestMetrics) + 1;
            processRequest(requestChannel, receiveRequest);
            TestUtils$.MODULE$.waitUntilTrue(new SocketServerTest$$anonfun$testBrokerSendAfterChannelClosedUpdatesRequestMetrics$2(this, requestMetrics, kafka$network$SocketServerTest$$totalTimeHistCount$2), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"request metrics not updated, expected: ", ", actual: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(kafka$network$SocketServerTest$$totalTimeHistCount$2), BoxesRunTime.boxToLong(kafka$network$SocketServerTest$$totalTimeHistCount$2(requestMetrics))})), TestUtils$.MODULE$.waitUntilTrue$default$3());
        } finally {
            socketServer.shutdown();
            metrics.close();
        }
    }

    public final long kafka$network$SocketServerTest$$totalTimeHistCount$1(RequestMetrics requestMetrics) {
        return requestMetrics.totalTimeHist().count();
    }

    public final long kafka$network$SocketServerTest$$totalTimeHistCount$2(RequestMetrics requestMetrics) {
        return requestMetrics.totalTimeHist().count();
    }

    public SocketServerTest() {
        props().put("listeners", "PLAINTEXT://localhost:0,TRACE://localhost:0");
        props().put("num.network.threads", "1");
        props().put("socket.send.buffer.bytes", "300000");
        props().put("socket.receive.buffer.bytes", "300000");
        props().put("queued.max.requests", "50");
        props().put("socket.request.max.bytes", "50");
        props().put("max.connections.per.ip", "5");
        props().put("connections.max.idle.ms", "60000");
        this.config = KafkaConfig$.MODULE$.fromProps(props());
        this.metrics = new Metrics();
        this.server = new SocketServer(config(), metrics(), new SystemTime());
        server().startup();
        this.sockets = new ArrayBuffer<>();
    }
}
