/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
import org.apache.kafka.clients.consumer.internals.GroupState;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

public class CommitRequestManagerTest {
    private SubscriptionState subscriptionState;
    private GroupState groupState;
    private LogContext logContext;
    private MockTime time;
    private CoordinatorRequestManager coordinatorRequestManager;
    private Properties props;

    @BeforeEach
    public void setup() {
        this.logContext = new LogContext();
        this.time = new MockTime(0L);
        this.subscriptionState = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        this.coordinatorRequestManager = (CoordinatorRequestManager)Mockito.mock(CoordinatorRequestManager.class);
        this.groupState = new GroupState("group-1", Optional.empty());
        this.props = new Properties();
        this.props.put("auto.commit.interval.ms", (Object)100);
        this.props.put("key.deserializer", StringDeserializer.class);
        this.props.put("value.deserializer", StringDeserializer.class);
    }

    @Test
    public void testPoll_EnsureManualCommitSent() {
        CommitRequestManager commitRequestManger = this.create(false, 0L);
        this.assertPoll(0, commitRequestManger);
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0L));
        commitRequestManger.addOffsetCommitRequest(offsets);
        this.assertPoll(1, commitRequestManger);
    }

    @Test
    public void testPoll_EnsureAutocommitSent() {
        CommitRequestManager commitRequestManger = this.create(true, 100L);
        this.assertPoll(0, commitRequestManger);
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0L));
        commitRequestManger.updateAutoCommitTimer(this.time.milliseconds());
        Mockito.when((Object)this.subscriptionState.allConsumed()).thenReturn(offsets);
        this.time.sleep(100L);
        commitRequestManger.updateAutoCommitTimer(this.time.milliseconds());
        this.assertPoll(1, commitRequestManger);
    }

    @Test
    public void testPoll_EnsureCorrectInflightRequestBufferSize() {
        CommitRequestManager commitManager = this.create(false, 100L);
        HashMap<TopicPartition, OffsetAndMetadata> offsets1 = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets1.put(new TopicPartition("test", 0), new OffsetAndMetadata(10L));
        offsets1.put(new TopicPartition("test", 1), new OffsetAndMetadata(20L));
        HashMap<TopicPartition, OffsetAndMetadata> offsets2 = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets2.put(new TopicPartition("test", 3), new OffsetAndMetadata(20L));
        offsets2.put(new TopicPartition("test", 4), new OffsetAndMetadata(20L));
        ArrayList<CompletableFuture> commitFutures = new ArrayList<CompletableFuture>();
        ArrayList<CompletableFuture> fetchFutures = new ArrayList<CompletableFuture>();
        commitFutures.add(commitManager.addOffsetCommitRequest(offsets1));
        fetchFutures.add(commitManager.addOffsetFetchRequest(Collections.singleton(new TopicPartition("test", 0))));
        commitFutures.add(commitManager.addOffsetCommitRequest(offsets2));
        fetchFutures.add(commitManager.addOffsetFetchRequest(Collections.singleton(new TopicPartition("test", 1))));
        NetworkClientDelegate.PollResult result = commitManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)4, (int)result.unsentRequests.size());
        Assertions.assertTrue((boolean)result.unsentRequests.stream().anyMatch(r -> r.requestBuilder() instanceof OffsetCommitRequest.Builder));
        Assertions.assertTrue((boolean)result.unsentRequests.stream().anyMatch(r -> r.requestBuilder() instanceof OffsetFetchRequest.Builder));
        Assertions.assertFalse((boolean)commitManager.pendingRequests.hasUnsentRequests());
        Assertions.assertEquals((int)2, (int)commitManager.pendingRequests.inflightOffsetFetches.size());
        commitFutures.forEach(f -> f.complete(null));
        fetchFutures.forEach(f -> f.complete(null));
        Assertions.assertEquals((int)0, (int)commitManager.pendingRequests.inflightOffsetFetches.size());
    }

    @Test
    public void testPoll_EnsureEmptyPendingRequestAfterPoll() {
        CommitRequestManager commitRequestManger = this.create(true, 100L);
        commitRequestManger.addOffsetCommitRequest(new HashMap());
        Assertions.assertEquals((int)1, (int)commitRequestManger.unsentOffsetCommitRequests().size());
        commitRequestManger.poll(this.time.milliseconds());
        Assertions.assertTrue((boolean)commitRequestManger.unsentOffsetCommitRequests().isEmpty());
        CommitRequestManagerTest.assertEmptyPendingRequests(commitRequestManger);
    }

    @Test
    public void testAutocommit_ResendAutocommitAfterException() {
        CommitRequestManager commitRequestManger = this.create(true, 100L);
        this.time.sleep(100L);
        commitRequestManger.updateAutoCommitTimer(this.time.milliseconds());
        List<CompletableFuture<ClientResponse>> futures = this.assertPoll(1, commitRequestManger);
        this.time.sleep(99L);
        futures.get(0).completeExceptionally(new KafkaException("test exception"));
        commitRequestManger.updateAutoCommitTimer(this.time.milliseconds());
        this.assertPoll(0, commitRequestManger);
        this.time.sleep(1L);
        commitRequestManger.updateAutoCommitTimer(this.time.milliseconds());
        this.assertPoll(1, commitRequestManger);
        CommitRequestManagerTest.assertEmptyPendingRequests(commitRequestManger);
    }

    @Test
    public void testAutocommit_EnsureOnlyOneInflightRequest() {
        CommitRequestManager commitRequestManger = this.create(true, 100L);
        this.time.sleep(100L);
        commitRequestManger.updateAutoCommitTimer(this.time.milliseconds());
        List<CompletableFuture<ClientResponse>> futures = this.assertPoll(1, commitRequestManger);
        this.time.sleep(100L);
        commitRequestManger.updateAutoCommitTimer(this.time.milliseconds());
        this.assertPoll(0, commitRequestManger);
        CommitRequestManagerTest.assertEmptyPendingRequests(commitRequestManger);
        futures.get(0).complete(null);
        this.assertPoll(1, commitRequestManger);
    }

    @Test
    public void testOffsetFetchRequest_EnsureDuplicatedRequestSucceed() {
        CommitRequestManager commitRequestManger = this.create(true, 100L);
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        partitions.add(new TopicPartition("t1", 0));
        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures = this.sendAndVerifyDuplicatedRequests(commitRequestManger, partitions, 2, Errors.NONE);
        futures.forEach(f -> {
            Assertions.assertTrue((boolean)f.isDone());
            Assertions.assertFalse((boolean)f.isCompletedExceptionally());
        });
        commitRequestManger.poll(0L);
        CommitRequestManagerTest.assertEmptyPendingRequests(commitRequestManger);
    }

    @ParameterizedTest
    @MethodSource(value={"exceptionSupplier"})
    public void testOffsetFetchRequest_ErroredRequests(Errors error, boolean isRetriable) {
        CommitRequestManager commitRequestManger = this.create(true, 100L);
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        partitions.add(new TopicPartition("t1", 0));
        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures = this.sendAndVerifyDuplicatedRequests(commitRequestManger, partitions, 5, error);
        if (isRetriable) {
            this.testRetriable(commitRequestManger, futures);
        } else {
            this.testNonRetriable(futures);
            CommitRequestManagerTest.assertEmptyPendingRequests(commitRequestManger);
        }
    }

    private void testRetriable(CommitRequestManager commitRequestManger, List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
        futures.forEach(f -> Assertions.assertFalse((boolean)f.isDone()));
        this.time.sleep(500L);
        commitRequestManger.poll(this.time.milliseconds());
        futures.forEach(f -> Assertions.assertFalse((boolean)f.isDone()));
    }

    private void testNonRetriable(List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
        futures.forEach(f -> Assertions.assertTrue((boolean)f.isCompletedExceptionally()));
    }

    private static Stream<Arguments> exceptionSupplier() {
        return Stream.of(Arguments.of((Object[])new Object[]{Errors.NOT_COORDINATOR, true}), Arguments.of((Object[])new Object[]{Errors.COORDINATOR_LOAD_IN_PROGRESS, true}), Arguments.of((Object[])new Object[]{Errors.UNKNOWN_SERVER_ERROR, false}), Arguments.of((Object[])new Object[]{Errors.GROUP_AUTHORIZATION_FAILED, false}), Arguments.of((Object[])new Object[]{Errors.TOPIC_AUTHORIZATION_FAILED, false}));
    }

    @ParameterizedTest
    @MethodSource(value={"partitionDataErrorSupplier"})
    public void testOffsetFetchRequest_PartitionDataError(Errors error, boolean isRetriable) {
        CommitRequestManager commitRequestManger = this.create(true, 100L);
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        TopicPartition tp1 = new TopicPartition("t1", 2);
        TopicPartition tp2 = new TopicPartition("t2", 3);
        partitions.add(tp1);
        partitions.add(tp2);
        CompletableFuture future = commitRequestManger.addOffsetFetchRequest(partitions);
        NetworkClientDelegate.PollResult res = commitRequestManger.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)res.unsentRequests.size());
        HashMap<TopicPartition, OffsetFetchResponse.PartitionData> topicPartitionData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
        topicPartitionData.put(tp1, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", error));
        topicPartitionData.put(tp2, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", Errors.NONE));
        ((NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0)).future().complete(this.buildOffsetFetchClientResponse((NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0), topicPartitionData, Errors.NONE));
        if (isRetriable) {
            this.testRetriable(commitRequestManger, Collections.singletonList(future));
        } else {
            this.testNonRetriable(Collections.singletonList(future));
        }
    }

    private static void assertEmptyPendingRequests(CommitRequestManager commitRequestManger) {
        Assertions.assertTrue((boolean)commitRequestManger.pendingRequests.inflightOffsetFetches.isEmpty());
        Assertions.assertTrue((boolean)commitRequestManger.pendingRequests.unsentOffsetFetches.isEmpty());
        Assertions.assertTrue((boolean)commitRequestManger.pendingRequests.unsentOffsetCommits.isEmpty());
    }

    private static Stream<Arguments> partitionDataErrorSupplier() {
        return Stream.of(Arguments.of((Object[])new Object[]{Errors.UNSTABLE_OFFSET_COMMIT, true}), Arguments.of((Object[])new Object[]{Errors.UNKNOWN_TOPIC_OR_PARTITION, false}), Arguments.of((Object[])new Object[]{Errors.TOPIC_AUTHORIZATION_FAILED, false}), Arguments.of((Object[])new Object[]{Errors.UNKNOWN_SERVER_ERROR, false}));
    }

    private List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> sendAndVerifyDuplicatedRequests(CommitRequestManager commitRequestManger, Set<TopicPartition> partitions, int numRequest, Errors error) {
        ArrayList<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> futures = new ArrayList<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>>();
        for (int i = 0; i < numRequest; ++i) {
            futures.add(commitRequestManger.addOffsetFetchRequest(partitions));
        }
        NetworkClientDelegate.PollResult res = commitRequestManger.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)res.unsentRequests.size());
        ((NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0)).future().complete(this.buildOffsetFetchClientResponse((NetworkClientDelegate.UnsentRequest)res.unsentRequests.get(0), partitions, error));
        res = commitRequestManger.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)res.unsentRequests.size());
        return futures;
    }

    private List<CompletableFuture<ClientResponse>> assertPoll(int numRes, CommitRequestManager manager) {
        NetworkClientDelegate.PollResult res = manager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)numRes, (int)res.unsentRequests.size());
        return res.unsentRequests.stream().map(r -> r.future()).collect(Collectors.toList());
    }

    private CommitRequestManager create(boolean autoCommitEnabled, long autoCommitInterval) {
        this.props.setProperty("auto.commit.interval.ms", String.valueOf(autoCommitInterval));
        this.props.setProperty("enable.auto.commit", String.valueOf(autoCommitEnabled));
        return new CommitRequestManager((Time)this.time, this.logContext, this.subscriptionState, new ConsumerConfig(this.props), this.coordinatorRequestManager, this.groupState);
    }

    private ClientResponse buildOffsetFetchClientResponse(NetworkClientDelegate.UnsentRequest request, Set<TopicPartition> topicPartitions, Errors error) {
        HashMap<TopicPartition, OffsetFetchResponse.PartitionData> topicPartitionData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
        topicPartitions.forEach(tp -> topicPartitionData.put((TopicPartition)tp, new OffsetFetchResponse.PartitionData(100L, Optional.of(1), "metadata", Errors.NONE)));
        return this.buildOffsetFetchClientResponse(request, topicPartitionData, error);
    }

    private ClientResponse buildOffsetFetchClientResponse(NetworkClientDelegate.UnsentRequest request, HashMap<TopicPartition, OffsetFetchResponse.PartitionData> topicPartitionData, Errors error) {
        AbstractRequest abstractRequest = request.requestBuilder().build();
        Assertions.assertTrue((boolean)(abstractRequest instanceof OffsetFetchRequest));
        OffsetFetchRequest offsetFetchRequest = (OffsetFetchRequest)abstractRequest;
        OffsetFetchResponse response = new OffsetFetchResponse(error, topicPartitionData);
        return new ClientResponse(new RequestHeader(ApiKeys.OFFSET_FETCH, offsetFetchRequest.version(), "", 1), request.callback(), "-1", this.time.milliseconds(), this.time.milliseconds(), false, null, null, (AbstractResponse)response);
    }
}

