/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.io.DataOutputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.LogTruncationException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Fetch;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class FetcherTest {
    private static final double EPSILON = 1.0E-4;
    private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
    private String topicName = "test";
    private String groupId = "test-group";
    private Uuid topicId = Uuid.randomUuid();
    private Map<String, Uuid> topicIds = new HashMap<String, Uuid>(){
        {
            this.put(FetcherTest.this.topicName, FetcherTest.this.topicId);
        }
    };
    private Map<Uuid, String> topicNames = Collections.singletonMap(this.topicId, this.topicName);
    private final String metricGroup = "consumer" + this.groupId + "-fetch-manager-metrics";
    private TopicPartition tp0 = new TopicPartition(this.topicName, 0);
    private TopicPartition tp1 = new TopicPartition(this.topicName, 1);
    private TopicPartition tp2 = new TopicPartition(this.topicName, 2);
    private TopicPartition tp3 = new TopicPartition(this.topicName, 3);
    private TopicIdPartition tidp0 = new TopicIdPartition(this.topicId, this.tp0);
    private TopicIdPartition tidp1 = new TopicIdPartition(this.topicId, this.tp1);
    private TopicIdPartition tidp2 = new TopicIdPartition(this.topicId, this.tp2);
    private TopicIdPartition tidp3 = new TopicIdPartition(this.topicId, this.tp3);
    private int validLeaderEpoch = 0;
    private MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWithIds(1, Collections.singletonMap(this.topicName, 4), this.topicIds);
    private int minBytes = 1;
    private int maxBytes = Integer.MAX_VALUE;
    private int maxWaitMs = 0;
    private int fetchSize = 1000;
    private long retryBackoffMs = 100L;
    private long requestTimeoutMs = 30000L;
    private MockTime time = new MockTime(1L);
    private SubscriptionState subscriptions;
    private ConsumerMetadata metadata;
    private FetcherMetricsRegistry metricsRegistry;
    private MockClient client;
    private Metrics metrics;
    private ApiVersions apiVersions = new ApiVersions();
    private ConsumerNetworkClient consumerClient;
    private Fetcher<?, ?> fetcher;
    private MemoryRecords records;
    private MemoryRecords nextRecords;
    private MemoryRecords emptyRecords;
    private MemoryRecords partialRecords;
    private ExecutorService executorService;

    @BeforeEach
    public void setup() {
        this.records = this.buildRecords(1L, 3, 1L);
        this.nextRecords = this.buildRecords(4L, 2, 4L);
        this.emptyRecords = this.buildRecords(0L, 0, 0L);
        this.partialRecords = this.buildRecords(4L, 1, 0L);
        this.partialRecords.buffer().putInt(8, 10000);
    }

    private void assignFromUser(Set<TopicPartition> partitions) {
        this.subscriptions.assignFromUser(partitions);
        this.client.updateMetadata(this.initialUpdateResponse);
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap(this.topicName, 4), tp -> this.validLeaderEpoch, this.topicIds), false, 0L);
    }

    private void assignFromUserNoId(Set<TopicPartition> partitions) {
        this.subscriptions.assignFromUser(partitions);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, Collections.singletonMap("noId", 1), Collections.emptyMap()));
        this.metadata.update(9, RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("noId", 1), tp -> this.validLeaderEpoch, this.topicIds), false, 0L);
    }

    @AfterEach
    public void teardown() throws Exception {
        if (this.metrics != null) {
            this.metrics.close();
        }
        if (this.fetcher != null) {
            this.fetcher.close();
        }
        if (this.executorService != null) {
            this.executorService.shutdownNow();
            Assertions.assertTrue((boolean)this.executorService.awaitTermination(5L, TimeUnit.SECONDS));
        }
    }

    @Test
    public void testFetchNormal() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        List records = partitionRecords.get(this.tp0);
        Assertions.assertEquals((int)3, (int)records.size());
        Assertions.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        long offset = 1L;
        for (ConsumerRecord record : records) {
            Assertions.assertEquals((long)offset, (long)record.offset());
            ++offset;
        }
    }

    @Test
    public void testInflightFetchOnPendingPartitions() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.subscriptions.markPendingRevocation(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertNull(this.fetchedRecords().get(this.tp0));
    }

    @Test
    public void testFetchingPendingPartitions() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        this.fetchedRecords();
        Assertions.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        this.subscriptions.markPendingRevocation(Collections.singleton(this.tp0));
        Assertions.assertEquals((int)0, (int)this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.fetchedRecords();
        Assertions.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testFetchWithNoTopicId() {
        this.buildFetcher();
        TopicIdPartition noId = new TopicIdPartition(Uuid.ZERO_UUID, new TopicPartition("noId", 0));
        this.assignFromUserNoId(Collections.singleton(noId.topicPartition()));
        this.subscriptions.seek(noId.topicPartition(), 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(this.fetchRequestMatcher((short)12, noId, 0L, Optional.of(this.validLeaderEpoch)), (AbstractResponse)this.fullFetchResponse(noId, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(noId.topicPartition()));
        List records = partitionRecords.get(noId.topicPartition());
        Assertions.assertEquals((int)3, (int)records.size());
        Assertions.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)noId.topicPartition()).offset);
        long offset = 1L;
        for (ConsumerRecord record : records) {
            Assertions.assertEquals((long)offset, (long)record.offset());
            ++offset;
        }
    }

    @Test
    public void testFetchWithTopicId() {
        this.buildFetcher();
        TopicIdPartition tp = new TopicIdPartition(this.topicId, new TopicPartition(this.topicName, 0));
        this.assignFromUser(Collections.singleton(tp.topicPartition()));
        this.subscriptions.seek(tp.topicPartition(), 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(this.fetchRequestMatcher(ApiKeys.FETCH.latestVersion(), tp, 0L, Optional.of(this.validLeaderEpoch)), (AbstractResponse)this.fullFetchResponse(tp, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(tp.topicPartition()));
        List records = partitionRecords.get(tp.topicPartition());
        Assertions.assertEquals((int)3, (int)records.size());
        Assertions.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)tp.topicPartition()).offset);
        long offset = 1L;
        for (ConsumerRecord record : records) {
            Assertions.assertEquals((long)offset, (long)record.offset());
            ++offset;
        }
    }

    @Test
    public void testFetchForgetTopicIdWhenUnassigned() {
        this.buildFetcher();
        TopicIdPartition foo = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition bar = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 0));
        this.subscriptions.assignFromUser(Collections.singleton(foo.topicPartition()));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, Collections.singleton(foo), tp -> this.validLeaderEpoch));
        this.subscriptions.seek(foo.topicPartition(), 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse(this.fetchRequestMatcher(ApiKeys.FETCH.latestVersion(), Collections.singletonMap(foo, new FetchRequest.PartitionData(foo.topicId(), 0L, -1L, this.fetchSize, Optional.of(this.validLeaderEpoch))), Collections.emptyList()), (AbstractResponse)this.fullFetchResponse(1, foo, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        this.fetchedRecords();
        this.subscriptions.assignFromUser(Collections.singleton(bar.topicPartition()));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, Collections.singleton(bar), tp -> this.validLeaderEpoch));
        this.subscriptions.seek(bar.topicPartition(), 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(this.fetchRequestMatcher(ApiKeys.FETCH.latestVersion(), Collections.singletonMap(bar, new FetchRequest.PartitionData(bar.topicId(), 0L, -1L, this.fetchSize, Optional.of(this.validLeaderEpoch))), Collections.singletonList(foo)), (AbstractResponse)this.fullFetchResponse(1, bar, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        this.fetchedRecords();
    }

    @Test
    public void testFetchForgetTopicIdWhenReplaced() {
        this.buildFetcher();
        TopicIdPartition fooWithOldTopicId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition fooWithNewTopicId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        this.subscriptions.assignFromUser(Collections.singleton(fooWithOldTopicId.topicPartition()));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, Collections.singleton(fooWithOldTopicId), tp -> this.validLeaderEpoch));
        this.subscriptions.seek(fooWithOldTopicId.topicPartition(), 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse(this.fetchRequestMatcher(ApiKeys.FETCH.latestVersion(), Collections.singletonMap(fooWithOldTopicId, new FetchRequest.PartitionData(fooWithOldTopicId.topicId(), 0L, -1L, this.fetchSize, Optional.of(this.validLeaderEpoch))), Collections.emptyList()), (AbstractResponse)this.fullFetchResponse(1, fooWithOldTopicId, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        this.fetchedRecords();
        this.subscriptions.assignFromUser(Collections.singleton(fooWithNewTopicId.topicPartition()));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, Collections.singleton(fooWithNewTopicId), tp -> this.validLeaderEpoch));
        this.subscriptions.seek(fooWithNewTopicId.topicPartition(), 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(this.fetchRequestMatcher(ApiKeys.FETCH.latestVersion(), Collections.singletonMap(fooWithNewTopicId, new FetchRequest.PartitionData(fooWithNewTopicId.topicId(), 0L, -1L, this.fetchSize, Optional.of(this.validLeaderEpoch))), Collections.singletonList(fooWithOldTopicId)), (AbstractResponse)this.fullFetchResponse(1, fooWithNewTopicId, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        this.fetchedRecords();
    }

    @Test
    public void testFetchTopicIdUpgradeDowngrade() {
        this.buildFetcher();
        TopicIdPartition fooWithoutId = new TopicIdPartition(Uuid.ZERO_UUID, new TopicPartition("foo", 0));
        TopicIdPartition fooWithId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        this.subscriptions.assignFromUser(Collections.singleton(fooWithoutId.topicPartition()));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, Collections.singleton(fooWithoutId), tp -> this.validLeaderEpoch));
        this.subscriptions.seek(fooWithoutId.topicPartition(), 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse(this.fetchRequestMatcher((short)12, Collections.singletonMap(fooWithoutId, new FetchRequest.PartitionData(fooWithoutId.topicId(), 0L, -1L, this.fetchSize, Optional.of(this.validLeaderEpoch))), Collections.emptyList()), (AbstractResponse)this.fullFetchResponse(1, fooWithoutId, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        this.fetchedRecords();
        this.subscriptions.assignFromUser(Collections.singleton(fooWithId.topicPartition()));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, Collections.singleton(fooWithId), tp -> this.validLeaderEpoch));
        this.subscriptions.seek(fooWithId.topicPartition(), 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(this.fetchRequestMatcher(ApiKeys.FETCH.latestVersion(), Collections.singletonMap(fooWithId, new FetchRequest.PartitionData(fooWithId.topicId(), 0L, -1L, this.fetchSize, Optional.of(this.validLeaderEpoch))), Collections.emptyList()), (AbstractResponse)this.fullFetchResponse(1, fooWithId, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        this.fetchedRecords();
        this.subscriptions.assignFromUser(Collections.singleton(fooWithoutId.topicPartition()));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, Collections.singleton(fooWithoutId), tp -> this.validLeaderEpoch));
        this.subscriptions.seek(fooWithoutId.topicPartition(), 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(this.fetchRequestMatcher((short)12, Collections.singletonMap(fooWithoutId, new FetchRequest.PartitionData(fooWithoutId.topicId(), 0L, -1L, this.fetchSize, Optional.of(this.validLeaderEpoch))), Collections.emptyList()), (AbstractResponse)this.fullFetchResponse(1, fooWithoutId, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        this.fetchedRecords();
    }

    private MockClient.RequestMatcher fetchRequestMatcher(short expectedVersion, TopicIdPartition tp, long expectedFetchOffset, Optional<Integer> expectedCurrentLeaderEpoch) {
        return this.fetchRequestMatcher(expectedVersion, Collections.singletonMap(tp, new FetchRequest.PartitionData(tp.topicId(), expectedFetchOffset, -1L, this.fetchSize, expectedCurrentLeaderEpoch)), Collections.emptyList());
    }

    private MockClient.RequestMatcher fetchRequestMatcher(short expectedVersion, Map<TopicIdPartition, FetchRequest.PartitionData> fetch, List<TopicIdPartition> forgotten) {
        return body -> {
            if (body instanceof FetchRequest) {
                FetchRequest fetchRequest = (FetchRequest)body;
                Assertions.assertEquals((short)expectedVersion, (short)fetchRequest.version());
                Assertions.assertEquals((Object)fetch, (Object)fetchRequest.fetchData(this.topicNames(new ArrayList<TopicIdPartition>(fetch.keySet()))));
                Assertions.assertEquals((Object)forgotten, (Object)fetchRequest.forgottenTopics(this.topicNames(forgotten)));
                return true;
            }
            Assertions.fail((String)"Should have seen FetchRequest");
            return false;
        };
    }

    private Map<Uuid, String> topicNames(List<TopicIdPartition> partitions) {
        HashMap<Uuid, String> topicNames = new HashMap<Uuid, String>();
        partitions.forEach(partition -> topicNames.putIfAbsent(partition.topicId(), partition.topic()));
        return topicNames;
    }

    @Test
    public void testMissingLeaderEpochInRecords() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)0, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L, (long)System.currentTimeMillis(), (int)-1);
        builder.append(0L, "key".getBytes(), "1".getBytes());
        builder.append(0L, "key".getBytes(), "2".getBytes());
        MemoryRecords records = builder.build();
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assertions.assertEquals((int)2, (int)partitionRecords.get(this.tp0).size());
        for (ConsumerRecord record : partitionRecords.get(this.tp0)) {
            Assertions.assertEquals(Optional.empty(), (Object)record.leaderEpoch());
        }
    }

    @Test
    public void testLeaderEpochInConsumerRecord() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Integer partitionLeaderEpoch = 1;
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L, (long)System.currentTimeMillis(), (int)partitionLeaderEpoch);
        builder.append(0L, "key".getBytes(), partitionLeaderEpoch.toString().getBytes());
        builder.append(0L, "key".getBytes(), partitionLeaderEpoch.toString().getBytes());
        builder.close();
        partitionLeaderEpoch = partitionLeaderEpoch + 7;
        builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)2L, (long)System.currentTimeMillis(), (int)partitionLeaderEpoch);
        builder.append(0L, "key".getBytes(), partitionLeaderEpoch.toString().getBytes());
        builder.close();
        partitionLeaderEpoch = partitionLeaderEpoch + 5;
        builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)3L, (long)System.currentTimeMillis(), (int)partitionLeaderEpoch);
        builder.append(0L, "key".getBytes(), partitionLeaderEpoch.toString().getBytes());
        builder.append(0L, "key".getBytes(), partitionLeaderEpoch.toString().getBytes());
        builder.append(0L, "key".getBytes(), partitionLeaderEpoch.toString().getBytes());
        builder.close();
        buffer.flip();
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assertions.assertEquals((int)6, (int)partitionRecords.get(this.tp0).size());
        for (ConsumerRecord record : partitionRecords.get(this.tp0)) {
            int expectedLeaderEpoch = Integer.parseInt(Utils.utf8((byte[])((byte[])record.value())));
            Assertions.assertEquals(Optional.of(expectedLeaderEpoch), (Object)record.leaderEpoch());
        }
    }

    @Test
    public void testClearBufferedDataForTopicPartitions() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        HashSet<TopicPartition> newAssignedTopicPartitions = new HashSet<TopicPartition>();
        newAssignedTopicPartitions.add(this.tp1);
        this.fetcher.clearBufferedDataForUnassignedPartitions(newAssignedTopicPartitions);
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
    }

    @Test
    public void testFetchSkipsBlackedOutNodes() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Node node = (Node)this.initialUpdateResponse.brokers().iterator().next();
        this.client.backoff(node, 500L);
        Assertions.assertEquals((int)0, (int)this.fetcher.sendFetches());
        this.time.sleep(500L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
    }

    @Test
    public void testFetcherIgnoresControlRecords() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        long producerId = 1L;
        short producerEpoch = 0;
        int baseSequence = 0;
        int partitionLeaderEpoch = 0;
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        MemoryRecordsBuilder builder = MemoryRecords.idempotentBuilder((ByteBuffer)buffer, (CompressionType)CompressionType.NONE, (long)0L, (long)producerId, (short)producerEpoch, (int)baseSequence);
        builder.append(0L, "key".getBytes(), null);
        builder.close();
        MemoryRecords.writeEndTransactionalMarker((ByteBuffer)buffer, (long)1L, (long)this.time.milliseconds(), (int)partitionLeaderEpoch, (long)producerId, (short)producerEpoch, (EndTransactionMarker)new EndTransactionMarker(ControlRecordType.ABORT, 0));
        buffer.flip();
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, MemoryRecords.readableRecords((ByteBuffer)buffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        List records = partitionRecords.get(this.tp0);
        Assertions.assertEquals((int)1, (int)records.size());
        Assertions.assertEquals((long)2L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        ConsumerRecord record = records.get(0);
        Assertions.assertArrayEquals((byte[])"key".getBytes(), (byte[])((byte[])record.key()));
    }

    @Test
    public void testFetchError() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assertions.assertFalse((boolean)partitionRecords.containsKey(this.tp0));
    }

    private MockClient.RequestMatcher matchesOffset(TopicIdPartition tp, long offset) {
        return body -> {
            FetchRequest fetch = (FetchRequest)body;
            Map fetchData = fetch.fetchData(this.topicNames);
            return fetchData.containsKey(tp) && ((FetchRequest.PartitionData)fetchData.get((Object)tp)).fetchOffset == offset;
        };
    }

    @Test
    public void testFetchedRecordsRaisesOnSerializationErrors() {
        ByteArrayDeserializer deserializer = new ByteArrayDeserializer(){
            int i = 0;

            public byte[] deserialize(String topic, byte[] data) {
                if (this.i++ % 2 == 1) {
                    Assertions.assertEquals((Object)"value-1", (Object)new String(data, StandardCharsets.UTF_8));
                    throw new SerializationException();
                }
                return data;
            }
        };
        this.buildFetcher((Deserializer<?>)deserializer, (Deserializer<?>)deserializer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        this.client.prepareResponse(this.matchesOffset(this.tidp0, 1L), (AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        for (int i = 0; i < 2; ++i) {
            try {
                this.fetcher.collectFetch();
                Assertions.fail((String)"fetchedRecords should have raised");
                continue;
            }
            catch (SerializationException e) {
                Assertions.assertEquals((long)1L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
            }
        }
    }

    @Test
    public void testParseCorruptedRecord() throws Exception {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        DataOutputStream out = new DataOutputStream((OutputStream)new ByteBufferOutputStream(buffer));
        byte magic = 1;
        byte[] key = "foo".getBytes();
        byte[] value = "baz".getBytes();
        long offset = 0L;
        long timestamp = 500L;
        int size = LegacyRecord.recordSize((byte)magic, (int)key.length, (int)value.length);
        byte attributes = LegacyRecord.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME);
        long crc = LegacyRecord.computeChecksum((byte)magic, (byte)attributes, (long)timestamp, (byte[])key, (byte[])value);
        out.writeLong(offset);
        out.writeInt(size);
        LegacyRecord.write((DataOutputStream)out, (byte)magic, (long)crc, (byte)LegacyRecord.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME), (long)timestamp, (byte[])key, (byte[])value);
        out.writeLong(offset + 1L);
        out.writeInt(size);
        LegacyRecord.write((DataOutputStream)out, (byte)magic, (long)(crc + 1L), (byte)LegacyRecord.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME), (long)timestamp, (byte[])key, (byte[])value);
        out.writeLong(offset + 2L);
        out.writeInt(size);
        LegacyRecord.write((DataOutputStream)out, (byte)magic, (long)crc, (byte)LegacyRecord.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME), (long)timestamp, (byte[])key, (byte[])value);
        out.writeLong(offset + 3L);
        out.writeInt(1);
        out.writeLong(offset + 4L);
        out.writeInt(size);
        LegacyRecord.write((DataOutputStream)out, (byte)magic, (long)crc, (byte)LegacyRecord.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME), (long)timestamp, (byte[])key, (byte[])value);
        buffer.flip();
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.empty(), this.metadata.currentLeader(this.tp0)));
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, MemoryRecords.readableRecords((ByteBuffer)buffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertEquals((int)1, (int)this.fetchedRecords().get(this.tp0).size());
        Assertions.assertEquals((long)1L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        this.ensureBlockOnRecord(1L);
        this.seekAndConsumeRecord(buffer, 2L);
        this.ensureBlockOnRecord(3L);
        try {
            this.seekAndConsumeRecord(buffer, 4L);
            Assertions.fail((String)"Should have thrown exception when fail to retrieve a record from iterator.");
        }
        catch (KafkaException kafkaException) {
            // empty catch block
        }
        this.ensureBlockOnRecord(4L);
    }

    private void ensureBlockOnRecord(long blockedOffset) {
        for (int i = 0; i < 2; ++i) {
            try {
                this.fetcher.collectFetch();
                Assertions.fail((String)"fetchedRecords should have raised KafkaException");
                continue;
            }
            catch (KafkaException e) {
                Assertions.assertEquals((long)blockedOffset, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
            }
        }
    }

    private void seekAndConsumeRecord(ByteBuffer responseBuffer, long toOffset) {
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(toOffset, Optional.empty(), this.metadata.currentLeader(this.tp0)));
        this.fetcher.collectFetch();
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, MemoryRecords.readableRecords((ByteBuffer)responseBuffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Map recordsByPartition = this.fetchedRecords();
        List records = recordsByPartition.get(this.tp0);
        Assertions.assertEquals((int)1, (int)records.size());
        Assertions.assertEquals((long)toOffset, (long)records.get(0).offset());
        Assertions.assertEquals((long)(toOffset + 1L), (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testInvalidDefaultRecordBatch() {
        this.buildFetcher();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        ByteBufferOutputStream out = new ByteBufferOutputStream(buffer);
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(out, 2, CompressionType.NONE, TimestampType.CREATE_TIME, 0L, 10L, 0L, 0, 0, false, false, 0, 1024);
        builder.append(10L, "key".getBytes(), "value".getBytes());
        builder.close();
        buffer.flip();
        buffer.position(17);
        buffer.put("beef".getBytes());
        buffer.position(0);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, MemoryRecords.readableRecords((ByteBuffer)buffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        for (int i = 0; i < 2; ++i) {
            try {
                this.fetcher.collectFetch();
                Assertions.fail((String)"fetchedRecords should have raised KafkaException");
                continue;
            }
            catch (KafkaException e) {
                Assertions.assertEquals((long)0L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
            }
        }
    }

    @Test
    public void testParseInvalidRecordBatch() {
        this.buildFetcher();
        MemoryRecords records = MemoryRecords.withRecords((byte)2, (long)0L, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes())});
        ByteBuffer buffer = records.buffer();
        buffer.putInt(32, buffer.get(32) ^ 0x5332717);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, MemoryRecords.readableRecords((ByteBuffer)buffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        try {
            this.fetcher.collectFetch();
            Assertions.fail((String)"fetchedRecords should have raised");
        }
        catch (KafkaException e) {
            Assertions.assertEquals((long)0L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        }
    }

    @Test
    public void testHeaders() {
        this.buildFetcher();
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)1L);
        builder.append(0L, "key".getBytes(), "value-1".getBytes());
        Header[] headersArray = new Header[]{new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8))};
        builder.append(0L, "key".getBytes(), "value-2".getBytes(), headersArray);
        Header[] headersArray2 = new Header[]{new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8)), new RecordHeader("headerKey", "headerValue2".getBytes(StandardCharsets.UTF_8))};
        builder.append(0L, "key".getBytes(), "value-3".getBytes(), headersArray2);
        MemoryRecords memoryRecords = builder.build();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        this.client.prepareResponse(this.matchesOffset(this.tidp0, 1L), (AbstractResponse)this.fullFetchResponse(this.tidp0, memoryRecords, Errors.NONE, 100L, 0));
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        Map recordsByPartition = this.fetchedRecords();
        List records = recordsByPartition.get(this.tp0);
        Assertions.assertEquals((int)3, (int)records.size());
        Iterator recordIterator = records.iterator();
        ConsumerRecord record = recordIterator.next();
        Assertions.assertNull((Object)record.headers().lastHeader("headerKey"));
        record = recordIterator.next();
        Assertions.assertEquals((Object)"headerValue", (Object)new String(record.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
        Assertions.assertEquals((Object)"headerKey", (Object)record.headers().lastHeader("headerKey").key());
        record = recordIterator.next();
        Assertions.assertEquals((Object)"headerValue2", (Object)new String(record.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
        Assertions.assertEquals((Object)"headerKey", (Object)record.headers().lastHeader("headerKey").key());
    }

    @Test
    public void testFetchMaxPollRecords() {
        this.buildFetcher(2);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        this.client.prepareResponse(this.matchesOffset(this.tidp0, 1L), (AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        this.client.prepareResponse(this.matchesOffset(this.tidp0, 4L), (AbstractResponse)this.fullFetchResponse(this.tidp0, this.nextRecords, Errors.NONE, 100L, 0));
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        Map recordsByPartition = this.fetchedRecords();
        List records = recordsByPartition.get(this.tp0);
        Assertions.assertEquals((int)2, (int)records.size());
        Assertions.assertEquals((long)3L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assertions.assertEquals((long)1L, (long)records.get(0).offset());
        Assertions.assertEquals((long)2L, (long)records.get(1).offset());
        Assertions.assertEquals((int)0, (int)this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        recordsByPartition = this.fetchedRecords();
        records = recordsByPartition.get(this.tp0);
        Assertions.assertEquals((int)1, (int)records.size());
        Assertions.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assertions.assertEquals((long)3L, (long)records.get(0).offset());
        Assertions.assertTrue((this.fetcher.sendFetches() > 0 ? 1 : 0) != 0);
        this.consumerClient.poll(this.time.timer(0L));
        recordsByPartition = this.fetchedRecords();
        records = recordsByPartition.get(this.tp0);
        Assertions.assertEquals((int)2, (int)records.size());
        Assertions.assertEquals((long)6L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assertions.assertEquals((long)4L, (long)records.get(0).offset());
        Assertions.assertEquals((long)5L, (long)records.get(1).offset());
    }

    @Test
    public void testFetchAfterPartitionWithFetchedRecordsIsUnassigned() {
        this.buildFetcher(2);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        this.client.prepareResponse(this.matchesOffset(this.tidp0, 1L), (AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        Map recordsByPartition = this.fetchedRecords();
        List records = recordsByPartition.get(this.tp0);
        Assertions.assertEquals((int)2, (int)records.size());
        Assertions.assertEquals((long)3L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assertions.assertEquals((long)1L, (long)records.get(0).offset());
        Assertions.assertEquals((long)2L, (long)records.get(1).offset());
        this.assignFromUser(Collections.singleton(this.tp1));
        this.client.prepareResponse(this.matchesOffset(this.tidp1, 4L), (AbstractResponse)this.fullFetchResponse(this.tidp1, this.nextRecords, Errors.NONE, 100L, 0));
        this.subscriptions.seek(this.tp1, 4L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        Map fetchedRecords = this.fetchedRecords();
        Assertions.assertNull(fetchedRecords.get(this.tp0));
        records = fetchedRecords.get(this.tp1);
        Assertions.assertEquals((int)2, (int)records.size());
        Assertions.assertEquals((long)6L, (long)this.subscriptions.position((TopicPartition)this.tp1).offset);
        Assertions.assertEquals((long)4L, (long)records.get(0).offset());
        Assertions.assertEquals((long)5L, (long)records.get(1).offset());
    }

    @Test
    public void testFetchNonContinuousRecords() {
        this.buildFetcher();
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        builder.appendWithOffset(15L, 0L, "key".getBytes(), "value-1".getBytes());
        builder.appendWithOffset(20L, 0L, "key".getBytes(), "value-2".getBytes());
        builder.appendWithOffset(30L, 0L, "key".getBytes(), "value-3".getBytes());
        MemoryRecords records = builder.build();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Map recordsByPartition = this.fetchedRecords();
        List consumerRecords = recordsByPartition.get(this.tp0);
        Assertions.assertEquals((int)3, (int)consumerRecords.size());
        Assertions.assertEquals((long)31L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assertions.assertEquals((long)15L, (long)consumerRecords.get(0).offset());
        Assertions.assertEquals((long)20L, (long)consumerRecords.get(1).offset());
        Assertions.assertEquals((long)30L, (long)consumerRecords.get(2).offset());
    }

    @Test
    public void testFetchRequestWhenRecordTooLarge() {
        try {
            this.buildFetcher();
            this.client.setNodeApiVersions(NodeApiVersions.create((short)ApiKeys.FETCH.id, (short)2, (short)2));
            this.makeFetchRequestWithIncompleteRecord();
            try {
                this.fetcher.collectFetch();
                Assertions.fail((String)"RecordTooLargeException should have been raised");
            }
            catch (RecordTooLargeException e) {
                Assertions.assertTrue((boolean)e.getMessage().startsWith("There are some messages at [Partition=Offset]: "));
                Assertions.assertEquals((long)0L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
            }
        }
        finally {
            this.client.setNodeApiVersions(NodeApiVersions.create());
        }
    }

    @Test
    public void testFetchRequestInternalError() {
        this.buildFetcher();
        this.makeFetchRequestWithIncompleteRecord();
        try {
            this.fetcher.collectFetch();
            Assertions.fail((String)"RecordTooLargeException should have been raised");
        }
        catch (KafkaException e) {
            Assertions.assertTrue((boolean)e.getMessage().startsWith("Failed to make progress reading messages"));
            Assertions.assertEquals((long)0L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        }
    }

    private void makeFetchRequestWithIncompleteRecord() {
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        MemoryRecords partialRecord = MemoryRecords.readableRecords((ByteBuffer)ByteBuffer.wrap(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}));
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, partialRecord, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
    }

    @Test
    public void testUnauthorizedTopic() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.TOPIC_AUTHORIZATION_FAILED, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        try {
            this.fetcher.collectFetch();
            Assertions.fail((String)"fetchedRecords should have thrown");
        }
        catch (TopicAuthorizationException e) {
            Assertions.assertEquals(Collections.singleton(this.topicName), (Object)e.unauthorizedTopics());
        }
    }

    @Test
    public void testFetchDuringEagerRebalance() {
        this.buildFetcher();
        this.subscriptions.subscribe(Collections.singleton(this.topicName), this.listener);
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, Collections.singletonMap(this.topicName, 4), tp -> this.validLeaderEpoch, this.topicIds));
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.subscriptions.assignFromSubscribed(Collections.emptyList());
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetchedRecords().isEmpty());
    }

    @Test
    public void testFetchDuringCooperativeRebalance() {
        this.buildFetcher();
        this.subscriptions.subscribe(Collections.singleton(this.topicName), this.listener);
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, Collections.singletonMap(this.topicName, 4), tp -> this.validLeaderEpoch, this.topicIds));
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Map fetchedRecords = this.fetchedRecords();
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
        Assertions.assertEquals((int)3, (int)fetchedRecords.get(this.tp0).size());
    }

    @Test
    public void testInFlightFetchOnPausedPartition() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.subscriptions.pause(this.tp0);
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertNull(this.fetchedRecords().get(this.tp0));
    }

    @Test
    public void testFetchOnPausedPartition() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        this.subscriptions.pause(this.tp0);
        Assertions.assertFalse((this.fetcher.sendFetches() > 0 ? 1 : 0) != 0);
        Assertions.assertTrue((boolean)this.client.requests().isEmpty());
    }

    @Test
    public void testFetchOnCompletedFetchesForPausedAndResumedPartitions() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.subscriptions.pause(this.tp0);
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.assertEmptyFetch("Should not return any records or advance position when partition is paused");
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches(), (String)"Should still contain completed fetches");
        Assertions.assertFalse((boolean)this.fetcher.hasAvailableFetches(), (String)"Should not have any available (non-paused) completed fetches");
        Assertions.assertEquals((int)0, (int)this.fetcher.sendFetches());
        this.subscriptions.resume(this.tp0);
        Assertions.assertTrue((boolean)this.fetcher.hasAvailableFetches(), (String)"Should have available (non-paused) completed fetches");
        this.consumerClient.poll(this.time.timer(0L));
        Map fetchedRecords = this.fetchedRecords();
        Assertions.assertEquals((int)1, (int)fetchedRecords.size(), (String)"Should return records when partition is resumed");
        Assertions.assertNotNull(fetchedRecords.get(this.tp0));
        Assertions.assertEquals((int)3, (int)fetchedRecords.get(this.tp0).size());
        this.consumerClient.poll(this.time.timer(0L));
        this.assertEmptyFetch("Should not return records or advance position after previously paused partitions are fetched");
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches(), (String)"Should no longer contain completed fetches");
    }

    @Test
    public void testFetchOnCompletedFetchesForSomePausedPartitions() {
        this.buildFetcher();
        this.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(1L, Optional.empty(), this.metadata.currentLeader(this.tp0)));
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.subscriptions.seekUnvalidated(this.tp1, new SubscriptionState.FetchPosition(1L, Optional.empty(), this.metadata.currentLeader(this.tp1)));
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp1, this.nextRecords, Errors.NONE, 100L, 0));
        this.subscriptions.pause(this.tp0);
        this.consumerClient.poll(this.time.timer(0L));
        Map fetchedRecords = this.fetchedRecords();
        Assertions.assertEquals((int)1, (int)fetchedRecords.size(), (String)"Should return completed fetch for unpaused partitions");
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches(), (String)"Should still contain completed fetches");
        Assertions.assertNotNull(fetchedRecords.get(this.tp1));
        Assertions.assertNull(fetchedRecords.get(this.tp0));
        this.assertEmptyFetch("Should not return records or advance position for remaining paused partition");
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches(), (String)"Should still contain completed fetches");
    }

    @Test
    public void testFetchOnCompletedFetchesForAllPausedPartitions() {
        this.buildFetcher();
        this.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(1L, Optional.empty(), this.metadata.currentLeader(this.tp0)));
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.subscriptions.seekUnvalidated(this.tp1, new SubscriptionState.FetchPosition(1L, Optional.empty(), this.metadata.currentLeader(this.tp1)));
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp1, this.nextRecords, Errors.NONE, 100L, 0));
        this.subscriptions.pause(this.tp0);
        this.subscriptions.pause(this.tp1);
        this.consumerClient.poll(this.time.timer(0L));
        this.assertEmptyFetch("Should not return records or advance position for all paused partitions");
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches(), (String)"Should still contain completed fetches");
        Assertions.assertFalse((boolean)this.fetcher.hasAvailableFetches(), (String)"Should not have any available (non-paused) completed fetches");
    }

    @Test
    public void testPartialFetchWithPausedPartitions() {
        this.buildFetcher(2);
        this.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seek(this.tp0, 1L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Map fetchedRecords = this.fetchedRecords();
        Assertions.assertEquals((int)2, (int)fetchedRecords.get(this.tp0).size(), (String)"Should return 2 records from fetch with 3 records");
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches(), (String)"Should have no completed fetches");
        this.subscriptions.pause(this.tp0);
        this.consumerClient.poll(this.time.timer(0L));
        fetchedRecords = this.fetchedRecords();
        this.assertEmptyFetch("Should not return records or advance position for paused partitions");
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches(), (String)"Should have 1 entry in completed fetches");
        Assertions.assertFalse((boolean)this.fetcher.hasAvailableFetches(), (String)"Should not have any available (non-paused) completed fetches");
        this.subscriptions.resume(this.tp0);
        this.consumerClient.poll(this.time.timer(0L));
        fetchedRecords = this.fetchedRecords();
        Assertions.assertEquals((int)1, (int)fetchedRecords.get(this.tp0).size(), (String)"Should return last remaining record");
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches(), (String)"Should have no completed fetches");
    }

    @Test
    public void testFetchDiscardedAfterPausedPartitionResumedAndSeekedToNewOffset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.subscriptions.pause(this.tp0);
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        this.subscriptions.seek(this.tp0, 3L);
        this.subscriptions.resume(this.tp0);
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches(), (String)"Should have 1 entry in completed fetches");
        Fetch fetch = this.collectFetch();
        Assertions.assertEquals(Collections.emptyMap(), (Object)fetch.records(), (String)"Should not return any records because we seeked to a new offset");
        Assertions.assertFalse((boolean)fetch.positionAdvanced());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches(), (String)"Should have no completed fetches");
    }

    @Test
    public void testFetchNotLeaderOrFollower() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.assertEmptyFetch("Should not return records or advance position on fetch error");
        Assertions.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchUnknownTopicOrPartition() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.assertEmptyFetch("Should not return records or advance position on fetch error");
        Assertions.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchUnknownTopicId() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.UNKNOWN_TOPIC_ID, -1L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.assertEmptyFetch("Should not return records or advance position on fetch error");
        Assertions.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchSessionIdError() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponseWithTopLevelError(this.tidp0, Errors.FETCH_SESSION_TOPIC_ID_ERROR, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.assertEmptyFetch("Should not return records or advance position on fetch error");
        Assertions.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchInconsistentTopicId() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.INCONSISTENT_TOPIC_ID, -1L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.assertEmptyFetch("Should not return records or advance position on fetch error");
        Assertions.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchFencedLeaderEpoch() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.FENCED_LEADER_EPOCH, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.assertEmptyFetch("Should not return records or advance position on fetch error");
        Assertions.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()), (String)"Should have requested metadata update");
    }

    @Test
    public void testFetchUnknownLeaderEpoch() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.UNKNOWN_LEADER_EPOCH, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.assertEmptyFetch("Should not return records or advance position on fetch error");
        Assertions.assertNotEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()), (String)"Should not have requested metadata update");
    }

    @Test
    public void testEpochSetInFetchRequest() {
        this.buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap(this.topicName, 4), tp -> 99, this.topicIds);
        this.client.updateMetadata(metadataResponse);
        this.subscriptions.seek(this.tp0, 10L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        MockClient.RequestMatcher matcher = body -> {
            if (body instanceof FetchRequest) {
                FetchRequest fetchRequest = (FetchRequest)body;
                fetchRequest.fetchData(this.topicNames).values().forEach(partitionData -> {
                    Assertions.assertTrue((boolean)partitionData.currentLeaderEpoch.isPresent(), (String)"Expected Fetcher to set leader epoch in request");
                    Assertions.assertEquals((long)99L, (long)((Integer)partitionData.currentLeaderEpoch.get()).longValue(), (String)"Expected leader epoch to match epoch from metadata update");
                });
                return true;
            }
            Assertions.fail((String)"Should have seen FetchRequest");
            return false;
        };
        this.client.prepareResponse(matcher, (AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.pollNoWakeup();
    }

    @Test
    public void testFetchOffsetOutOfRange() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.assertEmptyFetch("Should not return records or advance position on fetch error");
        Assertions.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertNull((Object)this.subscriptions.validPosition(this.tp0));
        Assertions.assertNull((Object)this.subscriptions.position(this.tp0));
    }

    @Test
    public void testStaleOutOfRangeError() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.subscriptions.seek(this.tp0, 1L);
        this.consumerClient.poll(this.time.timer(0L));
        this.assertEmptyFetch("Should not return records or advance position on fetch error");
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertEquals((long)1L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testFetchedRecordsAfterSeek() {
        this.buildFetcher(OffsetResetStrategy.NONE, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertTrue((this.fetcher.sendFetches() > 0 ? 1 : 0) != 0);
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        this.subscriptions.seek(this.tp0, 2L);
        this.assertEmptyFetch("Should not return records or advance position after seeking to end of topic partition");
    }

    @Test
    public void testFetchOffsetOutOfRangeException() {
        this.buildFetcher(OffsetResetStrategy.NONE, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        this.fetcher.sendFetches();
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        for (int i = 0; i < 2; ++i) {
            OffsetOutOfRangeException e = (OffsetOutOfRangeException)Assertions.assertThrows(OffsetOutOfRangeException.class, () -> this.fetcher.collectFetch());
            Assertions.assertEquals(Collections.singleton(this.tp0), e.offsetOutOfRangePartitions().keySet());
            Assertions.assertEquals((long)0L, (long)((Long)e.offsetOutOfRangePartitions().get(this.tp0)));
        }
    }

    @Test
    public void testFetchPositionAfterException() {
        this.buildFetcher(OffsetResetStrategy.NONE, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
        this.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seek(this.tp0, 1L);
        this.subscriptions.seek(this.tp1, 1L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> partitions = new LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>();
        partitions.put(this.tidp1, new FetchResponseData.PartitionData().setPartitionIndex(this.tp1.partition()).setHighWatermark(100L).setRecords((BaseRecords)this.records));
        partitions.put(this.tidp0, new FetchResponseData.PartitionData().setPartitionIndex(this.tp0.partition()).setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code()).setHighWatermark(100L));
        this.client.prepareResponse((AbstractResponse)FetchResponse.of((Errors)Errors.NONE, (int)0, (int)0, new LinkedHashMap(partitions)));
        this.consumerClient.poll(this.time.timer(0L));
        ArrayList<ConsumerRecord<byte[], byte[]>> allFetchedRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>();
        this.fetchRecordsInto(allFetchedRecords);
        Assertions.assertEquals((long)1L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assertions.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp1).offset);
        Assertions.assertEquals((int)3, (int)allFetchedRecords.size());
        OffsetOutOfRangeException e = (OffsetOutOfRangeException)Assertions.assertThrows(OffsetOutOfRangeException.class, () -> this.fetchRecordsInto(allFetchedRecords));
        Assertions.assertEquals(Collections.singleton(this.tp0), e.offsetOutOfRangePartitions().keySet());
        Assertions.assertEquals((long)1L, (long)((Long)e.offsetOutOfRangePartitions().get(this.tp0)));
        Assertions.assertEquals((long)1L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assertions.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp1).offset);
        Assertions.assertEquals((int)3, (int)allFetchedRecords.size());
    }

    private void fetchRecordsInto(List<ConsumerRecord<byte[], byte[]>> allFetchedRecords) {
        Map fetchedRecords = this.fetchedRecords();
        fetchedRecords.values().forEach(allFetchedRecords::addAll);
    }

    @Test
    public void testCompletedFetchRemoval() {
        this.buildFetcher(OffsetResetStrategy.NONE, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
        this.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0, this.tp1, this.tp2, this.tp3}));
        this.subscriptions.seek(this.tp0, 1L);
        this.subscriptions.seek(this.tp1, 1L);
        this.subscriptions.seek(this.tp2, 1L);
        this.subscriptions.seek(this.tp3, 1L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> partitions = new LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>();
        partitions.put(this.tidp1, new FetchResponseData.PartitionData().setPartitionIndex(this.tp1.partition()).setHighWatermark(100L).setRecords((BaseRecords)this.records));
        partitions.put(this.tidp0, new FetchResponseData.PartitionData().setPartitionIndex(this.tp0.partition()).setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code()).setHighWatermark(100L));
        partitions.put(this.tidp2, new FetchResponseData.PartitionData().setPartitionIndex(this.tp2.partition()).setHighWatermark(100L).setLastStableOffset(4L).setLogStartOffset(0L).setRecords((BaseRecords)this.nextRecords));
        partitions.put(this.tidp3, new FetchResponseData.PartitionData().setPartitionIndex(this.tp3.partition()).setHighWatermark(100L).setLastStableOffset(4L).setLogStartOffset(0L).setRecords((BaseRecords)this.partialRecords));
        this.client.prepareResponse((AbstractResponse)FetchResponse.of((Errors)Errors.NONE, (int)0, (int)0, new LinkedHashMap(partitions)));
        this.consumerClient.poll(this.time.timer(0L));
        ArrayList fetchedRecords = new ArrayList();
        Map recordsByPartition = this.fetchedRecords();
        for (List list : recordsByPartition.values()) {
            fetchedRecords.addAll(list);
        }
        Assertions.assertEquals((long)fetchedRecords.size(), (long)(this.subscriptions.position((TopicPartition)this.tp1).offset - 1L));
        Assertions.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp1).offset);
        Assertions.assertEquals((int)3, (int)fetchedRecords.size());
        ArrayList<OffsetOutOfRangeException> oorExceptions = new ArrayList<OffsetOutOfRangeException>();
        try {
            recordsByPartition = this.fetchedRecords();
            for (List list : recordsByPartition.values()) {
                fetchedRecords.addAll(list);
            }
        }
        catch (OffsetOutOfRangeException offsetOutOfRangeException) {
            oorExceptions.add(offsetOutOfRangeException);
        }
        Assertions.assertEquals((int)1, (int)oorExceptions.size());
        OffsetOutOfRangeException offsetOutOfRangeException = (OffsetOutOfRangeException)((Object)oorExceptions.get(0));
        Assertions.assertTrue((boolean)offsetOutOfRangeException.offsetOutOfRangePartitions().containsKey(this.tp0));
        Assertions.assertEquals((int)offsetOutOfRangeException.offsetOutOfRangePartitions().size(), (int)1);
        recordsByPartition = this.fetchedRecords();
        for (List list : recordsByPartition.values()) {
            fetchedRecords.addAll(list);
        }
        Assertions.assertEquals((long)6L, (long)this.subscriptions.position((TopicPartition)this.tp2).offset);
        Assertions.assertEquals((int)5, (int)fetchedRecords.size());
        int n = 3;
        ArrayList<KafkaException> arrayList = new ArrayList<KafkaException>();
        for (int i = 1; i <= n; ++i) {
            try {
                recordsByPartition = this.fetchedRecords();
                for (List records : recordsByPartition.values()) {
                    fetchedRecords.addAll(records);
                }
                continue;
            }
            catch (KafkaException e) {
                arrayList.add(e);
            }
        }
        Assertions.assertEquals((int)n, (int)arrayList.size());
    }

    @Test
    public void testSeekBeforeException() {
        this.buildFetcher(OffsetResetStrategy.NONE, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED);
        this.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0}));
        this.subscriptions.seek(this.tp0, 1L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        HashMap<TopicIdPartition, FetchResponseData.PartitionData> partitions = new HashMap<TopicIdPartition, FetchResponseData.PartitionData>();
        partitions.put(this.tidp0, new FetchResponseData.PartitionData().setPartitionIndex(this.tp0.partition()).setHighWatermark(100L).setRecords((BaseRecords)this.records));
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertEquals((int)2, (int)this.fetchedRecords().get(this.tp0).size());
        this.subscriptions.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seekUnvalidated(this.tp1, new SubscriptionState.FetchPosition(1L, Optional.empty(), this.metadata.currentLeader(this.tp1)));
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        partitions = new HashMap();
        partitions.put(this.tidp1, new FetchResponseData.PartitionData().setPartitionIndex(this.tp1.partition()).setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code()).setHighWatermark(100L));
        this.client.prepareResponse((AbstractResponse)FetchResponse.of((Errors)Errors.NONE, (int)0, (int)0, new LinkedHashMap(partitions)));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertEquals((int)1, (int)this.fetchedRecords().get(this.tp0).size());
        this.subscriptions.seek(this.tp1, 10L);
        this.assertEmptyFetch("Should not return records or advance position after seeking to end of topic partitions");
    }

    @Test
    public void testFetchDisconnected() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0), true);
        this.consumerClient.poll(this.time.timer(0L));
        this.assertEmptyFetch("Should not return records or advance position on disconnect");
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)0L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionNoOpWithPositionSet() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 5L);
        this.fetcher.resetOffsetsIfNeeded();
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionResetToDefaultOffset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-2L, this.validLeaderEpoch), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionResetToLatestOffset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.updateMetadata(this.initialUpdateResponse);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testFetchOffsetErrors() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L, this.validLeaderEpoch), (AbstractResponse)this.listOffsetResponse(Errors.OFFSET_NOT_AVAILABLE, 1L, 5L), false);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L, this.validLeaderEpoch), (AbstractResponse)this.listOffsetResponse(Errors.LEADER_NOT_AVAILABLE, 1L, 5L), false);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L), false);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)this.subscriptions.position((TopicPartition)this.tp0).offset, (long)5L);
    }

    @Test
    public void testListOffsetSendsReadUncommitted() {
        this.testListOffsetsSendsIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
    }

    @Test
    public void testListOffsetSendsReadCommitted() {
        this.testListOffsetsSendsIsolationLevel(IsolationLevel.READ_COMMITTED);
    }

    private void testListOffsetsSendsIsolationLevel(IsolationLevel isolationLevel) {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, isolationLevel);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(body -> {
            ListOffsetsRequest request = (ListOffsetsRequest)body;
            return request.isolationLevel() == isolationLevel;
        }, (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testResetOffsetsSkipsBlackedOutConnections() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        this.client.updateMetadata(this.initialUpdateResponse);
        Node node = (Node)this.initialUpdateResponse.brokers().iterator().next();
        this.client.backoff(node, 500L);
        this.fetcher.resetOffsetsIfNeeded();
        Assertions.assertEquals((int)0, (int)this.consumerClient.pendingRequestCount());
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertEquals((Object)OffsetResetStrategy.EARLIEST, (Object)this.subscriptions.resetStrategy(this.tp0));
        this.time.sleep(500L);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-2L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionResetToEarliestOffset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-2L, this.validLeaderEpoch), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testResetOffsetsMetadataRefresh() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L, this.validLeaderEpoch), (AbstractResponse)this.listOffsetResponse(Errors.NOT_LEADER_OR_FOLLOWER, 1L, 5L), false);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        this.client.prepareMetadataUpdate(this.initialUpdateResponse);
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.client.hasPendingMetadataUpdates());
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testListOffsetNoUpdateMissingEpoch() {
        this.buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        MetadataResponse metadataWithNoLeaderEpochs = RequestTestUtils.metadataUpdateWithIds("kafka-cluster", 1, Collections.emptyMap(), Collections.singletonMap(this.topicName, 4), tp -> null, this.topicIds);
        this.client.updateMetadata(metadataWithNoLeaderEpochs);
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, 1L, 5L, 1));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.metadata.updateRequested());
        Assertions.assertFalse((boolean)this.metadata.lastSeenLeaderEpoch(this.tp0).isPresent());
    }

    @Test
    public void testListOffsetUpdateEpoch() {
        this.buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        MetadataResponse metadataWithLeaderEpochs = RequestTestUtils.metadataUpdateWithIds("kafka-cluster", 1, Collections.emptyMap(), Collections.singletonMap(this.topicName, 4), tp -> 1, this.topicIds);
        this.client.updateMetadata(metadataWithLeaderEpochs);
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L, 1), (AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, 1L, 5L, 2));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.metadata.updateRequested());
        TestUtils.assertOptional(this.metadata.lastSeenLeaderEpoch(this.tp0), epoch -> Assertions.assertEquals((long)epoch.intValue(), (long)2L));
    }

    @Test
    public void testUpdateFetchPositionDisconnect() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L, this.validLeaderEpoch), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L), true);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        this.client.prepareMetadataUpdate(this.initialUpdateResponse);
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.client.hasPendingMetadataUpdates());
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testAssignmentChangeWithInFlightReset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
        this.assignFromUser(Collections.singleton(this.tp1));
        this.client.respond((AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.client.hasPendingResponses());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertFalse((boolean)this.subscriptions.isAssigned(this.tp0));
    }

    @Test
    public void testSeekWithInFlightReset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
        this.subscriptions.seek(this.tp0, 237L);
        this.client.respond((AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.client.hasPendingResponses());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertEquals((long)237L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    private boolean listOffsetMatchesExpectedReset(TopicPartition tp, OffsetResetStrategy strategy, AbstractRequest request) {
        Assertions.assertTrue((boolean)(request instanceof ListOffsetsRequest));
        ListOffsetsRequest req = (ListOffsetsRequest)request;
        Assertions.assertEquals(Collections.singleton(tp.topic()), req.data().topics().stream().map(ListOffsetsRequestData.ListOffsetsTopic::name).collect(Collectors.toSet()));
        ListOffsetsRequestData.ListOffsetsTopic listTopic = req.data().topics().get(0);
        Assertions.assertEquals(Collections.singleton(tp.partition()), listTopic.partitions().stream().map(ListOffsetsRequestData.ListOffsetsPartition::partitionIndex).collect(Collectors.toSet()));
        ListOffsetsRequestData.ListOffsetsPartition listPartition = listTopic.partitions().get(0);
        if (strategy == OffsetResetStrategy.EARLIEST) {
            Assertions.assertEquals((long)-2L, (long)listPartition.timestamp());
        } else if (strategy == OffsetResetStrategy.LATEST) {
            Assertions.assertEquals((long)-1L, (long)listPartition.timestamp());
        }
        return true;
    }

    @Test
    public void testEarlierOffsetResetArrivesLate() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        this.fetcher.resetOffsetsIfNeeded();
        this.client.prepareResponse(req -> {
            if (this.listOffsetMatchesExpectedReset(this.tp0, OffsetResetStrategy.EARLIEST, req)) {
                this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
                return true;
            }
            return false;
        }, (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 0L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertEquals((Object)OffsetResetStrategy.LATEST, (Object)this.subscriptions.resetStrategy(this.tp0));
        this.fetcher.resetOffsetsIfNeeded();
        this.client.prepareResponse(req -> this.listOffsetMatchesExpectedReset(this.tp0, OffsetResetStrategy.LATEST, req), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 10L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertEquals((long)10L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testChangeResetWithInFlightReset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        this.client.respond((AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.client.hasPendingResponses());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertEquals((Object)OffsetResetStrategy.EARLIEST, (Object)this.subscriptions.resetStrategy(this.tp0));
    }

    @Test
    public void testIdempotentResetWithInFlightReset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.respond((AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testRestOffsetsAuthorizationFailure() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L, this.validLeaderEpoch), (AbstractResponse)this.listOffsetResponse(Errors.TOPIC_AUTHORIZATION_FAILED, -1L, -1L), false);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        try {
            this.fetcher.resetOffsetsIfNeeded();
            Assertions.fail((String)"Expected authorization error to be raised");
        }
        catch (TopicAuthorizationException e) {
            Assertions.assertEquals(Collections.singleton(this.tp0.topic()), (Object)e.unauthorizedTopics());
        }
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 100L);
        this.subscriptions.seek(this.tp0, 100L);
        this.subscriptions.seek(this.tp0, 100L);
        Assertions.assertEquals((long)100L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        this.subscriptions.markPendingRevocation(Collections.singleton(this.tp0));
        this.fetcher.resetOffsetsIfNeeded();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertEquals((long)100L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        this.subscriptions.seek(this.tp0, 100L);
        Assertions.assertEquals((long)100L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        this.subscriptions.unsubscribe();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 100L);
        Assertions.assertEquals((long)100L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.pause(this.tp0);
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L, this.validLeaderEpoch), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 10L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertEquals((long)10L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0);
        this.subscriptions.pause(this.tp0);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 10L);
        this.subscriptions.pause(this.tp0);
        this.fetcher.resetOffsetsIfNeeded();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertEquals((long)10L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testGetAllTopics() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.NONE));
        Map allTopics = this.fetcher.getAllTopicMetadata(this.time.timer(5000L));
        Assertions.assertEquals((int)this.initialUpdateResponse.topicMetadata().size(), (int)allTopics.size());
    }

    @Test
    public void testGetAllTopicsDisconnect() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse(null, true);
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.NONE));
        Map allTopics = this.fetcher.getAllTopicMetadata(this.time.timer(5000L));
        Assertions.assertEquals((int)this.initialUpdateResponse.topicMetadata().size(), (int)allTopics.size());
    }

    @Test
    public void testGetAllTopicsTimeout() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        Assertions.assertThrows(TimeoutException.class, () -> this.fetcher.getAllTopicMetadata(this.time.timer(50L)));
    }

    @Test
    public void testGetAllTopicsUnauthorized() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.TOPIC_AUTHORIZATION_FAILED));
        try {
            this.fetcher.getAllTopicMetadata(this.time.timer(10L));
            Assertions.fail();
        }
        catch (TopicAuthorizationException e) {
            Assertions.assertEquals(Collections.singleton(this.topicName), (Object)e.unauthorizedTopics());
        }
    }

    @Test
    public void testGetTopicMetadataInvalidTopic() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.INVALID_TOPIC_EXCEPTION));
        Assertions.assertThrows(InvalidTopicException.class, () -> this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), true), this.time.timer(5000L)));
    }

    @Test
    public void testGetTopicMetadataUnknownTopic() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION));
        Map topicMetadata = this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), true), this.time.timer(5000L));
        Assertions.assertNull(topicMetadata.get(this.topicName));
    }

    @Test
    public void testGetTopicMetadataLeaderNotAvailable() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.LEADER_NOT_AVAILABLE));
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.NONE));
        Map topicMetadata = this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), true), this.time.timer(5000L));
        Assertions.assertTrue((boolean)topicMetadata.containsKey(this.topicName));
    }

    @Test
    public void testGetTopicMetadataOfflinePartitions() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        MetadataResponse originalResponse = this.newMetadataResponse(this.topicName, Errors.NONE);
        ArrayList<MetadataResponse.TopicMetadata> altTopics = new ArrayList<MetadataResponse.TopicMetadata>();
        for (MetadataResponse.TopicMetadata item : originalResponse.topicMetadata()) {
            List partitions = item.partitionMetadata();
            ArrayList<MetadataResponse.PartitionMetadata> altPartitions = new ArrayList<MetadataResponse.PartitionMetadata>();
            for (MetadataResponse.PartitionMetadata p : partitions) {
                altPartitions.add(new MetadataResponse.PartitionMetadata(p.error, p.topicPartition, Optional.empty(), Optional.empty(), p.replicaIds, p.inSyncReplicaIds, p.offlineReplicaIds));
            }
            MetadataResponse.TopicMetadata alteredTopic = new MetadataResponse.TopicMetadata(item.error(), item.topic(), item.isInternal(), altPartitions);
            altTopics.add(alteredTopic);
        }
        Node controller = originalResponse.controller();
        MetadataResponse altered = RequestTestUtils.metadataResponse(originalResponse.brokers(), originalResponse.clusterId(), controller != null ? controller.id() : -1, altTopics);
        this.client.prepareResponse((AbstractResponse)altered);
        Map topicMetadata = this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), false), this.time.timer(5000L));
        Assertions.assertNotNull((Object)topicMetadata);
        Assertions.assertNotNull(topicMetadata.get(this.topicName));
        Assertions.assertEquals((long)this.metadata.fetch().partitionCountForTopic(this.topicName).longValue(), (long)((List)topicMetadata.get(this.topicName)).size());
    }

    @Test
    public void testQuotaMetrics() {
        this.buildFetcher();
        MockSelector selector = new MockSelector(this.time);
        Sensor throttleTimeSensor = Fetcher.throttleTimeSensor((Metrics)this.metrics, (FetcherMetricsRegistry)this.metricsRegistry);
        Cluster cluster = TestUtils.singletonCluster("test", 1);
        Node node = (Node)cluster.nodes().get(0);
        NetworkClient client = new NetworkClient((Selectable)selector, (Metadata)this.metadata, "mock", Integer.MAX_VALUE, 1000L, 1000L, 65536, 65536, 1000, 10000L, 127000L, (Time)this.time, true, new ApiVersions(), throttleTimeSensor, new LogContext());
        ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse((int)400, (ApiMessageType.ListenerType)ApiMessageType.ListenerType.ZK_BROKER);
        ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader((AbstractResponse)apiVersionsResponse, ApiKeys.API_VERSIONS.latestVersion(), 0);
        selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
        while (!client.ready(node, this.time.milliseconds())) {
            client.poll(1L, this.time.milliseconds());
            this.time.sleep(client.throttleDelayMs(node, this.time.milliseconds()));
        }
        selector.clear();
        for (int i = 1; i <= 3; ++i) {
            int throttleTimeMs = 100 * i;
            FetchRequest.Builder builder = FetchRequest.Builder.forConsumer((short)ApiKeys.FETCH.latestVersion(), (int)100, (int)100, new LinkedHashMap());
            builder.rackId("");
            ClientRequest request = client.newClientRequest(node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true);
            client.send(request, this.time.milliseconds());
            client.poll(1L, this.time.milliseconds());
            FetchResponse response = this.fullFetchResponse(this.tidp0, this.nextRecords, Errors.NONE, i, throttleTimeMs);
            buffer = RequestTestUtils.serializeResponseWithHeader((AbstractResponse)response, ApiKeys.FETCH.latestVersion(), request.correlationId());
            selector.completeReceive(new NetworkReceive(node.idString(), buffer));
            client.poll(1L, this.time.milliseconds());
            this.time.sleep(client.throttleDelayMs(node, this.time.milliseconds()));
            selector.clear();
        }
        Map allMetrics = this.metrics.metrics();
        KafkaMetric avgMetric = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchThrottleTimeAvg, new String[0]));
        KafkaMetric maxMetric = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchThrottleTimeMax, new String[0]));
        Assertions.assertEquals((double)250.0, (double)((Double)avgMetric.metricValue()), (double)1.0E-4);
        Assertions.assertEquals((double)400.0, (double)((Double)maxMetric.metricValue()), (double)1.0E-4);
        client.close();
    }

    @Test
    public void testFetcherMetrics() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        MetricName maxLagMetric = this.metrics.metricInstance(this.metricsRegistry.recordsLagMax, new String[0]);
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("topic", this.tp0.topic());
        tags.put("partition", String.valueOf(this.tp0.partition()));
        MetricName partitionLagMetric = this.metrics.metricName("records-lag", this.metricGroup, tags);
        Map allMetrics = this.metrics.metrics();
        KafkaMetric recordsFetchLagMax = (KafkaMetric)allMetrics.get(maxLagMetric);
        Assertions.assertEquals((double)Double.NaN, (double)((Double)recordsFetchLagMax.metricValue()), (double)1.0E-4);
        this.fetchRecords(this.tidp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 0);
        Assertions.assertEquals((double)100.0, (double)((Double)recordsFetchLagMax.metricValue()), (double)1.0E-4);
        KafkaMetric partitionLag = (KafkaMetric)allMetrics.get(partitionLagMetric);
        Assertions.assertEquals((double)100.0, (double)((Double)partitionLag.metricValue()), (double)1.0E-4);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        this.fetchRecords(this.tidp0, builder.build(), Errors.NONE, 200L, 0);
        Assertions.assertEquals((double)197.0, (double)((Double)recordsFetchLagMax.metricValue()), (double)1.0E-4);
        Assertions.assertEquals((double)197.0, (double)((Double)partitionLag.metricValue()), (double)1.0E-4);
        this.subscriptions.unsubscribe();
        this.fetcher.sendFetches();
        Assertions.assertFalse((boolean)allMetrics.containsKey(partitionLagMetric));
    }

    @Test
    public void testFetcherLeadMetric() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        MetricName minLeadMetric = this.metrics.metricInstance(this.metricsRegistry.recordsLeadMin, new String[0]);
        HashMap<String, String> tags = new HashMap<String, String>(2);
        tags.put("topic", this.tp0.topic());
        tags.put("partition", String.valueOf(this.tp0.partition()));
        MetricName partitionLeadMetric = this.metrics.metricName("records-lead", this.metricGroup, "", tags);
        Map allMetrics = this.metrics.metrics();
        KafkaMetric recordsFetchLeadMin = (KafkaMetric)allMetrics.get(minLeadMetric);
        Assertions.assertEquals((double)Double.NaN, (double)((Double)recordsFetchLeadMin.metricValue()), (double)1.0E-4);
        this.fetchRecords(this.tidp0, MemoryRecords.EMPTY, Errors.NONE, 100L, -1L, 0L, 0);
        Assertions.assertEquals((double)0.0, (double)((Double)recordsFetchLeadMin.metricValue()), (double)1.0E-4);
        KafkaMetric partitionLead = (KafkaMetric)allMetrics.get(partitionLeadMetric);
        Assertions.assertEquals((double)0.0, (double)((Double)partitionLead.metricValue()), (double)1.0E-4);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        this.fetchRecords(this.tidp0, builder.build(), Errors.NONE, 200L, -1L, 0L, 0);
        Assertions.assertEquals((double)0.0, (double)((Double)recordsFetchLeadMin.metricValue()), (double)1.0E-4);
        Assertions.assertEquals((double)3.0, (double)((Double)partitionLead.metricValue()), (double)1.0E-4);
        this.subscriptions.unsubscribe();
        this.fetcher.sendFetches();
        Assertions.assertFalse((boolean)allMetrics.containsKey(partitionLeadMetric));
    }

    @Test
    public void testReadCommittedLagMetric() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        MetricName maxLagMetric = this.metrics.metricInstance(this.metricsRegistry.recordsLagMax, new String[0]);
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("topic", this.tp0.topic());
        tags.put("partition", String.valueOf(this.tp0.partition()));
        MetricName partitionLagMetric = this.metrics.metricName("records-lag", this.metricGroup, tags);
        Map allMetrics = this.metrics.metrics();
        KafkaMetric recordsFetchLagMax = (KafkaMetric)allMetrics.get(maxLagMetric);
        Assertions.assertEquals((double)Double.NaN, (double)((Double)recordsFetchLagMax.metricValue()), (double)1.0E-4);
        this.fetchRecords(this.tidp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 50L, 0);
        Assertions.assertEquals((double)50.0, (double)((Double)recordsFetchLagMax.metricValue()), (double)1.0E-4);
        KafkaMetric partitionLag = (KafkaMetric)allMetrics.get(partitionLagMetric);
        Assertions.assertEquals((double)50.0, (double)((Double)partitionLag.metricValue()), (double)1.0E-4);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        this.fetchRecords(this.tidp0, builder.build(), Errors.NONE, 200L, 150L, 0);
        Assertions.assertEquals((double)147.0, (double)((Double)recordsFetchLagMax.metricValue()), (double)1.0E-4);
        Assertions.assertEquals((double)147.0, (double)((Double)partitionLag.metricValue()), (double)1.0E-4);
        this.subscriptions.unsubscribe();
        this.fetcher.sendFetches();
        Assertions.assertFalse((boolean)allMetrics.containsKey(partitionLagMetric));
    }

    @Test
    public void testFetchResponseMetrics() {
        this.buildFetcher();
        String topic1 = "foo";
        String topic2 = "bar";
        TopicPartition tp1 = new TopicPartition(topic1, 0);
        TopicPartition tp2 = new TopicPartition(topic2, 0);
        this.subscriptions.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{tp1, tp2}));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(topic1, 1);
        partitionCounts.put(topic2, 1);
        this.topicIds.put(topic1, Uuid.randomUuid());
        this.topicIds.put(topic2, Uuid.randomUuid());
        TopicIdPartition tidp1 = new TopicIdPartition(this.topicIds.get(topic1), tp1);
        TopicIdPartition tidp2 = new TopicIdPartition(this.topicIds.get(topic2), tp2);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, partitionCounts, tp -> this.validLeaderEpoch, this.topicIds));
        int expectedBytes = 0;
        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> fetchPartitionData = new LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>();
        for (TopicIdPartition tp3 : Utils.mkSet((Object[])new TopicIdPartition[]{tidp1, tidp2})) {
            this.subscriptions.seek(tp3.topicPartition(), 0L);
            MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
            for (int v = 0; v < 3; ++v) {
                builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
            }
            MemoryRecords records = builder.build();
            for (Record record : records.records()) {
                expectedBytes += record.sizeInBytes();
            }
            fetchPartitionData.put(tp3, new FetchResponseData.PartitionData().setPartitionIndex(tp3.topicPartition().partition()).setHighWatermark(15L).setLogStartOffset(0L).setRecords((BaseRecords)records));
        }
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)FetchResponse.of((Errors)Errors.NONE, (int)0, (int)0, fetchPartitionData));
        this.consumerClient.poll(this.time.timer(0L));
        Map fetchedRecords = this.fetchedRecords();
        Assertions.assertEquals((int)3, (int)fetchedRecords.get(tp1).size());
        Assertions.assertEquals((int)3, (int)fetchedRecords.get(tp2).size());
        Map allMetrics = this.metrics.metrics();
        KafkaMetric fetchSizeAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric recordsCountAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        Assertions.assertEquals((double)expectedBytes, (double)((Double)fetchSizeAverage.metricValue()), (double)1.0E-4);
        Assertions.assertEquals((double)6.0, (double)((Double)recordsCountAverage.metricValue()), (double)1.0E-4);
    }

    @Test
    public void testFetchResponseMetricsPartialResponse() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        Map allMetrics = this.metrics.metrics();
        KafkaMetric fetchSizeAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric recordsCountAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        MemoryRecords records = builder.build();
        int expectedBytes = 0;
        for (Record record : records.records()) {
            if (record.offset() < 1L) continue;
            expectedBytes += record.sizeInBytes();
        }
        this.fetchRecords(this.tidp0, records, Errors.NONE, 100L, 0);
        Assertions.assertEquals((double)expectedBytes, (double)((Double)fetchSizeAverage.metricValue()), (double)1.0E-4);
        Assertions.assertEquals((double)2.0, (double)((Double)recordsCountAverage.metricValue()), (double)1.0E-4);
    }

    @Test
    public void testFetchResponseMetricsWithOnePartitionError() {
        this.buildFetcher();
        this.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seek(this.tp0, 0L);
        this.subscriptions.seek(this.tp1, 0L);
        Map allMetrics = this.metrics.metrics();
        KafkaMetric fetchSizeAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric recordsCountAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        MemoryRecords records = builder.build();
        HashMap<TopicIdPartition, FetchResponseData.PartitionData> partitions = new HashMap<TopicIdPartition, FetchResponseData.PartitionData>();
        partitions.put(this.tidp0, new FetchResponseData.PartitionData().setPartitionIndex(this.tp0.partition()).setHighWatermark(100L).setLogStartOffset(0L).setRecords((BaseRecords)records));
        partitions.put(this.tidp1, new FetchResponseData.PartitionData().setPartitionIndex(this.tp1.partition()).setErrorCode(Errors.OFFSET_OUT_OF_RANGE.code()).setHighWatermark(100L).setLogStartOffset(0L));
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)FetchResponse.of((Errors)Errors.NONE, (int)0, (int)0, new LinkedHashMap(partitions)));
        this.consumerClient.poll(this.time.timer(0L));
        this.fetcher.collectFetch();
        int expectedBytes = 0;
        for (Record record : records.records()) {
            expectedBytes += record.sizeInBytes();
        }
        Assertions.assertEquals((double)expectedBytes, (double)((Double)fetchSizeAverage.metricValue()), (double)1.0E-4);
        Assertions.assertEquals((double)3.0, (double)((Double)recordsCountAverage.metricValue()), (double)1.0E-4);
    }

    @Test
    public void testFetchResponseMetricsWithOnePartitionAtTheWrongOffset() {
        this.buildFetcher();
        this.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seek(this.tp0, 0L);
        this.subscriptions.seek(this.tp1, 0L);
        Map allMetrics = this.metrics.metrics();
        KafkaMetric fetchSizeAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric recordsCountAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.subscriptions.seek(this.tp1, 5L);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        MemoryRecords records = builder.build();
        HashMap<TopicIdPartition, FetchResponseData.PartitionData> partitions = new HashMap<TopicIdPartition, FetchResponseData.PartitionData>();
        partitions.put(this.tidp0, new FetchResponseData.PartitionData().setPartitionIndex(this.tp0.partition()).setHighWatermark(100L).setLogStartOffset(0L).setRecords((BaseRecords)records));
        partitions.put(this.tidp1, new FetchResponseData.PartitionData().setPartitionIndex(this.tp1.partition()).setHighWatermark(100L).setLogStartOffset(0L).setRecords((BaseRecords)MemoryRecords.withRecords((CompressionType)CompressionType.NONE, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord("val".getBytes())})));
        this.client.prepareResponse((AbstractResponse)FetchResponse.of((Errors)Errors.NONE, (int)0, (int)0, new LinkedHashMap(partitions)));
        this.consumerClient.poll(this.time.timer(0L));
        this.fetcher.collectFetch();
        int expectedBytes = 0;
        for (Record record : records.records()) {
            expectedBytes += record.sizeInBytes();
        }
        Assertions.assertEquals((double)expectedBytes, (double)((Double)fetchSizeAverage.metricValue()), (double)1.0E-4);
        Assertions.assertEquals((double)3.0, (double)((Double)recordsCountAverage.metricValue()), (double)1.0E-4);
    }

    @Test
    public void testFetcherMetricsTemplates() {
        Map<String, String> clientTags = Collections.singletonMap("client-id", "clientA");
        this.buildFetcher(new MetricConfig().tags(clientTags), OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Fetcher.throttleTimeSensor((Metrics)this.metrics, (FetcherMetricsRegistry)this.metricsRegistry);
        HashSet<MetricNameTemplate> allMetrics = new HashSet<MetricNameTemplate>();
        for (MetricName n : this.metrics.metrics().keySet()) {
            String name = n.name().replaceAll(this.tp0.toString(), "{topic}-{partition}");
            if (n.group().equals("kafka-metrics-count")) continue;
            allMetrics.add(new MetricNameTemplate(name, n.group(), "", n.tags().keySet()));
        }
        TestUtils.checkEquals(allMetrics, new HashSet(this.metricsRegistry.getAllTemplates()), "metrics", "templates");
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(TopicIdPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) {
        return this.fetchRecords(tp, records, error, hw, -1L, throttleTime);
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(TopicIdPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, int throttleTime) {
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(tp, records, error, hw, lastStableOffset, throttleTime));
        this.consumerClient.poll(this.time.timer(0L));
        return this.fetchedRecords();
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(TopicIdPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, long logStartOffset, int throttleTime) {
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(tp, records, error, hw, lastStableOffset, logStartOffset, throttleTime));
        this.consumerClient.poll(this.time.timer(0L));
        return this.fetchedRecords();
    }

    @Test
    public void testGetOffsetsForTimesTimeout() {
        this.buildFetcher();
        Assertions.assertThrows(TimeoutException.class, () -> this.fetcher.offsetsForTimes(Collections.singletonMap(new TopicPartition(this.topicName, 2), 1000L), this.time.timer(100L)));
    }

    @Test
    public void testGetOffsetsForTimes() {
        this.buildFetcher();
        Assertions.assertTrue((boolean)this.fetcher.offsetsForTimes(new HashMap(), this.time.timer(100L)).isEmpty());
        this.testGetOffsetsForTimesWithUnknownOffset();
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, -1L, 100L, null, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NOT_LEADER_OR_FOLLOWER, Errors.INVALID_REQUEST, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NOT_LEADER_OR_FOLLOWER, Errors.NONE, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, Errors.NONE, 10L, 100L, null, 100L);
        this.testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L);
    }

    @Test
    public void testGetOffsetsFencedLeaderEpoch() {
        this.buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(this.initialUpdateResponse);
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(Errors.FENCED_LEADER_EPOCH, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() {
        List<Errors> retriableErrors = Arrays.asList(Errors.NOT_LEADER_OR_FOLLOWER, Errors.REPLICA_NOT_AVAILABLE, Errors.KAFKA_STORAGE_ERROR, Errors.OFFSET_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE, Errors.FENCED_LEADER_EPOCH, Errors.UNKNOWN_LEADER_EPOCH);
        int newLeaderEpoch = 3;
        MetadataResponse updatedMetadata = RequestTestUtils.metadataUpdateWithIds("dummy", 3, Collections.singletonMap(this.topicName, Errors.NONE), Collections.singletonMap(this.topicName, 4), tp -> 3, this.topicIds);
        Node originalLeader = this.initialUpdateResponse.buildCluster().leaderFor(this.tp1);
        Node newLeader = updatedMetadata.buildCluster().leaderFor(this.tp1);
        Assertions.assertNotEquals((Object)originalLeader, (Object)newLeader);
        for (Errors retriableError : retriableErrors) {
            this.buildFetcher();
            this.subscriptions.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0, this.tp1}));
            this.client.updateMetadata(this.initialUpdateResponse);
            long fetchTimestamp = 10L;
            ListOffsetsResponseData.ListOffsetsPartitionResponse tp0NoError = new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp0.partition()).setErrorCode(Errors.NONE.code()).setTimestamp(10L).setOffset(4L);
            List<ListOffsetsResponseData.ListOffsetsTopicResponse> topics = Collections.singletonList(new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(this.tp0.topic()).setPartitions(Arrays.asList(tp0NoError, new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp1.partition()).setErrorCode(retriableError.code()).setTimestamp(-1L).setOffset(-1L))));
            ListOffsetsResponseData data = new ListOffsetsResponseData().setThrottleTimeMs(0).setTopics(topics);
            this.client.prepareResponseFrom(body -> {
                boolean isListOffsetRequest = body instanceof ListOffsetsRequest;
                if (isListOffsetRequest) {
                    ListOffsetsRequest request = (ListOffsetsRequest)body;
                    List<ListOffsetsRequestData.ListOffsetsTopic> expectedTopics = Collections.singletonList(new ListOffsetsRequestData.ListOffsetsTopic().setName(this.tp0.topic()).setPartitions(Arrays.asList(new ListOffsetsRequestData.ListOffsetsPartition().setPartitionIndex(this.tp1.partition()).setTimestamp(10L).setCurrentLeaderEpoch(-1), new ListOffsetsRequestData.ListOffsetsPartition().setPartitionIndex(this.tp0.partition()).setTimestamp(10L).setCurrentLeaderEpoch(-1))));
                    return request.topics().equals(expectedTopics);
                }
                return false;
            }, (AbstractResponse)new ListOffsetsResponse(data), originalLeader);
            this.client.prepareMetadataUpdate(updatedMetadata);
            List<ListOffsetsResponseData.ListOffsetsTopicResponse> topicsWithFatalError = Collections.singletonList(new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(this.tp0.topic()).setPartitions(Arrays.asList(tp0NoError, new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp1.partition()).setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()).setTimestamp(-1L).setOffset(-1L))));
            ListOffsetsResponseData dataWithFatalError = new ListOffsetsResponseData().setThrottleTimeMs(0).setTopics(topicsWithFatalError);
            this.client.prepareResponseFrom((AbstractResponse)new ListOffsetsResponse(dataWithFatalError), originalLeader);
            this.client.prepareResponseFrom(body -> {
                boolean isListOffsetRequest = body instanceof ListOffsetsRequest;
                if (isListOffsetRequest) {
                    ListOffsetsRequest request = (ListOffsetsRequest)body;
                    ListOffsetsRequestData.ListOffsetsTopic requestTopic = (ListOffsetsRequestData.ListOffsetsTopic)request.topics().get(0);
                    ListOffsetsRequestData.ListOffsetsPartition expectedPartition = new ListOffsetsRequestData.ListOffsetsPartition().setPartitionIndex(this.tp1.partition()).setTimestamp(10L).setCurrentLeaderEpoch(3);
                    return expectedPartition.equals(requestTopic.partitions().get(0));
                }
                return false;
            }, (AbstractResponse)this.listOffsetResponse(this.tp1, Errors.NONE, 10L, 5L), newLeader);
            Map offsetAndTimestampMap = this.fetcher.offsetsForTimes(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.tp0, (Object)10L), Utils.mkEntry((Object)this.tp1, (Object)10L)}), this.time.timer(Integer.MAX_VALUE));
            Assertions.assertEquals((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.tp0, (Object)new OffsetAndTimestamp(4L, 10L)), Utils.mkEntry((Object)this.tp1, (Object)new OffsetAndTimestamp(5L, 10L))}), (Object)offsetAndTimestampMap);
            Assertions.assertEquals((int)1, (int)this.client.numAwaitingResponses());
            this.fetcher.close();
        }
    }

    @Test
    public void testGetOffsetsUnknownLeaderEpoch() {
        this.buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(Errors.UNKNOWN_LEADER_EPOCH, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testGetOffsetsIncludesLeaderEpoch() {
        this.buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(this.initialUpdateResponse);
        MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap(this.topicName, 4), tp -> 99, this.topicIds);
        this.client.updateMetadata(metadataResponse);
        this.subscriptions.requestOffsetReset(this.tp0);
        this.fetcher.resetOffsetsIfNeeded();
        MockClient.RequestMatcher matcher = body -> {
            if (body instanceof ListOffsetsRequest) {
                ListOffsetsRequest offsetRequest = (ListOffsetsRequest)body;
                int epoch = ((ListOffsetsRequestData.ListOffsetsTopic)offsetRequest.topics().get(0)).partitions().get(0).currentLeaderEpoch();
                Assertions.assertTrue((epoch != -1 ? 1 : 0) != 0, (String)"Expected Fetcher to set leader epoch in request");
                Assertions.assertEquals((int)epoch, (int)99, (String)"Expected leader epoch to match epoch from metadata update");
                return true;
            }
            Assertions.fail((String)"Should have seen ListOffsetRequest");
            return false;
        };
        this.client.prepareResponse(matcher, (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
    }

    @Test
    public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersNotKnownInitially() {
        this.buildFetcher();
        this.subscriptions.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0, this.tp1}));
        String anotherTopic = "another-topic";
        TopicPartition t2p0 = new TopicPartition("another-topic", 0);
        this.client.reset();
        MetadataResponse initialMetadata = RequestTestUtils.metadataUpdateWithIds(3, Collections.singletonMap(this.topicName, 2), this.topicIds);
        this.client.updateMetadata(initialMetadata);
        this.client.prepareMetadataUpdate(initialMetadata);
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, 1000L, 11L), this.metadata.fetch().leaderFor(this.tp0));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(this.tp1, Errors.NONE, 1000L, 32L), this.metadata.fetch().leaderFor(this.tp1));
        HashMap<String, Integer> partitionNumByTopic = new HashMap<String, Integer>();
        partitionNumByTopic.put(this.topicName, 2);
        partitionNumByTopic.put("another-topic", 1);
        this.topicIds.put("another-topic", Uuid.randomUuid());
        MetadataResponse updatedMetadata = RequestTestUtils.metadataUpdateWithIds(3, partitionNumByTopic, this.topicIds);
        this.client.prepareMetadataUpdate(updatedMetadata);
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(t2p0, Errors.NONE, 1000L, 54L), this.metadata.fetch().leaderFor(t2p0));
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<TopicPartition, Long>();
        timestampToSearch.put(this.tp0, -1L);
        timestampToSearch.put(this.tp1, -1L);
        timestampToSearch.put(t2p0, -1L);
        Map offsetAndTimestampMap = this.fetcher.offsetsForTimes(timestampToSearch, this.time.timer(Long.MAX_VALUE));
        Assertions.assertNotNull(offsetAndTimestampMap.get(this.tp0), (String)("Expect Fetcher.offsetsForTimes() to return non-null result for " + this.tp0));
        Assertions.assertNotNull(offsetAndTimestampMap.get(this.tp1), (String)("Expect Fetcher.offsetsForTimes() to return non-null result for " + this.tp1));
        Assertions.assertNotNull(offsetAndTimestampMap.get(t2p0), (String)("Expect Fetcher.offsetsForTimes() to return non-null result for " + t2p0));
        Assertions.assertEquals((long)11L, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(this.tp0)).offset());
        Assertions.assertEquals((long)32L, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(this.tp1)).offset());
        Assertions.assertEquals((long)54L, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(t2p0)).offset());
    }

    @Test
    public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersDisconnectException() {
        this.buildFetcher();
        String anotherTopic = "another-topic";
        TopicPartition t2p0 = new TopicPartition("another-topic", 0);
        this.subscriptions.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0, t2p0}));
        this.client.reset();
        MetadataResponse initialMetadata = RequestTestUtils.metadataUpdateWithIds(1, Collections.singletonMap(this.topicName, 1), this.topicIds);
        this.client.updateMetadata(initialMetadata);
        HashMap<String, Integer> partitionNumByTopic = new HashMap<String, Integer>();
        partitionNumByTopic.put(this.topicName, 1);
        partitionNumByTopic.put("another-topic", 1);
        this.topicIds.put("another-topic", Uuid.randomUuid());
        MetadataResponse updatedMetadata = RequestTestUtils.metadataUpdateWithIds(1, partitionNumByTopic, this.topicIds);
        this.client.prepareMetadataUpdate(updatedMetadata);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, 1000L, 11L), true);
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, 1000L, 11L), this.metadata.fetch().leaderFor(this.tp0));
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<TopicPartition, Long>();
        timestampToSearch.put(this.tp0, -1L);
        Map offsetAndTimestampMap = this.fetcher.offsetsForTimes(timestampToSearch, this.time.timer(Long.MAX_VALUE));
        Assertions.assertNotNull(offsetAndTimestampMap.get(this.tp0), (String)("Expect Fetcher.offsetsForTimes() to return non-null result for " + this.tp0));
        Assertions.assertEquals((long)11L, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(this.tp0)).offset());
        Assertions.assertNotNull((Object)this.metadata.fetch().partitionCountForTopic("another-topic"));
    }

    @Test
    public void testListOffsetsWithZeroTimeout() {
        this.buildFetcher();
        HashMap<TopicPartition, Long> offsetsToSearch = new HashMap<TopicPartition, Long>();
        offsetsToSearch.put(this.tp0, -2L);
        offsetsToSearch.put(this.tp1, -2L);
        HashMap<TopicPartition, Object> offsetsToExpect = new HashMap<TopicPartition, Object>();
        offsetsToExpect.put(this.tp0, null);
        offsetsToExpect.put(this.tp1, null);
        Assertions.assertEquals(offsetsToExpect, (Object)this.fetcher.offsetsForTimes(offsetsToSearch, this.time.timer(0L)));
    }

    @Test
    public void testBatchedListOffsetsMetadataErrors() {
        this.buildFetcher();
        ListOffsetsResponseData data = new ListOffsetsResponseData().setThrottleTimeMs(0).setTopics(Collections.singletonList(new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(this.tp0.topic()).setPartitions(Arrays.asList(new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp0.partition()).setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()).setTimestamp(-1L).setOffset(-1L), new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp1.partition()).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setTimestamp(-1L).setOffset(-1L)))));
        this.client.prepareResponse((AbstractResponse)new ListOffsetsResponse(data));
        HashMap<TopicPartition, Long> offsetsToSearch = new HashMap<TopicPartition, Long>();
        offsetsToSearch.put(this.tp0, -2L);
        offsetsToSearch.put(this.tp1, -2L);
        Assertions.assertThrows(TimeoutException.class, () -> this.fetcher.offsetsForTimes(offsetsToSearch, this.time.timer(1L)));
    }

    @Test
    public void testSkippingAbortedTransactions() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int currentOffset = 0;
        currentOffset += this.appendTransactionalRecords(buffer, 1L, (long)currentOffset, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()));
        this.abortTransaction(buffer, 1L, currentOffset);
        buffer.flip();
        List<FetchResponseData.AbortedTransaction> abortedTransactions = Collections.singletonList(new FetchResponseData.AbortedTransaction().setProducerId(1L).setFirstOffset(0L));
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Fetch fetch = this.collectFetch();
        Assertions.assertEquals(Collections.emptyMap(), (Object)fetch.records());
        Assertions.assertTrue((boolean)fetch.positionAdvanced());
    }

    @Test
    public void testReturnCommittedTransactions() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int currentOffset = 0;
        currentOffset += this.appendTransactionalRecords(buffer, 1L, (long)currentOffset, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()));
        this.commitTransaction(buffer, 1L, currentOffset);
        buffer.flip();
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(body -> {
            FetchRequest request = (FetchRequest)body;
            Assertions.assertEquals((Object)IsolationLevel.READ_COMMITTED, (Object)request.isolationLevel());
            return true;
        }, (AbstractResponse)this.fullFetchResponseWithAbortedTransactions(records, Collections.emptyList(), Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)fetchedRecords.containsKey(this.tp0));
        Assertions.assertEquals((int)fetchedRecords.get(this.tp0).size(), (int)2);
    }

    @Test
    public void testReadCommittedWithCommittedAndAbortedTransactions() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        ArrayList<FetchResponseData.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponseData.AbortedTransaction>();
        long pid1 = 1L;
        long pid2 = 2L;
        this.appendTransactionalRecords(buffer, pid1, 0L, new SimpleRecord("commit1-1".getBytes(), "value".getBytes()), new SimpleRecord("commit1-2".getBytes(), "value".getBytes()));
        this.appendTransactionalRecords(buffer, pid2, 2L, new SimpleRecord("abort2-1".getBytes(), "value".getBytes()));
        this.commitTransaction(buffer, pid1, 3L);
        this.appendTransactionalRecords(buffer, pid2, 4L, new SimpleRecord("abort2-2".getBytes(), "value".getBytes()));
        this.abortTransaction(buffer, pid2, 5L);
        abortedTransactions.add(new FetchResponseData.AbortedTransaction().setProducerId(pid2).setFirstOffset(2L));
        this.appendTransactionalRecords(buffer, pid1, 6L, new SimpleRecord("abort1-1".getBytes(), "value".getBytes()));
        this.appendTransactionalRecords(buffer, pid2, 7L, new SimpleRecord("commit2-1".getBytes(), "value".getBytes()));
        this.appendTransactionalRecords(buffer, pid1, 8L, new SimpleRecord("abort1-2".getBytes(), "value".getBytes()));
        this.abortTransaction(buffer, pid1, 9L);
        abortedTransactions.add(new FetchResponseData.AbortedTransaction().setProducerId(1L).setFirstOffset(6L));
        this.commitTransaction(buffer, pid2, 10L);
        buffer.flip();
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)fetchedRecords.containsKey(this.tp0));
        List fetchedConsumerRecords = fetchedRecords.get(this.tp0);
        HashSet<String> fetchedKeys = new HashSet<String>();
        for (ConsumerRecord consumerRecord : fetchedConsumerRecords) {
            fetchedKeys.add(new String((byte[])consumerRecord.key(), StandardCharsets.UTF_8));
        }
        Assertions.assertEquals((Object)Utils.mkSet((Object[])new String[]{"commit1-1", "commit1-2", "commit2-1"}), fetchedKeys);
    }

    @Test
    public void testMultipleAbortMarkers() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int currentOffset = 0;
        currentOffset += this.appendTransactionalRecords(buffer, 1L, (long)currentOffset, new SimpleRecord(this.time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "abort1-2".getBytes(), "value".getBytes()));
        currentOffset += this.abortTransaction(buffer, 1L, currentOffset);
        currentOffset += this.abortTransaction(buffer, 1L, currentOffset);
        currentOffset += this.appendTransactionalRecords(buffer, 1L, (long)currentOffset, new SimpleRecord(this.time.milliseconds(), "commit1-1".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "commit1-2".getBytes(), "value".getBytes()));
        this.commitTransaction(buffer, 1L, currentOffset);
        buffer.flip();
        List<FetchResponseData.AbortedTransaction> abortedTransactions = Collections.singletonList(new FetchResponseData.AbortedTransaction().setProducerId(1L).setFirstOffset(0L));
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)fetchedRecords.containsKey(this.tp0));
        Assertions.assertEquals((int)fetchedRecords.get(this.tp0).size(), (int)2);
        List fetchedConsumerRecords = fetchedRecords.get(this.tp0);
        HashSet<String> committedKeys = new HashSet<String>(Arrays.asList("commit1-1", "commit1-2"));
        HashSet<String> actuallyCommittedKeys = new HashSet<String>();
        for (ConsumerRecord consumerRecord : fetchedConsumerRecords) {
            actuallyCommittedKeys.add(new String((byte[])consumerRecord.key(), StandardCharsets.UTF_8));
        }
        Assertions.assertEquals(actuallyCommittedKeys, committedKeys);
    }

    @Test
    public void testReadCommittedAbortMarkerWithNoData() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        long producerId = 1L;
        this.abortTransaction(buffer, producerId, 5L);
        this.appendTransactionalRecords(buffer, producerId, 6L, new SimpleRecord("6".getBytes(), null), new SimpleRecord("7".getBytes(), null), new SimpleRecord("8".getBytes(), null));
        this.commitTransaction(buffer, producerId, 9L);
        buffer.flip();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        List<FetchResponseData.AbortedTransaction> abortedTransactions = Collections.singletonList(new FetchResponseData.AbortedTransaction().setProducerId(producerId).setFirstOffset(0L));
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponseWithAbortedTransactions(MemoryRecords.readableRecords((ByteBuffer)buffer), abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map allFetchedRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)allFetchedRecords.containsKey(this.tp0));
        List fetchedRecords = allFetchedRecords.get(this.tp0);
        Assertions.assertEquals((int)3, (int)fetchedRecords.size());
        Assertions.assertEquals(Arrays.asList(6L, 7L, 8L), this.collectRecordOffsets(fetchedRecords));
    }

    @Test
    public void testUpdatePositionWithLastRecordMissingFromBatch() {
        this.buildFetcher();
        MemoryRecords records = MemoryRecords.withRecords((CompressionType)CompressionType.NONE, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord("0".getBytes(), "v".getBytes()), new SimpleRecord("1".getBytes(), "v".getBytes()), new SimpleRecord("2".getBytes(), "v".getBytes()), new SimpleRecord(null, "value".getBytes())});
        MemoryRecords.FilterResult result = records.filterTo(this.tp0, new MemoryRecords.RecordFilter(0L, 0L){

            protected MemoryRecords.RecordFilter.BatchRetentionResult checkBatchRetention(RecordBatch batch) {
                return new MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.DELETE_EMPTY, false);
            }

            protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
                return record.key() != null;
            }
        }, ByteBuffer.allocate(1024), Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
        result.outputBuffer().flip();
        MemoryRecords compactedRecords = MemoryRecords.readableRecords((ByteBuffer)result.outputBuffer());
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, compactedRecords, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map allFetchedRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)allFetchedRecords.containsKey(this.tp0));
        List fetchedRecords = allFetchedRecords.get(this.tp0);
        Assertions.assertEquals((int)3, (int)fetchedRecords.size());
        for (int i = 0; i < 3; ++i) {
            Assertions.assertEquals((Object)Integer.toString(i), (Object)new String((byte[])fetchedRecords.get(i).key()));
        }
        Assertions.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdatePositionOnEmptyBatch() {
        this.buildFetcher();
        long producerId = 1L;
        short producerEpoch = 0;
        int sequence = 1;
        long baseOffset = 37L;
        long lastOffset = 54L;
        int partitionLeaderEpoch = 7;
        ByteBuffer buffer = ByteBuffer.allocate(61);
        DefaultRecordBatch.writeEmptyHeader((ByteBuffer)buffer, (byte)2, (long)producerId, (short)producerEpoch, (int)sequence, (long)baseOffset, (long)lastOffset, (int)partitionLeaderEpoch, (TimestampType)TimestampType.CREATE_TIME, (long)System.currentTimeMillis(), (boolean)false, (boolean)false);
        buffer.flip();
        MemoryRecords recordsWithEmptyBatch = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, recordsWithEmptyBatch, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Fetch fetch = this.collectFetch();
        Assertions.assertEquals(Collections.emptyMap(), (Object)fetch.records());
        Assertions.assertTrue((boolean)fetch.positionAdvanced());
        Assertions.assertEquals((long)(lastOffset + 1L), (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testReadCommittedWithCompactedTopic() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        long pid1 = 1L;
        long pid2 = 2L;
        long pid3 = 3L;
        this.appendTransactionalRecords(buffer, pid3, 3L, new SimpleRecord("3".getBytes(), "value".getBytes()), new SimpleRecord("4".getBytes(), "value".getBytes()));
        this.appendTransactionalRecords(buffer, pid2, 15L, new SimpleRecord("15".getBytes(), "value".getBytes()), new SimpleRecord("16".getBytes(), "value".getBytes()), new SimpleRecord("17".getBytes(), "value".getBytes()));
        this.appendTransactionalRecords(buffer, pid1, 22L, new SimpleRecord("22".getBytes(), "value".getBytes()), new SimpleRecord("23".getBytes(), "value".getBytes()));
        this.abortTransaction(buffer, pid2, 28L);
        this.appendTransactionalRecords(buffer, pid3, 30L, new SimpleRecord("30".getBytes(), "value".getBytes()), new SimpleRecord("31".getBytes(), "value".getBytes()), new SimpleRecord("32".getBytes(), "value".getBytes()));
        this.commitTransaction(buffer, pid3, 35L);
        this.appendTransactionalRecords(buffer, pid1, 39L, new SimpleRecord("39".getBytes(), "value".getBytes()), new SimpleRecord("40".getBytes(), "value".getBytes()));
        buffer.flip();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        List<FetchResponseData.AbortedTransaction> abortedTransactions = Arrays.asList(new FetchResponseData.AbortedTransaction().setProducerId(pid2).setFirstOffset(6L), new FetchResponseData.AbortedTransaction().setProducerId(pid1).setFirstOffset(0L));
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponseWithAbortedTransactions(MemoryRecords.readableRecords((ByteBuffer)buffer), abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map allFetchedRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)allFetchedRecords.containsKey(this.tp0));
        List fetchedRecords = allFetchedRecords.get(this.tp0);
        Assertions.assertEquals((int)5, (int)fetchedRecords.size());
        Assertions.assertEquals(Arrays.asList(3L, 4L, 30L, 31L, 32L), this.collectRecordOffsets(fetchedRecords));
    }

    @Test
    public void testReturnAbortedTransactionsinUncommittedMode() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int currentOffset = 0;
        currentOffset += this.appendTransactionalRecords(buffer, 1L, (long)currentOffset, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()));
        this.abortTransaction(buffer, 1L, currentOffset);
        buffer.flip();
        List<FetchResponseData.AbortedTransaction> abortedTransactions = Collections.singletonList(new FetchResponseData.AbortedTransaction().setProducerId(1L).setFirstOffset(0L));
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)fetchedRecords.containsKey(this.tp0));
    }

    @Test
    public void testConsumerPositionUpdatedWhenSkippingAbortedTransactions() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        long currentOffset = 0L;
        currentOffset += (long)this.appendTransactionalRecords(buffer, 1L, currentOffset, new SimpleRecord(this.time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "abort1-2".getBytes(), "value".getBytes()));
        currentOffset += (long)this.abortTransaction(buffer, 1L, currentOffset);
        buffer.flip();
        List<FetchResponseData.AbortedTransaction> abortedTransactions = Collections.singletonList(new FetchResponseData.AbortedTransaction().setProducerId(1L).setFirstOffset(0L));
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetchedRecords();
        Assertions.assertFalse((boolean)fetchedRecords.containsKey(this.tp0));
        Assertions.assertEquals((long)currentOffset, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testConsumingViaIncrementalFetchRequests() {
        this.buildFetcher(2);
        this.assignFromUser(new HashSet<TopicPartition>(Arrays.asList(this.tp0, this.tp1)));
        this.subscriptions.seekValidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.empty(), this.metadata.currentLeader(this.tp0)));
        this.subscriptions.seekValidated(this.tp1, new SubscriptionState.FetchPosition(1L, Optional.empty(), this.metadata.currentLeader(this.tp1)));
        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> partitions1 = new LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>();
        partitions1.put(this.tidp0, new FetchResponseData.PartitionData().setPartitionIndex(this.tp0.partition()).setHighWatermark(2L).setLastStableOffset(2L).setLogStartOffset(0L).setRecords((BaseRecords)this.records));
        partitions1.put(this.tidp1, new FetchResponseData.PartitionData().setPartitionIndex(this.tp1.partition()).setHighWatermark(100L).setLogStartOffset(0L).setRecords((BaseRecords)this.emptyRecords));
        FetchResponse resp1 = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)123, partitions1);
        this.client.prepareResponse((AbstractResponse)resp1);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetchedRecords();
        Assertions.assertFalse((boolean)fetchedRecords.containsKey(this.tp1));
        List records = fetchedRecords.get(this.tp0);
        Assertions.assertEquals((int)2, (int)records.size());
        Assertions.assertEquals((long)3L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assertions.assertEquals((long)1L, (long)this.subscriptions.position((TopicPartition)this.tp1).offset);
        Assertions.assertEquals((long)1L, (long)records.get(0).offset());
        Assertions.assertEquals((long)2L, (long)records.get(1).offset());
        Assertions.assertEquals((int)0, (int)this.fetcher.sendFetches());
        fetchedRecords = this.fetchedRecords();
        Assertions.assertFalse((boolean)fetchedRecords.containsKey(this.tp1));
        records = fetchedRecords.get(this.tp0);
        Assertions.assertEquals((int)1, (int)records.size());
        Assertions.assertEquals((long)3L, (long)records.get(0).offset());
        Assertions.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        LinkedHashMap partitions2 = new LinkedHashMap();
        FetchResponse resp2 = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)123, partitions2);
        this.client.prepareResponse((AbstractResponse)resp2);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        fetchedRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)fetchedRecords.isEmpty());
        Assertions.assertEquals((long)4L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assertions.assertEquals((long)1L, (long)this.subscriptions.position((TopicPartition)this.tp1).offset);
        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> partitions3 = new LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>();
        partitions3.put(this.tidp0, new FetchResponseData.PartitionData().setPartitionIndex(this.tp0.partition()).setHighWatermark(100L).setLastStableOffset(4L).setLogStartOffset(0L).setRecords((BaseRecords)this.nextRecords));
        FetchResponse resp3 = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)123, partitions3);
        this.client.prepareResponse((AbstractResponse)resp3);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        fetchedRecords = this.fetchedRecords();
        Assertions.assertFalse((boolean)fetchedRecords.containsKey(this.tp1));
        records = fetchedRecords.get(this.tp0);
        Assertions.assertEquals((int)2, (int)records.size());
        Assertions.assertEquals((long)6L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assertions.assertEquals((long)1L, (long)this.subscriptions.position((TopicPartition)this.tp1).offset);
        Assertions.assertEquals((long)4L, (long)records.get(0).offset());
        Assertions.assertEquals((long)5L, (long)records.get(1).offset());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFetcherConcurrency() throws Exception {
        int numPartitions = 20;
        HashSet<TopicPartition> topicPartitions = new HashSet<TopicPartition>();
        for (int i = 0; i < numPartitions; ++i) {
            topicPartitions.add(new TopicPartition(this.topicName, i));
        }
        LogContext logContext = new LogContext();
        this.buildDependencies(new MetricConfig(), Long.MAX_VALUE, new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST), logContext);
        this.fetcher = new Fetcher<byte[], byte[]>(new LogContext(), this.consumerClient, this.minBytes, this.maxBytes, this.maxWaitMs, this.fetchSize, 2 * numPartitions, true, "", (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), this.metadata, this.subscriptions, this.metrics, this.metricsRegistry, (Time)this.time, this.retryBackoffMs, this.requestTimeoutMs, IsolationLevel.READ_UNCOMMITTED, this.apiVersions){

            protected FetchSessionHandler sessionHandler(int id) {
                final FetchSessionHandler handler = super.sessionHandler(id);
                if (handler == null) {
                    return null;
                }
                return new FetchSessionHandler(new LogContext(), id){

                    public FetchSessionHandler.Builder newBuilder() {
                        this.verifySessionPartitions();
                        return handler.newBuilder();
                    }

                    public boolean handleResponse(FetchResponse response, short version) {
                        this.verifySessionPartitions();
                        return handler.handleResponse(response, version);
                    }

                    public void handleError(Throwable t) {
                        this.verifySessionPartitions();
                        handler.handleError(t);
                    }

                    private void verifySessionPartitions() {
                        try {
                            Field field = FetchSessionHandler.class.getDeclaredField("sessionPartitions");
                            field.setAccessible(true);
                            LinkedHashMap sessionPartitions = (LinkedHashMap)field.get(handler);
                            for (Map.Entry entry : sessionPartitions.entrySet()) {
                                Thread.yield();
                            }
                        }
                        catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                };
            }
        };
        MetadataResponse initialMetadataResponse = RequestTestUtils.metadataUpdateWithIds(1, Collections.singletonMap(this.topicName, numPartitions), tp -> this.validLeaderEpoch, this.topicIds);
        this.client.updateMetadata(initialMetadataResponse);
        this.fetchSize = 10000;
        this.assignFromUser(topicPartitions);
        topicPartitions.forEach(tp -> this.subscriptions.seek(tp, 0L));
        AtomicInteger fetchesRemaining = new AtomicInteger(1000);
        this.executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = this.executorService.submit(() -> {
            while (fetchesRemaining.get() > 0) {
                ConsumerNetworkClient consumerNetworkClient = this.consumerClient;
                synchronized (consumerNetworkClient) {
                    if (!this.client.requests().isEmpty()) {
                        ClientRequest request = this.client.requests().peek();
                        FetchRequest fetchRequest = (FetchRequest)request.requestBuilder().build();
                        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> responseMap = new LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>();
                        for (Map.Entry entry : fetchRequest.fetchData(this.topicNames).entrySet()) {
                            TopicIdPartition tp = (TopicIdPartition)entry.getKey();
                            long offset = ((FetchRequest.PartitionData)entry.getValue()).fetchOffset;
                            responseMap.put(tp, new FetchResponseData.PartitionData().setPartitionIndex(tp.topicPartition().partition()).setHighWatermark(offset + 2L).setLastStableOffset(offset + 2L).setLogStartOffset(0L).setRecords((BaseRecords)this.buildRecords(offset, 2, offset)));
                        }
                        this.client.respondToRequest(request, (AbstractResponse)FetchResponse.of((Errors)Errors.NONE, (int)0, (int)123, responseMap));
                        this.consumerClient.poll(this.time.timer(0L));
                    }
                }
            }
            return fetchesRemaining.get();
        });
        Map nextFetchOffsets = topicPartitions.stream().collect(Collectors.toMap(Function.identity(), t -> 0L));
        while (fetchesRemaining.get() > 0 && !future.isDone()) {
            Map<TopicPartition, List<ConsumerRecord<TopicPartition, List>>> fetchedRecords;
            if (this.fetcher.sendFetches() == 1) {
                ConsumerNetworkClient consumerNetworkClient = this.consumerClient;
                synchronized (consumerNetworkClient) {
                    this.consumerClient.poll(this.time.timer(0L));
                }
            }
            if (!this.fetcher.hasCompletedFetches() || (fetchedRecords = this.fetchedRecords()).isEmpty()) continue;
            fetchesRemaining.decrementAndGet();
            fetchedRecords.forEach((tp, records) -> {
                Assertions.assertEquals((int)2, (int)records.size());
                long nextOffset = (Long)nextFetchOffsets.get(tp);
                Assertions.assertEquals((long)nextOffset, (long)((ConsumerRecord)records.get(0)).offset());
                Assertions.assertEquals((long)(nextOffset + 1L), (long)((ConsumerRecord)records.get(1)).offset());
                nextFetchOffsets.put(tp, nextOffset + 2L);
            });
        }
        Assertions.assertEquals((Object)0, (Object)future.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFetcherSessionEpochUpdate() throws Exception {
        this.buildFetcher(2);
        MetadataResponse initialMetadataResponse = RequestTestUtils.metadataUpdateWithIds(1, Collections.singletonMap(this.topicName, 1), this.topicIds);
        this.client.updateMetadata(initialMetadataResponse);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        AtomicInteger fetchesRemaining = new AtomicInteger(1000);
        this.executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = this.executorService.submit(() -> {
            long nextOffset = 0L;
            long nextEpoch = 0L;
            while (fetchesRemaining.get() > 0) {
                ConsumerNetworkClient consumerNetworkClient = this.consumerClient;
                synchronized (consumerNetworkClient) {
                    if (!this.client.requests().isEmpty()) {
                        ClientRequest request = this.client.requests().peek();
                        FetchRequest fetchRequest = (FetchRequest)request.requestBuilder().build();
                        int epoch = fetchRequest.metadata().epoch();
                        Assertions.assertTrue((epoch == 0 || (long)epoch == nextEpoch ? 1 : 0) != 0, (String)String.format("Unexpected epoch expected %d got %d", nextEpoch, epoch));
                        ++nextEpoch;
                        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> responseMap = new LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>();
                        responseMap.put(this.tidp0, new FetchResponseData.PartitionData().setPartitionIndex(this.tp0.partition()).setHighWatermark(nextOffset + 2L).setLastStableOffset(nextOffset + 2L).setLogStartOffset(0L).setRecords((BaseRecords)this.buildRecords(nextOffset, 2, nextOffset)));
                        nextOffset += 2L;
                        this.client.respondToRequest(request, (AbstractResponse)FetchResponse.of((Errors)Errors.NONE, (int)0, (int)123, responseMap));
                        this.consumerClient.poll(this.time.timer(0L));
                    }
                }
            }
            return fetchesRemaining.get();
        });
        long nextFetchOffset = 0L;
        while (fetchesRemaining.get() > 0 && !future.isDone()) {
            if (this.fetcher.sendFetches() == 1) {
                ConsumerNetworkClient consumerNetworkClient = this.consumerClient;
                synchronized (consumerNetworkClient) {
                    this.consumerClient.poll(this.time.timer(0L));
                }
            }
            if (!this.fetcher.hasCompletedFetches()) continue;
            Map fetchedRecords = this.fetchedRecords();
            if (!fetchedRecords.isEmpty()) {
                fetchesRemaining.decrementAndGet();
                List records = fetchedRecords.get(this.tp0);
                Assertions.assertEquals((int)2, (int)records.size());
                Assertions.assertEquals((long)nextFetchOffset, (long)records.get(0).offset());
                Assertions.assertEquals((long)(nextFetchOffset + 1L), (long)records.get(1).offset());
                nextFetchOffset += 2L;
            }
            Assertions.assertTrue((boolean)this.fetchedRecords().isEmpty());
        }
        Assertions.assertEquals((Object)0, (Object)future.get());
    }

    @Test
    public void testEmptyControlBatch() {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int currentOffset = 1;
        DefaultRecordBatch.writeEmptyHeader((ByteBuffer)buffer, (byte)2, (long)1L, (short)0, (int)-1, (long)0L, (long)0L, (int)-1, (TimestampType)TimestampType.CREATE_TIME, (long)this.time.milliseconds(), (boolean)true, (boolean)true);
        currentOffset += this.appendTransactionalRecords(buffer, 1L, (long)currentOffset, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()));
        this.commitTransaction(buffer, 1L, currentOffset);
        buffer.flip();
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(body -> {
            FetchRequest request = (FetchRequest)body;
            Assertions.assertEquals((Object)IsolationLevel.READ_COMMITTED, (Object)request.isolationLevel());
            return true;
        }, (AbstractResponse)this.fullFetchResponseWithAbortedTransactions(records, Collections.emptyList(), Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)fetchedRecords.containsKey(this.tp0));
        Assertions.assertEquals((int)fetchedRecords.get(this.tp0).size(), (int)2);
    }

    private MemoryRecords buildRecords(long baseOffset, int count, long firstMessageId) {
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)baseOffset);
        for (int i = 0; i < count; ++i) {
            builder.append(0L, "key".getBytes(), ("value-" + (firstMessageId + (long)i)).getBytes());
        }
        return builder.build();
    }

    private int appendTransactionalRecords(ByteBuffer buffer, long pid, long baseOffset, int baseSequence, SimpleRecord ... records) {
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)baseOffset, (long)this.time.milliseconds(), (long)pid, (short)0, (int)baseSequence, (boolean)true, (int)-1);
        for (SimpleRecord record : records) {
            builder.append(record);
        }
        builder.build();
        return records.length;
    }

    private int appendTransactionalRecords(ByteBuffer buffer, long pid, long baseOffset, SimpleRecord ... records) {
        return this.appendTransactionalRecords(buffer, pid, baseOffset, (int)baseOffset, records);
    }

    private void commitTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
        short producerEpoch = 0;
        int partitionLeaderEpoch = 0;
        MemoryRecords.writeEndTransactionalMarker((ByteBuffer)buffer, (long)baseOffset, (long)this.time.milliseconds(), (int)partitionLeaderEpoch, (long)producerId, (short)producerEpoch, (EndTransactionMarker)new EndTransactionMarker(ControlRecordType.COMMIT, 0));
    }

    private int abortTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
        short producerEpoch = 0;
        int partitionLeaderEpoch = 0;
        MemoryRecords.writeEndTransactionalMarker((ByteBuffer)buffer, (long)baseOffset, (long)this.time.milliseconds(), (int)partitionLeaderEpoch, (long)producerId, (short)producerEpoch, (EndTransactionMarker)new EndTransactionMarker(ControlRecordType.ABORT, 0));
        return 1;
    }

    private void testGetOffsetsForTimesWithError(Errors errorForP0, Errors errorForP1, long offsetForP0, long offsetForP1, Long expectedOffsetForP0, Long expectedOffsetForP1) {
        this.client.reset();
        String topicName2 = "topic2";
        TopicPartition t2p0 = new TopicPartition(topicName2, 0);
        this.metadata.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"), (ClientDnsLookup)ClientDnsLookup.USE_ALL_DNS_IPS));
        HashMap<String, Integer> partitionNumByTopic = new HashMap<String, Integer>();
        partitionNumByTopic.put(this.topicName, 2);
        partitionNumByTopic.put(topicName2, 1);
        MetadataResponse updateMetadataResponse = RequestTestUtils.metadataUpdateWithIds(2, partitionNumByTopic, this.topicIds);
        Cluster updatedCluster = updateMetadataResponse.buildCluster();
        this.client.prepareMetadataUpdate(updateMetadataResponse, true);
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(t2p0, errorForP0, offsetForP0, offsetForP0), updatedCluster.leaderFor(t2p0));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(this.tp1, errorForP1, offsetForP1, offsetForP1), updatedCluster.leaderFor(this.tp1));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(t2p0, Errors.NONE, offsetForP0, offsetForP0), updatedCluster.leaderFor(t2p0));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(this.tp1, Errors.NONE, offsetForP1, offsetForP1), updatedCluster.leaderFor(this.tp1));
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<TopicPartition, Long>();
        timestampToSearch.put(t2p0, 0L);
        timestampToSearch.put(this.tp1, 0L);
        Map offsetAndTimestampMap = this.fetcher.offsetsForTimes(timestampToSearch, this.time.timer(Long.MAX_VALUE));
        if (expectedOffsetForP0 == null) {
            Assertions.assertNull(offsetAndTimestampMap.get(t2p0));
        } else {
            Assertions.assertEquals((long)expectedOffsetForP0, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(t2p0)).timestamp());
            Assertions.assertEquals((long)expectedOffsetForP0, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(t2p0)).offset());
        }
        if (expectedOffsetForP1 == null) {
            Assertions.assertNull(offsetAndTimestampMap.get(this.tp1));
        } else {
            Assertions.assertEquals((long)expectedOffsetForP1, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(this.tp1)).timestamp());
            Assertions.assertEquals((long)expectedOffsetForP1, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(this.tp1)).offset());
        }
    }

    private void testGetOffsetsForTimesWithUnknownOffset() {
        this.client.reset();
        MetadataResponse initialMetadataUpdate = RequestTestUtils.metadataUpdateWithIds(1, Collections.singletonMap(this.topicName, 1), this.topicIds);
        this.client.updateMetadata(initialMetadataUpdate);
        ListOffsetsResponseData data = new ListOffsetsResponseData().setThrottleTimeMs(0).setTopics(Collections.singletonList(new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(this.tp0.topic()).setPartitions(Collections.singletonList(new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp0.partition()).setErrorCode(Errors.NONE.code()).setTimestamp(-1L).setOffset(-1L)))));
        this.client.prepareResponseFrom((AbstractResponse)new ListOffsetsResponse(data), this.metadata.fetch().leaderFor(this.tp0));
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<TopicPartition, Long>();
        timestampToSearch.put(this.tp0, 0L);
        Map offsetAndTimestampMap = this.fetcher.offsetsForTimes(timestampToSearch, this.time.timer(Long.MAX_VALUE));
        Assertions.assertTrue((boolean)offsetAndTimestampMap.containsKey(this.tp0));
        Assertions.assertNull(offsetAndTimestampMap.get(this.tp0));
    }

    @Test
    public void testGetOffsetsForTimesWithUnknownOffsetV0() {
        this.buildFetcher();
        Assertions.assertTrue((boolean)this.fetcher.offsetsForTimes(new HashMap(), this.time.timer(100L)).isEmpty());
        this.client.reset();
        MetadataResponse initialMetadataUpdate = RequestTestUtils.metadataUpdateWithIds(1, Collections.singletonMap(this.topicName, 1), this.topicIds);
        this.client.updateMetadata(initialMetadataUpdate);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.apiVersions.update(node.idString(), NodeApiVersions.create((short)ApiKeys.LIST_OFFSETS.id, (short)0, (short)0));
        ListOffsetsResponseData data = new ListOffsetsResponseData().setThrottleTimeMs(0).setTopics(Collections.singletonList(new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(this.tp0.topic()).setPartitions(Collections.singletonList(new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp0.partition()).setErrorCode(Errors.NONE.code()).setTimestamp(-1L).setOldStyleOffsets(Collections.emptyList())))));
        this.client.prepareResponseFrom((AbstractResponse)new ListOffsetsResponse(data), this.metadata.fetch().leaderFor(this.tp0));
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<TopicPartition, Long>();
        timestampToSearch.put(this.tp0, 0L);
        Map offsetAndTimestampMap = this.fetcher.offsetsForTimes(timestampToSearch, this.time.timer(Long.MAX_VALUE));
        Assertions.assertTrue((boolean)offsetAndTimestampMap.containsKey(this.tp0));
        Assertions.assertNull(offsetAndTimestampMap.get(this.tp0));
    }

    @Test
    public void testSubscriptionPositionUpdatedWithEpoch() {
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L, (long)-1L, (long)-1L, (short)-1, (int)-1, (boolean)false, (int)1);
        builder.appendWithOffset(0L, 0L, "key".getBytes(), "value-1".getBytes());
        builder.appendWithOffset(1L, 0L, "key".getBytes(), "value-2".getBytes());
        builder.appendWithOffset(2L, 0L, "key".getBytes(), "value-3".getBytes());
        MemoryRecords records = builder.build();
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1, this.topicIds);
        this.metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, records, Errors.NONE, 100L, 0));
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assertions.assertEquals((long)this.subscriptions.position((TopicPartition)this.tp0).offset, (long)3L);
        TestUtils.assertOptional(this.subscriptions.position((TopicPartition)this.tp0).offsetEpoch, value -> Assertions.assertEquals((int)value, (int)1));
    }

    @Test
    public void testOffsetValidationRequestGrouping() {
        this.buildFetcher();
        this.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp0, this.tp1, this.tp2, this.tp3}));
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 3, Collections.emptyMap(), Collections.singletonMap(this.topicName, 4), tp -> 5, this.topicIds), false, 0L);
        for (TopicPartition tp2 : this.subscriptions.assignedPartitions()) {
            Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader((TopicPartition)tp2).leader, Optional.of(4));
            this.subscriptions.seekUnvalidated(tp2, new SubscriptionState.FetchPosition(0L, Optional.of(4), leaderAndEpoch));
        }
        HashSet<TopicPartition> allRequestedPartitions = new HashSet<TopicPartition>();
        for (Node node : this.metadata.fetch().nodes()) {
            this.apiVersions.update(node.idString(), NodeApiVersions.create());
            Set<TopicPartition> expectedPartitions = this.subscriptions.assignedPartitions().stream().filter(tp -> this.metadata.currentLeader((TopicPartition)tp).leader.equals(Optional.of(node))).collect(Collectors.toSet());
            Assertions.assertTrue((boolean)expectedPartitions.stream().noneMatch(allRequestedPartitions::contains));
            Assertions.assertTrue((expectedPartitions.size() > 0 ? 1 : 0) != 0);
            allRequestedPartitions.addAll(expectedPartitions);
            OffsetForLeaderEpochResponseData data = new OffsetForLeaderEpochResponseData();
            expectedPartitions.forEach(tp -> {
                OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult topic = data.topics().find(tp.topic());
                if (topic == null) {
                    topic = new OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult().setTopic(tp.topic());
                    data.topics().add(topic);
                }
                topic.partitions().add(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(tp.partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(4).setEndOffset(0L));
            });
            OffsetsForLeaderEpochResponse response = new OffsetsForLeaderEpochResponse(data);
            this.client.prepareResponseFrom(body -> {
                OffsetsForLeaderEpochRequest request = (OffsetsForLeaderEpochRequest)body;
                return expectedPartitions.equals(this.offsetForLeaderPartitionMap(request.data()).keySet());
            }, (AbstractResponse)response, node);
        }
        Assertions.assertEquals((Object)this.subscriptions.assignedPartitions(), allRequestedPartitions);
        this.fetcher.validateOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.subscriptions.assignedPartitions().stream().noneMatch(arg_0 -> ((SubscriptionState)this.subscriptions).awaitingValidation(arg_0)));
    }

    @Test
    public void testOffsetValidationAwaitsNodeApiVersion() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        boolean epochOne = true;
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1, this.topicIds), false, 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        Assertions.assertFalse((boolean)this.client.isConnected(node.idString()));
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(20L, Optional.of(1), leaderAndEpoch));
        Assertions.assertFalse((boolean)this.client.isConnected(node.idString()));
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        this.fetcher.validateOffsetsIfNeeded();
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        Assertions.assertTrue((boolean)this.client.isConnected(node.idString()));
        this.apiVersions.update(node.idString(), NodeApiVersions.create());
        this.client.prepareResponseFrom((AbstractResponse)this.prepareOffsetsForLeaderEpochResponse(this.tp0, Errors.NONE, 1, 30L), node);
        this.fetcher.validateOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.awaitingValidation(this.tp0));
        Assertions.assertEquals((long)20L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testOffsetValidationSkippedForOldBroker() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        boolean epochOne = true;
        int epochTwo = 2;
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1, this.topicIds), false, 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.apiVersions.update(node.idString(), NodeApiVersions.create((short)ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short)0, (short)2));
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), leaderAndEpoch));
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 2, this.topicIds), false, 0L);
        this.fetcher.validateOffsetsIfNeeded();
        Assertions.assertFalse((boolean)this.subscriptions.awaitingValidation(this.tp0));
        leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), leaderAndEpoch));
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 2, this.topicIds), false, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testOffsetValidationSkippedForOldResponse() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        boolean epochOne = true;
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1, this.topicIds), false, 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        Assertions.assertFalse((boolean)this.client.isConnected(node.idString()));
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(20L, Optional.of(1), leaderAndEpoch));
        Assertions.assertFalse((boolean)this.client.isConnected(node.idString()));
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        int responseVersion = 8;
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new, (short)8, this.topicIds), false, 0L);
        this.fetcher.validateOffsetsIfNeeded();
        Assertions.assertFalse((boolean)this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testOffsetValidationResetOffsetForUndefinedEpochWithDefinedResetPolicy() {
        this.testOffsetValidationWithGivenEpochOffset(-1, 0L, OffsetResetStrategy.EARLIEST);
    }

    @Test
    public void testOffsetValidationResetOffsetForUndefinedOffsetWithDefinedResetPolicy() {
        this.testOffsetValidationWithGivenEpochOffset(2, -1L, OffsetResetStrategy.EARLIEST);
    }

    @Test
    public void testOffsetValidationResetOffsetForUndefinedEpochWithUndefinedResetPolicy() {
        this.testOffsetValidationWithGivenEpochOffset(-1, 0L, OffsetResetStrategy.NONE);
    }

    @Test
    public void testOffsetValidationResetOffsetForUndefinedOffsetWithUndefinedResetPolicy() {
        this.testOffsetValidationWithGivenEpochOffset(2, -1L, OffsetResetStrategy.NONE);
    }

    @Test
    public void testOffsetValidationTriggerLogTruncationForBadOffsetWithUndefinedResetPolicy() {
        this.testOffsetValidationWithGivenEpochOffset(1, 1L, OffsetResetStrategy.NONE);
    }

    private void testOffsetValidationWithGivenEpochOffset(int leaderEpoch, long endOffset, OffsetResetStrategy offsetResetStrategy) {
        this.buildFetcher(offsetResetStrategy);
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        boolean epochOne = true;
        long initialOffset = 5L;
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1, this.topicIds), false, 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.apiVersions.update(node.idString(), NodeApiVersions.create());
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(5L, Optional.of(1), leaderAndEpoch));
        this.fetcher.validateOffsetsIfNeeded();
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
        this.client.respond(this.offsetsForLeaderEpochRequestMatcher(this.tp0, 1, 1), (AbstractResponse)this.prepareOffsetsForLeaderEpochResponse(this.tp0, Errors.NONE, leaderEpoch, endOffset));
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        if (offsetResetStrategy == OffsetResetStrategy.NONE) {
            LogTruncationException thrown = (LogTruncationException)Assertions.assertThrows(LogTruncationException.class, () -> this.fetcher.validateOffsetsIfNeeded());
            Assertions.assertEquals(Collections.singletonMap(this.tp0, 5L), (Object)thrown.offsetOutOfRangePartitions());
            if (endOffset == -1L || leaderEpoch == -1) {
                Assertions.assertEquals(Collections.emptyMap(), (Object)thrown.divergentOffsets());
            } else {
                OffsetAndMetadata expectedDivergentOffset = new OffsetAndMetadata(endOffset, Optional.of(leaderEpoch), "");
                Assertions.assertEquals(Collections.singletonMap(this.tp0, expectedDivergentOffset), (Object)thrown.divergentOffsets());
            }
            Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        } else {
            this.fetcher.validateOffsetsIfNeeded();
            Assertions.assertFalse((boolean)this.subscriptions.awaitingValidation(this.tp0));
        }
    }

    @Test
    public void testOffsetValidationHandlesSeekWithInflightOffsetForLeaderRequest() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        boolean epochOne = true;
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1, this.topicIds), false, 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.apiVersions.update(node.idString(), NodeApiVersions.create());
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), leaderAndEpoch));
        this.fetcher.validateOffsetsIfNeeded();
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(5L, Optional.of(1), leaderAndEpoch));
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        this.client.respond(this.offsetsForLeaderEpochRequestMatcher(this.tp0, 1, 1), (AbstractResponse)this.prepareOffsetsForLeaderEpochResponse(this.tp0, Errors.NONE, 0, 0L));
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testOffsetValidationFencing() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        boolean epochOne = true;
        int epochTwo = 2;
        int epochThree = 3;
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1, this.topicIds), false, 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.apiVersions.update(node.idString(), NodeApiVersions.create());
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekValidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), leaderAndEpoch));
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 2, this.topicIds), false, 0L);
        this.fetcher.validateOffsetsIfNeeded();
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        this.subscriptions.completeValidation(this.tp0);
        SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(10L, Optional.of(2), new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(2)));
        this.subscriptions.position(this.tp0, nextPosition);
        this.subscriptions.maybeValidatePositionForCurrentLeader(this.apiVersions, this.tp0, new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(3)));
        this.client.prepareResponse((AbstractResponse)this.prepareOffsetsForLeaderEpochResponse(this.tp0, Errors.NONE, 2, 10L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0), (String)"Expected validation to fail since leader epoch changed");
        this.fetcher.validateOffsetsIfNeeded();
        this.client.prepareResponse((AbstractResponse)this.prepareOffsetsForLeaderEpochResponse(this.tp0, Errors.NONE, 3, 10L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.awaitingValidation(this.tp0), (String)"Expected validation to succeed with latest epoch");
    }

    @Test
    public void testSkipValidationForOlderApiVersion() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        this.apiVersions.update("0", NodeApiVersions.create((short)ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short)0, (short)2));
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1, this.topicIds), false, 0L);
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        Assertions.assertEquals((int)0, (int)this.fetcher.sendFetches());
        Assertions.assertEquals((int)0, (int)this.fetcher.sendFetches());
        this.fetcher.resetOffsetIfNeeded(this.tp0, OffsetResetStrategy.LATEST, new Fetcher.ListOffsetData(100L, Long.valueOf(1L), Optional.empty()));
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
    }

    @Test
    public void testTruncationDetected() {
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L, (long)-1L, (long)-1L, (short)-1, (int)-1, (boolean)false, (int)1);
        builder.appendWithOffset(0L, 0L, "key".getBytes(), "value-1".getBytes());
        builder.appendWithOffset(1L, 0L, "key".getBytes(), "value-2".getBytes());
        builder.appendWithOffset(2L, 0L, "key".getBytes(), "value-3".getBytes());
        MemoryRecords records = builder.build();
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 2, this.topicIds);
        this.metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.apiVersions.update(node.idString(), NodeApiVersions.create());
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekValidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), leaderAndEpoch));
        this.fetcher.validateOffsetsIfNeeded();
        Assertions.assertEquals((int)0, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.prepareOffsetsForLeaderEpochResponse(this.tp0, Errors.NONE, 1, 10L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.awaitingValidation(this.tp0));
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, records, Errors.NONE, 100L, 0));
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assertions.assertEquals((long)this.subscriptions.position((TopicPartition)this.tp0).offset, (long)3L);
        TestUtils.assertOptional(this.subscriptions.position((TopicPartition)this.tp0).offsetEpoch, value -> Assertions.assertEquals((int)value, (int)1));
    }

    @Test
    public void testPreferredReadReplica() {
        this.buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, (Deserializer)new BytesDeserializer(), (Deserializer)new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5L).toMillis());
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, Collections.singletonMap(this.topicName, 4), tp -> this.validLeaderEpoch, this.topicIds));
        this.subscriptions.seek(this.tp0, 0L);
        Node selected = this.fetcher.selectReadReplica(this.tp0, Node.noNode(), this.time.milliseconds());
        Assertions.assertEquals((int)selected.id(), (int)-1);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, -1L, 0, Optional.of(1)));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetchedRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        selected = this.fetcher.selectReadReplica(this.tp0, Node.noNode(), this.time.milliseconds());
        Assertions.assertEquals((int)selected.id(), (int)1);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, -1L, 0, Optional.of(2)));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        this.fetchedRecords();
        selected = this.fetcher.selectReadReplica(this.tp0, Node.noNode(), this.time.milliseconds());
        Assertions.assertEquals((int)selected.id(), (int)-1);
    }

    @Test
    public void testPreferredReadReplicaOffsetError() {
        this.buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, (Deserializer)new BytesDeserializer(), (Deserializer)new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5L).toMillis());
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, Collections.singletonMap(this.topicName, 4), tp -> this.validLeaderEpoch, this.topicIds));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.NONE, 100L, -1L, 0, Optional.of(1)));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        this.fetchedRecords();
        Node selected = this.fetcher.selectReadReplica(this.tp0, Node.noNode(), this.time.milliseconds());
        Assertions.assertEquals((int)selected.id(), (int)1);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, -1L, 0, Optional.empty()));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        this.fetchedRecords();
        selected = this.fetcher.selectReadReplica(this.tp0, Node.noNode(), this.time.milliseconds());
        Assertions.assertEquals((int)selected.id(), (int)-1);
    }

    @Test
    public void testFetchCompletedBeforeHandlerAdded() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        this.fetcher.sendFetches();
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.buildRecords(1L, 1, 1L), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.fetchedRecords();
        Metadata.LeaderAndEpoch leaderAndEpoch = this.subscriptions.position((TopicPartition)this.tp0).currentLeader;
        Assertions.assertTrue((boolean)leaderAndEpoch.leader.isPresent());
        Node readReplica = this.fetcher.selectReadReplica(this.tp0, (Node)leaderAndEpoch.leader.get(), this.time.milliseconds());
        AtomicBoolean wokenUp = new AtomicBoolean(false);
        this.client.setWakeupHook(() -> {
            if (!wokenUp.getAndSet(true)) {
                this.consumerClient.disconnectAsync(readReplica);
                this.consumerClient.poll(this.time.timer(0L));
            }
        });
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        this.consumerClient.disconnectAsync(readReplica);
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
    }

    @Test
    public void testCorruptMessageError() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assertions.assertEquals((int)1, (int)this.fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tidp0, this.buildRecords(1L, 1, 1L), Errors.CORRUPT_MESSAGE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Assertions.assertThrows(KafkaException.class, this::fetchedRecords);
    }

    @Test
    public void testBeginningOffsets() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, -2L, 2L));
        Assertions.assertEquals(Collections.singletonMap(this.tp0, 2L), (Object)this.fetcher.beginningOffsets(Collections.singleton(this.tp0), this.time.timer(5000L)));
    }

    @Test
    public void testBeginningOffsetsDuplicateTopicPartition() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, -2L, 2L));
        Assertions.assertEquals(Collections.singletonMap(this.tp0, 2L), (Object)this.fetcher.beginningOffsets(Arrays.asList(this.tp0, this.tp0), this.time.timer(5000L)));
    }

    @Test
    public void testBeginningOffsetsMultipleTopicPartitions() {
        this.buildFetcher();
        HashMap<TopicPartition, Long> expectedOffsets = new HashMap<TopicPartition, Long>();
        expectedOffsets.put(this.tp0, 2L);
        expectedOffsets.put(this.tp1, 4L);
        expectedOffsets.put(this.tp2, 6L);
        this.assignFromUser(expectedOffsets.keySet());
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(expectedOffsets, Errors.NONE, -2L, -1));
        Assertions.assertEquals(expectedOffsets, (Object)this.fetcher.beginningOffsets(Arrays.asList(this.tp0, this.tp1, this.tp2), this.time.timer(5000L)));
    }

    @Test
    public void testBeginningOffsetsEmpty() {
        this.buildFetcher();
        Assertions.assertEquals(Collections.emptyMap(), (Object)this.fetcher.beginningOffsets(Collections.emptyList(), this.time.timer(5000L)));
    }

    @Test
    public void testEndOffsets() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, -1L, 5L));
        Assertions.assertEquals(Collections.singletonMap(this.tp0, 5L), (Object)this.fetcher.endOffsets(Collections.singleton(this.tp0), this.time.timer(5000L)));
    }

    @Test
    public void testEndOffsetsDuplicateTopicPartition() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, -1L, 5L));
        Assertions.assertEquals(Collections.singletonMap(this.tp0, 5L), (Object)this.fetcher.endOffsets(Arrays.asList(this.tp0, this.tp0), this.time.timer(5000L)));
    }

    @Test
    public void testEndOffsetsMultipleTopicPartitions() {
        this.buildFetcher();
        HashMap<TopicPartition, Long> expectedOffsets = new HashMap<TopicPartition, Long>();
        expectedOffsets.put(this.tp0, 5L);
        expectedOffsets.put(this.tp1, 7L);
        expectedOffsets.put(this.tp2, 9L);
        this.assignFromUser(expectedOffsets.keySet());
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(expectedOffsets, Errors.NONE, -1L, -1));
        Assertions.assertEquals(expectedOffsets, (Object)this.fetcher.endOffsets(Arrays.asList(this.tp0, this.tp1, this.tp2), this.time.timer(5000L)));
    }

    @Test
    public void testEndOffsetsEmpty() {
        this.buildFetcher();
        Assertions.assertEquals(Collections.emptyMap(), (Object)this.fetcher.endOffsets(Collections.emptyList(), this.time.timer(5000L)));
    }

    private MockClient.RequestMatcher offsetsForLeaderEpochRequestMatcher(TopicPartition topicPartition, int currentLeaderEpoch, int leaderEpoch) {
        return request -> {
            OffsetsForLeaderEpochRequest epochRequest = (OffsetsForLeaderEpochRequest)request;
            OffsetForLeaderEpochRequestData.OffsetForLeaderPartition partition = this.offsetForLeaderPartitionMap(epochRequest.data()).get(topicPartition);
            return partition != null && partition.currentLeaderEpoch() == currentLeaderEpoch && partition.leaderEpoch() == leaderEpoch;
        };
    }

    private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse(TopicPartition topicPartition, Errors error, int leaderEpoch, long endOffset) {
        OffsetForLeaderEpochResponseData data = new OffsetForLeaderEpochResponseData();
        data.topics().add(new OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult().setTopic(topicPartition.topic()).setPartitions(Collections.singletonList(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(topicPartition.partition()).setErrorCode(error.code()).setLeaderEpoch(leaderEpoch).setEndOffset(endOffset))));
        return new OffsetsForLeaderEpochResponse(data);
    }

    private Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> offsetForLeaderPartitionMap(OffsetForLeaderEpochRequestData data) {
        HashMap<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> result = new HashMap<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition>();
        data.topics().forEach(topic -> topic.partitions().forEach(partition -> result.put(new TopicPartition(topic.topic(), partition.partition()), (OffsetForLeaderEpochRequestData.OffsetForLeaderPartition)partition)));
        return result;
    }

    private MockClient.RequestMatcher listOffsetRequestMatcher(long timestamp) {
        return this.listOffsetRequestMatcher(timestamp, -1);
    }

    private MockClient.RequestMatcher listOffsetRequestMatcher(long timestamp, int leaderEpoch) {
        return body -> {
            ListOffsetsRequest req = (ListOffsetsRequest)body;
            ListOffsetsRequestData.ListOffsetsTopic topic = (ListOffsetsRequestData.ListOffsetsTopic)req.topics().get(0);
            ListOffsetsRequestData.ListOffsetsPartition partition = topic.partitions().get(0);
            return this.tp0.topic().equals(topic.name()) && this.tp0.partition() == partition.partitionIndex() && timestamp == partition.timestamp() && leaderEpoch == partition.currentLeaderEpoch();
        };
    }

    private ListOffsetsResponse listOffsetResponse(Errors error, long timestamp, long offset) {
        return this.listOffsetResponse(this.tp0, error, timestamp, offset);
    }

    private ListOffsetsResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
        return this.listOffsetResponse(tp, error, timestamp, offset, -1);
    }

    private ListOffsetsResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset, int leaderEpoch) {
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        offsets.put(tp, offset);
        return this.listOffsetResponse(offsets, error, timestamp, leaderEpoch);
    }

    private ListOffsetsResponse listOffsetResponse(Map<TopicPartition, Long> offsets, Errors error, long timestamp, int leaderEpoch) {
        HashMap responses = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            responses.putIfAbsent(topicPartition.topic(), new ArrayList());
            ((List)responses.get(topicPartition.topic())).add(new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(topicPartition.partition()).setErrorCode(error.code()).setOffset(entry.getValue()).setTimestamp(timestamp).setLeaderEpoch(leaderEpoch));
        }
        ArrayList<ListOffsetsResponseData.ListOffsetsTopicResponse> topics = new ArrayList<ListOffsetsResponseData.ListOffsetsTopicResponse>();
        for (Map.Entry entry : responses.entrySet()) {
            topics.add(new ListOffsetsResponseData.ListOffsetsTopicResponse().setName((String)entry.getKey()).setPartitions((List)entry.getValue()));
        }
        ListOffsetsResponseData listOffsetsResponseData = new ListOffsetsResponseData().setTopics(topics);
        return new ListOffsetsResponse(listOffsetsResponseData);
    }

    private FetchResponse fetchResponseWithTopLevelError(TopicIdPartition tp, Errors error, int throttleTime) {
        Map<TopicIdPartition, FetchResponseData.PartitionData> partitions = Collections.singletonMap(tp, new FetchResponseData.PartitionData().setPartitionIndex(tp.topicPartition().partition()).setErrorCode(error.code()).setHighWatermark(-1L));
        return FetchResponse.of((Errors)error, (int)throttleTime, (int)0, new LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>(partitions));
    }

    private FetchResponse fullFetchResponseWithAbortedTransactions(MemoryRecords records, List<FetchResponseData.AbortedTransaction> abortedTransactions, Errors error, long lastStableOffset, long hw, int throttleTime) {
        Map<TopicIdPartition, FetchResponseData.PartitionData> partitions = Collections.singletonMap(this.tidp0, new FetchResponseData.PartitionData().setPartitionIndex(this.tp0.partition()).setErrorCode(error.code()).setHighWatermark(hw).setLastStableOffset(lastStableOffset).setLogStartOffset(0L).setAbortedTransactions(abortedTransactions).setRecords((BaseRecords)records));
        return FetchResponse.of((Errors)Errors.NONE, (int)throttleTime, (int)0, new LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>(partitions));
    }

    private FetchResponse fullFetchResponse(int sessionId, TopicIdPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) {
        return this.fullFetchResponse(sessionId, tp, records, error, hw, -1L, throttleTime);
    }

    private FetchResponse fullFetchResponse(TopicIdPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) {
        return this.fullFetchResponse(tp, records, error, hw, -1L, throttleTime);
    }

    private FetchResponse fullFetchResponse(TopicIdPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, int throttleTime) {
        return this.fullFetchResponse(0, tp, records, error, hw, lastStableOffset, throttleTime);
    }

    private FetchResponse fullFetchResponse(int sessionId, TopicIdPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, int throttleTime) {
        Map<TopicIdPartition, FetchResponseData.PartitionData> partitions = Collections.singletonMap(tp, new FetchResponseData.PartitionData().setPartitionIndex(tp.topicPartition().partition()).setErrorCode(error.code()).setHighWatermark(hw).setLastStableOffset(lastStableOffset).setLogStartOffset(0L).setRecords((BaseRecords)records));
        return FetchResponse.of((Errors)Errors.NONE, (int)throttleTime, (int)sessionId, new LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>(partitions));
    }

    private FetchResponse fullFetchResponse(TopicIdPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, int throttleTime, Optional<Integer> preferredReplicaId) {
        Map<TopicIdPartition, FetchResponseData.PartitionData> partitions = Collections.singletonMap(tp, new FetchResponseData.PartitionData().setPartitionIndex(tp.topicPartition().partition()).setErrorCode(error.code()).setHighWatermark(hw).setLastStableOffset(lastStableOffset).setLogStartOffset(0L).setRecords((BaseRecords)records).setPreferredReadReplica(preferredReplicaId.orElse(-1)));
        return FetchResponse.of((Errors)Errors.NONE, (int)throttleTime, (int)0, new LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>(partitions));
    }

    private FetchResponse fetchResponse(TopicIdPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, long logStartOffset, int throttleTime) {
        Map<TopicIdPartition, FetchResponseData.PartitionData> partitions = Collections.singletonMap(tp, new FetchResponseData.PartitionData().setPartitionIndex(tp.topicPartition().partition()).setErrorCode(error.code()).setHighWatermark(hw).setLastStableOffset(lastStableOffset).setLogStartOffset(logStartOffset).setRecords((BaseRecords)records));
        return FetchResponse.of((Errors)Errors.NONE, (int)throttleTime, (int)0, new LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>(partitions));
    }

    private MetadataResponse newMetadataResponse(String topic, Errors error) {
        ArrayList partitionsMetadata = new ArrayList();
        if (error == Errors.NONE) {
            Optional<MetadataResponse.TopicMetadata> foundMetadata = this.initialUpdateResponse.topicMetadata().stream().filter(topicMetadata -> topicMetadata.topic().equals(topic)).findFirst();
            foundMetadata.ifPresent(topicMetadata -> partitionsMetadata.addAll(topicMetadata.partitionMetadata()));
        }
        MetadataResponse.TopicMetadata topicMetadata2 = new MetadataResponse.TopicMetadata(error, topic, false, partitionsMetadata);
        ArrayList<Node> brokers = new ArrayList<Node>(this.initialUpdateResponse.brokers());
        return RequestTestUtils.metadataResponse(brokers, this.initialUpdateResponse.clusterId(), this.initialUpdateResponse.controller().id(), Collections.singletonList(topicMetadata2));
    }

    private void assertEmptyFetch(String reason) {
        Fetch fetch = this.collectFetch();
        Assertions.assertEquals(Collections.emptyMap(), (Object)fetch.records(), (String)reason);
        Assertions.assertFalse((boolean)fetch.positionAdvanced(), (String)reason);
        Assertions.assertTrue((boolean)fetch.isEmpty(), (String)reason);
    }

    private <K, V> Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        Fetch<K, V> fetch = this.collectFetch();
        return fetch.records();
    }

    private <K, V> Fetch<K, V> collectFetch() {
        return this.fetcher.collectFetch();
    }

    private void buildFetcher(int maxPollRecords) {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), maxPollRecords, IsolationLevel.READ_UNCOMMITTED);
    }

    private void buildFetcher() {
        this.buildFetcher(Integer.MAX_VALUE);
    }

    private void buildFetcher(Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) {
        this.buildFetcher(OffsetResetStrategy.EARLIEST, keyDeserializer, valueDeserializer, Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
    }

    private void buildFetcher(OffsetResetStrategy offsetResetStrategy) {
        this.buildFetcher(new MetricConfig(), offsetResetStrategy, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
    }

    private <K, V> void buildFetcher(OffsetResetStrategy offsetResetStrategy, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, int maxPollRecords, IsolationLevel isolationLevel) {
        this.buildFetcher(new MetricConfig(), offsetResetStrategy, keyDeserializer, valueDeserializer, maxPollRecords, isolationLevel);
    }

    private <K, V> void buildFetcher(MetricConfig metricConfig, OffsetResetStrategy offsetResetStrategy, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, int maxPollRecords, IsolationLevel isolationLevel) {
        this.buildFetcher(metricConfig, offsetResetStrategy, keyDeserializer, valueDeserializer, maxPollRecords, isolationLevel, Long.MAX_VALUE);
    }

    private <K, V> void buildFetcher(MetricConfig metricConfig, OffsetResetStrategy offsetResetStrategy, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, int maxPollRecords, IsolationLevel isolationLevel, long metadataExpireMs) {
        LogContext logContext = new LogContext();
        SubscriptionState subscriptionState = new SubscriptionState(logContext, offsetResetStrategy);
        this.buildFetcher(metricConfig, keyDeserializer, valueDeserializer, maxPollRecords, isolationLevel, metadataExpireMs, subscriptionState, logContext);
    }

    private void buildFetcher(SubscriptionState subscriptionState, LogContext logContext) {
        this.buildFetcher(new MetricConfig(), (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED, Long.MAX_VALUE, subscriptionState, logContext);
    }

    private <K, V> void buildFetcher(MetricConfig metricConfig, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, int maxPollRecords, IsolationLevel isolationLevel, long metadataExpireMs, SubscriptionState subscriptionState, LogContext logContext) {
        this.buildDependencies(metricConfig, metadataExpireMs, subscriptionState, logContext);
        this.fetcher = new Fetcher(new LogContext(), this.consumerClient, this.minBytes, this.maxBytes, this.maxWaitMs, this.fetchSize, maxPollRecords, true, "", keyDeserializer, valueDeserializer, this.metadata, this.subscriptions, this.metrics, this.metricsRegistry, (Time)this.time, this.retryBackoffMs, this.requestTimeoutMs, isolationLevel, this.apiVersions);
    }

    private void buildDependencies(MetricConfig metricConfig, long metadataExpireMs, SubscriptionState subscriptionState, LogContext logContext) {
        this.time = new MockTime(1L);
        this.subscriptions = subscriptionState;
        this.metadata = new ConsumerMetadata(0L, metadataExpireMs, false, false, this.subscriptions, logContext, new ClusterResourceListeners());
        this.client = new MockClient((Time)this.time, (Metadata)this.metadata);
        this.metrics = new Metrics(metricConfig, (Time)this.time);
        this.consumerClient = new ConsumerNetworkClient(logContext, (KafkaClient)this.client, (Metadata)this.metadata, (Time)this.time, 100L, 1000, Integer.MAX_VALUE);
        this.metricsRegistry = new FetcherMetricsRegistry(metricConfig.tags().keySet(), "consumer" + this.groupId);
    }

    private <T> List<Long> collectRecordOffsets(List<ConsumerRecord<T, T>> records) {
        return records.stream().map(ConsumerRecord::offset).collect(Collectors.toList());
    }
}

