package org.apache.kafka.clients.consumer.internals;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.class */
public class ConsumerNetworkClientTest {
    private String topicName = "test";
    private MockTime time = new MockTime(1);
    private Cluster cluster = TestUtils.singletonCluster(this.topicName, 1);
    private Node node = (Node) this.cluster.nodes().get(0);
    private Metadata metadata = new Metadata(100, 50000, new LogContext(), new ClusterResourceListeners());
    private MockClient client = new MockClient(this.time, this.metadata);
    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), this.client, this.metadata, this.time, 100, 1000, Integer.MAX_VALUE);

    @Test
    public void send() {
        this.client.prepareResponse(heartbeatResponse(Errors.NONE));
        RequestFuture send = this.consumerClient.send(this.node, heartbeat());
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount());
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount(this.node));
        Assert.assertFalse(send.isDone());
        this.consumerClient.poll(send);
        Assert.assertTrue(send.isDone());
        Assert.assertTrue(send.succeeded());
        Assert.assertEquals(Errors.NONE, ((ClientResponse) send.value()).responseBody().error());
    }

    @Test
    public void sendWithinBlackoutPeriodAfterAuthenticationFailure() {
        this.client.authenticationFailed(this.node, 300L);
        this.client.prepareResponse(heartbeatResponse(Errors.NONE));
        RequestFuture send = this.consumerClient.send(this.node, heartbeat());
        this.consumerClient.poll(send);
        Assert.assertTrue(send.failed());
        Assert.assertTrue("Expected only an authentication error.", send.exception() instanceof AuthenticationException);
        this.time.sleep(30L);
        Assert.assertTrue(this.client.connectionFailed(this.node));
        RequestFuture send2 = this.consumerClient.send(this.node, heartbeat());
        this.consumerClient.poll(send2);
        Assert.assertTrue(send2.failed());
        Assert.assertTrue("Expected only an authentication error.", send2.exception() instanceof AuthenticationException);
    }

    @Test
    public void multiSend() {
        this.client.prepareResponse(heartbeatResponse(Errors.NONE));
        this.client.prepareResponse(heartbeatResponse(Errors.NONE));
        RequestFuture send = this.consumerClient.send(this.node, heartbeat());
        RequestFuture send2 = this.consumerClient.send(this.node, heartbeat());
        Assert.assertEquals(2L, this.consumerClient.pendingRequestCount());
        Assert.assertEquals(2L, this.consumerClient.pendingRequestCount(this.node));
        this.consumerClient.awaitPendingRequests(this.node, this.time.timer(Long.MAX_VALUE));
        Assert.assertTrue(send.succeeded());
        Assert.assertTrue(send2.succeeded());
    }

    @Test
    public void testDisconnectWithUnsentRequests() {
        RequestFuture send = this.consumerClient.send(this.node, heartbeat());
        Assert.assertTrue(this.consumerClient.hasPendingRequests(this.node));
        Assert.assertFalse(this.client.hasInFlightRequests(this.node.idString()));
        this.consumerClient.disconnectAsync(this.node);
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue(send.failed());
        Assert.assertTrue(send.exception() instanceof DisconnectException);
    }

    @Test
    public void testDisconnectWithInFlightRequests() {
        RequestFuture send = this.consumerClient.send(this.node, heartbeat());
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue(this.consumerClient.hasPendingRequests(this.node));
        Assert.assertTrue(this.client.hasInFlightRequests(this.node.idString()));
        this.consumerClient.disconnectAsync(this.node);
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue(send.failed());
        Assert.assertTrue(send.exception() instanceof DisconnectException);
    }

    @Test
    public void testTimeoutUnsentRequest() {
        this.client.delayReady(this.node, 1000L);
        RequestFuture send = this.consumerClient.send(this.node, heartbeat(), 500);
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue(this.consumerClient.hasPendingRequests());
        Assert.assertFalse(this.client.hasInFlightRequests());
        this.time.sleep(501L);
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.consumerClient.hasPendingRequests());
        Assert.assertTrue(send.failed());
        Assert.assertTrue(send.exception() instanceof TimeoutException);
    }

    @Test
    public void doNotBlockIfPollConditionIsSatisfied() {
        NetworkClient networkClient = (NetworkClient) Mockito.mock(NetworkClient.class);
        new ConsumerNetworkClient(new LogContext(), networkClient, this.metadata, this.time, 100L, 1000, Integer.MAX_VALUE).poll(this.time.timer(Long.MAX_VALUE), () -> {
            return false;
        });
        ((NetworkClient) Mockito.verify(networkClient)).poll(ArgumentMatchers.eq(0L), ArgumentMatchers.anyLong());
    }

    @Test
    public void blockWhenPollConditionNotSatisfied() {
        NetworkClient networkClient = (NetworkClient) Mockito.mock(NetworkClient.class);
        ConsumerNetworkClient consumerNetworkClient = new ConsumerNetworkClient(new LogContext(), networkClient, this.metadata, this.time, 100L, 1000, Integer.MAX_VALUE);
        Mockito.when(Integer.valueOf(networkClient.inFlightRequestCount())).thenReturn(1);
        consumerNetworkClient.poll(this.time.timer(4000L), () -> {
            return true;
        });
        ((NetworkClient) Mockito.verify(networkClient)).poll(ArgumentMatchers.eq(4000L), ArgumentMatchers.anyLong());
    }

    @Test
    public void blockOnlyForRetryBackoffIfNoInflightRequests() {
        NetworkClient networkClient = (NetworkClient) Mockito.mock(NetworkClient.class);
        ConsumerNetworkClient consumerNetworkClient = new ConsumerNetworkClient(new LogContext(), networkClient, this.metadata, this.time, 100L, 1000, Integer.MAX_VALUE);
        Mockito.when(Integer.valueOf(networkClient.inFlightRequestCount())).thenReturn(0);
        consumerNetworkClient.poll(this.time.timer(Long.MAX_VALUE), () -> {
            return true;
        });
        ((NetworkClient) Mockito.verify(networkClient)).poll(ArgumentMatchers.eq(100L), ArgumentMatchers.anyLong());
    }

    @Test
    public void wakeup() {
        RequestFuture send = this.consumerClient.send(this.node, heartbeat());
        this.consumerClient.wakeup();
        try {
            this.consumerClient.poll(this.time.timer(0L));
            Assert.fail();
        } catch (WakeupException e) {
        }
        this.client.respond(heartbeatResponse(Errors.NONE));
        this.consumerClient.poll(send);
        Assert.assertTrue(send.isDone());
    }

    @Test
    public void testDisconnectWakesUpPoll() throws Exception {
        final RequestFuture send = this.consumerClient.send(this.node, heartbeat());
        this.client.enableBlockingUntilWakeup(1);
        Thread thread = new Thread() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerNetworkClientTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                ConsumerNetworkClientTest.this.consumerClient.poll(send);
            }
        };
        thread.start();
        this.consumerClient.disconnectAsync(this.node);
        thread.join();
        Assert.assertTrue(send.failed());
        Assert.assertTrue(send.exception() instanceof DisconnectException);
    }

    @Test
    public void testAuthenticationExceptionPropagatedFromMetadata() {
        this.metadata.failedUpdate(this.time.milliseconds(), new AuthenticationException("Authentication failed"));
        try {
            this.consumerClient.poll(this.time.timer(Duration.ZERO));
            Assert.fail("Expected authentication error thrown");
        } catch (AuthenticationException e) {
            Assert.assertNull(this.metadata.getAndClearMetadataException());
        }
    }

    @Test(expected = InvalidTopicException.class)
    public void testInvalidTopicExceptionPropagatedFromMetadata() {
        this.metadata.update(TestUtils.metadataUpdateWith("clusterId", 1, Collections.singletonMap("topic", Errors.INVALID_TOPIC_EXCEPTION), Collections.emptyMap()), this.time.milliseconds());
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
    }

    @Test(expected = TopicAuthorizationException.class)
    public void testTopicAuthorizationExceptionPropagatedFromMetadata() {
        this.metadata.update(TestUtils.metadataUpdateWith("clusterId", 1, Collections.singletonMap("topic", Errors.TOPIC_AUTHORIZATION_FAILED), Collections.emptyMap()), this.time.milliseconds());
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
    }

    @Test
    public void testMetadataFailurePropagated() {
        KafkaException kafkaException = new KafkaException();
        this.metadata.failedUpdate(this.time.milliseconds(), kafkaException);
        try {
            this.consumerClient.poll(this.time.timer(Duration.ZERO));
            Assert.fail("Expected poll to throw exception");
        } catch (Exception e) {
            Assert.assertEquals(kafkaException, e);
        }
    }

    @Test
    public void testFutureCompletionOutsidePoll() throws Exception {
        final RequestFuture send = this.consumerClient.send(this.node, heartbeat());
        this.consumerClient.pollNoWakeup();
        this.client.enableBlockingUntilWakeup(2);
        Thread thread = new Thread() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerNetworkClientTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                ConsumerNetworkClientTest.this.consumerClient.pollNoWakeup();
            }
        };
        thread.start();
        Thread.sleep(50L);
        Thread thread2 = new Thread() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerNetworkClientTest.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                ConsumerNetworkClientTest.this.consumerClient.poll(send);
            }
        };
        thread2.start();
        Thread.sleep(50L);
        this.client.respond(heartbeatResponse(Errors.NONE));
        this.client.wakeup();
        thread.join();
        thread2.join();
        Assert.assertTrue(send.succeeded());
    }

    @Test
    public void testAwaitForMetadataUpdateWithTimeout() {
        Assert.assertFalse(this.consumerClient.awaitMetadataUpdate(this.time.timer(10L)));
    }

    @Test
    public void sendExpiry() {
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        this.client = new MockClient(this.time, this.metadata) { // from class: org.apache.kafka.clients.consumer.internals.ConsumerNetworkClientTest.4
            @Override // org.apache.kafka.clients.MockClient
            public boolean ready(Node node, long j) {
                if (atomicBoolean.get()) {
                    return super.ready(node, j);
                }
                return false;
            }

            @Override // org.apache.kafka.clients.MockClient
            public boolean connectionFailed(Node node) {
                return atomicBoolean2.get();
            }
        };
        this.consumerClient = new ConsumerNetworkClient(new LogContext(), this.client, this.metadata, this.time, 100L, 10, Integer.MAX_VALUE);
        RequestFuture send = this.consumerClient.send(this.node, heartbeat());
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount());
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount(this.node));
        Assert.assertFalse(send.isDone());
        this.time.sleep(10 + 1);
        RequestFuture send2 = this.consumerClient.send(this.node, heartbeat());
        Assert.assertEquals(2L, this.consumerClient.pendingRequestCount());
        Assert.assertEquals(2L, this.consumerClient.pendingRequestCount(this.node));
        Assert.assertFalse(send2.isDone());
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(send.isDone());
        Assert.assertFalse(send.succeeded());
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount());
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount(this.node));
        Assert.assertFalse(send2.isDone());
        atomicBoolean.set(true);
        this.client.prepareResponse(heartbeatResponse(Errors.NONE));
        this.consumerClient.poll(send2);
        Assert.assertEquals(Errors.NONE, ((ClientResponse) send2.value()).responseBody().error());
        atomicBoolean.set(false);
        RequestFuture send3 = this.consumerClient.send(this.node, heartbeat());
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount());
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount(this.node));
        atomicBoolean2.set(true);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(send3.isDone());
        Assert.assertFalse(send3.succeeded());
        Assert.assertEquals(0L, this.consumerClient.pendingRequestCount());
        Assert.assertEquals(0L, this.consumerClient.pendingRequestCount(this.node));
    }

    @Test
    public void testTrySend() {
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final AtomicInteger atomicInteger = new AtomicInteger();
        this.client = new MockClient(this.time, this.metadata) { // from class: org.apache.kafka.clients.consumer.internals.ConsumerNetworkClientTest.5
            @Override // org.apache.kafka.clients.MockClient
            public boolean ready(Node node, long j) {
                atomicInteger.incrementAndGet();
                if (atomicBoolean.get()) {
                    return super.ready(node, j);
                }
                return false;
            }
        };
        this.consumerClient = new ConsumerNetworkClient(new LogContext(), this.client, this.metadata, this.time, 100L, 10, Integer.MAX_VALUE);
        this.consumerClient.send(this.node, heartbeat());
        this.consumerClient.send(this.node, heartbeat());
        Assert.assertEquals(2L, this.consumerClient.pendingRequestCount(this.node));
        Assert.assertEquals(0L, this.client.inFlightRequestCount(this.node.idString()));
        this.consumerClient.trySend(this.time.milliseconds());
        Assert.assertEquals(1L, atomicInteger.getAndSet(0));
        Assert.assertEquals(2L, this.consumerClient.pendingRequestCount(this.node));
        Assert.assertEquals(0L, this.client.inFlightRequestCount(this.node.idString()));
        atomicBoolean.set(true);
        this.consumerClient.trySend(this.time.milliseconds());
        Assert.assertEquals(2L, atomicInteger.getAndSet(0));
        Assert.assertEquals(2L, this.consumerClient.pendingRequestCount(this.node));
        Assert.assertEquals(2L, this.client.inFlightRequestCount(this.node.idString()));
    }

    private HeartbeatRequest.Builder heartbeat() {
        return new HeartbeatRequest.Builder(new HeartbeatRequestData().setGroupId("group").setGenerationId(1).setMemberId("memberId"));
    }

    private HeartbeatResponse heartbeatResponse(Errors errors) {
        return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(errors.code()));
    }
}
