package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.Consumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import pl.tlinkowski.unij.api.UniSets;

/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/PartitionOrderProcessingTest.class */
class PartitionOrderProcessingTest extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(PartitionOrderProcessingTest.class);
    Consumer<String, String> consumer;
    ParallelEoSStreamProcessor<String, String> pc;

    PartitionOrderProcessingTest() {
        this.numPartitions = 5;
    }

    @BeforeEach
    void setup() {
        setupTopic();
        this.consumer = getKcu().createNewConsumer(true, consumerProps());
    }

    @AfterEach
    void cleanup() {
        this.pc.close();
    }

    private ParallelEoSStreamProcessor<String, String> setupPC() {
        return setupPC(null);
    }

    private ParallelEoSStreamProcessor<String, String> setupPC(Function<ParallelConsumerOptions.ParallelConsumerOptionsBuilder<String, String>, ParallelConsumerOptions.ParallelConsumerOptionsBuilder<String, String>> function) {
        ParallelConsumerOptions.ParallelConsumerOptionsBuilder<String, String> maxConcurrency = ParallelConsumerOptions.builder().consumer(this.consumer).ordering(ParallelConsumerOptions.ProcessingOrder.PARTITION).maxConcurrency(5);
        if (function != null) {
            maxConcurrency = function.apply(maxConcurrency);
        }
        return new ParallelEoSStreamProcessor<>(maxConcurrency.build());
    }

    @Test
    void allPartitionsAreProcessedInParallel() {
        HashMap hashMap = new HashMap();
        IntStream.range(0, 5).forEach(i -> {
            hashMap.put(Integer.valueOf(i), new AtomicInteger(0));
        });
        this.pc = setupPC(parallelConsumerOptionsBuilder -> {
            return parallelConsumerOptionsBuilder.messageBufferSize(5000);
        });
        this.pc.subscribe(UniSets.of(this.topic));
        getKcu().produceMessages(this.topic, 10000L);
        this.pc.poll(pollContext -> {
            ((AtomicInteger) hashMap.get(Integer.valueOf(pollContext.getSingleConsumerRecord().partition()))).getAndIncrement();
            ThreadUtils.sleepQuietly(10);
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(hashMap.values().stream().mapToInt((v0) -> {
                return v0.get();
            }).sum() > 500);
        });
        Assertions.assertTrue(hashMap.values().stream().allMatch(atomicInteger -> {
            return atomicInteger.get() > 0;
        }), "Expect all partitions to have some messages processed, actual partitionCounts:" + hashMap);
    }

    @Test
    void allPartitionsAreNotProcessedInParallel() {
        HashMap hashMap = new HashMap();
        IntStream.range(0, 5).forEach(i -> {
            hashMap.put(Integer.valueOf(i), new AtomicInteger(0));
        });
        this.pc = setupPC();
        this.pc.subscribe(UniSets.of(this.topic));
        getKcu().produceMessages(this.topic, 10000L);
        this.pc.poll(pollContext -> {
            ((AtomicInteger) hashMap.get(Integer.valueOf(pollContext.getSingleConsumerRecord().partition()))).getAndIncrement();
            ThreadUtils.sleepQuietly(10);
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(hashMap.values().stream().mapToInt((v0) -> {
                return v0.get();
            }).sum() > 500);
        });
        Assertions.assertFalse(hashMap.values().stream().allMatch(atomicInteger -> {
            return atomicInteger.get() > 0;
        }), "Expect some processing thread starving and not all partition counts to have some messages processed, actual partitionCounts:" + hashMap);
    }

    private Properties consumerProps() {
        Properties properties = new Properties();
        properties.put("max.partition.fetch.bytes", 50000);
        properties.put("max.poll.records", 500);
        return properties;
    }
}
