package kafka.server.share;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.log.remote.RemoteLogReaderTest;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.share.SharePartitionManager;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidShareSessionEpochException;
import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.ShareFetchMetadata;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.group.share.NoOpShareStatePersister;
import org.apache.kafka.server.group.share.Persister;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.ShareSession;
import org.apache.kafka.server.share.ShareSessionCache;
import org.apache.kafka.server.share.ShareSessionKey;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.FetchParams;
import org.apache.kafka.storage.internals.log.FetchPartitionData;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import scala.Function1;
import scala.Tuple2;
import scala.collection.Seq;

@Timeout(120)
/* loaded from: input_file:kafka/server/share/SharePartitionManagerTest.class */
public class SharePartitionManagerTest {
    private static final int RECORD_LOCK_DURATION_MS = 30000;
    private static final int MAX_DELIVERY_COUNT = 5;
    private static final short MAX_IN_FLIGHT_MESSAGES = 200;
    private static final int PARTITION_MAX_BYTES = 40000;
    private static Timer mockTimer;
    private static final List<TopicIdPartition> EMPTY_PART_LIST = Collections.unmodifiableList(new ArrayList());

    /* loaded from: input_file:kafka/server/share/SharePartitionManagerTest$SharePartitionManagerBuilder.class */
    private static class SharePartitionManagerBuilder {
        private ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        private Time time = new MockTime();
        private ShareSessionCache cache = new ShareSessionCache(10, 1000);
        private Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap();
        private Persister persister = NoOpShareStatePersister.getInstance();
        private Timer timer = new MockTimer();
        private Metrics metrics = new Metrics();
        private ConcurrentLinkedQueue<SharePartitionManager.ShareFetchPartitionData> fetchQueue = new ConcurrentLinkedQueue<>();

        private SharePartitionManagerBuilder() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) {
            this.replicaManager = replicaManager;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public SharePartitionManagerBuilder withTime(Time time) {
            this.time = time;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public SharePartitionManagerBuilder withCache(ShareSessionCache shareSessionCache) {
            this.cache = shareSessionCache;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public SharePartitionManagerBuilder withPartitionCacheMap(Map<SharePartitionManager.SharePartitionKey, SharePartition> map) {
            this.partitionCacheMap = map;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public SharePartitionManagerBuilder withShareGroupPersister(Persister persister) {
            this.persister = persister;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public SharePartitionManagerBuilder withTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public SharePartitionManagerBuilder withMetrics(Metrics metrics) {
            this.metrics = metrics;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public SharePartitionManagerBuilder withFetchQueue(ConcurrentLinkedQueue<SharePartitionManager.ShareFetchPartitionData> concurrentLinkedQueue) {
            this.fetchQueue = concurrentLinkedQueue;
            return this;
        }

        public static SharePartitionManagerBuilder builder() {
            return new SharePartitionManagerBuilder();
        }

        public SharePartitionManager build() {
            return new SharePartitionManager(this.replicaManager, this.time, this.cache, this.partitionCacheMap, this.fetchQueue, SharePartitionManagerTest.RECORD_LOCK_DURATION_MS, this.timer, SharePartitionManagerTest.MAX_DELIVERY_COUNT, SharePartitionManagerTest.MAX_IN_FLIGHT_MESSAGES, this.persister, this.metrics);
        }
    }

    @BeforeEach
    public void setUp() {
        mockTimer = new SystemTimerReaper("sharePartitionManagerTestReaper", new SystemTimer("sharePartitionManagerTestTimer"));
    }

    @AfterEach
    public void tearDown() throws Exception {
        mockTimer.close();
    }

    @Test
    public void testNewContextReturnsFinalContext() {
        SharePartitionManager build = SharePartitionManagerBuilder.builder().build();
        ShareFetchMetadata shareFetchMetadata = new ShareFetchMetadata(Uuid.ZERO_UUID, -1);
        Assertions.assertEquals(FinalContext.class, build.newContext("grp", Collections.emptyMap(), Collections.emptyList(), shareFetchMetadata).getClass());
        Uuid randomUuid = Uuid.randomUuid();
        Map singletonMap = Collections.singletonMap(new TopicIdPartition(randomUuid, new TopicPartition("foo", 0)), new ShareFetchRequest.SharePartitionData(randomUuid, 4000));
        Assertions.assertThrows(InvalidRequestException.class, () -> {
            build.newContext("grp", singletonMap, Collections.emptyList(), new ShareFetchMetadata(Uuid.ZERO_UUID, -1));
        });
        Assertions.assertEquals(FinalContext.class, build.newContext("grp", Collections.singletonMap(new TopicIdPartition(randomUuid, new TopicPartition("foo", 0)), new ShareFetchRequest.SharePartitionData(randomUuid, 0)), Collections.emptyList(), shareFetchMetadata).getClass());
    }

    @Test
    public void testNewContextReturnsFinalContextForEmptyTopicPartitionsAndFinalEpoch() {
        Assertions.assertEquals(FinalContext.class, SharePartitionManagerBuilder.builder().withCache(new ShareSessionCache(10, 1000L)).withTime(new MockTime()).build().newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareFetchMetadata(Uuid.randomUuid(), -1)).getClass());
    }

    @Test
    public void testNewContext() {
        MockTime mockTime = new MockTime();
        ShareSessionCache shareSessionCache = new ShareSessionCache(10, 1000L);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(shareSessionCache).withTime(mockTime).build();
        HashMap hashMap = new HashMap();
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        hashMap.put(randomUuid, "foo");
        hashMap.put(randomUuid2, "bar");
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 0));
        TopicIdPartition topicIdPartition4 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 1));
        String str = "grp";
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareFetchMetadata shareFetchMetadata = new ShareFetchMetadata(Uuid.randomUuid(), 0);
        ShareSessionContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareFetchMetadata);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        Assertions.assertFalse(newContext.isSubsequent());
        newContext.shareFetchData().forEach((topicIdPartition5, sharePartitionData) -> {
            Assertions.assertTrue(linkedHashMap.containsKey(topicIdPartition5));
            Assertions.assertEquals(linkedHashMap.get(topicIdPartition5), sharePartitionData);
        });
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        linkedHashMap2.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        ShareFetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertEquals(linkedHashMap2, updateAndGenerateResponseData.responseData(hashMap));
        ShareSessionKey shareSessionKey = new ShareSessionKey("grp", shareFetchMetadata.memberId());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareFetchMetadata(shareSessionKey.memberId(), MAX_DELIVERY_COUNT));
        });
        Uuid randomUuid3 = Uuid.randomUuid();
        Assertions.assertThrows(ShareSessionNotFoundException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareFetchMetadata(randomUuid3, 1));
        });
        ShareSessionContext newContext2 = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareFetchMetadata(shareSessionKey.memberId(), 1));
        Assertions.assertEquals(ShareSessionContext.class, newContext2.getClass());
        Assertions.assertTrue(newContext2.isSubsequent());
        ShareSessionContext shareSessionContext = newContext2;
        synchronized (shareSessionContext.session()) {
            shareSessionContext.session().partitionMap().forEach(cachedSharePartition -> {
                TopicIdPartition topicIdPartition6 = new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
                ShareFetchRequest.SharePartitionData reqData = cachedSharePartition.reqData();
                Assertions.assertTrue(linkedHashMap.containsKey(topicIdPartition6));
                Assertions.assertEquals(linkedHashMap.get(topicIdPartition6), reqData);
            });
        }
        ShareFetchResponse updateAndGenerateResponseData2 = newContext2.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData2.error());
        Assertions.assertEquals(0, updateAndGenerateResponseData2.responseData(hashMap).size());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareFetchMetadata(shareSessionKey.memberId(), MAX_DELIVERY_COUNT));
        });
        ShareFetchResponse throttleResponse = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareFetchMetadata(shareSessionKey.memberId(), 2)).throttleResponse(100);
        Assertions.assertEquals(Errors.NONE, throttleResponse.error());
        Assertions.assertEquals(100, throttleResponse.throttleTimeMs());
        ShareFetchContext newContext3 = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareFetchMetadata(shareFetchMetadata.memberId(), -1));
        Assertions.assertEquals(FinalContext.class, newContext3.getClass());
        Assertions.assertEquals(0, shareSessionCache.size());
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(topicIdPartition3, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        linkedHashMap3.put(topicIdPartition4, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        Assertions.assertEquals(Errors.NONE, newContext3.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), linkedHashMap3).error());
    }

    @Test
    public void testShareSessionExpiration() {
        MockTime mockTime = new MockTime();
        ShareSessionCache shareSessionCache = new ShareSessionCache(2, 1000L);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(shareSessionCache).withTime(mockTime).build();
        HashMap hashMap = new HashMap();
        Uuid randomUuid = Uuid.randomUuid();
        hashMap.put(randomUuid, "foo");
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareFetchMetadata shareFetchMetadata = new ShareFetchMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareFetchMetadata);
        Assertions.assertEquals(newContext.getClass(), ShareSessionContext.class);
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()));
        linkedHashMap2.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition2.partition()));
        ShareFetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertEquals(2, updateAndGenerateResponseData.responseData(hashMap).size());
        ShareSessionKey shareSessionKey = new ShareSessionKey("grp", shareFetchMetadata.memberId());
        Assertions.assertNotNull(shareSessionCache.get(shareSessionKey));
        mockTime.sleep(500L);
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap3.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareFetchMetadata shareFetchMetadata2 = new ShareFetchMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext newContext2 = build.newContext("grp", linkedHashMap3, EMPTY_PART_LIST, shareFetchMetadata2);
        Assertions.assertEquals(newContext2.getClass(), ShareSessionContext.class);
        LinkedHashMap linkedHashMap4 = new LinkedHashMap();
        linkedHashMap4.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()));
        linkedHashMap4.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition2.partition()));
        ShareFetchResponse updateAndGenerateResponseData2 = newContext2.updateAndGenerateResponseData("grp", shareFetchMetadata2.memberId(), linkedHashMap4);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData2.error());
        Assertions.assertEquals(2, updateAndGenerateResponseData2.responseData(hashMap).size());
        ShareSessionKey shareSessionKey2 = new ShareSessionKey("grp", shareFetchMetadata2.memberId());
        Assertions.assertNotNull(shareSessionCache.get(shareSessionKey));
        Assertions.assertNotNull(shareSessionCache.get(shareSessionKey2));
        mockTime.sleep(500L);
        Assertions.assertEquals(build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareFetchMetadata(shareFetchMetadata.memberId(), 1)).getClass(), ShareSessionContext.class);
        mockTime.sleep(501L);
        LinkedHashMap linkedHashMap5 = new LinkedHashMap();
        linkedHashMap5.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap5.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareFetchMetadata shareFetchMetadata3 = new ShareFetchMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext newContext3 = build.newContext("grp", linkedHashMap5, EMPTY_PART_LIST, shareFetchMetadata3);
        LinkedHashMap linkedHashMap6 = new LinkedHashMap();
        linkedHashMap6.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()));
        linkedHashMap6.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition2.partition()));
        ShareFetchResponse updateAndGenerateResponseData3 = newContext3.updateAndGenerateResponseData("grp", shareFetchMetadata3.memberId(), linkedHashMap6);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData3.error());
        Assertions.assertEquals(2, updateAndGenerateResponseData3.responseData(hashMap).size());
        ShareSessionKey shareSessionKey3 = new ShareSessionKey("grp", shareFetchMetadata3.memberId());
        Assertions.assertNotNull(shareSessionCache.get(shareSessionKey));
        Assertions.assertNull(shareSessionCache.get(shareSessionKey2), "share session 2 should have been evicted by latest share session, as share session 1 was used more recently");
        Assertions.assertNotNull(shareSessionCache.get(shareSessionKey3));
    }

    @Test
    public void testSubsequentShareSession() {
        SharePartitionManager build = SharePartitionManagerBuilder.builder().build();
        HashMap hashMap = new HashMap();
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        hashMap.put(randomUuid, "foo");
        hashMap.put(randomUuid2, "bar");
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 0));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareFetchMetadata shareFetchMetadata = new ShareFetchMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareFetchMetadata);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()));
        linkedHashMap2.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition2.partition()));
        ShareFetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertEquals(2, updateAndGenerateResponseData.responseData(hashMap).size());
        Map singletonMap = Collections.singletonMap(topicIdPartition3, new ShareFetchRequest.SharePartitionData(topicIdPartition3.topicId(), 100));
        ArrayList arrayList = new ArrayList();
        arrayList.add(topicIdPartition);
        ShareSessionContext newContext2 = build.newContext("grp", singletonMap, arrayList, new ShareFetchMetadata(shareFetchMetadata.memberId(), 1));
        Assertions.assertEquals(ShareSessionContext.class, newContext2.getClass());
        HashSet hashSet = new HashSet();
        hashSet.add(topicIdPartition2);
        hashSet.add(topicIdPartition3);
        HashSet hashSet2 = new HashSet();
        newContext2.session().partitionMap().forEach(cachedSharePartition -> {
            hashSet2.add(new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition())));
        });
        Assertions.assertEquals(hashSet, hashSet2);
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition2.partition()));
        linkedHashMap3.put(topicIdPartition3, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition3.partition()));
        ShareFetchResponse updateAndGenerateResponseData2 = newContext2.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), linkedHashMap3);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData2.error());
        Assertions.assertEquals(1, updateAndGenerateResponseData2.data().responses().size());
        Assertions.assertEquals(randomUuid2, ((ShareFetchResponseData.ShareFetchableTopicResponse) updateAndGenerateResponseData2.data().responses().get(0)).topicId());
        Assertions.assertEquals(1, ((ShareFetchResponseData.ShareFetchableTopicResponse) updateAndGenerateResponseData2.data().responses().get(0)).partitions().size());
        Assertions.assertEquals(0, ((ShareFetchResponseData.PartitionData) ((ShareFetchResponseData.ShareFetchableTopicResponse) updateAndGenerateResponseData2.data().responses().get(0)).partitions().get(0)).partitionIndex());
        Assertions.assertEquals(1, updateAndGenerateResponseData2.responseData(hashMap).size());
    }

    @Test
    public void testZeroSizeShareSession() {
        ShareSessionCache shareSessionCache = new ShareSessionCache(10, 1000L);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(shareSessionCache).build();
        HashMap hashMap = new HashMap();
        Uuid randomUuid = Uuid.randomUuid();
        hashMap.put(randomUuid, "foo");
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareFetchMetadata shareFetchMetadata = new ShareFetchMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareFetchMetadata);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()));
        linkedHashMap2.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition2.partition()));
        ShareFetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertEquals(2, updateAndGenerateResponseData.responseData(hashMap).size());
        ArrayList arrayList = new ArrayList();
        arrayList.add(topicIdPartition);
        arrayList.add(topicIdPartition2);
        ShareFetchContext newContext2 = build.newContext("grp", Collections.emptyMap(), arrayList, new ShareFetchMetadata(shareFetchMetadata.memberId(), 1));
        Assertions.assertEquals(ShareSessionContext.class, newContext2.getClass());
        Assertions.assertTrue(newContext2.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), new LinkedHashMap()).responseData(hashMap).isEmpty());
        Assertions.assertEquals(1, shareSessionCache.size());
    }

    @Test
    public void testToForgetPartitions() {
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(new ShareSessionCache(10, 1000L)).build();
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 0));
        ShareFetchMetadata shareFetchMetadata = new ShareFetchMetadata(Uuid.randomUuid(), 0);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareFetchContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareFetchMetadata);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        assertPartitionsPresent((ShareSessionContext) newContext, Arrays.asList(topicIdPartition, topicIdPartition2));
        mockUpdateAndGenerateResponseData(newContext, "grp", shareFetchMetadata.memberId());
        ShareFetchContext newContext2 = build.newContext("grp", Collections.emptyMap(), Collections.singletonList(topicIdPartition), new ShareFetchMetadata(shareFetchMetadata.memberId(), 1));
        assertPartitionsPresent((ShareSessionContext) newContext2, Collections.singletonList(topicIdPartition2));
        mockUpdateAndGenerateResponseData(newContext2, "grp", shareFetchMetadata.memberId());
        assertPartitionsPresent((ShareSessionContext) build.newContext("grp", Collections.emptyMap(), Collections.singletonList(topicIdPartition2), new ShareFetchMetadata(shareFetchMetadata.memberId(), 2)), Collections.emptyList());
    }

    @Test
    public void testShareSessionUpdateTopicIdsBrokerSide() {
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(new ShareSessionCache(10, 1000L)).build();
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 1));
        HashMap hashMap = new HashMap();
        hashMap.put(randomUuid, "foo");
        hashMap.put(randomUuid2, "bar");
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareFetchMetadata shareFetchMetadata = new ShareFetchMetadata(Uuid.randomUuid(), 0);
        ShareSessionContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareFetchMetadata);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        Assertions.assertFalse(newContext.isSubsequent());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition2.partition()));
        linkedHashMap2.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
        ShareFetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertEquals(2, updateAndGenerateResponseData.responseData(hashMap).size());
        ShareSessionContext newContext2 = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareFetchMetadata(shareFetchMetadata.memberId(), 1));
        Assertions.assertEquals(ShareSessionContext.class, newContext2.getClass());
        Assertions.assertTrue(newContext2.isSubsequent());
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(topicIdPartition.partition()).setErrorCode(Errors.INCONSISTENT_TOPIC_ID.code()));
        ShareFetchResponse updateAndGenerateResponseData2 = newContext2.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), linkedHashMap3);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData2.error());
        Assertions.assertEquals(Errors.INCONSISTENT_TOPIC_ID.code(), ((ShareFetchResponseData.PartitionData) updateAndGenerateResponseData2.responseData(hashMap).get(topicIdPartition)).errorCode());
    }

    @Test
    public void testGetErroneousAndValidTopicIdPartitions() {
        MockTime mockTime = new MockTime();
        ShareSessionCache shareSessionCache = new ShareSessionCache(10, 1000L);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(shareSessionCache).withTime(mockTime).build();
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition((String) null, 0));
        TopicIdPartition topicIdPartition4 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition((String) null, 1));
        String str = "grp";
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        linkedHashMap.put(topicIdPartition3, new ShareFetchRequest.SharePartitionData(topicIdPartition3.topicId(), 100));
        ShareFetchMetadata shareFetchMetadata = new ShareFetchMetadata(Uuid.randomUuid(), 0);
        ShareSessionContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareFetchMetadata);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        Assertions.assertFalse(newContext.isSubsequent());
        assertErroneousAndValidTopicIdPartitions(newContext.getErroneousAndValidTopicIdPartitions(), Collections.singletonList(topicIdPartition3), Arrays.asList(topicIdPartition, topicIdPartition2));
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        linkedHashMap2.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        linkedHashMap2.put(topicIdPartition3, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        Assertions.assertEquals(Errors.NONE, newContext.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), linkedHashMap2).error());
        ShareSessionKey shareSessionKey = new ShareSessionKey("grp", shareFetchMetadata.memberId());
        ShareFetchResponse throttleResponse = newContext.throttleResponse(100);
        Assertions.assertEquals(Errors.NONE, throttleResponse.error());
        Assertions.assertEquals(100, throttleResponse.throttleTimeMs());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareFetchMetadata(shareSessionKey.memberId(), MAX_DELIVERY_COUNT));
        });
        Assertions.assertThrows(ShareSessionNotFoundException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareFetchMetadata(Uuid.randomUuid(), 1));
        });
        ShareSessionContext newContext2 = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareFetchMetadata(shareSessionKey.memberId(), 1));
        Assertions.assertEquals(ShareSessionContext.class, newContext2.getClass());
        Assertions.assertTrue(newContext2.isSubsequent());
        assertErroneousAndValidTopicIdPartitions(newContext2.getErroneousAndValidTopicIdPartitions(), Collections.singletonList(topicIdPartition3), Arrays.asList(topicIdPartition, topicIdPartition2));
        Assertions.assertEquals(Errors.NONE, newContext2.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), linkedHashMap2).error());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareFetchMetadata(shareSessionKey.memberId(), MAX_DELIVERY_COUNT));
        });
        ShareFetchContext newContext3 = build.newContext("grp", Collections.singletonMap(topicIdPartition4, new ShareFetchRequest.SharePartitionData(topicIdPartition4.topicId(), 100)), EMPTY_PART_LIST, new ShareFetchMetadata(shareSessionKey.memberId(), 2));
        ShareFetchResponse throttleResponse2 = newContext3.throttleResponse(100);
        Assertions.assertEquals(Errors.NONE, throttleResponse2.error());
        Assertions.assertEquals(100, throttleResponse2.throttleTimeMs());
        assertErroneousAndValidTopicIdPartitions(newContext3.getErroneousAndValidTopicIdPartitions(), Arrays.asList(topicIdPartition3, topicIdPartition4), Arrays.asList(topicIdPartition, topicIdPartition2));
        ShareFetchContext newContext4 = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareFetchMetadata(shareFetchMetadata.memberId(), -1));
        Assertions.assertEquals(FinalContext.class, newContext4.getClass());
        Assertions.assertEquals(0, shareSessionCache.size());
        assertErroneousAndValidTopicIdPartitions(newContext4.getErroneousAndValidTopicIdPartitions(), Collections.emptyList(), Collections.emptyList());
        ShareFetchResponse throttleResponse3 = newContext4.throttleResponse(100);
        Assertions.assertEquals(Errors.NONE, throttleResponse3.error());
        Assertions.assertEquals(100, throttleResponse3.throttleTimeMs());
    }

    @Test
    public void testShareFetchContextResponseSize() {
        MockTime mockTime = new MockTime();
        ShareSessionCache shareSessionCache = new ShareSessionCache(10, 1000L);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(shareSessionCache).withTime(mockTime).build();
        HashMap hashMap = new HashMap();
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        hashMap.put(randomUuid, "foo");
        hashMap.put(randomUuid2, "bar");
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 0));
        TopicIdPartition topicIdPartition4 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 1));
        String str = "grp";
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ObjectSerializationCache objectSerializationCache = new ObjectSerializationCache();
        short latestVersion = ApiKeys.SHARE_FETCH.latestVersion();
        ShareFetchMetadata shareFetchMetadata = new ShareFetchMetadata(Uuid.randomUuid(), 0);
        ShareSessionContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareFetchMetadata);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        Assertions.assertFalse(newContext.isSubsequent());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        linkedHashMap2.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        int responseSize = newContext.responseSize(linkedHashMap2, latestVersion);
        ShareFetchResponse updateAndGenerateResponseData = newContext.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), linkedHashMap2);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData.error());
        Assertions.assertEquals(linkedHashMap2, updateAndGenerateResponseData.responseData(hashMap));
        Assertions.assertEquals(4 + updateAndGenerateResponseData.data().size(objectSerializationCache, latestVersion), responseSize);
        ShareSessionKey shareSessionKey = new ShareSessionKey("grp", shareFetchMetadata.memberId());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareFetchMetadata(shareSessionKey.memberId(), MAX_DELIVERY_COUNT));
        });
        Uuid randomUuid3 = Uuid.randomUuid();
        Assertions.assertThrows(ShareSessionNotFoundException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareFetchMetadata(randomUuid3, 1));
        });
        ShareSessionContext newContext2 = build.newContext("grp", Collections.singletonMap(topicIdPartition3, new ShareFetchRequest.SharePartitionData(topicIdPartition3.topicId(), 100)), EMPTY_PART_LIST, new ShareFetchMetadata(shareSessionKey.memberId(), 1));
        Assertions.assertEquals(ShareSessionContext.class, newContext2.getClass());
        Assertions.assertTrue(newContext2.isSubsequent());
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(topicIdPartition3, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        int responseSize2 = newContext2.responseSize(linkedHashMap3, latestVersion);
        ShareFetchResponse updateAndGenerateResponseData2 = newContext2.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), linkedHashMap3);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData2.error());
        Assertions.assertEquals(4 + updateAndGenerateResponseData2.data().size(objectSerializationCache, latestVersion), responseSize2);
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> {
            build.newContext(str, linkedHashMap, EMPTY_PART_LIST, new ShareFetchMetadata(shareSessionKey.memberId(), MAX_DELIVERY_COUNT));
        });
        ShareFetchContext newContext3 = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareFetchMetadata(shareSessionKey.memberId(), 2));
        int responseSize3 = newContext3.responseSize(linkedHashMap2, latestVersion);
        ShareFetchResponse throttleResponse = newContext3.throttleResponse(100);
        Assertions.assertEquals(Errors.NONE, throttleResponse.error());
        Assertions.assertEquals(100, throttleResponse.throttleTimeMs());
        Assertions.assertEquals(4 + new ShareFetchResponseData().size(objectSerializationCache, latestVersion), responseSize3);
        ShareFetchContext newContext4 = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareFetchMetadata(shareFetchMetadata.memberId(), -1));
        Assertions.assertEquals(FinalContext.class, newContext4.getClass());
        Assertions.assertEquals(0, shareSessionCache.size());
        LinkedHashMap linkedHashMap4 = new LinkedHashMap();
        linkedHashMap4.put(topicIdPartition4, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        int responseSize4 = newContext4.responseSize(linkedHashMap4, latestVersion);
        ShareFetchResponse updateAndGenerateResponseData3 = newContext4.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), linkedHashMap4);
        Assertions.assertEquals(Errors.NONE, updateAndGenerateResponseData3.error());
        Assertions.assertEquals(4 + updateAndGenerateResponseData3.data().size(objectSerializationCache, latestVersion), responseSize4);
    }

    @Test
    public void testCachedTopicPartitionsWithNoTopicPartitions() {
        Assertions.assertTrue(SharePartitionManagerBuilder.builder().withCache(new ShareSessionCache(10, 1000L)).build().cachedTopicIdPartitionsInShareSession("grp", Uuid.randomUuid()).isEmpty());
    }

    @Test
    public void testCachedTopicPartitionsForValidShareSessions() {
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withCache(new ShareSessionCache(10, 1000L)).build();
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 0));
        TopicIdPartition topicIdPartition4 = new TopicIdPartition(randomUuid2, new TopicPartition("bar", 1));
        Uuid randomUuid3 = Uuid.randomUuid();
        Uuid randomUuid4 = Uuid.randomUuid();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(topicIdPartition, new ShareFetchRequest.SharePartitionData(topicIdPartition.topicId(), 100));
        linkedHashMap.put(topicIdPartition2, new ShareFetchRequest.SharePartitionData(topicIdPartition2.topicId(), 100));
        ShareFetchMetadata shareFetchMetadata = new ShareFetchMetadata(randomUuid3, 0);
        ShareSessionContext newContext = build.newContext("grp", linkedHashMap, EMPTY_PART_LIST, shareFetchMetadata);
        Assertions.assertEquals(ShareSessionContext.class, newContext.getClass());
        Assertions.assertFalse(newContext.isSubsequent());
        ShareSessionKey shareSessionKey = new ShareSessionKey("grp", shareFetchMetadata.memberId());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(topicIdPartition, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        linkedHashMap2.put(topicIdPartition2, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        Assertions.assertEquals(Errors.NONE, newContext.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), linkedHashMap2).error());
        Assertions.assertEquals(new HashSet(Arrays.asList(topicIdPartition, topicIdPartition2)), new HashSet(build.cachedTopicIdPartitionsInShareSession("grp", randomUuid3)));
        Map singletonMap = Collections.singletonMap(topicIdPartition3, new ShareFetchRequest.SharePartitionData(topicIdPartition3.topicId(), 100));
        ShareFetchMetadata shareFetchMetadata2 = new ShareFetchMetadata(randomUuid4, 0);
        ShareSessionContext newContext2 = build.newContext("grp", singletonMap, EMPTY_PART_LIST, shareFetchMetadata2);
        Assertions.assertEquals(ShareSessionContext.class, newContext2.getClass());
        Assertions.assertFalse(newContext2.isSubsequent());
        ShareSessionKey shareSessionKey2 = new ShareSessionKey("grp", shareFetchMetadata2.memberId());
        LinkedHashMap linkedHashMap3 = new LinkedHashMap();
        linkedHashMap3.put(topicIdPartition3, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        Assertions.assertEquals(Errors.NONE, newContext2.updateAndGenerateResponseData("grp", shareFetchMetadata2.memberId(), linkedHashMap3).error());
        Assertions.assertEquals(Collections.singletonList(topicIdPartition3), build.cachedTopicIdPartitionsInShareSession("grp", randomUuid4));
        ShareSessionContext newContext3 = build.newContext("grp", Collections.singletonMap(topicIdPartition3, new ShareFetchRequest.SharePartitionData(topicIdPartition3.topicId(), 100)), EMPTY_PART_LIST, new ShareFetchMetadata(shareSessionKey.memberId(), 1));
        Assertions.assertEquals(ShareSessionContext.class, newContext3.getClass());
        Assertions.assertTrue(newContext3.isSubsequent());
        LinkedHashMap linkedHashMap4 = new LinkedHashMap();
        linkedHashMap4.put(topicIdPartition3, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        Assertions.assertEquals(Errors.NONE, newContext3.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), linkedHashMap4).error());
        Assertions.assertEquals(new HashSet(Arrays.asList(topicIdPartition, topicIdPartition2, topicIdPartition3)), new HashSet(build.cachedTopicIdPartitionsInShareSession("grp", randomUuid3)));
        ShareSessionContext newContext4 = build.newContext("grp", Collections.singletonMap(topicIdPartition4, new ShareFetchRequest.SharePartitionData(topicIdPartition4.topicId(), 100)), Collections.singletonList(topicIdPartition3), new ShareFetchMetadata(shareSessionKey2.memberId(), 1));
        Assertions.assertEquals(ShareSessionContext.class, newContext4.getClass());
        Assertions.assertTrue(newContext4.isSubsequent());
        LinkedHashMap linkedHashMap5 = new LinkedHashMap();
        linkedHashMap5.put(topicIdPartition4, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        Assertions.assertEquals(Errors.NONE, newContext4.updateAndGenerateResponseData("grp", shareFetchMetadata2.memberId(), linkedHashMap5).error());
        Assertions.assertEquals(Collections.singletonList(topicIdPartition4), build.cachedTopicIdPartitionsInShareSession("grp", randomUuid4));
        ShareFetchContext newContext5 = build.newContext("grp", Collections.emptyMap(), EMPTY_PART_LIST, new ShareFetchMetadata(shareFetchMetadata.memberId(), -1));
        Assertions.assertEquals(FinalContext.class, newContext5.getClass());
        Assertions.assertEquals(Errors.NONE, newContext5.updateAndGenerateResponseData("grp", shareFetchMetadata.memberId(), new LinkedHashMap()).error());
        Assertions.assertTrue(build.cachedTopicIdPartitionsInShareSession("grp", randomUuid3).isEmpty());
        ShareSessionContext newContext6 = build.newContext("grp", Collections.emptyMap(), Collections.singletonList(topicIdPartition4), new ShareFetchMetadata(shareSessionKey2.memberId(), 2));
        Assertions.assertEquals(ShareSessionContext.class, newContext6.getClass());
        Assertions.assertTrue(newContext6.isSubsequent());
        Assertions.assertEquals(Errors.NONE, newContext6.updateAndGenerateResponseData("grp", shareFetchMetadata2.memberId(), new LinkedHashMap()).error());
        Assertions.assertEquals(Collections.emptyList(), build.cachedTopicIdPartitionsInShareSession("grp", randomUuid4));
    }

    @Test
    public void testSharePartitionKey() {
        SharePartitionManager.SharePartitionKey sharePartitionKey = new SharePartitionManager.SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition(RemoteLogReaderTest.TOPIC, 0)));
        SharePartitionManager.SharePartitionKey sharePartitionKey2 = new SharePartitionManager.SharePartitionKey("mock-group-2", new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition(RemoteLogReaderTest.TOPIC, 0)));
        SharePartitionManager.SharePartitionKey sharePartitionKey3 = new SharePartitionManager.SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(1L, 1L), new TopicPartition("test-1", 0)));
        SharePartitionManager.SharePartitionKey sharePartitionKey4 = new SharePartitionManager.SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition(RemoteLogReaderTest.TOPIC, 1)));
        SharePartitionManager.SharePartitionKey sharePartitionKey5 = new SharePartitionManager.SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(0L, 0L), new TopicPartition("test-2", 0)));
        Assertions.assertEquals(sharePartitionKey, new SharePartitionManager.SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition(RemoteLogReaderTest.TOPIC, 0))));
        Assertions.assertNotEquals(sharePartitionKey, sharePartitionKey2);
        Assertions.assertNotEquals(sharePartitionKey, sharePartitionKey3);
        Assertions.assertNotEquals(sharePartitionKey, sharePartitionKey4);
        Assertions.assertNotEquals(sharePartitionKey, sharePartitionKey5);
        Assertions.assertNotEquals(sharePartitionKey, (Object) null);
    }

    @Test
    public void testMultipleSequentialShareFetches() {
        String str = "grp";
        Uuid randomUuid = Uuid.randomUuid();
        FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), -1, -1L, 0L, 1, 1048576, FetchIsolation.HIGH_WATERMARK, Optional.empty());
        Uuid randomUuid2 = Uuid.randomUuid();
        Uuid randomUuid3 = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid2, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid2, new TopicPartition("foo", 1));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(randomUuid3, new TopicPartition("bar", 0));
        TopicIdPartition topicIdPartition4 = new TopicIdPartition(randomUuid3, new TopicPartition("bar", 1));
        TopicIdPartition topicIdPartition5 = new TopicIdPartition(randomUuid2, new TopicPartition("foo", 2));
        TopicIdPartition topicIdPartition6 = new TopicIdPartition(randomUuid3, new TopicPartition("bar", 2));
        TopicIdPartition topicIdPartition7 = new TopicIdPartition(randomUuid2, new TopicPartition("foo", 3));
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition2, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition3, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition4, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition5, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition6, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition7, Integer.valueOf(PARTITION_MAX_BYTES));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        Time time = (Time) Mockito.mock(Time.class);
        Mockito.when(Long.valueOf(time.hiResClockMs())).thenReturn(0L).thenReturn(100L);
        Metrics metrics = new Metrics();
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withReplicaManager(replicaManager).withTime(time).withMetrics(metrics).build();
        ((ReplicaManager) Mockito.doAnswer(invocationOnMock -> {
            build.releaseFetchQueueAndPartitionsLock(str, hashMap.keySet());
            return null;
        }).when(replicaManager)).fetchMessages((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), (Function1) ArgumentMatchers.any());
        build.fetchMessages("grp", randomUuid.toString(), fetchParams, Arrays.asList(topicIdPartition, topicIdPartition2, topicIdPartition3, topicIdPartition4), hashMap);
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(1))).fetchMessages((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), (Function1) ArgumentMatchers.any());
        build.fetchMessages("grp", randomUuid.toString(), fetchParams, Collections.singletonList(topicIdPartition5), hashMap);
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(2))).fetchMessages((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), (Function1) ArgumentMatchers.any());
        build.fetchMessages("grp", randomUuid.toString(), fetchParams, Arrays.asList(topicIdPartition6, topicIdPartition7), hashMap);
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(3))).fetchMessages((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), (Function1) ArgumentMatchers.any());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(metrics.metricName("partition-load-time-avg", "share-group-metrics"), d -> {
            Assertions.assertEquals(d.intValue(), 14, "partition-load-time-avg");
        });
        hashMap2.put(metrics.metricName("partition-load-time-max", "share-group-metrics"), d2 -> {
            Assertions.assertEquals(d2, 100.0d, "partition-load-time-max");
        });
        hashMap2.forEach((metricName, consumer) -> {
            Assertions.assertTrue(metrics.metrics().containsKey(metricName));
            consumer.accept((Double) ((KafkaMetric) metrics.metrics().get(metricName)).metricValue());
        });
    }

    @Test
    public void testProcessFetchResponse() {
        String str = "grp";
        String uuid = Uuid.randomUuid().toString();
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition2, Integer.valueOf(PARTITION_MAX_BYTES));
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        concurrentHashMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition), sharePartitionKey -> {
            return new SharePartition(str, topicIdPartition, MAX_IN_FLIGHT_MESSAGES, MAX_DELIVERY_COUNT, RECORD_LOCK_DURATION_MS, mockTimer, new MockTime(), NoOpShareStatePersister.getInstance());
        });
        concurrentHashMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition2), sharePartitionKey2 -> {
            return new SharePartition(str, topicIdPartition2, MAX_IN_FLIGHT_MESSAGES, MAX_DELIVERY_COUNT, RECORD_LOCK_DURATION_MS, mockTimer, new MockTime(), NoOpShareStatePersister.getInstance());
        });
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withPartitionCacheMap(concurrentHashMap).build();
        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData(new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), -1, -1L, 0L, 1, 1048576, FetchIsolation.HIGH_WATERMARK, Optional.empty()), "grp", uuid, Arrays.asList(topicIdPartition, topicIdPartition2), new CompletableFuture(), hashMap);
        MemoryRecords withRecords = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord[]{new SimpleRecord("0".getBytes(), "v".getBytes()), new SimpleRecord("1".getBytes(), "v".getBytes()), new SimpleRecord("2".getBytes(), "v".getBytes()), new SimpleRecord((byte[]) null, "value".getBytes())});
        MemoryRecords withRecords2 = MemoryRecords.withRecords(100L, Compression.NONE, new SimpleRecord[]{new SimpleRecord("0".getBytes(), "v".getBytes()), new SimpleRecord("1".getBytes(), "v".getBytes()), new SimpleRecord("2".getBytes(), "v".getBytes()), new SimpleRecord((byte[]) null, "value".getBytes())});
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Tuple2(topicIdPartition, new FetchPartitionData(Errors.NONE, 0L, 0L, withRecords, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)));
        arrayList.add(new Tuple2(topicIdPartition2, new FetchPartitionData(Errors.NONE, 0L, 100L, withRecords2, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)));
        CompletableFuture processFetchResponse = build.processFetchResponse(shareFetchPartitionData, arrayList);
        Assertions.assertTrue(processFetchResponse.isDone());
        Map map = (Map) processFetchResponse.join();
        Assertions.assertEquals(2, map.size());
        Assertions.assertTrue(map.containsKey(topicIdPartition));
        Assertions.assertTrue(map.containsKey(topicIdPartition2));
        Assertions.assertEquals(0, ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition)).partitionIndex());
        Assertions.assertEquals(1, ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition2)).partitionIndex());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition)).errorCode());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition2)).errorCode());
        Assertions.assertEquals(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0L).setLastOffset(3L).setDeliveryCount((short) 1)), ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition)).acquiredRecords());
        Assertions.assertEquals(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(100L).setLastOffset(103L).setDeliveryCount((short) 1)), ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition2)).acquiredRecords());
    }

    @Test
    public void testProcessFetchResponseWithEmptyRecords() {
        String str = "grp";
        String uuid = Uuid.randomUuid().toString();
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition2, Integer.valueOf(PARTITION_MAX_BYTES));
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        concurrentHashMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition), sharePartitionKey -> {
            return new SharePartition(str, topicIdPartition, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, RECORD_LOCK_DURATION_MS, mockTimer, new MockTime(), NoOpShareStatePersister.getInstance());
        });
        concurrentHashMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition2), sharePartitionKey2 -> {
            return new SharePartition(str, topicIdPartition2, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, RECORD_LOCK_DURATION_MS, mockTimer, new MockTime(), NoOpShareStatePersister.getInstance());
        });
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withPartitionCacheMap(concurrentHashMap).build();
        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData(new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), -1, -1L, 0L, 1, 1048576, FetchIsolation.HIGH_WATERMARK, Optional.empty()), "grp", uuid, Arrays.asList(topicIdPartition, topicIdPartition2), new CompletableFuture(), hashMap);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Tuple2(topicIdPartition, new FetchPartitionData(Errors.NONE, 0L, 0L, MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)));
        arrayList.add(new Tuple2(topicIdPartition2, new FetchPartitionData(Errors.NONE, 0L, 0L, MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)));
        CompletableFuture processFetchResponse = build.processFetchResponse(shareFetchPartitionData, arrayList);
        Assertions.assertTrue(processFetchResponse.isDone());
        Map map = (Map) processFetchResponse.join();
        Assertions.assertEquals(2, map.size());
        Assertions.assertTrue(map.containsKey(topicIdPartition));
        Assertions.assertTrue(map.containsKey(topicIdPartition2));
        Assertions.assertEquals(0, ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition)).partitionIndex());
        Assertions.assertEquals(1, ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition2)).partitionIndex());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition)).errorCode());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition2)).errorCode());
        Assertions.assertEquals(Collections.emptyList(), ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition)).acquiredRecords());
        Assertions.assertEquals(Collections.emptyList(), ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition2)).acquiredRecords());
    }

    @Test
    public void testMultipleConcurrentShareFetches() throws InterruptedException {
        String str = "grp";
        Uuid randomUuid = Uuid.randomUuid();
        FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), -1, -1L, 0L, 1, 1048576, FetchIsolation.HIGH_WATERMARK, Optional.empty());
        Uuid randomUuid2 = Uuid.randomUuid();
        Uuid randomUuid3 = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid2, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid2, new TopicPartition("foo", 1));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(randomUuid3, new TopicPartition("bar", 0));
        TopicIdPartition topicIdPartition4 = new TopicIdPartition(randomUuid3, new TopicPartition("bar", 1));
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition2, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition3, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition4, Integer.valueOf(PARTITION_MAX_BYTES));
        MockTime mockTime = new MockTime(0L, System.currentTimeMillis(), 0L);
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        concurrentHashMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition), sharePartitionKey -> {
            return new SharePartition(str, topicIdPartition, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, RECORD_LOCK_DURATION_MS, mockTimer, mockTime, NoOpShareStatePersister.getInstance());
        });
        concurrentHashMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition2), sharePartitionKey2 -> {
            return new SharePartition(str, topicIdPartition2, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, RECORD_LOCK_DURATION_MS, mockTimer, mockTime, NoOpShareStatePersister.getInstance());
        });
        concurrentHashMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition3), sharePartitionKey3 -> {
            return new SharePartition(str, topicIdPartition3, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, RECORD_LOCK_DURATION_MS, mockTimer, mockTime, NoOpShareStatePersister.getInstance());
        });
        concurrentHashMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition4), sharePartitionKey4 -> {
            return new SharePartition(str, topicIdPartition4, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, RECORD_LOCK_DURATION_MS, mockTimer, mockTime, NoOpShareStatePersister.getInstance());
        });
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withPartitionCacheMap(concurrentHashMap).withTime(mockTime).withReplicaManager(replicaManager).build();
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition2 = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition3 = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition4 = (SharePartition) Mockito.mock(SharePartition.class);
        Mockito.when(Long.valueOf(sharePartition.nextFetchOffset())).thenReturn(1L, new Long[]{15L, 6L, 30L, 25L});
        Mockito.when(Long.valueOf(sharePartition2.nextFetchOffset())).thenReturn(4L, new Long[]{1L, 18L, 5L});
        Mockito.when(Long.valueOf(sharePartition3.nextFetchOffset())).thenReturn(10L, new Long[]{25L, 26L});
        Mockito.when(Long.valueOf(sharePartition4.nextFetchOffset())).thenReturn(20L, new Long[]{15L, 23L, 16L});
        ((ReplicaManager) Mockito.doAnswer(invocationOnMock -> {
            Assertions.assertEquals(1L, sharePartition.nextFetchOffset());
            Assertions.assertEquals(4L, sharePartition2.nextFetchOffset());
            Assertions.assertEquals(10L, sharePartition3.nextFetchOffset());
            Assertions.assertEquals(20L, sharePartition4.nextFetchOffset());
            build.releaseFetchQueueAndPartitionsLock(str, hashMap.keySet());
            return null;
        }).doAnswer(invocationOnMock2 -> {
            Assertions.assertEquals(15L, sharePartition.nextFetchOffset());
            Assertions.assertEquals(1L, sharePartition2.nextFetchOffset());
            Assertions.assertEquals(25L, sharePartition3.nextFetchOffset());
            Assertions.assertEquals(15L, sharePartition4.nextFetchOffset());
            build.releaseFetchQueueAndPartitionsLock(str, hashMap.keySet());
            return null;
        }).doAnswer(invocationOnMock3 -> {
            Assertions.assertEquals(6L, sharePartition.nextFetchOffset());
            Assertions.assertEquals(18L, sharePartition2.nextFetchOffset());
            Assertions.assertEquals(26L, sharePartition3.nextFetchOffset());
            Assertions.assertEquals(23L, sharePartition4.nextFetchOffset());
            build.releaseFetchQueueAndPartitionsLock(str, hashMap.keySet());
            return null;
        }).doAnswer(invocationOnMock4 -> {
            Assertions.assertEquals(30L, sharePartition.nextFetchOffset());
            Assertions.assertEquals(5L, sharePartition2.nextFetchOffset());
            Assertions.assertEquals(26L, sharePartition3.nextFetchOffset());
            Assertions.assertEquals(16L, sharePartition4.nextFetchOffset());
            build.releaseFetchQueueAndPartitionsLock(str, hashMap.keySet());
            return null;
        }).doAnswer(invocationOnMock5 -> {
            Assertions.assertEquals(25L, sharePartition.nextFetchOffset());
            Assertions.assertEquals(5L, sharePartition2.nextFetchOffset());
            Assertions.assertEquals(26L, sharePartition3.nextFetchOffset());
            Assertions.assertEquals(16L, sharePartition4.nextFetchOffset());
            build.releaseFetchQueueAndPartitionsLock(str, hashMap.keySet());
            return null;
        }).when(replicaManager)).fetchMessages((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), (Function1) ArgumentMatchers.any());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(100);
        for (int i = 0; i != 100; i++) {
            try {
                newFixedThreadPool.submit(() -> {
                    build.fetchMessages(str, randomUuid.toString(), fetchParams, Arrays.asList(topicIdPartition, topicIdPartition2, topicIdPartition3, topicIdPartition4), hashMap);
                });
                if (i % 10 == 0) {
                    newFixedThreadPool.awaitTermination(50L, TimeUnit.MILLISECONDS);
                }
            } finally {
                if (!newFixedThreadPool.awaitTermination(50L, TimeUnit.MILLISECONDS)) {
                    newFixedThreadPool.shutdown();
                }
            }
        }
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.atMost(100))).fetchMessages((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), (Function1) ArgumentMatchers.any());
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.atLeast(10))).fetchMessages((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), (Function1) ArgumentMatchers.any());
    }

    @Test
    public void testReplicaManagerFetchShouldNotProceed() {
        Uuid randomUuid = Uuid.randomUuid();
        FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), -1, -1L, 0L, 1, 1048576, FetchIsolation.HIGH_WATERMARK, Optional.empty());
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        Mockito.when(Boolean.valueOf(sharePartition.maybeAcquireFetchLock())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sharePartition.canAcquireRecords())).thenReturn(false);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition), sharePartition);
        CompletableFuture fetchMessages = SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap2).withReplicaManager(replicaManager).build().fetchMessages("grp", randomUuid.toString(), fetchParams, Collections.singletonList(topicIdPartition), hashMap);
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(0))).fetchMessages((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), (Function1) ArgumentMatchers.any());
        Assertions.assertEquals(0, ((Map) fetchMessages.join()).size());
    }

    @Test
    public void testReplicaManagerFetchShouldProceed() {
        Uuid randomUuid = Uuid.randomUuid();
        FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), -1, -1L, 0L, 1, 1048576, FetchIsolation.HIGH_WATERMARK, Optional.empty());
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        Mockito.when(Boolean.valueOf(sharePartition.maybeAcquireFetchLock())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sharePartition.canAcquireRecords())).thenReturn(true);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition), sharePartition);
        SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap2).withReplicaManager(replicaManager).build().fetchMessages("grp", randomUuid.toString(), fetchParams, Collections.singletonList(topicIdPartition), hashMap);
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(1))).fetchMessages((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), (Function1) ArgumentMatchers.any());
    }

    @Test
    public void testCloseSharePartitionManager() throws Exception {
        Timer timer = (Timer) Mockito.mock(SystemTimerReaper.class);
        Persister persister = (Persister) Mockito.mock(Persister.class);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withTimer(timer).withShareGroupPersister(persister).build();
        ((Timer) Mockito.verify(timer, Mockito.times(0))).close();
        ((Persister) Mockito.verify(persister, Mockito.times(0))).stop();
        build.close();
        ((Timer) Mockito.verify(timer, Mockito.times(1))).close();
        ((Persister) Mockito.verify(persister, Mockito.times(1))).stop();
    }

    @Test
    public void testReleaseAcquiredRecordsSuccess() {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 2));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("baz", 4));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition2 = (SharePartition) Mockito.mock(SharePartition.class);
        Mockito.when(sharePartition.releaseAcquiredRecords((String) ArgumentMatchers.eq(randomUuid.toString()))).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
        Mockito.when(sharePartition2.releaseAcquiredRecords((String) ArgumentMatchers.eq(randomUuid.toString()))).thenReturn(CompletableFuture.completedFuture(Optional.of(new InvalidRecordStateException("Unable to release acquired records for the batch"))));
        ShareSessionCache shareSessionCache = (ShareSessionCache) Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession) Mockito.mock(ShareSession.class);
        Mockito.when(shareSessionCache.get(new ShareSessionKey("grp", randomUuid))).thenReturn(shareSession);
        ImplicitLinkedHashCollection implicitLinkedHashCollection = new ImplicitLinkedHashCollection(3);
        implicitLinkedHashCollection.add(new CachedSharePartition(topicIdPartition));
        implicitLinkedHashCollection.add(new CachedSharePartition(topicIdPartition2));
        implicitLinkedHashCollection.add(new CachedSharePartition(topicIdPartition3));
        Mockito.when(shareSession.partitionMap()).thenReturn(implicitLinkedHashCollection);
        HashMap hashMap = new HashMap();
        hashMap.put(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition), sharePartition);
        hashMap.put(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition2), sharePartition2);
        Map map = (Map) SharePartitionManagerBuilder.builder().withCache(shareSessionCache).withPartitionCacheMap(hashMap).build().releaseAcquiredRecords("grp", randomUuid.toString()).join();
        Assertions.assertEquals(3, map.size());
        Assertions.assertTrue(map.containsKey(topicIdPartition));
        Assertions.assertTrue(map.containsKey(topicIdPartition2));
        Assertions.assertTrue(map.containsKey(topicIdPartition3));
        Assertions.assertEquals(0, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).partitionIndex());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).errorCode());
        Assertions.assertEquals(2, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition2)).partitionIndex());
        Assertions.assertEquals(Errors.INVALID_RECORD_STATE.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition2)).errorCode());
        Assertions.assertEquals(4, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition3)).partitionIndex());
        Assertions.assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition3)).errorCode());
    }

    @Test
    public void testReleaseAcquiredRecordsWithIncorrectGroupId() {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        ShareSessionCache shareSessionCache = (ShareSessionCache) Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession) Mockito.mock(ShareSession.class);
        Mockito.when(shareSessionCache.get(new ShareSessionKey("grp", randomUuid))).thenReturn(shareSession);
        ImplicitLinkedHashCollection implicitLinkedHashCollection = new ImplicitLinkedHashCollection(3);
        implicitLinkedHashCollection.add(new CachedSharePartition(topicIdPartition));
        Mockito.when(shareSession.partitionMap()).thenReturn(implicitLinkedHashCollection);
        Assertions.assertTrue(((Map) SharePartitionManagerBuilder.builder().withCache(shareSessionCache).build().releaseAcquiredRecords("grp-2", randomUuid.toString()).join()).isEmpty());
    }

    @Test
    public void testReleaseAcquiredRecordsWithIncorrectMemberId() {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        ShareSessionCache shareSessionCache = (ShareSessionCache) Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession) Mockito.mock(ShareSession.class);
        Mockito.when(shareSessionCache.get(new ShareSessionKey("grp", Uuid.randomUuid()))).thenReturn(shareSession);
        ImplicitLinkedHashCollection implicitLinkedHashCollection = new ImplicitLinkedHashCollection(3);
        implicitLinkedHashCollection.add(new CachedSharePartition(topicIdPartition));
        Mockito.when(shareSession.partitionMap()).thenReturn(implicitLinkedHashCollection);
        Assertions.assertTrue(((Map) SharePartitionManagerBuilder.builder().withCache(shareSessionCache).build().releaseAcquiredRecords("grp", randomUuid.toString()).join()).isEmpty());
    }

    @Test
    public void testReleaseAcquiredRecordsWithEmptyTopicPartitions() {
        String uuid = Uuid.randomUuid().toString();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 2));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition2 = (SharePartition) Mockito.mock(SharePartition.class);
        Mockito.when(sharePartition.releaseAcquiredRecords((String) ArgumentMatchers.eq(uuid))).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
        Mockito.when(sharePartition2.releaseAcquiredRecords((String) ArgumentMatchers.eq(uuid))).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
        HashMap hashMap = new HashMap();
        hashMap.put(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition), sharePartition);
        hashMap.put(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition2), sharePartition2);
        Assertions.assertEquals(0, ((Map) SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap).build().releaseAcquiredRecords("grp", uuid).join()).size());
    }

    @Test
    public void testAcknowledgeSinglePartition() {
        String uuid = Uuid.randomUuid().toString();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        Mockito.when(sharePartition.acknowledge((String) ArgumentMatchers.eq(uuid), (List) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
        HashMap hashMap = new HashMap();
        hashMap.put(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition), sharePartition);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap).build();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicIdPartition, Arrays.asList(new ShareAcknowledgementBatch(12L, 20L, Collections.singletonList((byte) 1)), new ShareAcknowledgementBatch(24L, 56L, Collections.singletonList((byte) 1))));
        Map map = (Map) build.acknowledge(uuid, "grp", hashMap2).join();
        Assertions.assertEquals(1, map.size());
        Assertions.assertTrue(map.containsKey(topicIdPartition));
        Assertions.assertEquals(0, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).partitionIndex());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).errorCode());
    }

    @Test
    public void testAcknowledgeMultiplePartition() {
        String uuid = Uuid.randomUuid().toString();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
        TopicIdPartition topicIdPartition3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition2 = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition3 = (SharePartition) Mockito.mock(SharePartition.class);
        Mockito.when(sharePartition.acknowledge((String) ArgumentMatchers.eq(uuid), (List) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
        Mockito.when(sharePartition2.acknowledge((String) ArgumentMatchers.eq(uuid), (List) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
        Mockito.when(sharePartition3.acknowledge((String) ArgumentMatchers.eq(uuid), (List) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
        HashMap hashMap = new HashMap();
        hashMap.put(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition), sharePartition);
        hashMap.put(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition2), sharePartition2);
        hashMap.put(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition3), sharePartition3);
        Metrics metrics = new Metrics();
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap).withMetrics(metrics).build();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicIdPartition, Arrays.asList(new ShareAcknowledgementBatch(12L, 20L, Collections.singletonList((byte) 1)), new ShareAcknowledgementBatch(24L, 56L, Collections.singletonList((byte) 1))));
        hashMap2.put(topicIdPartition2, Arrays.asList(new ShareAcknowledgementBatch(15L, 26L, Collections.singletonList((byte) 2)), new ShareAcknowledgementBatch(34L, 56L, Collections.singletonList((byte) 2))));
        hashMap2.put(topicIdPartition3, Arrays.asList(new ShareAcknowledgementBatch(4L, 15L, Collections.singletonList((byte) 3)), new ShareAcknowledgementBatch(16L, 21L, Collections.singletonList((byte) 3))));
        Map map = (Map) build.acknowledge(uuid, "grp", hashMap2).join();
        Assertions.assertEquals(3, map.size());
        Assertions.assertTrue(map.containsKey(topicIdPartition));
        Assertions.assertTrue(map.containsKey(topicIdPartition2));
        Assertions.assertTrue(map.containsKey(topicIdPartition3));
        Assertions.assertEquals(0, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).partitionIndex());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).errorCode());
        Assertions.assertEquals(0, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition2)).partitionIndex());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition2)).errorCode());
        Assertions.assertEquals(0, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition3)).partitionIndex());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition3)).errorCode());
        HashMap hashMap3 = new HashMap();
        hashMap3.put(metrics.metricName("share-acknowledgement-count", "share-group-metrics"), d -> {
            Assertions.assertEquals(d, 1.0d);
        });
        hashMap3.put(metrics.metricName("share-acknowledgement-rate", "share-group-metrics"), d2 -> {
            Assertions.assertTrue(d2.doubleValue() > 0.0d);
        });
        hashMap3.put(metrics.metricName("record-acknowledgement-count", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.ACCEPT.toString())), d3 -> {
            Assertions.assertEquals(2.0d, d3);
        });
        hashMap3.put(metrics.metricName("record-acknowledgement-count", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.RELEASE.toString())), d4 -> {
            Assertions.assertEquals(2.0d, d4);
        });
        hashMap3.put(metrics.metricName("record-acknowledgement-count", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.REJECT.toString())), d5 -> {
            Assertions.assertEquals(2.0d, d5);
        });
        hashMap3.put(metrics.metricName("record-acknowledgement-rate", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.ACCEPT.toString())), d6 -> {
            Assertions.assertTrue(d6.doubleValue() > 0.0d);
        });
        hashMap3.put(metrics.metricName("record-acknowledgement-rate", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.RELEASE.toString())), d7 -> {
            Assertions.assertTrue(d7.doubleValue() > 0.0d);
        });
        hashMap3.put(metrics.metricName("record-acknowledgement-rate", "share-group-metrics", Collections.singletonMap("ack-type", AcknowledgeType.REJECT.toString())), d8 -> {
            Assertions.assertTrue(d8.doubleValue() > 0.0d);
        });
        hashMap3.forEach((metricName, consumer) -> {
            Assertions.assertTrue(metrics.metrics().containsKey(metricName));
            consumer.accept((Double) ((KafkaMetric) metrics.metrics().get(metricName)).metricValue());
        });
    }

    @Test
    public void testAcknowledgeIncorrectGroupId() {
        String uuid = Uuid.randomUuid().toString();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        HashMap hashMap = new HashMap();
        hashMap.put(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition), sharePartition);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap).build();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicIdPartition, Arrays.asList(new ShareAcknowledgementBatch(12L, 20L, Collections.singletonList((byte) 1)), new ShareAcknowledgementBatch(24L, 56L, Collections.singletonList((byte) 1))));
        Map map = (Map) build.acknowledge(uuid, "grp2", hashMap2).join();
        Assertions.assertEquals(1, map.size());
        Assertions.assertTrue(map.containsKey(topicIdPartition));
        Assertions.assertEquals(0, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).partitionIndex());
        Assertions.assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).errorCode());
    }

    @Test
    public void testAcknowledgeIncorrectMemberId() {
        String uuid = Uuid.randomUuid().toString();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        Mockito.when(sharePartition.acknowledge((String) ArgumentMatchers.eq(uuid), (List) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Optional.of(new InvalidRequestException("Member is not the owner of batch record"))));
        HashMap hashMap = new HashMap();
        hashMap.put(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition), sharePartition);
        SharePartitionManager build = SharePartitionManagerBuilder.builder().withPartitionCacheMap(hashMap).build();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicIdPartition, Arrays.asList(new ShareAcknowledgementBatch(12L, 20L, Collections.singletonList((byte) 1)), new ShareAcknowledgementBatch(24L, 56L, Collections.singletonList((byte) 1))));
        Map map = (Map) build.acknowledge(uuid, "grp", hashMap2).join();
        Assertions.assertEquals(1, map.size());
        Assertions.assertTrue(map.containsKey(topicIdPartition));
        Assertions.assertEquals(0, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).partitionIndex());
        Assertions.assertEquals(Errors.INVALID_REQUEST.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).errorCode());
    }

    @Test
    public void testAcknowledgeEmptyPartitionCacheMap() {
        String uuid = Uuid.randomUuid().toString();
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo4", 3));
        SharePartitionManager build = SharePartitionManagerBuilder.builder().build();
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Arrays.asList(new ShareAcknowledgementBatch(78L, 90L, Collections.singletonList((byte) 2)), new ShareAcknowledgementBatch(94L, 99L, Collections.singletonList((byte) 2))));
        Map map = (Map) build.acknowledge(uuid, "grp", hashMap).join();
        Assertions.assertEquals(1, map.size());
        Assertions.assertTrue(map.containsKey(topicIdPartition));
        Assertions.assertEquals(3, ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).partitionIndex());
        Assertions.assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), ((ShareAcknowledgeResponseData.PartitionData) map.get(topicIdPartition)).errorCode());
    }

    @Test
    public void testProcessFetchResponseWithLsoMovementForTopicPartition() {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("foo", 0));
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid, new TopicPartition("foo", 1));
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        hashMap.put(topicIdPartition2, Integer.valueOf(PARTITION_MAX_BYTES));
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        SharePartition sharePartition = (SharePartition) Mockito.mock(SharePartition.class);
        SharePartition sharePartition2 = (SharePartition) Mockito.mock(SharePartition.class);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        concurrentHashMap.put(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition), sharePartition);
        concurrentHashMap.put(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition2), sharePartition2);
        SharePartitionManager sharePartitionManager = (SharePartitionManager) Mockito.spy(SharePartitionManagerBuilder.builder().withPartitionCacheMap(concurrentHashMap).withReplicaManager(replicaManager).build());
        ((SharePartitionManager) Mockito.doReturn(1L).when(sharePartitionManager)).offsetForEarliestTimestamp((TopicIdPartition) ArgumentMatchers.any(TopicIdPartition.class));
        Mockito.when(Long.valueOf(sharePartition.nextFetchOffset())).thenReturn(0L, new Long[]{5L});
        Mockito.when(Long.valueOf(sharePartition2.nextFetchOffset())).thenReturn(4L, new Long[]{4L});
        Mockito.when(sharePartition.acquire((String) ArgumentMatchers.any(), (FetchPartitionData) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Collections.emptyList()), new CompletableFuture[]{CompletableFuture.completedFuture(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0L).setLastOffset(3L).setDeliveryCount((short) 1)))});
        Mockito.when(sharePartition2.acquire((String) ArgumentMatchers.any(), (FetchPartitionData) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(100L).setLastOffset(103L).setDeliveryCount((short) 1))), new CompletableFuture[]{CompletableFuture.completedFuture(Collections.emptyList())});
        ((SharePartition) Mockito.doNothing().when(sharePartition2)).updateCacheAndOffsets(((Long) ArgumentMatchers.any(Long.class)).longValue());
        ((SharePartition) Mockito.doNothing().when(sharePartition)).updateCacheAndOffsets(((Long) ArgumentMatchers.any(Long.class)).longValue());
        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData(new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), -1, -1L, 0L, 1, 1048576, FetchIsolation.HIGH_WATERMARK, Optional.empty()), "grp", Uuid.randomUuid().toString(), Arrays.asList(topicIdPartition, topicIdPartition2), new CompletableFuture(), hashMap);
        MemoryRecords withRecords = MemoryRecords.withRecords(Compression.NONE, new SimpleRecord[]{new SimpleRecord("0".getBytes(), "v".getBytes()), new SimpleRecord("1".getBytes(), "v".getBytes()), new SimpleRecord("2".getBytes(), "v".getBytes()), new SimpleRecord((byte[]) null, "value".getBytes())});
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Tuple2(topicIdPartition, new FetchPartitionData(Errors.OFFSET_OUT_OF_RANGE, 0L, 0L, MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)));
        arrayList.add(new Tuple2(topicIdPartition2, new FetchPartitionData(Errors.NONE, 0L, 0L, withRecords, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)));
        CompletableFuture processFetchResponse = sharePartitionManager.processFetchResponse(shareFetchPartitionData, arrayList);
        Assertions.assertTrue(processFetchResponse.isDone());
        Map map = (Map) processFetchResponse.join();
        Assertions.assertEquals(2, map.size());
        Assertions.assertTrue(map.containsKey(topicIdPartition));
        Assertions.assertTrue(map.containsKey(topicIdPartition2));
        Assertions.assertEquals(0, ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition)).partitionIndex());
        Assertions.assertEquals(1, ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition2)).partitionIndex());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition)).errorCode());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareFetchResponseData.PartitionData) map.get(topicIdPartition2)).errorCode());
        ((SharePartition) Mockito.verify(sharePartition, Mockito.times(1))).updateCacheAndOffsets(((Long) ArgumentMatchers.any(Long.class)).longValue());
        ((SharePartition) Mockito.verify(sharePartition2, Mockito.times(0))).updateCacheAndOffsets(((Long) ArgumentMatchers.any(Long.class)).longValue());
        MemoryRecords withRecords2 = MemoryRecords.withRecords(100L, Compression.NONE, new SimpleRecord[]{new SimpleRecord("0".getBytes(), "v".getBytes()), new SimpleRecord("1".getBytes(), "v".getBytes()), new SimpleRecord("2".getBytes(), "v".getBytes()), new SimpleRecord((byte[]) null, "value".getBytes())});
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new Tuple2(topicIdPartition, new FetchPartitionData(Errors.NONE, 0L, 0L, withRecords2, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)));
        arrayList2.add(new Tuple2(topicIdPartition2, new FetchPartitionData(Errors.NONE, 0L, 0L, MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)));
        CompletableFuture processFetchResponse2 = sharePartitionManager.processFetchResponse(shareFetchPartitionData, arrayList2);
        Assertions.assertTrue(processFetchResponse2.isDone());
        Map map2 = (Map) processFetchResponse2.join();
        Assertions.assertEquals(2, map2.size());
        Assertions.assertTrue(map2.containsKey(topicIdPartition));
        Assertions.assertTrue(map2.containsKey(topicIdPartition2));
        Assertions.assertEquals(0, ((ShareFetchResponseData.PartitionData) map2.get(topicIdPartition)).partitionIndex());
        Assertions.assertEquals(1, ((ShareFetchResponseData.PartitionData) map2.get(topicIdPartition2)).partitionIndex());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareFetchResponseData.PartitionData) map2.get(topicIdPartition)).errorCode());
        Assertions.assertEquals(Errors.NONE.code(), ((ShareFetchResponseData.PartitionData) map2.get(topicIdPartition2)).errorCode());
        ((SharePartition) Mockito.verify(sharePartition, Mockito.times(1))).updateCacheAndOffsets(((Long) ArgumentMatchers.any(Long.class)).longValue());
        ((SharePartition) Mockito.verify(sharePartition2, Mockito.times(0))).updateCacheAndOffsets(((Long) ArgumentMatchers.any(Long.class)).longValue());
    }

    @Test
    public void testFetchQueueProcessingWhenFrontItemIsEmpty() {
        String str = "grp";
        String uuid = Uuid.randomUuid().toString();
        FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), -1, -1L, 0L, 1, 1048576, FetchIsolation.HIGH_WATERMARK, Optional.empty());
        TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        HashMap hashMap = new HashMap();
        hashMap.put(topicIdPartition, Integer.valueOf(PARTITION_MAX_BYTES));
        MockTime mockTime = new MockTime();
        ReplicaManager replicaManager = (ReplicaManager) Mockito.mock(ReplicaManager.class);
        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData(fetchParams, "grp", uuid, Collections.emptyList(), new CompletableFuture(), hashMap);
        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData2 = new SharePartitionManager.ShareFetchPartitionData(fetchParams, "grp", uuid, Collections.singletonList(topicIdPartition), new CompletableFuture(), hashMap);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        concurrentHashMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey("grp", topicIdPartition), sharePartitionKey -> {
            return new SharePartition(str, topicIdPartition, MAX_IN_FLIGHT_MESSAGES, MAX_DELIVERY_COUNT, RECORD_LOCK_DURATION_MS, mockTimer, mockTime, NoOpShareStatePersister.getInstance());
        });
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(shareFetchPartitionData);
        concurrentLinkedQueue.add(shareFetchPartitionData2);
        SharePartitionManagerBuilder.builder().withPartitionCacheMap(concurrentHashMap).withReplicaManager(replicaManager).withTime(mockTime).withFetchQueue(concurrentLinkedQueue).build().maybeProcessFetchQueue();
        ((ReplicaManager) Mockito.verify(replicaManager, Mockito.times(1))).fetchMessages((FetchParams) ArgumentMatchers.any(), (Seq) ArgumentMatchers.any(), (ReplicaQuota) ArgumentMatchers.any(ReplicaQuota.class), (Function1) ArgumentMatchers.any());
    }

    private ShareFetchResponseData.PartitionData noErrorShareFetchResponse() {
        return new ShareFetchResponseData.PartitionData().setPartitionIndex(0);
    }

    private ShareFetchResponseData.PartitionData errorShareFetchResponse(Short sh) {
        return new ShareFetchResponseData.PartitionData().setPartitionIndex(0).setErrorCode(sh.shortValue());
    }

    private void mockUpdateAndGenerateResponseData(ShareFetchContext shareFetchContext, String str, Uuid uuid) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        if (shareFetchContext.getClass() == ShareSessionContext.class) {
            ShareSessionContext shareSessionContext = (ShareSessionContext) shareFetchContext;
            if (shareSessionContext.isSubsequent()) {
                synchronized (shareSessionContext.session()) {
                    shareSessionContext.session().partitionMap().forEach(cachedSharePartition -> {
                        TopicIdPartition topicIdPartition = new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
                        linkedHashMap.put(topicIdPartition, topicIdPartition.topic() == null ? errorShareFetchResponse(Short.valueOf(Errors.UNKNOWN_TOPIC_ID.code())) : noErrorShareFetchResponse());
                    });
                }
            } else {
                shareSessionContext.shareFetchData().forEach((topicIdPartition, sharePartitionData) -> {
                });
            }
        }
        shareFetchContext.updateAndGenerateResponseData(str, uuid, linkedHashMap);
    }

    private void assertPartitionsPresent(ShareSessionContext shareSessionContext, List<TopicIdPartition> list) {
        HashSet hashSet = new HashSet();
        if (shareSessionContext.isSubsequent()) {
            shareSessionContext.session().partitionMap().forEach(cachedSharePartition -> {
                hashSet.add(new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition())));
            });
        } else {
            shareSessionContext.shareFetchData().forEach((topicIdPartition, sharePartitionData) -> {
                hashSet.add(topicIdPartition);
            });
        }
        Assertions.assertEquals(new HashSet(list), hashSet);
    }

    private void assertErroneousAndValidTopicIdPartitions(ErroneousAndValidPartitionData erroneousAndValidPartitionData, List<TopicIdPartition> list, List<TopicIdPartition> list2) {
        HashSet hashSet = new HashSet(list);
        HashSet hashSet2 = new HashSet(list2);
        HashSet hashSet3 = new HashSet();
        HashSet hashSet4 = new HashSet();
        erroneousAndValidPartitionData.erroneous().forEach(tuple2 -> {
            hashSet3.add(tuple2._1);
        });
        erroneousAndValidPartitionData.validTopicIdPartitions().forEach(tuple22 -> {
            hashSet4.add(tuple22._1);
        });
        Assertions.assertEquals(hashSet, hashSet3);
        Assertions.assertEquals(hashSet2, hashSet4);
    }
}
