package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.class */
public class ConsumerCoordinatorTest {
    private MockClient client;
    private SubscriptionState subscriptions;
    private ConsumerMetadata metadata;
    private Metrics metrics;
    private ConsumerNetworkClient consumerClient;
    private MockRebalanceListener rebalanceListener;
    private MockCommitCallback mockOffsetCommitCallback;
    private ConsumerCoordinator coordinator;
    private final String topic1 = "test1";
    private final String topic2 = "test2";
    private final TopicPartition t1p = new TopicPartition("test1", 0);
    private final TopicPartition t2p = new TopicPartition("test2", 0);
    private final String groupId = "test-group";
    private final Optional<String> groupInstanceId = Optional.of("test-instance");
    private final int rebalanceTimeoutMs = 60000;
    private final int sessionTimeoutMs = 10000;
    private final int heartbeatIntervalMs = 5000;
    private final long retryBackoffMs = 100;
    private final int autoCommitIntervalMs = 2000;
    private final int requestTimeoutMs = 30000;
    private final MockTime time = new MockTime();
    private final Heartbeat heartbeat = new Heartbeat(this.time, 10000, 5000, 60000, 100);
    private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
    private List<PartitionAssignor> assignors = Collections.singletonList(this.partitionAssignor);
    private MetadataResponse metadataResponse = TestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.1
        {
            put("test1", 1);
            put("test2", 1);
        }
    });
    private Node node = (Node) this.metadataResponse.brokers().iterator().next();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest$MockCommitCallback.class */
    public static class MockCommitCallback implements OffsetCommitCallback {
        public int invoked;
        public Exception exception;

        private MockCommitCallback() {
            this.invoked = 0;
            this.exception = null;
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            this.invoked++;
            this.exception = exc;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest$MockRebalanceListener.class */
    public static class MockRebalanceListener implements ConsumerRebalanceListener {
        public Collection<TopicPartition> revoked;
        public Collection<TopicPartition> assigned;
        public int revokedCount;
        public int assignedCount;

        private MockRebalanceListener() {
            this.revokedCount = 0;
            this.assignedCount = 0;
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            this.assigned = collection;
            this.assignedCount++;
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            this.revoked = collection;
            this.revokedCount++;
        }
    }

    @Before
    public void setup() {
        LogContext logContext = new LogContext();
        this.subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST);
        this.metadata = new ConsumerMetadata(0L, Long.MAX_VALUE, false, false, this.subscriptions, logContext, new ClusterResourceListeners());
        this.client = new MockClient((Time) this.time, (Metadata) this.metadata);
        this.client.updateMetadata(this.metadataResponse);
        this.consumerClient = new ConsumerNetworkClient(logContext, this.client, this.metadata, this.time, 100L, 30000, Integer.MAX_VALUE);
        this.metrics = new Metrics(this.time);
        this.rebalanceListener = new MockRebalanceListener();
        this.mockOffsetCommitCallback = new MockCommitCallback();
        this.partitionAssignor.clear();
        this.coordinator = buildCoordinator(this.metrics, this.assignors, false, Optional.empty());
    }

    @After
    public void teardown() {
        this.metrics.close();
        this.coordinator.close(this.time.timer(0L));
    }

    @Test
    public void testNormalHeartbeat() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.time.sleep(10000L);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount());
        Assert.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse(heartbeatResponse(Errors.NONE));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(sendHeartbeatRequest.isDone());
        Assert.assertTrue(sendHeartbeatRequest.succeeded());
    }

    @Test(expected = GroupAuthorizationException.class)
    public void testGroupDescribeUnauthorized() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.GROUP_AUTHORIZATION_FAILED));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
    }

    @Test(expected = GroupAuthorizationException.class)
    public void testGroupReadUnauthorized() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.emptyMap(), Errors.GROUP_AUTHORIZATION_FAILED));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testCoordinatorNotAvailable() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.time.sleep(10000L);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount());
        Assert.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse(heartbeatResponse(Errors.COORDINATOR_NOT_AVAILABLE));
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(sendHeartbeatRequest.isDone());
        Assert.assertTrue(sendHeartbeatRequest.failed());
        Assert.assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.exception(), sendHeartbeatRequest.exception());
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() throws Exception {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int i = 0; i < 1000; i++) {
            this.coordinator.commitOffsetsAsync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(i)), new OffsetCommitCallback() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.2
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
                    atomicInteger.incrementAndGet();
                    Throwable cause = exc.getCause();
                    Assert.assertTrue("Unexpected exception cause type: " + (cause == null ? null : cause.getClass()), cause instanceof DisconnectException);
                }
            });
        }
        this.coordinator.markCoordinatorUnknown();
        this.consumerClient.pollNoWakeup();
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertEquals(1000, atomicInteger.get());
    }

    @Test
    public void testCoordinatorUnknownInUnsentCallbacksAfterCoordinatorDead() throws Exception {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.consumerClient.send(this.coordinator.checkAndGetCoordinator(), new OffsetCommitRequest.Builder(new OffsetCommitRequestData().setGroupId("test-group").setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("foo").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedLeaderEpoch(-1).setCommittedMetadata("").setCommittedOffset(13L).setCommitTimestamp(0L))))))).compose(new RequestFutureAdapter<ClientResponse, Object>() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.3
            public void onSuccess(ClientResponse clientResponse, RequestFuture<Object> requestFuture) {
            }

            public void onFailure(RuntimeException runtimeException, RequestFuture<Object> requestFuture) {
                Assert.assertTrue("Unexpected exception type: " + runtimeException.getClass(), runtimeException instanceof DisconnectException);
                Assert.assertTrue(ConsumerCoordinatorTest.this.coordinator.coordinatorUnknown());
                atomicBoolean.set(true);
            }

            public /* bridge */ /* synthetic */ void onSuccess(Object obj, RequestFuture requestFuture) {
                onSuccess((ClientResponse) obj, (RequestFuture<Object>) requestFuture);
            }
        });
        this.coordinator.markCoordinatorUnknown();
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testNotCoordinator() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.time.sleep(10000L);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount());
        Assert.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR));
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(sendHeartbeatRequest.isDone());
        Assert.assertTrue(sendHeartbeatRequest.failed());
        Assert.assertEquals(Errors.NOT_COORDINATOR.exception(), sendHeartbeatRequest.exception());
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testIllegalGeneration() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.subscriptions.assignFromSubscribed(Collections.singletonList(this.t1p));
        this.time.sleep(10000L);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount());
        Assert.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse(heartbeatResponse(Errors.ILLEGAL_GENERATION));
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(sendHeartbeatRequest.isDone());
        Assert.assertTrue(sendHeartbeatRequest.failed());
        Assert.assertEquals(Errors.ILLEGAL_GENERATION.exception(), sendHeartbeatRequest.exception());
        Assert.assertTrue(this.coordinator.rejoinNeededOrPending());
    }

    @Test
    public void testUnknownConsumerId() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.subscriptions.assignFromSubscribed(Collections.singletonList(this.t1p));
        this.time.sleep(10000L);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount());
        Assert.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse(heartbeatResponse(Errors.UNKNOWN_MEMBER_ID));
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(sendHeartbeatRequest.isDone());
        Assert.assertTrue(sendHeartbeatRequest.failed());
        Assert.assertEquals(Errors.UNKNOWN_MEMBER_ID.exception(), sendHeartbeatRequest.exception());
        Assert.assertTrue(this.coordinator.rejoinNeededOrPending());
    }

    @Test
    public void testCoordinatorDisconnect() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.time.sleep(10000L);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount());
        Assert.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse((AbstractResponse) heartbeatResponse(Errors.NONE), true);
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(sendHeartbeatRequest.isDone());
        Assert.assertTrue(sendHeartbeatRequest.failed());
        Assert.assertTrue(sendHeartbeatRequest.exception() instanceof DisconnectException);
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
    }

    @Test(expected = ApiException.class)
    public void testJoinGroupInvalidGroupId() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.updateMetadata(this.metadataResponse);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupLeaderResponse(0, "leader", Collections.emptyMap(), Errors.INVALID_GROUP_ID));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testNormalJoinGroupLeader() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.updateMetadata(this.metadataResponse);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("leader", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Collections.singletonList(this.t1p)));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "leader", singletonMap, Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.4
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                return syncGroupRequest.data.memberId().equals("leader") && syncGroupRequest.data.generationId() == 1 && syncGroupRequest.groupAssignments().containsKey("leader");
            }
        }, (AbstractResponse) syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
        Assert.assertEquals(Collections.singleton("test1"), this.subscriptions.groupSubscription());
        Assert.assertEquals(1L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.emptySet(), this.rebalanceListener.revoked);
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test
    public void testOutdatedCoordinatorAssignment() {
        this.subscriptions.subscribe(Collections.singleton("test2"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.partitionAssignor.prepare(Collections.singletonMap("outdated_assignment", Collections.singletonList(this.t1p)));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "outdated_assignment", Collections.singletonMap("outdated_assignment", Collections.singletonList("test2")), Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.5
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                return syncGroupRequest.data.memberId().equals("outdated_assignment") && syncGroupRequest.data.generationId() == 1 && syncGroupRequest.groupAssignments().containsKey("outdated_assignment");
            }
        }, (AbstractResponse) syncGroupResponse(Arrays.asList(this.t2p), Errors.NONE));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "outdated_assignment", Collections.singletonMap("outdated_assignment", Collections.singletonList("test1")), Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.6
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                return syncGroupRequest.data.memberId().equals("outdated_assignment") && syncGroupRequest.data.generationId() == 1 && syncGroupRequest.groupAssignments().containsKey("outdated_assignment");
            }
        }, (AbstractResponse) syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.timer(0L));
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.coordinator.poll(this.time.timer(0L));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
        Assert.assertEquals(Collections.singleton("test1"), this.subscriptions.groupSubscription());
        Assert.assertEquals(2L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.emptySet(), this.rebalanceListener.revoked);
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test
    public void testInvalidCoordinatorAssignment() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("invalid_assignment", Collections.singletonList("test2"));
        this.partitionAssignor.prepare(Collections.singletonMap("invalid_assignment", Collections.singletonList(this.t2p)));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "invalid_assignment", singletonMap, Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.7
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                return syncGroupRequest.data.memberId().equals("invalid_assignment") && syncGroupRequest.data.generationId() == 1 && syncGroupRequest.groupAssignments().containsKey("invalid_assignment");
            }
        }, (AbstractResponse) syncGroupResponse(Collections.singletonList(this.t2p), Errors.NONE));
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        });
    }

    @Test
    public void testPatternJoinGroupLeader() {
        this.subscriptions.subscribe(Pattern.compile("test.*"), this.rebalanceListener);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("leader", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Arrays.asList(this.t1p, this.t2p)));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "leader", singletonMap, Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.8
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                return syncGroupRequest.data.memberId().equals("leader") && syncGroupRequest.data.generationId() == 1 && syncGroupRequest.groupAssignments().containsKey("leader");
            }
        }, (AbstractResponse) syncGroupResponse(Arrays.asList(this.t1p, this.t2p), Errors.NONE));
        this.client.prepareMetadataUpdate(this.metadataResponse);
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(2L, this.subscriptions.numAssignedPartitions());
        Assert.assertEquals(2L, this.subscriptions.groupSubscription().size());
        Assert.assertEquals(2L, this.subscriptions.subscription().size());
        Assert.assertEquals(1L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.emptySet(), this.rebalanceListener.revoked);
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(2L, this.rebalanceListener.assigned.size());
    }

    @Test
    public void testMetadataRefreshDuringRebalance() {
        this.subscriptions.subscribe(Pattern.compile(".*"), this.rebalanceListener);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
        this.coordinator.maybeUpdateSubscriptionMetadata();
        Assert.assertEquals(Collections.singleton("test1"), this.subscriptions.subscription());
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("leader", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Collections.singletonList(this.t1p)));
        final List asList = Arrays.asList("test1", "test2");
        final HashSet hashSet = new HashSet(asList);
        this.client.prepareResponse(joinGroupLeaderResponse(1, "leader", singletonMap, Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.9
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                HashMap hashMap = new HashMap();
                Iterator it = asList.iterator();
                while (it.hasNext()) {
                    hashMap.put((String) it.next(), 1);
                }
                ConsumerCoordinatorTest.this.client.updateMetadata(TestUtils.metadataUpdateWith(1, hashMap));
                return true;
            }
        }, (AbstractResponse) syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        List<TopicPartition> asList2 = Arrays.asList(this.t1p, this.t2p);
        HashSet hashSet2 = new HashSet(asList2);
        Map<String, List<String>> singletonMap2 = Collections.singletonMap("leader", Arrays.asList("test1", "test2"));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", asList2));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.10
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                Iterator it = ((JoinGroupRequest) abstractRequest).data().protocols().iterator();
                Assert.assertTrue(it.hasNext());
                ByteBuffer wrap = ByteBuffer.wrap(((JoinGroupRequestData.JoinGroupRequestProtocol) it.next()).metadata());
                PartitionAssignor.Subscription deserializeSubscription = ConsumerProtocol.deserializeSubscription(wrap);
                wrap.rewind();
                return deserializeSubscription.topics().containsAll(hashSet);
            }
        }, (AbstractResponse) joinGroupLeaderResponse(2, "leader", singletonMap2, Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(asList2, Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(hashSet, this.subscriptions.subscription());
        Assert.assertEquals(hashSet2, this.subscriptions.assignedPartitions());
        Assert.assertEquals(2L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.revoked);
        Assert.assertEquals(2L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(hashSet2, this.rebalanceListener.assigned);
    }

    @Test
    public void testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
        this.subscriptions.subscribe(Pattern.compile(".*"), this.rebalanceListener);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
        this.coordinator.maybeUpdateSubscriptionMetadata();
        Assert.assertEquals(Collections.singleton("test1"), this.subscriptions.subscription());
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareMetadataUpdate(this.metadataResponse);
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.11
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                return syncGroupRequest.data.memberId().equals("consumer") && syncGroupRequest.data.generationId() == 1 && syncGroupRequest.groupAssignments().isEmpty();
            }
        }, (AbstractResponse) syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.partitionAssignor.prepare(Collections.singletonMap("consumer", Collections.singletonList(this.t1p)));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(new HashSet(Arrays.asList("test1", "test2")), this.subscriptions.subscription());
        this.metadata.requestUpdate();
        this.consumerClient.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
    }

    @Test
    public void testWakeupDuringJoin() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.updateMetadata(this.metadataResponse);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("leader", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Collections.singletonList(this.t1p)));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "leader", singletonMap, Errors.NONE));
        this.consumerClient.wakeup();
        try {
            this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        } catch (WakeupException e) {
        }
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
        Assert.assertEquals(1L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.emptySet(), this.rebalanceListener.revoked);
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test
    public void testNormalJoinGroupFollower() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.12
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                return syncGroupRequest.data.memberId().equals("consumer") && syncGroupRequest.data.generationId() == 1 && syncGroupRequest.groupAssignments().isEmpty();
            }
        }, (AbstractResponse) syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
        Assert.assertEquals(Collections.singleton("test1"), this.subscriptions.groupSubscription());
        Assert.assertEquals(1L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.emptySet(), this.rebalanceListener.revoked);
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test
    public void testUpdateLastHeartbeatPollWhenCoordinatorUnknown() throws Exception {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        this.time.sleep(5000L);
        TestUtils.waitForCondition(() -> {
            return !this.client.hasPendingResponses();
        }, "Failed to observe expected heartbeat from background thread");
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
        Assert.assertFalse(this.coordinator.poll(this.time.timer(0L)));
        Assert.assertEquals(this.time.milliseconds(), this.heartbeat.lastPollTime());
        this.time.sleep(59999L);
        Assert.assertFalse(this.heartbeat.pollTimeoutExpired(this.time.milliseconds()));
    }

    @Test
    public void testPatternJoinGroupFollower() {
        this.subscriptions.subscribe(Pattern.compile("test.*"), this.rebalanceListener);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.13
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                return syncGroupRequest.data.memberId().equals("consumer") && syncGroupRequest.data.generationId() == 1 && syncGroupRequest.groupAssignments().isEmpty();
            }
        }, (AbstractResponse) syncGroupResponse(Arrays.asList(this.t1p, this.t2p), Errors.NONE));
        this.client.prepareMetadataUpdate(this.metadataResponse);
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(2L, this.subscriptions.numAssignedPartitions());
        Assert.assertEquals(2L, this.subscriptions.subscription().size());
        Assert.assertEquals(1L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(2L, this.rebalanceListener.assigned.size());
    }

    @Test
    public void testLeaveGroupOnClose() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        joinAsFollowerAndReceiveAssignment("consumer", this.coordinator, Collections.singletonList(this.t1p));
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.14
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                atomicBoolean.set(true);
                LeaveGroupRequest leaveGroupRequest = (LeaveGroupRequest) abstractRequest;
                return leaveGroupRequest.data().memberId().equals("consumer") && leaveGroupRequest.data().groupId().equals("test-group");
            }
        }, (AbstractResponse) new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
        this.coordinator.close(this.time.timer(0L));
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testMaybeLeaveGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        joinAsFollowerAndReceiveAssignment("consumer", this.coordinator, Collections.singletonList(this.t1p));
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.15
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                atomicBoolean.set(true);
                LeaveGroupRequest leaveGroupRequest = (LeaveGroupRequest) abstractRequest;
                return leaveGroupRequest.data().memberId().equals("consumer") && leaveGroupRequest.data().groupId().equals("test-group");
            }
        }, (AbstractResponse) new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
        this.coordinator.maybeLeaveGroup();
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertNull(this.coordinator.generation());
    }

    @Test
    public void testPendingMemberShouldLeaveGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(-1, "consumer-id", "leader-id", Errors.MEMBER_ID_REQUIRED));
        this.coordinator.joinGroupIfNeeded(this.time.timer(0L));
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.16
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                atomicBoolean.set(true);
                return ((LeaveGroupRequest) abstractRequest).data().memberId().equals("consumer-id");
            }
        }, (AbstractResponse) new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
        this.coordinator.maybeLeaveGroup();
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test(expected = KafkaException.class)
    public void testUnexpectedErrorOnSyncGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_SERVER_ERROR));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testUnknownMemberIdOnSyncGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_MEMBER_ID));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.17
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                return ((JoinGroupRequest) abstractRequest).data().memberId().equals("");
            }
        }, (AbstractResponse) joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
    }

    @Test
    public void testRebalanceInProgressOnSyncGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.REBALANCE_IN_PROGRESS));
        this.client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
    }

    @Test
    public void testIllegalGenerationOnSyncGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.ILLEGAL_GENERATION));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.18
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                return ((JoinGroupRequest) abstractRequest).data().memberId().equals("");
            }
        }, (AbstractResponse) joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
    }

    @Test
    public void testMetadataChangeTriggersRebalance() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.updateMetadata(this.metadataResponse);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("consumer", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("consumer", Collections.singletonList(this.t1p)));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "consumer", singletonMap, Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
        this.metadata.update(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 2)), this.time.milliseconds());
        this.coordinator.maybeUpdateSubscriptionMetadata();
        Assert.assertTrue(this.coordinator.rejoinNeededOrPending());
    }

    @Test
    public void testUpdateMetadataDuringRebalance() {
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        TopicPartition topicPartition2 = new TopicPartition("topic2", 0);
        List asList = Arrays.asList("topic1", "topic2");
        this.subscriptions.subscribe(new HashSet(asList), this.rebalanceListener);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic1", 1)));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("leader", asList);
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Collections.singletonList(topicPartition)));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "leader", singletonMap, Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.19
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                if (!syncGroupRequest.data.memberId().equals("leader") || syncGroupRequest.data.generationId() != 1 || !syncGroupRequest.groupAssignments().containsKey("leader")) {
                    return false;
                }
                HashMap hashMap = new HashMap();
                hashMap.put("topic1", 1);
                hashMap.put("topic2", 1);
                ConsumerCoordinatorTest.this.client.updateMetadata(TestUtils.metadataUpdateWith(1, hashMap));
                return true;
            }
        }, (AbstractResponse) syncGroupResponse(Collections.singletonList(topicPartition), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupLeaderResponse(2, "leader", singletonMap, Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Arrays.asList(topicPartition, topicPartition2), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(new HashSet(Arrays.asList(topicPartition, topicPartition2)), this.subscriptions.assignedPartitions());
    }

    @Test
    public void testWakeupFromAssignmentCallback() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(new Metrics(), this.assignors, false, Optional.empty());
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        this.subscriptions.subscribe(Collections.singleton("topic1"), new MockRebalanceListener() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.20
            @Override // org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.MockRebalanceListener
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                boolean z = this.assignedCount == 0;
                super.onPartitionsAssigned(collection);
                if (z) {
                    throw new WakeupException();
                }
            }
        });
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        buildCoordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.partitionAssignor.prepare(Collections.singletonMap("follower", Collections.singletonList(topicPartition)));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "follower", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(topicPartition), Errors.NONE));
        try {
            buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assert.fail("Expected exception thrown from assignment callback");
        } catch (WakeupException e) {
        }
        buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(buildCoordinator.rejoinNeededOrPending());
        Assert.assertEquals(1L, r0.revokedCount);
        Assert.assertEquals(2L, r0.assignedCount);
    }

    @Test
    public void testRebalanceAfterTopicUnavailableWithSubscribe() {
        unavailableTopicTest(false, Collections.emptySet());
    }

    @Test
    public void testRebalanceAfterTopicUnavailableWithPatternSubscribe() {
        unavailableTopicTest(true, Collections.emptySet());
    }

    @Test
    public void testRebalanceAfterNotMatchingTopicUnavailableWithPatternSubscribe() {
        unavailableTopicTest(true, Collections.singleton("notmatching"));
    }

    private void unavailableTopicTest(boolean z, Set<String> set) {
        if (z) {
            this.subscriptions.subscribe(Pattern.compile("test.*"), this.rebalanceListener);
        } else {
            this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        }
        this.client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1, Collections.singletonMap("test1", Errors.UNKNOWN_TOPIC_OR_PARTITION), Collections.emptyMap()));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("consumer", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.emptyMap());
        this.client.prepareResponse(joinGroupLeaderResponse(1, "consumer", singletonMap, Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(Collections.emptySet(), this.rebalanceListener.assigned);
        Assert.assertTrue("Metadata refresh not requested for unavailable partitions", this.metadata.updateRequested());
        HashMap hashMap = new HashMap();
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), Errors.UNKNOWN_TOPIC_OR_PARTITION);
        }
        this.client.prepareMetadataUpdate(TestUtils.metadataUpdateWith("kafka-cluster", 1, hashMap, Collections.singletonMap("test1", 1)));
        this.consumerClient.poll(this.time.timer(0L));
        this.client.prepareResponse(joinGroupLeaderResponse(2, "consumer", singletonMap, Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse("Metadata refresh requested unnecessarily", this.metadata.updateRequested());
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test
    public void testExcludeInternalTopicsConfigOption() {
        testInternalTopicInclusion(false);
    }

    @Test
    public void testIncludeInternalTopicsConfigOption() {
        testInternalTopicInclusion(true);
    }

    private void testInternalTopicInclusion(boolean z) {
        this.metadata = new ConsumerMetadata(0L, Long.MAX_VALUE, z, false, this.subscriptions, new LogContext(), new ClusterResourceListeners());
        this.client = new MockClient((Time) this.time, (Metadata) this.metadata);
        this.coordinator = buildCoordinator(new Metrics(), this.assignors, false, Optional.empty());
        this.subscriptions.subscribe(Pattern.compile(".*"), this.rebalanceListener);
        Node node = new Node(0, "localhost", 9999);
        this.client.updateMetadata(MetadataResponse.prepareResponse(Collections.singletonList(node), "clusterId", node.id(), Collections.singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, "__consumer_offsets", true, Collections.singletonList(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node, Optional.empty(), Collections.singletonList(node), Collections.singletonList(node), Collections.singletonList(node)))))));
        this.coordinator.maybeUpdateSubscriptionMetadata();
        Assert.assertEquals(Boolean.valueOf(z), Boolean.valueOf(this.subscriptions.subscription().contains("__consumer_offsets")));
    }

    @Test
    public void testRejoinGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        joinAsFollowerAndReceiveAssignment("consumer", this.coordinator, Collections.singletonList(this.t1p));
        Assert.assertEquals(1L, this.rebalanceListener.revokedCount);
        Assert.assertTrue(this.rebalanceListener.revoked.isEmpty());
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
        this.subscriptions.subscribe(new HashSet(Arrays.asList("test1", "otherTopic")), this.rebalanceListener);
        this.client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(2L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.revoked);
        Assert.assertEquals(2L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test
    public void testDisconnectInJoin() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse) joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE), true);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assert.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
        Assert.assertEquals(1L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test(expected = ApiException.class)
    public void testInvalidSessionTimeout() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testCommitOffsetOnly() {
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), callback(atomicBoolean));
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testCoordinatorDisconnectAfterNotCoordinatorError() {
        testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors.NOT_COORDINATOR);
    }

    @Test
    public void testCoordinatorDisconnectAfterCoordinatorNotAvailableError() {
        testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors.COORDINATOR_NOT_AVAILABLE);
    }

    private void testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors errors) {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        MockCommitCallback mockCommitCallback2 = new MockCommitCallback();
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), mockCommitCallback);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), mockCommitCallback2);
        respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), errors);
        this.consumerClient.pollNoWakeup();
        this.consumerClient.pollNoWakeup();
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
        Assert.assertTrue(mockCommitCallback.exception instanceof RetriableCommitFailedException);
        Assert.assertTrue(mockCommitCallback2.exception instanceof RetriableCommitFailedException);
    }

    @Test
    public void testAutoCommitDynamicAssignment() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(new Metrics(), this.assignors, true, this.groupInstanceId);
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        joinAsFollowerAndReceiveAssignment("consumer", buildCoordinator, Collections.singletonList(this.t1p));
        this.subscriptions.seek(this.t1p, 100L);
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        this.time.sleep(2000L);
        buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.client.hasPendingResponses());
    }

    @Test
    public void testAutoCommitRetryBackoff() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(new Metrics(), this.assignors, true, this.groupInstanceId);
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        joinAsFollowerAndReceiveAssignment("consumer", buildCoordinator, Collections.singletonList(this.t1p));
        this.subscriptions.seek(this.t1p, 100L);
        this.time.sleep(2000L);
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NOT_COORDINATOR);
        buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertTrue(buildCoordinator.coordinatorUnknown());
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.seek(this.t1p, 200L);
        this.time.sleep(50L);
        buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(0L, this.client.inFlightRequestCount());
        this.time.sleep(50L);
        buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, 200L), Errors.NONE);
    }

    @Test
    public void testAutoCommitAwaitsInterval() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(new Metrics(), this.assignors, true, this.groupInstanceId);
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        joinAsFollowerAndReceiveAssignment("consumer", buildCoordinator, Collections.singletonList(this.t1p));
        this.subscriptions.seek(this.t1p, 100L);
        this.time.sleep(2000L);
        buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        this.time.sleep(1000L);
        buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(0L, this.client.inFlightRequestCount());
        this.subscriptions.seek(this.t1p, 200L);
        buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(0L, this.client.inFlightRequestCount());
        this.time.sleep(1000L);
        buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, 200L), Errors.NONE);
    }

    @Test
    public void testAutoCommitDynamicAssignmentRebalance() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(new Metrics(), this.assignors, true, this.groupInstanceId);
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        buildCoordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.time.sleep(2000L);
        this.consumerClient.poll(this.time.timer(0L));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        buildCoordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.seek(this.t1p, 100L);
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        this.time.sleep(2000L);
        buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.client.hasPendingResponses());
    }

    @Test
    public void testAutoCommitManualAssignment() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(new Metrics(), this.assignors, true, this.groupInstanceId);
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.seek(this.t1p, 100L);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        buildCoordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        this.time.sleep(2000L);
        buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.client.hasPendingResponses());
    }

    @Test
    public void testAutoCommitManualAssignmentCoordinatorUnknown() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(new Metrics(), this.assignors, true, this.groupInstanceId);
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.seek(this.t1p, 100L);
        this.consumerClient.poll(this.time.timer(0L));
        this.time.sleep(2000L);
        this.consumerClient.poll(this.time.timer(0L));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        buildCoordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.time.sleep(100L);
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.client.hasPendingResponses());
    }

    @Test
    public void testCommitOffsetMetadata() {
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "hello"));
        this.coordinator.commitOffsetsAsync(singletonMap, callback(singletonMap, atomicBoolean));
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testCommitOffsetAsyncWithDefaultCallback() {
        int i = this.mockOffsetCommitCallback.invoked;
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.mockOffsetCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertEquals(i + 1, this.mockOffsetCommitCallback.invoked);
        Assert.assertNull(this.mockOffsetCommitCallback.exception);
    }

    @Test
    public void testCommitAfterLeaveGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        joinAsFollowerAndReceiveAssignment("consumer", this.coordinator, Collections.singletonList(this.t1p));
        this.client.prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
        this.subscriptions.unsubscribe();
        this.coordinator.maybeLeaveGroup();
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.21
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                OffsetCommitRequest offsetCommitRequest = (OffsetCommitRequest) abstractRequest;
                return offsetCommitRequest.data().memberId().equals("") && offsetCommitRequest.data().generationId() == -1;
            }
        }, (AbstractResponse) offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), callback(atomicBoolean));
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testCommitOffsetAsyncFailedWithDefaultCallback() {
        int i = this.mockOffsetCommitCallback.invoked;
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.mockOffsetCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertEquals(i + 1, this.mockOffsetCommitCallback.invoked);
        Assert.assertTrue(this.mockOffsetCommitCallback.exception instanceof RetriableCommitFailedException);
    }

    @Test
    public void testCommitOffsetAsyncCoordinatorNotAvailable() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), mockCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
        Assert.assertEquals(1L, mockCommitCallback.invoked);
        Assert.assertTrue(mockCommitCallback.exception instanceof RetriableCommitFailedException);
    }

    @Test
    public void testCommitOffsetAsyncNotCoordinator() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), mockCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
        Assert.assertEquals(1L, mockCommitCallback.invoked);
        Assert.assertTrue(mockCommitCallback.exception instanceof RetriableCommitFailedException);
    }

    @Test
    public void testCommitOffsetAsyncDisconnected() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        prepareOffsetCommitRequestDisconnect(Collections.singletonMap(this.t1p, 100L));
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), mockCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
        Assert.assertEquals(1L, mockCommitCallback.invoked);
        Assert.assertTrue(mockCommitCallback.exception instanceof RetriableCommitFailedException);
    }

    @Test
    public void testCommitOffsetSyncNotCoordinator() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NOT_COORDINATOR);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testCommitOffsetSyncCoordinatorNotAvailable() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testCommitOffsetSyncCoordinatorDisconnected() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequestDisconnect(Collections.singletonMap(this.t1p, 100L));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testAsyncCommitCallbacksInvokedPriorToSyncCommitCompletion() throws Exception {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        final List synchronizedList = Collections.synchronizedList(new ArrayList());
        final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
        final OffsetAndMetadata offsetAndMetadata2 = new OffsetAndMetadata(1L);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, offsetAndMetadata), new OffsetCommitCallback() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.22
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
                synchronizedList.add(offsetAndMetadata);
            }
        });
        Thread thread = new Thread() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.23
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                ConsumerCoordinatorTest.this.coordinator.commitOffsetsSync(Collections.singletonMap(ConsumerCoordinatorTest.this.t1p, offsetAndMetadata2), ConsumerCoordinatorTest.this.time.timer(10000L));
                synchronizedList.add(offsetAndMetadata2);
            }
        };
        thread.start();
        this.client.waitForRequests(2, 5000L);
        respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, Long.valueOf(offsetAndMetadata.offset())), Errors.NONE);
        respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, Long.valueOf(offsetAndMetadata2.offset())), Errors.NONE);
        thread.join();
        Assert.assertEquals(Arrays.asList(offsetAndMetadata, offsetAndMetadata2), synchronizedList);
    }

    @Test
    public void testRetryCommitUnknownTopicOrPartition() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
        Assert.assertTrue(this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), this.time.timer(10000L)));
    }

    @Test(expected = OffsetMetadataTooLarge.class)
    public void testCommitOffsetMetadataTooLarge() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.OFFSET_METADATA_TOO_LARGE);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), this.time.timer(Long.MAX_VALUE));
    }

    @Test(expected = CommitFailedException.class)
    public void testCommitOffsetIllegalGeneration() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.ILLEGAL_GENERATION);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), this.time.timer(Long.MAX_VALUE));
    }

    @Test(expected = CommitFailedException.class)
    public void testCommitOffsetUnknownMemberId() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.UNKNOWN_MEMBER_ID);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), this.time.timer(Long.MAX_VALUE));
    }

    @Test(expected = CommitFailedException.class)
    public void testCommitOffsetRebalanceInProgress() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), this.time.timer(Long.MAX_VALUE));
    }

    @Test(expected = KafkaException.class)
    public void testCommitOffsetSyncCallbackWithNonRetriableException() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.UNKNOWN_SERVER_ERROR);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testCommitOffsetSyncWithoutFutureGetsCompleted() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Assert.assertFalse(this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(0L)));
    }

    @Test
    public void testRefreshOffset() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.NONE, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(Collections.emptySet(), this.subscriptions.missingFetchPositions());
        Assert.assertTrue(this.subscriptions.hasAllFetchPositions());
        Assert.assertEquals(100L, this.subscriptions.position(this.t1p).offset);
    }

    @Test
    public void testRefreshOffsetWithValidation() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.updateMetadata(TestUtils.metadataUpdateWith("kafka-cluster", 1, Collections.emptyMap(), Collections.singletonMap("test1", 1), topicPartition -> {
            return 4;
        }));
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.NONE, "", 100L, 3));
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(Collections.emptySet(), this.subscriptions.missingFetchPositions());
        Assert.assertFalse(this.subscriptions.hasAllFetchPositions());
        Assert.assertTrue(this.subscriptions.awaitingValidation(this.t1p));
        Assert.assertEquals(this.subscriptions.position(this.t1p).offset, 100L);
        Assert.assertNull(this.subscriptions.validPosition(this.t1p));
    }

    @Test
    public void testFetchCommittedOffsets() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Optional of = Optional.of(15);
        this.client.prepareResponse(new OffsetFetchResponse(Errors.NONE, Collections.singletonMap(this.t1p, new OffsetFetchResponse.PartitionData(500L, of, "blahblah", Errors.NONE))));
        Map fetchCommittedOffsets = this.coordinator.fetchCommittedOffsets(Collections.singleton(this.t1p), this.time.timer(Long.MAX_VALUE));
        Assert.assertNotNull(fetchCommittedOffsets);
        Assert.assertEquals(new OffsetAndMetadata(500L, of, "blahblah"), fetchCommittedOffsets.get(this.t1p));
    }

    @Test
    public void testTopicAuthorizationFailedInOffsetFetch() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(new OffsetFetchResponse(Errors.NONE, Collections.singletonMap(this.t1p, new OffsetFetchResponse.PartitionData(-1L, Optional.empty(), "", Errors.TOPIC_AUTHORIZATION_FAILED))));
        Assert.assertEquals(Collections.singleton("test1"), Assert.assertThrows(TopicAuthorizationException.class, () -> {
            this.coordinator.fetchCommittedOffsets(Collections.singleton(this.t1p), this.time.timer(Long.MAX_VALUE));
        }).unauthorizedTopics());
    }

    @Test
    public void testRefreshOffsetLoadInProgress() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.NONE, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(Collections.emptySet(), this.subscriptions.missingFetchPositions());
        Assert.assertTrue(this.subscriptions.hasAllFetchPositions());
        Assert.assertEquals(100L, this.subscriptions.position(this.t1p).offset);
    }

    @Test
    public void testRefreshOffsetsGroupNotAuthorized() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED));
        try {
            this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
            Assert.fail("Expected group authorization error");
        } catch (GroupAuthorizationException e) {
            Assert.assertEquals("test-group", e.groupId());
        }
    }

    @Test(expected = KafkaException.class)
    public void testRefreshOffsetUnknownTopicOrPartition() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testRefreshOffsetNotCoordinatorForConsumer() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.NONE, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(Collections.emptySet(), this.subscriptions.missingFetchPositions());
        Assert.assertTrue(this.subscriptions.hasAllFetchPositions());
        Assert.assertEquals(100L, this.subscriptions.position(this.t1p).offset);
    }

    @Test
    public void testRefreshOffsetWithNoFetchableOffsets() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.NONE, "", -1L));
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(Collections.singleton(this.t1p), this.subscriptions.missingFetchPositions());
        Assert.assertEquals(Collections.emptySet(), this.subscriptions.partitionsNeedingReset(this.time.milliseconds()));
        Assert.assertFalse(this.subscriptions.hasAllFetchPositions());
        Assert.assertEquals((Object) null, this.subscriptions.position(this.t1p));
    }

    @Test
    public void testNoCoordinatorDiscoveryIfPositionsKnown() {
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.seek(this.t1p, 500L);
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(Collections.emptySet(), this.subscriptions.missingFetchPositions());
        Assert.assertTrue(this.subscriptions.hasAllFetchPositions());
        Assert.assertEquals(500L, this.subscriptions.position(this.t1p).offset);
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testNoCoordinatorDiscoveryIfPartitionAwaitingReset() {
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.requestOffsetReset(this.t1p, OffsetResetStrategy.EARLIEST);
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assert.assertEquals(Collections.emptySet(), this.subscriptions.missingFetchPositions());
        Assert.assertFalse(this.subscriptions.hasAllFetchPositions());
        Assert.assertEquals(Collections.singleton(this.t1p), this.subscriptions.partitionsNeedingReset(this.time.milliseconds()));
        Assert.assertEquals(OffsetResetStrategy.EARLIEST, this.subscriptions.resetStrategy(this.t1p));
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testAuthenticationFailureInEnsureActiveGroup() {
        this.client.createPendingAuthenticationError(this.node, 300L);
        try {
            this.coordinator.ensureActiveGroup();
            Assert.fail("Expected an authentication error.");
        } catch (AuthenticationException e) {
        }
    }

    @Test
    public void testThreadSafeAssignedPartitionsMetric() throws Exception {
        final KafkaMetric metric = this.metrics.metric(new MetricName("assigned-partitions", "consumertest-group-coordinator-metrics", "", Collections.emptyMap()));
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicInteger atomicInteger = new AtomicInteger();
        Thread thread = new Thread() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.24
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (!atomicBoolean.get()) {
                    try {
                        atomicInteger.set(((Double) metric.metricValue()).intValue());
                    } catch (Exception e) {
                        atomicReference.set(e);
                        return;
                    }
                }
            }
        };
        thread.start();
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        HashSet hashSet = new HashSet();
        final int i = 10;
        for (int i2 = 0; i2 < 10; i2++) {
            hashSet.add(new TopicPartition("test1", i2));
            this.subscriptions.assignFromUser(hashSet);
        }
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.25
            @Override // org.apache.kafka.test.TestCondition
            public boolean conditionMet() {
                return atomicInteger.get() == i || atomicReference.get() != null;
            }
        }, "Failed to observe expected assignment change");
        atomicBoolean.set(true);
        thread.join();
        Assert.assertNull("Failed fetching the metric at least once", atomicReference.get());
    }

    @Test
    public void testCloseDynamicAssignment() throws Exception {
        gracefulCloseTest(prepareCoordinatorForCloseTest(true, true, Optional.empty()), true);
    }

    @Test
    public void testCloseManualAssignment() throws Exception {
        gracefulCloseTest(prepareCoordinatorForCloseTest(false, true, Optional.empty()), false);
    }

    @Test
    public void testCloseCoordinatorNotKnownManualAssignment() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(false, true, Optional.empty());
        makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.NOT_COORDINATOR);
        this.time.sleep(2000L);
        closeVerifyTimeout(prepareCoordinatorForCloseTest, 1000L, 1000L, 1000L);
    }

    @Test
    public void testCloseCoordinatorNotKnownNoCommits() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, false, Optional.empty());
        makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.NOT_COORDINATOR);
        closeVerifyTimeout(prepareCoordinatorForCloseTest, 1000L, 0L, 0L);
    }

    @Test
    public void testCloseCoordinatorNotKnownWithCommits() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, this.groupInstanceId);
        makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.NOT_COORDINATOR);
        this.time.sleep(2000L);
        closeVerifyTimeout(prepareCoordinatorForCloseTest, 1000L, 1000L, 1000L);
    }

    @Test
    public void testCloseCoordinatorUnavailableNoCommits() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, false, Optional.empty());
        makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.COORDINATOR_NOT_AVAILABLE);
        closeVerifyTimeout(prepareCoordinatorForCloseTest, 1000L, 0L, 0L);
    }

    @Test
    public void testCloseTimeoutCoordinatorUnavailableForCommit() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, this.groupInstanceId);
        makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.COORDINATOR_NOT_AVAILABLE);
        this.time.sleep(2000L);
        closeVerifyTimeout(prepareCoordinatorForCloseTest, 1000L, 1000L, 1000L);
    }

    @Test
    public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, this.groupInstanceId);
        makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.COORDINATOR_NOT_AVAILABLE);
        this.time.sleep(2000L);
        closeVerifyTimeout(prepareCoordinatorForCloseTest, Long.MAX_VALUE, 30000L, 30000L);
    }

    @Test
    public void testCloseNoResponseForCommit() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, this.groupInstanceId);
        this.time.sleep(2000L);
        closeVerifyTimeout(prepareCoordinatorForCloseTest, Long.MAX_VALUE, 30000L, 30000L);
    }

    @Test
    public void testCloseNoResponseForLeaveGroup() throws Exception {
        closeVerifyTimeout(prepareCoordinatorForCloseTest(true, false, Optional.empty()), Long.MAX_VALUE, 30000L, 30000L);
    }

    @Test
    public void testCloseNoWait() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, this.groupInstanceId);
        this.time.sleep(2000L);
        closeVerifyTimeout(prepareCoordinatorForCloseTest, 0L, 0L, 0L);
    }

    @Test
    public void testHeartbeatThreadClose() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, this.groupInstanceId);
        prepareCoordinatorForCloseTest.ensureActiveGroup();
        this.time.sleep(5100L);
        Thread.yield();
        closeVerifyTimeout(prepareCoordinatorForCloseTest, Long.MAX_VALUE, 30000L, 30000L);
        Thread[] threadArr = new Thread[Thread.activeCount()];
        int enumerate = Thread.enumerate(threadArr);
        for (int i = 0; i < enumerate; i++) {
            Assert.assertFalse("Heartbeat thread active after close", threadArr[i].getName().contains("test-group"));
        }
    }

    @Test
    public void testAutoCommitAfterCoordinatorBackToService() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(new Metrics(), this.assignors, true, this.groupInstanceId);
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.seek(this.t1p, 100L);
        buildCoordinator.markCoordinatorUnknown();
        Assert.assertTrue(buildCoordinator.coordinatorUnknown());
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        this.time.sleep(2000L);
        buildCoordinator.maybeAutoCommitOffsetsAsync(this.time.milliseconds());
        Assert.assertFalse(buildCoordinator.coordinatorUnknown());
        Assert.assertEquals(100L, this.subscriptions.position(this.t1p).offset);
    }

    @Test(expected = FencedInstanceIdException.class)
    public void testCommitOffsetRequestSyncWithFencedInstanceIdException() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.FENCED_INSTANCE_ID);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
    }

    @Test(expected = FencedInstanceIdException.class)
    public void testCommitOffsetRequestAsyncWithFencedInstanceIdException() {
        receiveFencedInstanceIdException();
    }

    @Test
    public void testCommitOffsetRequestAsyncAlwaysReceiveFencedException() {
        Assert.assertThrows(FencedInstanceIdException.class, this::receiveFencedInstanceIdException);
        Assert.assertThrows(FencedInstanceIdException.class, () -> {
            this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), new MockCommitCallback());
        });
        Assert.assertThrows(FencedInstanceIdException.class, () -> {
            this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
        });
    }

    private void receiveFencedInstanceIdException() {
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.FENCED_INSTANCE_ID);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), new MockCommitCallback());
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
    }

    private ConsumerCoordinator prepareCoordinatorForCloseTest(boolean z, boolean z2, Optional<String> optional) {
        ConsumerCoordinator buildCoordinator = buildCoordinator(new Metrics(), this.assignors, z2, optional);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        buildCoordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        if (z) {
            this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
            this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
            this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
            buildCoordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        } else {
            this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        }
        this.subscriptions.seek(this.t1p, 100L);
        buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
        return buildCoordinator;
    }

    private void makeCoordinatorUnknown(ConsumerCoordinator consumerCoordinator, Errors errors) {
        this.time.sleep(10000L);
        consumerCoordinator.sendHeartbeatRequest();
        this.client.prepareResponse(heartbeatResponse(errors));
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(consumerCoordinator.coordinatorUnknown());
    }

    private void closeVerifyTimeout(final ConsumerCoordinator consumerCoordinator, final long j, long j2, long j3) throws Exception {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            boolean coordinatorUnknown = consumerCoordinator.coordinatorUnknown();
            Future<?> submit = newSingleThreadExecutor.submit(new Runnable() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.26
                @Override // java.lang.Runnable
                public void run() {
                    consumerCoordinator.close(ConsumerCoordinatorTest.this.time.timer(Math.min(j, 30000L)));
                }
            });
            if (coordinatorUnknown) {
                Thread.sleep(200L);
            } else {
                this.client.waitForRequests(1, 1000L);
            }
            if (j2 > 0) {
                this.time.sleep(j2 - 1);
                try {
                    submit.get(500L, TimeUnit.MILLISECONDS);
                    Assert.fail("Close completed ungracefully without waiting for timeout");
                } catch (TimeoutException e) {
                }
            }
            if (j3 >= 0) {
                this.time.sleep((j3 - j2) + 2);
            }
            submit.get(2000L, TimeUnit.MILLISECONDS);
            newSingleThreadExecutor.shutdownNow();
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdownNow();
            throw th;
        }
    }

    private void gracefulCloseTest(ConsumerCoordinator consumerCoordinator, boolean z) throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.27
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                atomicBoolean.set(true);
                return ((OffsetCommitRequest) abstractRequest).data().groupId().equals("test-group");
            }
        }, (AbstractResponse) new OffsetCommitResponse(new OffsetCommitResponseData()));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.28
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                atomicBoolean2.set(true);
                return ((LeaveGroupRequest) abstractRequest).data().groupId().equals("test-group");
            }
        }, (AbstractResponse) new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
        consumerCoordinator.close();
        Assert.assertTrue("Commit not requested", atomicBoolean.get());
        Assert.assertEquals("leaveGroupRequested should be " + z, Boolean.valueOf(z), Boolean.valueOf(atomicBoolean2.get()));
    }

    private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> list, boolean z, Optional<String> optional) {
        return new ConsumerCoordinator(new LogContext(), this.consumerClient, "test-group", optional, 60000, 10000, this.heartbeat, list, this.metadata, this.subscriptions, metrics, "consumertest-group", this.time, 100L, z, 2000, (ConsumerInterceptors) null, !optional.isPresent());
    }

    private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors errors) {
        return FindCoordinatorResponse.prepareResponse(errors, node);
    }

    private HeartbeatResponse heartbeatResponse(Errors errors) {
        return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(errors.code()));
    }

    private JoinGroupResponse joinGroupLeaderResponse(int i, String str, Map<String, List<String>> map, Errors errors) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, List<String>> entry : map.entrySet()) {
            arrayList.add(new JoinGroupResponseData.JoinGroupResponseMember().setMemberId(entry.getKey()).setMetadata(ConsumerProtocol.serializeSubscription(new PartitionAssignor.Subscription(entry.getValue())).array()));
        }
        return new JoinGroupResponse(new JoinGroupResponseData().setErrorCode(errors.code()).setGenerationId(i).setProtocolName(this.partitionAssignor.name()).setLeader(str).setMemberId(str).setMembers(Collections.emptyList()));
    }

    private JoinGroupResponse joinGroupFollowerResponse(int i, String str, String str2, Errors errors) {
        return new JoinGroupResponse(new JoinGroupResponseData().setErrorCode(errors.code()).setGenerationId(i).setProtocolName(this.partitionAssignor.name()).setLeader(str2).setMemberId(str).setMembers(Collections.emptyList()));
    }

    private SyncGroupResponse syncGroupResponse(List<TopicPartition> list, Errors errors) {
        return new SyncGroupResponse(new SyncGroupResponseData().setErrorCode(errors.code()).setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(list)))));
    }

    private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Errors> map) {
        return new OffsetCommitResponse(map);
    }

    private OffsetFetchResponse offsetFetchResponse(Errors errors) {
        return new OffsetFetchResponse(errors, Collections.emptyMap());
    }

    private OffsetFetchResponse offsetFetchResponse(TopicPartition topicPartition, Errors errors, String str, long j) {
        return new OffsetFetchResponse(Errors.NONE, Collections.singletonMap(topicPartition, new OffsetFetchResponse.PartitionData(j, Optional.empty(), str, errors)));
    }

    private OffsetFetchResponse offsetFetchResponse(TopicPartition topicPartition, Errors errors, String str, long j, int i) {
        return new OffsetFetchResponse(Errors.NONE, Collections.singletonMap(topicPartition, new OffsetFetchResponse.PartitionData(j, Optional.of(Integer.valueOf(i)), str, errors)));
    }

    private OffsetCommitCallback callback(final AtomicBoolean atomicBoolean) {
        return new OffsetCommitCallback() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.29
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
                if (exc == null) {
                    atomicBoolean.set(true);
                }
            }
        };
    }

    private void joinAsFollowerAndReceiveAssignment(String str, ConsumerCoordinator consumerCoordinator, List<TopicPartition> list) {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        consumerCoordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, str, "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(list, Errors.NONE));
        consumerCoordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
    }

    private void prepareOffsetCommitRequest(Map<TopicPartition, Long> map, Errors errors) {
        prepareOffsetCommitRequest(map, errors, false);
    }

    private void prepareOffsetCommitRequestDisconnect(Map<TopicPartition, Long> map) {
        prepareOffsetCommitRequest(map, Errors.NONE, true);
    }

    private void prepareOffsetCommitRequest(Map<TopicPartition, Long> map, Errors errors, boolean z) {
        this.client.prepareResponse(offsetCommitRequestMatcher(map), offsetCommitResponse(partitionErrors(map.keySet(), errors)), z);
    }

    private Map<TopicPartition, Errors> partitionErrors(Collection<TopicPartition> collection, Errors errors) {
        HashMap hashMap = new HashMap();
        Iterator<TopicPartition> it = collection.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), errors);
        }
        return hashMap;
    }

    private void respondToOffsetCommitRequest(Map<TopicPartition, Long> map, Errors errors) {
        this.client.respond(offsetCommitRequestMatcher(map), (AbstractResponse) offsetCommitResponse(partitionErrors(map.keySet(), errors)));
    }

    private MockClient.RequestMatcher offsetCommitRequestMatcher(final Map<TopicPartition, Long> map) {
        return new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.30
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                Map offsets = ((OffsetCommitRequest) abstractRequest).offsets();
                if (offsets.size() != map.size()) {
                    return false;
                }
                for (Map.Entry entry : map.entrySet()) {
                    if (!offsets.containsKey(entry.getKey()) || !((Long) offsets.get(entry.getKey())).equals(entry.getValue())) {
                        return false;
                    }
                }
                return true;
            }
        };
    }

    private OffsetCommitCallback callback(final Map<TopicPartition, OffsetAndMetadata> map, final AtomicBoolean atomicBoolean) {
        return new OffsetCommitCallback() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.31
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> map2, Exception exc) {
                if (map.equals(map2) && exc == null) {
                    atomicBoolean.set(true);
                }
            }
        };
    }
}
