package org.apache.kafka.clients.producer.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/clients/producer/internals/ProducerMetadataTest.class */
public class ProducerMetadataTest {
    private static final long METADATA_IDLE_MS = 60000;
    private final long refreshBackoffMs = 100;
    private final long refreshBackoffMaxMs = 1000;
    private final long metadataExpireMs = 1000;
    private final ProducerMetadata metadata = new ProducerMetadata(100, 1000, 1000, METADATA_IDLE_MS, new LogContext(), new ClusterResourceListeners(), Time.SYSTEM);
    private final AtomicReference<Exception> backgroundError = new AtomicReference<>();

    @AfterEach
    public void tearDown() {
        Assertions.assertNull(this.backgroundError.get(), "Exception in background thread : " + this.backgroundError.get());
    }

    @Test
    public void testMetadata() throws Exception {
        long milliseconds = Time.SYSTEM.milliseconds();
        this.metadata.add("my-topic", milliseconds);
        this.metadata.updateWithCurrentRequestVersion(responseWithTopics(Collections.emptySet()), false, milliseconds);
        Assertions.assertTrue(this.metadata.timeToNextUpdate(milliseconds) > 0, "No update needed.");
        this.metadata.requestUpdate(true);
        Assertions.assertTrue(this.metadata.timeToNextUpdate(milliseconds) > 0, "Still no updated needed due to backoff");
        long j = milliseconds + 120;
        Assertions.assertEquals(0L, this.metadata.timeToNextUpdate(j), "Update needed now that backoff time expired");
        Thread asyncFetch = asyncFetch("my-topic", 500L);
        Thread asyncFetch2 = asyncFetch("my-topic", 500L);
        Assertions.assertTrue(asyncFetch.isAlive(), "Awaiting update");
        Assertions.assertTrue(asyncFetch2.isAlive(), "Awaiting update");
        while (true) {
            if (!asyncFetch.isAlive() && !asyncFetch2.isAlive()) {
                break;
            }
            if (this.metadata.timeToNextUpdate(j) == 0) {
                this.metadata.updateWithCurrentRequestVersion(responseWithCurrentTopics(), false, j);
                j += 120;
            }
            Thread.sleep(1L);
        }
        asyncFetch.join();
        asyncFetch2.join();
        Assertions.assertTrue(this.metadata.timeToNextUpdate(j) > 0, "No update needed.");
        Assertions.assertEquals(0L, this.metadata.timeToNextUpdate(j + 1000), "Update needed due to stale metadata.");
    }

    @Test
    public void testMetadataAwaitAfterClose() throws InterruptedException {
        this.metadata.updateWithCurrentRequestVersion(responseWithCurrentTopics(), false, 0L);
        Assertions.assertTrue(this.metadata.timeToNextUpdate(0L) > 0, "No update needed.");
        this.metadata.requestUpdate(true);
        Assertions.assertTrue(this.metadata.timeToNextUpdate(0L) > 0, "Still no updated needed due to backoff");
        Assertions.assertEquals(0L, this.metadata.timeToNextUpdate(0 + 120), "Update needed now that backoff time expired");
        this.metadata.close();
        asyncFetch("my-topic", 500L).join();
        Assertions.assertEquals(KafkaException.class, this.backgroundError.get().getClass());
        Assertions.assertTrue(this.backgroundError.get().toString().contains("Requested metadata update after close"));
        clearBackgroundError();
    }

    @Test
    public void testMetadataEquivalentResponsesBackoff() throws InterruptedException {
        this.metadata.updateWithCurrentRequestVersion(responseWithCurrentTopics(), false, 0L);
        Assertions.assertTrue(this.metadata.timeToNextUpdate(0L) > 0, "No update needed");
        this.metadata.requestUpdate(false);
        Assertions.assertTrue(this.metadata.timeToNextUpdate(0L) > 0, "Still no update needed due to backoff");
        long j = 0 + 120;
        this.metadata.updateWithCurrentRequestVersion(responseWithCurrentTopics(), false, j);
        Assertions.assertTrue(this.metadata.timeToNextUpdate(j) > 0, "No update needed after equivalent metadata response");
        this.metadata.requestUpdate(false);
        Assertions.assertTrue(this.metadata.timeToNextUpdate(j) > 0, "Still no update needed due to backoff");
        Assertions.assertTrue(this.metadata.timeToNextUpdate(j + 100) > 0, "Still no updated needed due to exponential backoff");
        Assertions.assertEquals(0L, this.metadata.timeToNextUpdate(j + 240), "Update needed now that backoff time expired");
        this.metadata.close();
        asyncFetch("my-topic", 500L).join();
        Assertions.assertEquals(KafkaException.class, this.backgroundError.get().getClass());
        Assertions.assertTrue(this.backgroundError.get().toString().contains("Requested metadata update after close"));
        clearBackgroundError();
    }

    @Test
    public void testMetadataUpdateWaitTime() throws Exception {
        this.metadata.updateWithCurrentRequestVersion(responseWithCurrentTopics(), false, 0L);
        Assertions.assertTrue(this.metadata.timeToNextUpdate(0L) > 0, "No update needed.");
        try {
            this.metadata.awaitUpdate(this.metadata.requestUpdate(true), 0L);
            Assertions.fail("Wait on metadata update was expected to timeout, but it didn't");
        } catch (TimeoutException e) {
        }
        try {
            this.metadata.awaitUpdate(this.metadata.requestUpdate(true), 2000L);
            Assertions.fail("Wait on metadata update was expected to timeout, but it didn't");
        } catch (TimeoutException e2) {
        }
    }

    @Test
    public void testTimeToNextUpdateOverwriteBackoff() {
        this.metadata.updateWithCurrentRequestVersion(responseWithCurrentTopics(), false, 10000L);
        this.metadata.add("new-topic", 10000L);
        Assertions.assertEquals(0L, this.metadata.timeToNextUpdate(10000L));
        this.metadata.updateWithCurrentRequestVersion(responseWithCurrentTopics(), false, 10000L);
        this.metadata.add("new-topic", 10000L);
        Assertions.assertEquals(1000L, this.metadata.timeToNextUpdate(10000L));
        this.metadata.add("another-new-topic", 10000L);
        Assertions.assertEquals(0L, this.metadata.timeToNextUpdate(10000L));
    }

    @Test
    public void testTopicExpiry() {
        this.metadata.add("topic1", 0L);
        this.metadata.updateWithCurrentRequestVersion(responseWithCurrentTopics(), false, 0L);
        Assertions.assertTrue(this.metadata.containsTopic("topic1"));
        long j = 0 + METADATA_IDLE_MS;
        this.metadata.updateWithCurrentRequestVersion(responseWithCurrentTopics(), false, j);
        Assertions.assertFalse(this.metadata.containsTopic("topic1"), "Unused topic not expired");
        this.metadata.add("topic2", j);
        this.metadata.updateWithCurrentRequestVersion(responseWithCurrentTopics(), false, j);
        for (int i = 0; i < 3; i++) {
            j += 30000;
            this.metadata.updateWithCurrentRequestVersion(responseWithCurrentTopics(), false, j);
            Assertions.assertTrue(this.metadata.containsTopic("topic2"), "Topic expired even though in use");
            this.metadata.add("topic2", j);
        }
        this.metadata.add("topic3", j);
        this.metadata.updateWithCurrentRequestVersion(responseWithCurrentTopics(), false, j + 120000);
        Assertions.assertTrue(this.metadata.containsTopic("topic3"), "Topic expired while awaiting metadata");
    }

    @Test
    public void testMetadataWaitAbortedOnFatalException() {
        this.metadata.fatalError(new AuthenticationException("Fatal exception from test"));
        Assertions.assertThrows(AuthenticationException.class, () -> {
            this.metadata.awaitUpdate(0, 1000L);
        });
    }

    @Test
    public void testMetadataPartialUpdate() {
        this.metadata.add("topic-one", 10000L);
        Assertions.assertTrue(this.metadata.updateRequested());
        Assertions.assertEquals(0L, this.metadata.timeToNextUpdate(10000L));
        Assertions.assertEquals(this.metadata.topics(), Collections.singleton("topic-one"));
        Assertions.assertEquals(this.metadata.newTopics(), Collections.singleton("topic-one"));
        long j = 10000 + 1000;
        this.metadata.updateWithCurrentRequestVersion(responseWithTopics(Collections.singleton("topic-one")), true, j);
        Assertions.assertFalse(this.metadata.updateRequested());
        Assertions.assertEquals(this.metadata.topics(), Collections.singleton("topic-one"));
        Assertions.assertEquals(this.metadata.newTopics(), Collections.emptySet());
        this.metadata.add("topic-one", j);
        Assertions.assertFalse(this.metadata.updateRequested());
        Assertions.assertTrue(this.metadata.timeToNextUpdate(j) > 0);
        Assertions.assertEquals(this.metadata.topics(), Collections.singleton("topic-one"));
        Assertions.assertEquals(this.metadata.newTopics(), Collections.emptySet());
        long j2 = j + 1000;
        this.metadata.add("topic-two", j2);
        long j3 = j2 + 1000;
        this.metadata.add("topic-three", j3);
        Assertions.assertTrue(this.metadata.updateRequested());
        Assertions.assertEquals(0L, this.metadata.timeToNextUpdate(j3));
        Assertions.assertEquals(this.metadata.topics(), new HashSet(Arrays.asList("topic-one", "topic-two", "topic-three")));
        Assertions.assertEquals(this.metadata.newTopics(), new HashSet(Arrays.asList("topic-two", "topic-three")));
        Assertions.assertTrue(this.metadata.updateRequested());
        this.metadata.updateWithCurrentRequestVersion(responseWithTopics(Collections.singleton("topic-two")), true, j3 + 1000);
        Assertions.assertEquals(this.metadata.topics(), new HashSet(Arrays.asList("topic-one", "topic-two", "topic-three")));
        Assertions.assertEquals(this.metadata.newTopics(), Collections.singleton("topic-three"));
    }

    @Test
    public void testRequestUpdateForTopic() {
        this.metadata.add("topic-1", 10000L);
        this.metadata.add("topic-2", 10000L);
        Assertions.assertTrue(this.metadata.updateRequested());
        this.metadata.requestUpdateForTopic("topic-1");
        Assertions.assertTrue(this.metadata.updateRequested());
        long j = 10000 + 1000;
        this.metadata.updateWithCurrentRequestVersion(responseWithTopics(Collections.singleton("topic-1")), true, j);
        Assertions.assertFalse(this.metadata.updateRequested());
        this.metadata.requestUpdateForTopic("topic-1");
        Assertions.assertTrue(this.metadata.updateRequested());
        long j2 = j + 1000;
        this.metadata.updateWithCurrentRequestVersion(responseWithTopics(Collections.singleton("topic-1")), true, j2);
        Assertions.assertTrue(this.metadata.updateRequested());
        this.metadata.updateWithCurrentRequestVersion(responseWithTopics(new HashSet(Arrays.asList("topic-1", "topic-2"))), false, j2 + 1000);
        Assertions.assertFalse(this.metadata.updateRequested());
    }

    private MetadataResponse responseWithCurrentTopics() {
        return responseWithTopics(this.metadata.topics());
    }

    private MetadataResponse responseWithTopics(Set<String> set) {
        HashMap hashMap = new HashMap();
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), 1);
        }
        return RequestTestUtils.metadataUpdateWith(1, hashMap);
    }

    private void clearBackgroundError() {
        this.backgroundError.set(null);
    }

    private Thread asyncFetch(String str, long j) {
        Thread thread = new Thread(() -> {
            while (this.metadata.fetch().partitionsForTopic(str).isEmpty()) {
                try {
                    this.metadata.awaitUpdate(this.metadata.requestUpdate(false), j);
                } catch (Exception e) {
                    this.backgroundError.set(e);
                    return;
                }
            }
        });
        thread.start();
        return thread;
    }
}
