/*
 * Decompiled with CFR 0.152.
 */
package kafka.network;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.Random;
import junit.framework.Assert;
import kafka.api.ProducerRequest;
import kafka.network.BoundedByteBufferSend;
import kafka.network.RequestChannel;
import kafka.network.Send;
import kafka.network.SocketServer;
import kafka.producer.SyncProducerConfig$;
import kafka.utils.TestUtils$;
import org.junit.After;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.Function0;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\u0005\u001da\u0001B\u0001\u0003\u0001\u001d\u0011\u0001cU8dW\u0016$8+\u001a:wKJ$Vm\u001d;\u000b\u0005\r!\u0011a\u00028fi^|'o\u001b\u0006\u0002\u000b\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\t!\tI\u0001#D\u0001\u000b\u0015\tYA\"A\u0003kk:LGO\u0003\u0002\u000e\u001d\u0005I1oY1mCR,7\u000f\u001e\u0006\u0002\u001f\u0005\u0019qN]4\n\u0005EQ!A\u0003&V]&$8+^5uK\")1\u0003\u0001C\u0001)\u00051A(\u001b8jiz\"\u0012!\u0006\t\u0003-\u0001i\u0011A\u0001\u0005\b1\u0001\u0011\r\u0011\"\u0001\u001a\u0003\u0019\u0019XM\u001d<feV\t!\u0004\u0005\u0002\u00177%\u0011AD\u0001\u0002\r'>\u001c7.\u001a;TKJ4XM\u001d\u0005\u0007=\u0001\u0001\u000b\u0011\u0002\u000e\u0002\u000fM,'O^3sA!)\u0001\u0005\u0001C\u0001C\u0005Y1/\u001a8e%\u0016\fX/Z:u)\u0011\u0011\u0003FM\u001c\u0011\u0005\r2S\"\u0001\u0013\u000b\u0003\u0015\nQa]2bY\u0006L!a\n\u0013\u0003\tUs\u0017\u000e\u001e\u0005\u0006S}\u0001\rAK\u0001\u0007g>\u001c7.\u001a;\u0011\u0005-\u0002T\"\u0001\u0017\u000b\u00055r\u0013a\u00018fi*\tq&\u0001\u0003kCZ\f\u0017BA\u0019-\u0005\u0019\u0019vnY6fi\")1g\ba\u0001i\u0005\u0011\u0011\u000e\u001a\t\u0003GUJ!A\u000e\u0013\u0003\u000bMCwN\u001d;\t\u000baz\u0002\u0019A\u001d\u0002\u000fI,\u0017/^3tiB\u00191E\u000f\u001f\n\u0005m\"#!B!se\u0006L\bCA\u0012>\u0013\tqDE\u0001\u0003CsR,\u0007\"\u0002!\u0001\t\u0003\t\u0015a\u0004:fG\u0016Lg/\u001a*fgB|gn]3\u0015\u0005e\u0012\u0005\"B\u0015@\u0001\u0004Q\u0003\"\u0002#\u0001\t\u0003)\u0015A\u00049s_\u000e,7o\u001d*fcV,7\u000f\u001e\u000b\u0003E\u0019CQaR\"A\u0002!\u000bqa\u00195b]:,G\u000e\u0005\u0002\u0017\u0013&\u0011!J\u0001\u0002\u000f%\u0016\fX/Z:u\u0007\"\fgN\\3m\u0011\u0015a\u0005\u0001\"\u0001N\u0003\u001d\u0019wN\u001c8fGR$\"A\u000b(\t\u000f=[\u0005\u0013!a\u00015\u0005\t1\u000fC\u0003R\u0001\u0011\u0005!+A\u0004dY\u0016\fg.\u001e9\u0015\u0003\tB#\u0001\u0015+\u0011\u0005U;V\"\u0001,\u000b\u0005-q\u0011B\u0001-W\u0005\u0015\te\r^3s\u0011\u0015Q\u0006\u0001\"\u0001S\u00035\u0019\u0018.\u001c9mKJ+\u0017/^3ti\"\u0012\u0011\f\u0018\t\u0003+vK!A\u0018,\u0003\tQ+7\u000f\u001e\u0005\u0006A\u0002!\tAU\u0001\u0018i>|')[4SKF,Xm\u001d;JgJ+'.Z2uK\u0012DCa\u0018/cG\u0006AQ\r\u001f9fGR,GmI\u0001e!\t)\u0007.D\u0001g\u0015\t9g&\u0001\u0002j_&\u0011\u0011N\u001a\u0002\f\u0013>+\u0005pY3qi&|g\u000eC\u0003l\u0001\u0011\u0005!+\u0001\tuKN$h*\u001e7m%\u0016\u001c\bo\u001c8tK\"\u0012!\u000e\u0018\u0005\u0006]\u0002!\tAU\u0001\u001bi\u0016\u001cHoU8dW\u0016$8o\u00117pg\u0016|en\u00155vi\u0012|wO\u001c\u0015\u0005[r\u00137\rC\u0003r\u0001\u0011\u0005!+A\fuKN$X*\u0019=D_:tWm\u0019;j_:\u001c\b+\u001a:Ja\"\u0012\u0001\u000f\u0018\u0005\u0006i\u0002!\tAU\u0001!i\u0016\u001cH/T1y\u0007>tg.Z2uS>t7\u000fU3s\u0013B{e/\u001a:sS\u0012,7\u000f\u000b\u0002t9\"9q\u000fAI\u0001\n\u0003A\u0018!E2p]:,7\r\u001e\u0013eK\u001a\fW\u000f\u001c;%cU\t\u0011P\u000b\u0002\u001bu.\n1\u0010E\u0002}\u0003\u0007i\u0011! \u0006\u0003}~\f\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005\u0005A%\u0001\u0006b]:|G/\u0019;j_:L1!!\u0002~\u0005E)hn\u00195fG.,GMV1sS\u0006t7-\u001a")
public class SocketServerTest
extends JUnitSuite {
    private final SocketServer server = new SocketServer(0, null, TestUtils$.MODULE$.choosePort(), 1, 50, 300000, 300000, 50, 5, 60000L, (Map)Map$.MODULE$.empty());

    public SocketServer server() {
        return this.server;
    }

    public void sendRequest(Socket socket, short id, byte[] request) {
        DataOutputStream outgoing = new DataOutputStream(socket.getOutputStream());
        outgoing.writeInt(request.length + 2);
        outgoing.writeShort(id);
        outgoing.write(request);
        outgoing.flush();
    }

    public byte[] receiveResponse(Socket socket) {
        DataInputStream incoming = new DataInputStream(socket.getInputStream());
        int len = incoming.readInt();
        byte[] response = new byte[len];
        incoming.readFully(response);
        return response;
    }

    public void processRequest(RequestChannel channel) {
        RequestChannel.Request request = channel.receiveRequest();
        ByteBuffer byteBuffer = ByteBuffer.allocate(request.requestObj().sizeInBytes());
        request.requestObj().writeTo(byteBuffer);
        byteBuffer.rewind();
        BoundedByteBufferSend send = new BoundedByteBufferSend(byteBuffer);
        channel.sendResponse(new RequestChannel.Response(request.processor(), request, (Send)send));
    }

    public Socket connect(SocketServer s) {
        return new Socket("localhost", s.port());
    }

    public SocketServer connect$default$1() {
        return this.server();
    }

    @After
    public void cleanup() {
        this.server().shutdown();
    }

    @Test
    public void simpleRequest() {
        Socket socket = this.connect(this.connect$default$1());
        int correlationId = -1;
        String clientId = SyncProducerConfig$.MODULE$.DefaultClientId();
        int ackTimeoutMs = SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs();
        short ack = SyncProducerConfig$.MODULE$.DefaultRequiredAcks();
        ProducerRequest emptyRequest = new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, (scala.collection.mutable.Map)scala.collection.mutable.Map$.MODULE$.apply((Seq)Nil$.MODULE$));
        ByteBuffer byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes());
        emptyRequest.writeTo(byteBuffer);
        byteBuffer.rewind();
        byte[] serializedBytes = new byte[byteBuffer.remaining()];
        byteBuffer.get(serializedBytes);
        this.sendRequest(socket, (short)0, serializedBytes);
        this.processRequest(this.server().requestChannel());
        Assert.assertEquals((Object)Predef$.MODULE$.byteArrayOps(serializedBytes).toSeq(), (Object)Predef$.MODULE$.byteArrayOps(this.receiveResponse(socket)).toSeq());
    }

    @Test(expected=IOException.class)
    public void tooBigRequestIsRejected() {
        byte[] tooManyBytes = new byte[this.server().maxRequestSize() + 1];
        new Random().nextBytes(tooManyBytes);
        Socket socket = this.connect(this.connect$default$1());
        this.sendRequest(socket, (short)0, tooManyBytes);
        this.receiveResponse(socket);
    }

    @Test
    public void testNullResponse() {
        Socket socket = this.connect(this.connect$default$1());
        byte[] bytes = new byte[40];
        this.sendRequest(socket, (short)0, bytes);
        RequestChannel.Request request = this.server().requestChannel().receiveRequest();
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)new Serializable(this, request){
            public static final long serialVersionUID = 0L;
            private final RequestChannel.Request request$1;

            public final boolean apply() {
                return this.apply$mcZ$sp();
            }

            public boolean apply$mcZ$sp() {
                return (((SelectionKey)this.request$1.requestKey()).interestOps() & 1) != 1;
            }
            {
                this.request$1 = request$1;
            }
        }, "Socket key shouldn't be available for read", TestUtils$.MODULE$.waitUntilTrue$default$3());
        this.server().requestChannel().sendResponse(new RequestChannel.Response(0, request, null));
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)new Serializable(this, request){
            public static final long serialVersionUID = 0L;
            private final RequestChannel.Request request$1;

            public final boolean apply() {
                return this.apply$mcZ$sp();
            }

            public boolean apply$mcZ$sp() {
                return (((SelectionKey)this.request$1.requestKey()).interestOps() & 1) == 1;
            }
            {
                this.request$1 = request$1;
            }
        }, "Socket key should be available for reads", TestUtils$.MODULE$.waitUntilTrue$default$3());
    }

    @Test(expected=IOException.class)
    public void testSocketsCloseOnShutdown() {
        Socket socket = this.connect(this.connect$default$1());
        byte[] bytes = new byte[40];
        this.sendRequest(socket, (short)0, bytes);
        this.processRequest(this.server().requestChannel());
        this.server().shutdown();
        this.sendRequest(socket, (short)0, bytes);
    }

    @Test
    public void testMaxConnectionsPerIp() {
        IndexedSeq conns = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.server().maxConnectionsPerIp()).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ SocketServerTest $outer;

            public final Socket apply(int i) {
                return this.$outer.connect(this.$outer.connect$default$1());
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        }, IndexedSeq$.MODULE$.canBuildFrom());
        Socket conn = this.connect(this.connect$default$1());
        conn.setSoTimeout(3000);
        Assert.assertEquals((int)-1, (int)conn.getInputStream().read());
    }

    @Test
    public void testMaxConnectionsPerIPOverrides() {
        int overrideNum = 6;
        Map overrides = (Map)Map$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)"localhost"), (Object)BoxesRunTime.boxToInteger((int)overrideNum))}));
        SocketServer overrideServer = new SocketServer(0, null, TestUtils$.MODULE$.choosePort(), 1, 50, 300000, 300000, 50, 5, 60000L, overrides);
        overrideServer.startup();
        IndexedSeq conns = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), overrideNum).map((Function1)new Serializable(this, overrideServer){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ SocketServerTest $outer;
            private final SocketServer overrideServer$1;

            public final Socket apply(int i) {
                return this.$outer.connect(this.overrideServer$1);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.overrideServer$1 = overrideServer$1;
            }
        }, IndexedSeq$.MODULE$.canBuildFrom());
        Socket conn = this.connect(overrideServer);
        conn.setSoTimeout(3000);
        Assert.assertEquals((int)-1, (int)conn.getInputStream().read());
        overrideServer.shutdown();
    }

    public SocketServerTest() {
        this.server().startup();
    }
}

