package org.apache.kafka.common.network;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.memory.SimpleMemoryPool;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.easymock.IMocksControl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/common/network/SelectorTest.class */
public class SelectorTest {
    protected static final int BUFFER_SIZE = 4096;
    protected EchoServer server;
    protected Time time;
    protected Selector selector;
    protected ChannelBuilder channelBuilder;
    protected Metrics metrics;

    @Before
    public void setUp() throws Exception {
        HashMap hashMap = new HashMap();
        this.server = new EchoServer(SecurityProtocol.PLAINTEXT, hashMap);
        this.server.start();
        this.time = new MockTime();
        this.channelBuilder = new PlaintextChannelBuilder();
        this.channelBuilder.configure(hashMap);
        this.metrics = new Metrics();
        this.selector = new Selector(5000L, this.metrics, this.time, "MetricGroup", this.channelBuilder, new LogContext());
    }

    @After
    public void tearDown() throws Exception {
        this.selector.close();
        this.server.close();
        this.metrics.close();
    }

    public SecurityProtocol securityProtocol() {
        return SecurityProtocol.PLAINTEXT;
    }

    @Test
    public void testServerDisconnect() throws Exception {
        blockingConnect("0");
        Assert.assertEquals("hello", blockingRequest("0", "hello"));
        this.server.closeConnections();
        while (!this.selector.disconnected().containsKey("0")) {
            this.selector.poll(1000L);
        }
        blockingConnect("0");
        Assert.assertEquals("hello", blockingRequest("0", "hello"));
    }

    @Test
    public void testCantSendWithInProgress() throws Exception {
        blockingConnect("0");
        this.selector.send(createSend("0", "test1"));
        try {
            this.selector.send(createSend("0", "test2"));
            Assert.fail("IllegalStateException not thrown when sending a request with one in flight");
        } catch (IllegalStateException e) {
        }
        this.selector.poll(0L);
        Assert.assertTrue("Channel not closed", this.selector.disconnected().containsKey("0"));
        Assert.assertEquals(ChannelState.FAILED_SEND, this.selector.disconnected().get("0"));
    }

    @Test(expected = IllegalStateException.class)
    public void testCantSendWithoutConnecting() throws Exception {
        this.selector.send(createSend("0", "test"));
        this.selector.poll(1000L);
    }

    @Test(expected = IOException.class)
    public void testNoRouteToHost() throws Exception {
        this.selector.connect("0", new InetSocketAddress("some.invalid.hostname.foo.bar.local", this.server.port), BUFFER_SIZE, BUFFER_SIZE);
    }

    @Test
    public void testConnectionRefused() throws Exception {
        ServerSocket serverSocket = new ServerSocket(0);
        this.selector.connect("0", new InetSocketAddress("localhost", serverSocket.getLocalPort()), BUFFER_SIZE, BUFFER_SIZE);
        while (this.selector.disconnected().containsKey("0")) {
            Assert.assertEquals(ChannelState.NOT_CONNECTED, this.selector.disconnected().get("0"));
            this.selector.poll(1000L);
        }
        serverSocket.close();
    }

    @Test
    public void testNormalOperation() throws Exception {
        InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", this.server.port);
        for (int i = 0; i < 5; i++) {
            connect(Integer.toString(i), inetSocketAddress);
        }
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        int i2 = 0;
        for (int i3 = 0; i3 < 5; i3++) {
            String num = Integer.toString(i3);
            this.selector.send(createSend(num, num + "-0"));
        }
        while (i2 < 5 * 500) {
            this.selector.poll(0L);
            Assert.assertEquals("No disconnects should have occurred.", 0L, this.selector.disconnected().size());
            for (NetworkReceive networkReceive : this.selector.completedReceives()) {
                String[] split = asString(networkReceive).split("-");
                Assert.assertEquals("Should be in the form 'conn-counter'", 2L, split.length);
                Assert.assertEquals("Check the source", networkReceive.source(), split[0]);
                Assert.assertEquals("Check that the receive has kindly been rewound", 0L, networkReceive.payload().position());
                if (hashMap2.containsKey(networkReceive.source())) {
                    Assert.assertEquals("Check the request counter", ((Integer) hashMap2.get(networkReceive.source())).intValue(), Integer.parseInt(split[1]));
                    hashMap2.put(networkReceive.source(), Integer.valueOf(((Integer) hashMap2.get(networkReceive.source())).intValue() + 1));
                } else {
                    Assert.assertEquals("Check the request counter", 0L, Integer.parseInt(split[1]));
                    hashMap2.put(networkReceive.source(), 1);
                }
                i2++;
            }
            Iterator it = this.selector.completedSends().iterator();
            while (it.hasNext()) {
                String destination = ((Send) it.next()).destination();
                if (hashMap.containsKey(destination)) {
                    hashMap.put(destination, Integer.valueOf(((Integer) hashMap.get(destination)).intValue() + 1));
                } else {
                    hashMap.put(destination, 1);
                }
                if (((Integer) hashMap.get(destination)).intValue() < 500) {
                    this.selector.send(createSend(destination, destination + "-" + hashMap.get(destination)));
                }
            }
        }
    }

    @Test
    public void testSendLargeRequest() throws Exception {
        blockingConnect("0");
        String randomString = TestUtils.randomString(40960);
        Assert.assertEquals(randomString, blockingRequest("0", randomString));
    }

    @Test
    public void testLargeMessageSequence() throws Exception {
        connect("0", new InetSocketAddress("localhost", this.server.port));
        sendAndReceive("0", TestUtils.randomString(524288), 0, 50);
    }

    @Test
    public void testEmptyRequest() throws Exception {
        blockingConnect("0");
        Assert.assertEquals("", blockingRequest("0", ""));
    }

    @Test(expected = IllegalStateException.class)
    public void testExistingConnectionId() throws IOException {
        blockingConnect("0");
        blockingConnect("0");
    }

    @Test
    public void testMute() throws Exception {
        blockingConnect("0");
        blockingConnect("1");
        this.selector.send(createSend("0", "hello"));
        this.selector.send(createSend("1", "hi"));
        this.selector.mute("1");
        while (this.selector.completedReceives().isEmpty()) {
            this.selector.poll(5L);
        }
        Assert.assertEquals("We should have only one response", 1L, this.selector.completedReceives().size());
        Assert.assertEquals("The response should not be from the muted node", "0", ((NetworkReceive) this.selector.completedReceives().get(0)).source());
        this.selector.unmute("1");
        do {
            this.selector.poll(5L);
        } while (this.selector.completedReceives().isEmpty());
        Assert.assertEquals("We should have only one response", 1L, this.selector.completedReceives().size());
        Assert.assertEquals("The response should be from the previously muted node", "1", ((NetworkReceive) this.selector.completedReceives().get(0)).source());
    }

    @Test
    public void registerFailure() throws Exception {
        Selector selector = new Selector(5000L, new Metrics(), new MockTime(), "MetricGroup", new PlaintextChannelBuilder() { // from class: org.apache.kafka.common.network.SelectorTest.1
            public KafkaChannel buildChannel(String str, SelectionKey selectionKey, int i, MemoryPool memoryPool) throws KafkaException {
                throw new RuntimeException("Test exception");
            }

            public void close() {
            }
        }, new LogContext());
        SocketChannel open = SocketChannel.open();
        open.configureBlocking(false);
        try {
            selector.register("1", open);
            Assert.fail("Register did not fail");
        } catch (IOException e) {
            Assert.assertTrue("Unexpected exception: " + e, e.getCause().getMessage().contains("Test exception"));
            Assert.assertFalse("Socket not closed", open.isOpen());
        }
        selector.close();
    }

    @Test
    public void testCloseConnectionInClosingState() throws Exception {
        KafkaChannel createConnectionWithStagedReceives = createConnectionWithStagedReceives(5);
        String id = createConnectionWithStagedReceives.id();
        this.selector.mute(id);
        this.time.sleep(6000L);
        this.selector.poll(0L);
        Assert.assertNull("Channel not expired", this.selector.channel(id));
        Assert.assertEquals(createConnectionWithStagedReceives, this.selector.closingChannel(id));
        Assert.assertEquals(ChannelState.EXPIRED, createConnectionWithStagedReceives.state());
        this.selector.close(id);
        Assert.assertNull("Channel not removed from channels", this.selector.channel(id));
        Assert.assertNull("Channel not removed from closingChannels", this.selector.closingChannel(id));
        Assert.assertTrue("Unexpected disconnect notification", this.selector.disconnected().isEmpty());
        Assert.assertEquals(ChannelState.EXPIRED, createConnectionWithStagedReceives.state());
        this.selector.poll(0L);
        Assert.assertTrue("Unexpected disconnect notification", this.selector.disconnected().isEmpty());
    }

    @Test
    public void testCloseOldestConnection() throws Exception {
        blockingConnect("0");
        this.time.sleep(6000L);
        this.selector.poll(0L);
        Assert.assertTrue("The idle connection should have been closed", this.selector.disconnected().containsKey("0"));
        Assert.assertEquals(ChannelState.EXPIRED, this.selector.disconnected().get("0"));
    }

    @Test
    public void testCloseOldestConnectionWithOneStagedReceive() throws Exception {
        verifyCloseOldestConnectionWithStagedReceives(1);
    }

    @Test
    public void testCloseOldestConnectionWithMultipleStagedReceives() throws Exception {
        verifyCloseOldestConnectionWithStagedReceives(5);
    }

    private KafkaChannel createConnectionWithStagedReceives(int i) throws Exception {
        blockingConnect("0");
        KafkaChannel channel = this.selector.channel("0");
        int i2 = 100;
        do {
            this.selector.mute("0");
            for (int i3 = 0; i3 <= i; i3++) {
                this.selector.send(createSend("0", String.valueOf(i3)));
                this.selector.poll(1000L);
            }
            this.selector.unmute("0");
            do {
                this.selector.poll(1000L);
            } while (this.selector.completedReceives().isEmpty());
            if (this.selector.numStagedReceives(channel) != 0) {
                break;
            }
            i2--;
        } while (i2 > 0);
        Assert.assertTrue("No staged receives after 100 attempts", this.selector.numStagedReceives(channel) > 0);
        return channel;
    }

    private void verifyCloseOldestConnectionWithStagedReceives(int i) throws Exception {
        KafkaChannel createConnectionWithStagedReceives = createConnectionWithStagedReceives(i);
        String id = createConnectionWithStagedReceives.id();
        int numStagedReceives = this.selector.numStagedReceives(createConnectionWithStagedReceives);
        int i2 = 0;
        while (this.selector.disconnected().isEmpty()) {
            this.time.sleep(6000L);
            this.selector.poll(0L);
            i2 += this.selector.completedReceives().size();
            int numStagedReceives2 = this.selector.numStagedReceives(createConnectionWithStagedReceives) - (numStagedReceives - i2);
            if (numStagedReceives2 > 0) {
                numStagedReceives += numStagedReceives2;
                Assert.assertNotNull("Channel should not have been expired", this.selector.channel(id));
                Assert.assertFalse("Channel should not have been disconnected", this.selector.disconnected().containsKey(id));
            } else if (!this.selector.completedReceives().isEmpty()) {
                Assert.assertEquals(1L, this.selector.completedReceives().size());
                Assert.assertTrue("Channel not found", (this.selector.closingChannel(id) == null && this.selector.channel(id) == null) ? false : true);
                Assert.assertFalse("Disconnect notified too early", this.selector.disconnected().containsKey(id));
            }
        }
        Assert.assertEquals(numStagedReceives, i2);
        Assert.assertNull("Channel not removed", this.selector.channel(id));
        Assert.assertNull("Channel not removed", this.selector.closingChannel(id));
        Assert.assertTrue("Disconnect not notified", this.selector.disconnected().containsKey(id));
        Assert.assertTrue("Unexpected receive", this.selector.completedReceives().isEmpty());
    }

    @Test
    public void testMuteOnOOM() throws Exception {
        this.selector.close();
        SimpleMemoryPool simpleMemoryPool = new SimpleMemoryPool(900L, 900, false, (Sensor) null);
        this.selector = new Selector(-1, 5000L, this.metrics, this.time, "MetricGroup", new HashMap(), true, false, this.channelBuilder, simpleMemoryPool, new LogContext());
        ServerSocketChannel open = ServerSocketChannel.open();
        Throwable th = null;
        try {
            try {
                open.bind((SocketAddress) new InetSocketAddress(0));
                InetSocketAddress inetSocketAddress = (InetSocketAddress) open.getLocalAddress();
                Thread createSender = createSender(inetSocketAddress, randomPayload(900));
                Thread createSender2 = createSender(inetSocketAddress, randomPayload(900));
                createSender.start();
                createSender2.start();
                createSender.join(5000L);
                createSender2.join(5000L);
                SocketChannel accept = open.accept();
                accept.configureBlocking(false);
                SocketChannel accept2 = open.accept();
                accept2.configureBlocking(false);
                this.selector.register("clientX", accept);
                this.selector.register("clientY", accept2);
                List emptyList = Collections.emptyList();
                long currentTimeMillis = System.currentTimeMillis() + 5000;
                while (System.currentTimeMillis() < currentTimeMillis && emptyList.isEmpty()) {
                    this.selector.poll(1000L);
                    emptyList = this.selector.completedReceives();
                }
                Assert.assertEquals("could not read a single request within timeout", 1L, emptyList.size());
                NetworkReceive networkReceive = (NetworkReceive) emptyList.get(0);
                Assert.assertEquals(0L, simpleMemoryPool.availableMemory());
                Assert.assertTrue(this.selector.isOutOfMemory());
                this.selector.poll(10L);
                Assert.assertTrue(this.selector.completedReceives().isEmpty());
                Assert.assertEquals(0L, simpleMemoryPool.availableMemory());
                Assert.assertTrue(this.selector.isOutOfMemory());
                networkReceive.close();
                Assert.assertEquals(900L, simpleMemoryPool.availableMemory());
                List emptyList2 = Collections.emptyList();
                long currentTimeMillis2 = System.currentTimeMillis() + 5000;
                while (System.currentTimeMillis() < currentTimeMillis2 && emptyList2.isEmpty()) {
                    this.selector.poll(1000L);
                    emptyList2 = this.selector.completedReceives();
                }
                Assert.assertEquals("could not read a single request within timeout", 1L, this.selector.completedReceives().size());
                Assert.assertEquals(0L, simpleMemoryPool.availableMemory());
                Assert.assertFalse(this.selector.isOutOfMemory());
                if (open != null) {
                    if (0 == 0) {
                        open.close();
                        return;
                    }
                    try {
                        open.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (open != null) {
                if (th != null) {
                    try {
                        open.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    open.close();
                }
            }
            throw th4;
        }
    }

    private Thread createSender(InetSocketAddress inetSocketAddress, byte[] bArr) {
        return new PlaintextSender(inetSocketAddress, bArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public byte[] randomPayload(int i) throws Exception {
        byte[] bArr = new byte[i + 4];
        new Random().nextBytes(bArr);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
        dataOutputStream.writeInt(i);
        dataOutputStream.flush();
        dataOutputStream.close();
        byteArrayOutputStream.flush();
        byteArrayOutputStream.close();
        byte[] byteArray = byteArrayOutputStream.toByteArray();
        System.arraycopy(byteArray, 0, bArr, 0, byteArray.length);
        return bArr;
    }

    @Test
    public void testConnectDisconnectDuringInSinglePoll() throws Exception {
        IMocksControl createControl = EasyMock.createControl();
        KafkaChannel kafkaChannel = (KafkaChannel) createControl.createMock(KafkaChannel.class);
        EasyMock.expect(kafkaChannel.id()).andStubReturn("1");
        EasyMock.expect(kafkaChannel.socketDescription()).andStubReturn("");
        EasyMock.expect(kafkaChannel.state()).andStubReturn(ChannelState.NOT_CONNECTED);
        EasyMock.expect(Boolean.valueOf(kafkaChannel.finishConnect())).andReturn(true);
        EasyMock.expect(Boolean.valueOf(kafkaChannel.isConnected())).andStubReturn(true);
        kafkaChannel.disconnect();
        kafkaChannel.close();
        EasyMock.expect(Boolean.valueOf(kafkaChannel.ready())).andReturn(false).anyTimes();
        kafkaChannel.prepare();
        EasyMock.expectLastCall().andThrow(new IOException());
        SelectionKey selectionKey = (SelectionKey) createControl.createMock(SelectionKey.class);
        EasyMock.expect(selectionKey.channel()).andReturn(SocketChannel.open());
        EasyMock.expect(Integer.valueOf(selectionKey.readyOps())).andStubReturn(8);
        createControl.replay();
        selectionKey.attach(kafkaChannel);
        this.selector.pollSelectionKeys(Utils.mkSet(new SelectionKey[]{selectionKey}), false, System.nanoTime());
        Assert.assertFalse(this.selector.connected().contains(kafkaChannel.id()));
        Assert.assertTrue(this.selector.disconnected().containsKey(kafkaChannel.id()));
        createControl.verify();
    }

    private String blockingRequest(String str, String str2) throws IOException {
        this.selector.send(createSend(str, str2));
        this.selector.poll(1000L);
        while (true) {
            this.selector.poll(1000L);
            for (NetworkReceive networkReceive : this.selector.completedReceives()) {
                if (networkReceive.source().equals(str)) {
                    return asString(networkReceive);
                }
            }
        }
    }

    protected void connect(String str, InetSocketAddress inetSocketAddress) throws IOException {
        this.selector.connect(str, inetSocketAddress, BUFFER_SIZE, BUFFER_SIZE);
    }

    private void blockingConnect(String str) throws IOException {
        blockingConnect(str, new InetSocketAddress("localhost", this.server.port));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void blockingConnect(String str, InetSocketAddress inetSocketAddress) throws IOException {
        this.selector.connect(str, inetSocketAddress, BUFFER_SIZE, BUFFER_SIZE);
        while (!this.selector.connected().contains(str)) {
            this.selector.poll(10000L);
        }
        while (!this.selector.isChannelReady(str)) {
            this.selector.poll(10000L);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public NetworkSend createSend(String str, String str2) {
        return new NetworkSend(str, ByteBuffer.wrap(str2.getBytes()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String asString(NetworkReceive networkReceive) {
        return new String(Utils.toArray(networkReceive.payload()));
    }

    private void sendAndReceive(String str, String str2, int i, int i2) throws Exception {
        int i3 = i;
        this.selector.send(createSend(str, str2 + "-" + i));
        int i4 = i + 1;
        while (i3 < i2) {
            this.selector.poll(0L);
            Assert.assertEquals("No disconnects should have occurred.", 0L, this.selector.disconnected().size());
            Iterator it = this.selector.completedReceives().iterator();
            while (it.hasNext()) {
                Assert.assertEquals(str2 + "-" + i3, asString((NetworkReceive) it.next()));
                i3++;
            }
            int i5 = 0;
            while (i5 < this.selector.completedSends().size() && i4 < i2) {
                this.selector.send(createSend(str, str2 + "-" + i4));
                i5++;
                i4++;
            }
        }
    }
}
