package org.apache.kafka.clients.producer.internals;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/clients/producer/internals/SenderTest.class */
public class SenderTest {
    private static final int MAX_REQUEST_SIZE = 1048576;
    private static final short ACKS_ALL = -1;
    private static final int MAX_RETRIES = 0;
    private static final String CLIENT_ID = "clientId";
    private static final String METRIC_GROUP = "producer-metrics";
    private static final double EPS = 1.0E-4d;
    private static final int MAX_BLOCK_TIMEOUT = 1000;
    private static final int REQUEST_TIMEOUT = 1000;
    private TopicPartition tp = new TopicPartition("test", MAX_RETRIES);
    private MockTime time = new MockTime();
    private MockClient client = new MockClient(this.time);
    private int batchSize = 16384;
    private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
    private Cluster cluster = TestUtils.singletonCluster("test", 1);
    private Metrics metrics = null;
    private RecordAccumulator accumulator = null;
    private Sender sender = null;

    @Before
    public void setup() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("client-id", CLIENT_ID);
        this.metrics = new Metrics(new MetricConfig().tags(linkedHashMap), this.time);
        this.accumulator = new RecordAccumulator(this.batchSize, 1048576L, CompressionType.NONE, 0L, 0L, this.metrics, this.time);
        this.sender = new Sender(this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, MAX_RETRIES, this.metrics, this.time, CLIENT_ID, 1000);
        this.metadata.update(this.cluster, this.time.milliseconds());
    }

    @After
    public void tearDown() {
        this.metrics.close();
    }

    @Test
    public void testSimple() throws Exception {
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp, 0L, "key".getBytes(), "value".getBytes(), (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals("We should have a single produce request in flight.", 1L, this.client.inFlightRequestCount());
        this.client.respond(produceResponse(this.tp, 0L, Errors.NONE.code(), MAX_RETRIES));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals("All requests completed.", 0L, this.client.inFlightRequestCount());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue("Request should be completed", futureRecordMetadata.isDone());
        Assert.assertEquals(0L, ((RecordMetadata) futureRecordMetadata.get()).offset());
    }

    @Test
    public void testQuotaMetrics() throws Exception {
        for (int i = 1; i <= 3; i++) {
            FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp, 0L, "key".getBytes(), "value".getBytes(), (Callback) null, 1000L).future;
            this.sender.run(this.time.milliseconds());
            this.client.respond(produceResponse(this.tp, 0L, Errors.NONE.code(), 100 * i));
            this.sender.run(this.time.milliseconds());
        }
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(this.metrics.metricName("produce-throttle-time-avg", METRIC_GROUP, ""));
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(this.metrics.metricName("produce-throttle-time-max", METRIC_GROUP, ""));
        Assert.assertEquals(200.0d, kafkaMetric.value(), EPS);
        Assert.assertEquals(300.0d, kafkaMetric2.value(), EPS);
    }

    @Test
    public void testRetries() throws Exception {
        Metrics metrics = new Metrics();
        try {
            Sender sender = new Sender(this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, (short) -1, 1, metrics, this.time, CLIENT_ID, 1000);
            FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp, 0L, "key".getBytes(), "value".getBytes(), (Callback) null, 1000L).future;
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            String destination = this.client.requests().peek().request().destination();
            Node node = new Node(Integer.valueOf(destination).intValue(), "localhost", MAX_RETRIES);
            Assert.assertEquals(1L, this.client.inFlightRequestCount());
            Assert.assertTrue("Client ready status should be true", this.client.isReady(node, 0L));
            this.client.disconnect(destination);
            Assert.assertEquals(0L, this.client.inFlightRequestCount());
            Assert.assertFalse("Client ready status should be false", this.client.isReady(node, 0L));
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            Assert.assertEquals(1L, this.client.inFlightRequestCount());
            this.client.respond(produceResponse(this.tp, 0L, Errors.NONE.code(), MAX_RETRIES));
            sender.run(this.time.milliseconds());
            Assert.assertTrue("Request should have retried and completed", futureRecordMetadata.isDone());
            Assert.assertEquals(0L, ((RecordMetadata) futureRecordMetadata.get()).offset());
            FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp, 0L, "key".getBytes(), "value".getBytes(), (Callback) null, 1000L).future;
            sender.run(this.time.milliseconds());
            for (int i = MAX_RETRIES; i < 1 + 1; i++) {
                this.client.disconnect(this.client.requests().peek().request().destination());
                sender.run(this.time.milliseconds());
                sender.run(this.time.milliseconds());
                sender.run(this.time.milliseconds());
            }
            sender.run(this.time.milliseconds());
            completedWithError(futureRecordMetadata2, Errors.NETWORK_EXCEPTION);
            metrics.close();
        } catch (Throwable th) {
            metrics.close();
            throw th;
        }
    }

    @Test
    public void testSendInOrder() throws Exception {
        Metrics metrics = new Metrics();
        try {
            Sender sender = new Sender(this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, 1, metrics, this.time, CLIENT_ID, 1000);
            this.metadata.update(TestUtils.clusterWith(2, "test", 2), this.time.milliseconds());
            TopicPartition topicPartition = new TopicPartition("test", 1);
            this.accumulator.append(topicPartition, 0L, "key1".getBytes(), "value1".getBytes(), (Callback) null, 1000L);
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            String destination = this.client.requests().peek().request().destination();
            Assert.assertEquals(ApiKeys.PRODUCE.id, this.client.requests().peek().request().header().apiKey());
            Node node = new Node(Integer.valueOf(destination).intValue(), "localhost", MAX_RETRIES);
            Assert.assertEquals(1L, this.client.inFlightRequestCount());
            Assert.assertTrue("Client ready status should be true", this.client.isReady(node, 0L));
            this.time.sleep(900L);
            this.accumulator.append(topicPartition, 0L, "key2".getBytes(), "value2".getBytes(), (Callback) null, 1000L);
            this.metadata.update(TestUtils.singletonCluster("test", 2), this.time.milliseconds());
            sender.run(this.time.milliseconds());
            Assert.assertEquals(1L, this.client.inFlightRequestCount());
            metrics.close();
        } catch (Throwable th) {
            metrics.close();
            throw th;
        }
    }

    private void completedWithError(Future<RecordMetadata> future, Errors errors) throws Exception {
        Assert.assertTrue("Request should be completed", future.isDone());
        try {
            future.get();
            Assert.fail("Should have thrown an exception.");
        } catch (ExecutionException e) {
            Assert.assertEquals(errors.exception().getClass(), e.getCause().getClass());
        }
    }

    private Struct produceResponse(TopicPartition topicPartition, long j, int i, int i2) {
        return new ProduceResponse(Collections.singletonMap(topicPartition, new ProduceResponse.PartitionResponse((short) i, j, -1L)), i2).toStruct();
    }
}
