package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import io.confluent.parallelconsumer.internal.PCModule;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.awaitility.Awaitility;
import org.hamcrest.CoreMatchers;
import org.hamcrest.number.OrderingComparison;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/RebalanceEoSDeadlockTest.class */
class RebalanceEoSDeadlockTest extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(RebalanceEoSDeadlockTest.class);
    private static final String PC_CONTROL = "pc-control";
    public static final String PC_BROKER_POLL = "pc-broker-poll";
    Consumer<String, String> consumer;
    Producer<String, String> producer;
    CountDownLatch rebalanceLatch;
    private long sleepTimeMs = 0;
    ParallelEoSStreamProcessor<String, String> pc;
    private String outputTopic;
    static final long SLEEP_TIME_MS = 3000;

    RebalanceEoSDeadlockTest() {
        this.numPartitions = 2;
    }

    @BeforeEach
    void setup() {
        this.rebalanceLatch = new CountDownLatch(1);
        setupTopic();
        this.outputTopic = setupTopic("output-topic");
        this.producer = getKcu().createNewProducer(KafkaClientUtils.ProducerMode.TRANSACTIONAL);
        this.consumer = getKcu().createNewConsumer(KafkaClientUtils.GroupOption.NEW_GROUP);
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER).consumer(this.consumer).produceLockAcquisitionTimeout(Duration.ofMinutes(2L)).producer(this.producer).ordering(ParallelConsumerOptions.ProcessingOrder.PARTITION).build();
        this.pc = new ParallelEoSStreamProcessor<String, String>(build, new PCModule(build)) { // from class: io.confluent.parallelconsumer.integrationTests.RebalanceEoSDeadlockTest.1
            protected void commitOffsetsThatAreReady() throws TimeoutException, InterruptedException {
                String name = Thread.currentThread().getName();
                if (name.contains(RebalanceEoSDeadlockTest.PC_CONTROL)) {
                    RebalanceEoSDeadlockTest.log.info("Delaying pc-control thread {}ms to force the potential deadlock on rebalance", Long.valueOf(RebalanceEoSDeadlockTest.this.sleepTimeMs));
                    ThreadUtils.sleepQuietly(RebalanceEoSDeadlockTest.this.sleepTimeMs);
                }
                super.commitOffsetsThatAreReady();
                if (name.contains(RebalanceEoSDeadlockTest.PC_BROKER_POLL)) {
                    RebalanceEoSDeadlockTest.this.rebalanceLatch.countDown();
                }
            }

            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                super.onPartitionsRevoked(collection);
            }
        };
        this.pc.subscribe(UniSets.of(this.topic));
    }

    @AfterEach
    void cleanup() {
        this.pc.close();
    }

    @RepeatedTest(5)
    void noDeadlockOnRevoke() {
        this.sleepTimeMs = (long) (3000.0d + (Math.random() * 1000.0d));
        AtomicLong atomicLong = new AtomicLong();
        getKcu().produceMessages(this.topic, 100L);
        this.pc.setTimeBetweenCommits(Duration.ofSeconds(1L));
        this.pc.pollAndProduce(pollContext -> {
            atomicLong.getAndIncrement();
            log.debug("Processed record, count now {} - offset: {}", atomicLong, Long.valueOf(pollContext.offset()));
            return new ProducerRecord(this.outputTopic, (String) pollContext.key(), (String) pollContext.value());
        });
        Awaitility.await().timeout(Duration.ofSeconds(30L)).untilAtomic(atomicLong, CoreMatchers.is(OrderingComparison.greaterThan(5L)));
        log.debug("Records are getting consumed");
        Duration ofSeconds = Duration.ofSeconds(5L);
        log.debug("Creating new consumer in same group and subscribing to same topic set with a no record timeout of {}, expect this phase to take entire timeout...", ofSeconds);
        KafkaConsumer createNewConsumer = getKcu().createNewConsumer(KafkaClientUtils.GroupOption.REUSE_GROUP);
        try {
            createNewConsumer.subscribe(UniLists.of(this.topic));
            createNewConsumer.poll(ofSeconds);
            if (!this.rebalanceLatch.await(30L, TimeUnit.SECONDS)) {
                Assertions.fail("Rebalance did not finished");
            }
            log.debug("Test finished");
            if (createNewConsumer != null) {
                createNewConsumer.close();
            }
        } finally {
        }
    }
}
