package io.confluent.parallelconsumer;

import io.confluent.csid.utils.GeneralTestUtils;
import io.confluent.csid.utils.JavaUtils;
import io.confluent.csid.utils.KafkaTestUtils;
import io.confluent.csid.utils.KafkaUtils;
import io.confluent.csid.utils.LatchTestUtils;
import io.confluent.csid.utils.LongPollingMockConsumer;
import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.Range;
import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import me.tongfei.progressbar.ProgressBar;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

@Timeout(value = 3, unit = TimeUnit.MINUTES)
/* loaded from: input_file:io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.class */
public class ParallelEoSStreamProcessorTest extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(ParallelEoSStreamProcessorTest.class);

    /* loaded from: input_file:io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest$MyAction.class */
    public static class MyAction implements Function<ConsumerRecord<String, String>, String> {
        @Override // java.util.function.Function
        public String apply(ConsumerRecord<String, String> consumerRecord) {
            ParallelEoSStreamProcessorTest.log.info("User client function - consuming a record... {}", consumerRecord.key());
            return "my-result";
        }
    }

    @BeforeEach
    public void setupData() {
        primeFirstRecord();
    }

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    public void failingActionNothingCommitted(ParallelConsumerOptions.CommitMode commitMode) {
        setupParallelConsumerInstance(commitMode);
        this.parallelConsumer.poll(pollContext -> {
            throw new FakeRuntimeException("My user's function error");
        });
        awaitForSomeLoopCycles(3);
        this.parallelConsumer.close();
        assertCommits(UniLists.of(), "All erroring, so nothing committed except initial");
    }

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    public void executorThreadsInterruptedOnShutdownTimeout(ParallelConsumerOptions.CommitMode commitMode) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        setupParallelConsumerInstance(getBaseOptionsKeyOrdered(commitMode, Duration.ofSeconds(1L)));
        primeFirstRecord();
        this.parallelConsumer.poll(pollContext -> {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                atomicBoolean.set(true);
                Thread.interrupted();
                throw new RuntimeException(e);
            }
        });
        awaitForSomeLoopCycles(2);
        this.parallelConsumer.close();
        assertCommits(UniLists.of(), "All erroring, so nothing committed except initial");
        Assertions.assertThat(atomicBoolean).isTrue();
    }

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    public void inFlightMessagesCommittedIfProcessedDuringShutdown(ParallelConsumerOptions.CommitMode commitMode) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        setupParallelConsumerInstance(getBaseOptionsKeyOrdered(commitMode, Duration.ofSeconds(1L)));
        primeFirstRecord();
        this.parallelConsumer.poll(pollContext -> {
            try {
                countDownLatch.await();
                ThreadUtils.sleepQuietly(100);
            } catch (InterruptedException e) {
                atomicBoolean.set(true);
                Thread.interrupted();
            }
        });
        awaitForSomeLoopCycles(2);
        countDownLatch.countDown();
        this.parallelConsumer.close();
        assertCommits(UniLists.of(1), "1 record completed during shutdown");
        Assertions.assertThat(atomicBoolean).isFalse();
    }

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    public void queuedMessagesNotProcessedOrCommittedIfSubmittedDuringShutdown(ParallelConsumerOptions.CommitMode commitMode) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        setupParallelConsumerInstance(getBaseOptionsKeyOrdered(commitMode, Duration.ofSeconds(1L)));
        primeFirstRecord();
        this.consumerSpy.addRecord(this.ktu.makeRecord("0", "v1"));
        this.consumerSpy.addRecord(this.ktu.makeRecord("0", "v2"));
        this.consumerSpy.addRecord(this.ktu.makeRecord("1", "v3"));
        this.consumerSpy.addRecord(this.ktu.makeRecord("0", "v4"));
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean3 = new AtomicBoolean(false);
        this.parallelConsumer.poll(pollContext -> {
            if (!((String) pollContext.getSingleConsumerRecord().value()).equals("v1")) {
                if (((String) pollContext.getSingleConsumerRecord().key()).equals("1")) {
                    atomicBoolean3.set(true);
                }
            } else {
                atomicBoolean2.set(true);
                try {
                    countDownLatch.await();
                    ThreadUtils.sleepQuietly(100);
                } catch (InterruptedException e) {
                    atomicBoolean.set(true);
                    Thread.interrupted();
                }
            }
        });
        while (!atomicBoolean2.get() && !atomicBoolean3.get()) {
            awaitForSomeLoopCycles(1);
        }
        awaitForSomeLoopCycles(2);
        countDownLatch.countDown();
        this.parallelConsumer.close();
        assertCommits(UniLists.of(1, 2), "primed record and first key=0 record completed only, followup key 0 records skipped");
        assertCommits().encodedIncomplete(2);
        Assertions.assertThat(atomicBoolean).isFalse();
    }

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    void offsetsAreNeverCommittedForMessagesStillInFlightSimplest(ParallelConsumerOptions.CommitMode commitMode) {
        setupParallelConsumerInstance(getBaseOptions(commitMode).toBuilder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).build());
        this.parallelConsumer.setTimeBetweenCommits(Duration.ofSeconds(1L));
        primeFirstRecord();
        sendSecondRecord(this.consumerSpy);
        Assertions.assertThat(this.parallelConsumer.getWm().getOptions().getOrdering()).isEqualTo(ParallelConsumerOptions.ProcessingOrder.UNORDERED);
        List<CountDownLatch> constructLatches = LatchTestUtils.constructLatches(2);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.parallelConsumer.poll(pollContext -> {
            log.debug("msg: {}", pollContext);
            countDownLatch.countDown();
            int offset = (int) pollContext.offset();
            LatchTestUtils.awaitLatch((List<CountDownLatch>) constructLatches, offset);
            linkedHashMap.put(Integer.valueOf(offset), true);
        });
        LatchTestUtils.awaitLatch(countDownLatch);
        Assertions.assertThat(this.parallelConsumer.getWm().getNumberOfWorkQueuedInShardsAwaitingSelection()).isZero();
        Assertions.assertThat(this.parallelConsumer.getWm().getNumberRecordsOutForProcessing()).isEqualTo(2);
        releaseAndWait(constructLatches, 1);
        this.parallelConsumer.requestCommitAsap();
        awaitForCommitExact(0);
        assertCommits(UniLists.of(), "Partition is blocked");
        releaseAndWait(constructLatches, 0);
        this.parallelConsumer.requestCommitAsap();
        awaitForCommitExact(2);
        log.debug("Closing...");
        this.parallelConsumer.closeDrainFirst();
        Assertions.assertThat(linkedHashMap).as("sanity - all expected messages are processed", new Object[0]).containsValues(new Boolean[]{true, true});
    }

    private void setupParallelConsumerInstance(ParallelConsumerOptions.CommitMode commitMode) {
        setupParallelConsumerInstance(getBaseOptions(commitMode));
        primeFirstRecord();
    }

    private ParallelConsumerOptions getBaseOptions(ParallelConsumerOptions.CommitMode commitMode) {
        return ParallelConsumerOptions.builder().commitMode(commitMode).consumer(this.consumerSpy).producer(this.producerSpy).build();
    }

    private ParallelConsumerOptions getBaseOptionsKeyOrdered(ParallelConsumerOptions.CommitMode commitMode, Duration duration) {
        return ParallelConsumerOptions.builder().commitMode(commitMode).consumer(this.consumerSpy).producer(this.producerSpy).shutdownTimeout(duration).ordering(ParallelConsumerOptions.ProcessingOrder.KEY).build();
    }

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    void offsetsAreNeverCommittedForMessagesStillInFlightShort(ParallelConsumerOptions.CommitMode commitMode) {
        offsetsAreNeverCommittedForMessagesStillInFlightSimplest(commitMode);
        log.info("Test start");
        Awaitility.await().untilAsserted(() -> {
            assertCommits(UniLists.of(2), "Only one of the two offsets committed, as they were coalesced for efficiency");
        });
    }

    @Disabled
    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    void offsetsAreNeverCommittedForMessagesStillInFlightLong(ParallelConsumerOptions.CommitMode commitMode) {
        setupParallelConsumerInstance(commitMode);
        sendSecondRecord(this.consumerSpy);
        this.consumerSpy.addRecord(this.ktu.makeRecord("0", "v2"));
        this.consumerSpy.addRecord(this.ktu.makeRecord("0", "v3"));
        this.consumerSpy.addRecord(this.ktu.makeRecord("0", "v4"));
        this.consumerSpy.addRecord(this.ktu.makeRecord("0", "v5"));
        List<CountDownLatch> constructLatches = LatchTestUtils.constructLatches(6);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.parallelConsumer.poll(pollContext -> {
            CountDownLatch countDownLatch2 = (CountDownLatch) constructLatches.get((int) pollContext.offset());
            try {
                countDownLatch.countDown();
                countDownLatch2.await();
            } catch (InterruptedException e) {
            }
        });
        countDownLatch.countDown();
        releaseAndWait(constructLatches, 1);
        awaitForSomeLoopCycles(1);
        ((MockProducer) Mockito.verify(this.producerSpy, Mockito.after(this.verificationWaitDelay).never())).commitTransaction();
        releaseAndWait(constructLatches, 2);
        awaitForSomeLoopCycles(1);
        ((MockProducer) Mockito.verify(this.producerSpy, Mockito.after(this.verificationWaitDelay).never())).commitTransaction();
        releaseAndWait(constructLatches, 0);
        awaitForOneLoopCycle();
        ((MockProducer) Mockito.verify(this.producerSpy, Mockito.after(this.verificationWaitDelay).times(1))).commitTransaction();
        List consumerGroupOffsetsHistory = this.producerSpy.consumerGroupOffsetsHistory();
        Assertions.assertThat(consumerGroupOffsetsHistory).hasSize(1);
        Assertions.assertThat(((OffsetAndMetadata) ((Map) ((Map) consumerGroupOffsetsHistory.get(0)).get(this.CONSUMER_GROUP_ID)).get(KafkaUtils.toTopicPartition(this.firstRecord))).offset()).isEqualTo(2L);
        releaseAndWait(constructLatches, 3);
        ((MockProducer) Mockito.verify(this.producerSpy, Mockito.after(this.verificationWaitDelay).times(2))).commitTransaction();
        List consumerGroupOffsetsHistory2 = this.producerSpy.consumerGroupOffsetsHistory();
        Assertions.assertThat(consumerGroupOffsetsHistory2).hasSize(2);
        Assertions.assertThat(((OffsetAndMetadata) ((Map) ((Map) consumerGroupOffsetsHistory2.get(1)).get(this.CONSUMER_GROUP_ID)).get(KafkaUtils.toTopicPartition(this.firstRecord))).offset()).isEqualTo(3L);
        releaseAndWait(constructLatches, UniLists.of(4, 5));
        ((MockProducer) Mockito.verify(this.producerSpy, Mockito.after(this.verificationWaitDelay).atLeast(3))).commitTransaction();
        List consumerGroupOffsetsHistory3 = this.producerSpy.consumerGroupOffsetsHistory();
        Assertions.assertThat(consumerGroupOffsetsHistory3).hasSizeGreaterThanOrEqualTo(3);
        Assertions.assertThat(((OffsetAndMetadata) ((Map) ((Map) consumerGroupOffsetsHistory3.get(2)).get(this.CONSUMER_GROUP_ID)).get(KafkaUtils.toTopicPartition(this.firstRecord))).offset()).isEqualTo(5L);
        assertCommits(UniLists.of(2, 3, 5));
        this.parallelConsumer.close();
    }

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    void offsetCommitsAreIsolatedPerPartition(ParallelConsumerOptions.CommitMode commitMode) {
        Assumptions.assumeThat(this.parallelConsumer).as("Should only test on core PC - this test is very complicated to get to work with vert.x thread system, as the event and locking system needed is quite different", new Object[0]).isExactlyInstanceOf(AbstractParallelEoSStreamProcessor.class);
        setupParallelConsumerInstance(getBaseOptions(commitMode).toBuilder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).build());
        primeFirstRecord();
        sendSecondRecord(this.consumerSpy);
        this.consumerSpy.addRecord(this.ktu.makeRecord(1, "0", "v2"));
        this.consumerSpy.addRecord(this.ktu.makeRecord(1, "0", "v3"));
        List<CountDownLatch> of = UniLists.of(new CountDownLatch(1), new CountDownLatch(1), new CountDownLatch(1), new CountDownLatch(1));
        this.parallelConsumer.poll(pollContext -> {
            try {
                ((CountDownLatch) of.get((int) pollContext.offset())).await();
            } catch (InterruptedException e) {
                log.error(e.toString());
            }
        });
        releaseAndWait(of, 1);
        this.parallelConsumer.requestCommitAsap();
        awaitForSomeLoopCycles(50);
        assertCommitLists(UniLists.of(UniLists.of(), UniLists.of(2)));
        releaseAndWait(of, 2);
        this.parallelConsumer.requestCommitAsap();
        Awaitility.await().untilAsserted(() -> {
            assertCommitLists(UniLists.of(UniLists.of(), UniLists.of(2, 3)));
        });
        releaseAndWait(of, 0);
        this.parallelConsumer.requestCommitAsap();
        awaitForOneLoopCycle();
        if (isUsingAsyncCommits()) {
            awaitForSomeLoopCycles(3);
        }
        assertCommitLists(UniLists.of(UniLists.of(2), UniLists.of(2, 3)));
        releaseAndWait(of, 3);
        if (isUsingAsyncCommits()) {
            awaitForSomeLoopCycles(3);
        }
        Awaitility.await().untilAsserted(() -> {
            assertCommitLists(UniLists.of(UniLists.of(2), UniLists.of(2, 3, 4)));
        });
    }

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    void controlFlowException(ParallelConsumerOptions.CommitMode commitMode) {
        instantiateConsumerProducer();
        this.parentParallelConsumer = initPollingAsyncConsumer(getBaseOptions(commitMode));
        subscribeParallelConsumerAndMockConsumerTo(this.INPUT_TOPIC);
        setupData();
        this.parallelConsumer.addLoopEndCallBack(() -> {
            throw new FakeRuntimeException("My fake control loop error");
        });
        this.parallelConsumer.poll(pollContext -> {
            log.info("Ignoring {}", pollContext);
        });
        Assertions.assertThatThrownBy(() -> {
            this.parallelConsumer.closeDrainFirst(Duration.ofSeconds(10L));
        }).hasMessageContainingAll(new CharSequence[]{"Error", "poll", "thread", "fake control"});
    }

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    void testVoidPollMethod(ParallelConsumerOptions.CommitMode commitMode) {
        setupParallelConsumerInstance(commitMode);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.parallelConsumer.poll(pollContext -> {
            log.debug("Processing test context...");
            this.myRecordProcessingAction.apply(pollContext.getSingleConsumerRecord());
            countDownLatch.countDown();
        });
        LatchTestUtils.awaitLatch(countDownLatch);
        awaitForSomeLoopCycles(2);
        this.parallelConsumer.close();
        assertCommits(UniLists.of(1));
        ((MyAction) Mockito.verify(this.myRecordProcessingAction, VerificationModeFactory.times(1))).apply((ConsumerRecord<String, String>) ArgumentMatchers.any());
        if (commitMode.equals(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER)) {
            ((MockProducer) Mockito.verify(this.producerSpy, Mockito.atLeastOnce())).commitTransaction();
            ((MockProducer) Mockito.verify(this.producerSpy, Mockito.atLeastOnce())).sendOffsetsToTransaction(Mockito.anyMap(), (ConsumerGroupMetadata) ArgumentMatchers.any());
        }
    }

    @Disabled
    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    public void processInKeyOrder(ParallelConsumerOptions.CommitMode commitMode) {
        setupParallelConsumerInstance(ParallelConsumerOptions.builder().commitMode(commitMode).ordering(ParallelConsumerOptions.ProcessingOrder.KEY).build());
        primeFirstRecord();
        Assertions.assertThat(this.parallelConsumer.getWm().getOptions().getOrdering()).isEqualTo(ParallelConsumerOptions.ProcessingOrder.KEY);
        sendSecondRecord(this.consumerSpy);
        this.consumerSpy.addRecord(this.ktu.makeRecord("key-1", "v2"));
        this.consumerSpy.addRecord(this.ktu.makeRecord("key-1", "v3"));
        this.consumerSpy.addRecord(this.ktu.makeRecord(1, "key-2", "v4"));
        this.consumerSpy.addRecord(this.ktu.makeRecord(1, "key-3", "v5"));
        this.consumerSpy.addRecord(this.ktu.makeRecord(1, "key-3", "v6"));
        this.consumerSpy.addRecord(this.ktu.makeRecord(1, "key-3", "v7"));
        this.consumerSpy.addRecord(this.ktu.makeRecord(1, "key-4", "v8"));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        CountDownLatch countDownLatch4 = new CountDownLatch(1);
        CountDownLatch countDownLatch5 = new CountDownLatch(1);
        CountDownLatch countDownLatch6 = new CountDownLatch(1);
        CountDownLatch countDownLatch7 = new CountDownLatch(1);
        CountDownLatch countDownLatch8 = new CountDownLatch(1);
        CountDownLatch countDownLatch9 = new CountDownLatch(1);
        HashMap hashMap = new HashMap();
        Iterator it = Range.range(8L).iterator();
        while (it.hasNext()) {
            hashMap.put(Integer.valueOf(((Long) it.next()).intValue()), false);
        }
        List<CountDownLatch> of = UniLists.of(countDownLatch, countDownLatch2, countDownLatch3, countDownLatch4, countDownLatch5, countDownLatch6, countDownLatch7, countDownLatch8, countDownLatch9);
        ArrayList arrayList = new ArrayList();
        ((LongPollingMockConsumer) Mockito.doAnswer(invocationOnMock -> {
            ConsumerRecords consumerRecords = (ConsumerRecords) invocationOnMock.callRealMethod();
            Iterator it2 = consumerRecords.iterator();
            while (it2.hasNext()) {
                arrayList.add(it2.next());
            }
            return consumerRecords;
        }).when(this.consumerSpy)).poll((Duration) ArgumentMatchers.any());
        this.parallelConsumer.poll(pollContext -> {
            int offset = (int) pollContext.offset();
            CountDownLatch countDownLatch10 = (CountDownLatch) of.get(offset);
            try {
                log.debug("Started msg {} processing, locking on latch to simulate long process times...", Integer.valueOf(offset));
                countDownLatch10.await();
            } catch (InterruptedException e) {
            }
            log.debug("Finished msg {} processing after waking...", Integer.valueOf(offset));
            hashMap.put(Integer.valueOf(offset), true);
        });
        countDownLatch7.countDown();
        countDownLatch9.countDown();
        log.debug("Unlocking 1...");
        countDownLatch2.countDown();
        awaitForOneLoopCycle();
        Assertions.assertThat(arrayList).as("sanity check input data", new Object[0]).hasSameSizeAs(of);
        ((AbstractBooleanAssert) Assertions.assertThat((Boolean) hashMap.get(1)).as("blocked by 0 (1 shouldn't be run until 0 is complete, due to key order processing)", new Object[0])).isFalse();
        assertCommits(UniLists.of());
        log.debug("Unlocking 2...");
        countDownLatch3.countDown();
        awaitForSomeLoopCycles(2);
        Assertions.assertThat((Boolean) hashMap.get(2)).isTrue();
        ((MockProducer) Mockito.verify(this.producerSpy, Mockito.after(this.verificationWaitDelay).never())).commitTransaction();
        awaitForOneLoopCycle();
        assertCommits(UniLists.of());
        log.debug("Unlocking 0...");
        countDownLatch.countDown();
        awaitForCommitExact(0, 0);
        awaitForCommitExact(0, 2);
        assertCommits(UniLists.of(0, 2));
        log.debug("Unlocking 3...");
        countDownLatch4.countDown();
        log.debug("Unlocking 5...");
        countDownLatch6.countDown();
        awaitUntilTrue(() -> {
            return (Boolean) hashMap.get(5);
        });
        ((AbstractBooleanAssert) Assertions.assertThat((Boolean) hashMap.get(5)).as("5 should processed", new Object[0])).isTrue();
        awaitForCommitExact(0, 3);
        assertCommits(UniLists.of(0, 2, 3));
        log.debug("Unlocking 4...");
        countDownLatch5.countDown();
        awaitUntilTrue(() -> {
            return (Boolean) hashMap.get(6);
        });
        ((AbstractBooleanAssert) Assertions.assertThat((Boolean) hashMap.get(6)).as("6 should processed", new Object[0])).isTrue();
        awaitForSomeLoopCycles(1);
        awaitForCommitExact(1, 6);
        assertCommits(UniLists.of(0, 2, 3, 6));
        Assertions.assertThat((Boolean) hashMap.get(7)).isFalse();
        Assertions.assertThat((Boolean) hashMap.get(8)).isTrue();
        releaseAndWait(of, 7);
        awaitForCommitExact(1, 8);
        assertCommits(UniLists.of(0, 2, 3, 6, 8));
    }

    @Test
    void processInKeyOrderWorkNotReturnedDoesntBreakCommits() {
        setupParallelConsumerInstance(ParallelConsumerOptions.builder().commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC).ordering(ParallelConsumerOptions.ProcessingOrder.KEY).build());
        primeFirstRecord();
        sendSecondRecord(this.consumerSpy);
        Assertions.assertThat(this.parallelConsumer.getWm().getOptions().getOrdering()).isEqualTo(ParallelConsumerOptions.ProcessingOrder.KEY);
        this.consumerSpy.addRecord(this.ktu.makeRecord("key-1", "v2"));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        HashMap hashMap = new HashMap();
        hashMap.put(1, countDownLatch);
        CountDownLatch countDownLatch2 = new CountDownLatch(2);
        CountDownLatch countDownLatch3 = new CountDownLatch(4);
        this.parallelConsumer.addLoopEndCallBack(() -> {
            log.trace("Control loop cycle - {}, {}", Long.valueOf(countDownLatch2.getCount()), Long.valueOf(countDownLatch3.getCount()));
            countDownLatch2.countDown();
            countDownLatch3.countDown();
        });
        ArrayList arrayList = new ArrayList();
        ((LongPollingMockConsumer) Mockito.doAnswer(invocationOnMock -> {
            ConsumerRecords consumerRecords = (ConsumerRecords) invocationOnMock.callRealMethod();
            Iterator it = consumerRecords.iterator();
            while (it.hasNext()) {
                arrayList.add((ConsumerRecord) it.next());
            }
            return consumerRecords;
        }).when(this.consumerSpy)).poll((Duration) ArgumentMatchers.any());
        this.parallelConsumer.poll(pollContext -> {
            int offset = (int) pollContext.offset();
            CountDownLatch countDownLatch4 = (CountDownLatch) hashMap.get(Integer.valueOf(offset));
            if (countDownLatch4 != null) {
                try {
                    countDownLatch4.await();
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }
            }
            log.debug("Message offset {} processed...", Integer.valueOf(offset));
        });
        Awaitility.await().untilAsserted(() -> {
            Assertions.assertThat(arrayList).as("sanity check - the records have been polled", new Object[0]).hasSize(3);
        });
        LatchTestUtils.awaitLatch(countDownLatch2);
        awaitForOneLoopCycle();
        Awaitility.await().untilAsserted(() -> {
            try {
                assertCommits(UniLists.of(0, 1), "Only 0 should be committed, as even though 2 is also finished, 1 should be blocking the partition");
            } catch (AssertionError e) {
                assertCommits(UniLists.of(1), "Bootstrap commit is optional. See msg in code above");
            }
        });
        countDownLatch.countDown();
        LatchTestUtils.awaitLatch(countDownLatch3);
        awaitForOneLoopCycle();
        Awaitility.await().untilAsserted(() -> {
            try {
                assertCommits(UniLists.of(0, 1, 3), "Remaining two records should be committed as a single offset");
            } catch (AssertionError e) {
                assertCommits(UniLists.of(1, 3), "Bootstrap commit is optional. See msg in code above");
            }
        });
    }

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    public void closeAfterSingleMessageShouldBeEventBasedFast(ParallelConsumerOptions.CommitMode commitMode) {
        setupParallelConsumerInstance(commitMode);
        Duration timeBetweenCommits = this.parallelConsumer.getTimeBetweenCommits();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.parallelConsumer.poll(pollContext -> {
            log.info("Message processed: {} - noop", Long.valueOf(pollContext.offset()));
            countDownLatch.countDown();
        });
        LatchTestUtils.awaitLatch(countDownLatch);
        awaitForOneLoopCycle();
        this.parallelConsumer.requestCommitAsap();
        awaitForOneLoopCycle();
        Awaitility.await().untilAsserted(() -> {
            assertCommits(UniLists.of(1));
        });
        Duration time = GeneralTestUtils.time(() -> {
            this.parallelConsumer.close();
        });
        Assertions.assertThat(time).as("Should be fast", new Object[0]).isLessThan(JavaUtils.max(timeBetweenCommits, Duration.ofSeconds(2L)));
    }

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    public void closeWithoutRunningShouldBeEventBasedFast(ParallelConsumerOptions.CommitMode commitMode) {
        setupParallelConsumerInstance(getBaseOptions(commitMode));
        this.parallelConsumer.closeDontDrainFirst();
    }

    @Test
    public void ensureLibraryCantBeUsedTwice() {
        this.parallelConsumer.poll(pollContext -> {
        });
        Assertions.assertThatIllegalStateException().isThrownBy(() -> {
            this.parallelConsumer.poll(pollContext2 -> {
            });
        });
    }

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    void consumeFlowDoesntRequireProducer(ParallelConsumerOptions.CommitMode commitMode) {
        setupClients();
        ParallelConsumerOptions<String, String> build = ParallelConsumerOptions.builder().consumer(this.consumerSpy).commitMode(commitMode).build();
        if (commitMode.equals(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER)) {
            Assertions.assertThatThrownBy(() -> {
                this.parallelConsumer = initPollingAsyncConsumer(build);
            }).isInstanceOf(IllegalArgumentException.class).hasMessageContainingAll(new CharSequence[]{"Producer", "Transaction"});
            return;
        }
        this.parallelConsumer = initPollingAsyncConsumer(build);
        attachLoopCounter(this.parallelConsumer);
        subscribeParallelConsumerAndMockConsumerTo(this.INPUT_TOPIC);
        setupData();
        this.parallelConsumer.poll(pollContext -> {
            log.debug("Test record processor - rec: {}", pollContext);
        });
        this.parallelConsumer.requestCommitAsap();
        awaitForCommitExact(1);
        this.parallelConsumer.closeDrainFirst();
        assertCommits(UniLists.of(1));
    }

    @Test
    void optionsProduceMessageFlowRequiresProducer() {
        setupClients();
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().consumer(this.consumerSpy).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER).build();
        Assertions.assertThatThrownBy(() -> {
            this.parallelConsumer = initPollingAsyncConsumer(build);
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContainingAll(new CharSequence[]{"Producer", "Transaction"});
    }

    @Test
    void optionsGroupIdRequiredAndAutoCommitDisabled() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        Deserializer deserializer = Serdes.String().deserializer();
        ParallelConsumerOptions.ParallelConsumerOptionsBuilder commitMode = ParallelConsumerOptions.builder().consumer(new KafkaConsumer(properties, deserializer, deserializer)).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS);
        ParallelConsumerOptions build = commitMode.build();
        Assertions.assertThatThrownBy(() -> {
            this.parallelConsumer = initPollingAsyncConsumer(build);
        }).as("Should error on missing group id", new Object[0]).isInstanceOf(IllegalArgumentException.class).hasMessageContainingAll(new CharSequence[]{"Consumer", "GroupId"});
        properties.setProperty("group.id", "dummy-group");
        commitMode.consumer(new KafkaConsumer(properties, deserializer, deserializer));
        Assertions.assertThat(Assertions.catchThrowable(() -> {
            this.parallelConsumer = initPollingAsyncConsumer(commitMode.build());
        })).as("Should error on auto commit enabled by default", new Object[0]).isInstanceOf(ParallelConsumerException.class).hasMessageContainingAll(new CharSequence[]{"auto", "commit", "disabled"});
        properties.setProperty("enable.auto.commit", "true");
        commitMode.consumer(new KafkaConsumer(properties, deserializer, deserializer));
        Assertions.assertThat(Assertions.catchThrowable(() -> {
            this.parallelConsumer = initPollingAsyncConsumer(commitMode.build());
        })).as("Should error on auto commit enabled", new Object[0]).isInstanceOf(ParallelConsumerException.class).hasMessageContainingAll(new CharSequence[]{"auto", "commit", "disabled"});
        properties.setProperty("enable.auto.commit", "false");
        commitMode.consumer(new KafkaConsumer(properties, deserializer, deserializer));
        Assertions.assertThatNoException().isThrownBy(() -> {
            this.parallelConsumer = initPollingAsyncConsumer(commitMode.build());
        });
    }

    @Test
    void cantUseProduceFlowWithWrongOptions() throws InterruptedException {
        setupClients();
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().consumer(this.consumerSpy).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS).build();
        setupParallelConsumerInstance(build);
        subscribeParallelConsumerAndMockConsumerTo(this.INPUT_TOPIC);
        setupData();
        ParallelEoSStreamProcessor<String, String> initPollingAsyncConsumer = initPollingAsyncConsumer(build);
        Assertions.assertThatThrownBy(() -> {
            initPollingAsyncConsumer.pollAndProduce(pollContext -> {
                return new ProducerRecord(this.INPUT_TOPIC, "hi there");
            });
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContainingAll(new CharSequence[]{"Producer", "options"});
    }

    @EnumSource(ParallelConsumerOptions.CommitMode.class)
    @ParameterizedTest
    void produceMessageFlow(ParallelConsumerOptions.CommitMode commitMode) {
        setupParallelConsumerInstance(commitMode);
        this.parallelConsumer.pollAndProduce(pollContext -> {
            return new ProducerRecord("Hello", "there");
        });
        awaitForSomeLoopCycles(2);
        this.parallelConsumer.requestCommitAsap();
        Awaitility.await().untilAsserted(() -> {
            assertCommits(UniLists.of(1));
        });
        this.parallelConsumer.closeDrainFirst();
        Assertions.assertThat(this.producerSpy.history()).hasSize(1);
    }

    @Test
    void lessKeysThanThreads() {
        setupParallelConsumerInstance(ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.KEY).maxConcurrency(100).build());
        List<Integer> listAsIntegers = Range.range(4L).listAsIntegers();
        log.debug("Generating {} records against {} keys...", 100000, 4);
        HashMap<Integer, List<ConsumerRecord<String, String>>> generateRecords = this.ktu.generateRecords(listAsIntegers, 100000);
        generateRecords.entrySet().forEach(entry -> {
            log.debug("Key {} has {} records", entry.getKey(), Integer.valueOf(((List) entry.getValue()).size()));
        });
        log.debug("Sending...");
        this.ktu.send(this.consumerSpy, generateRecords);
        ProgressBar newMessagesBar = ProgressBarUtils.getNewMessagesBar(log, 100000L);
        log.debug("Consuming...");
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        AtomicLong atomicLong = new AtomicLong();
        this.parallelConsumer.poll(pollContext -> {
            atomicLong.incrementAndGet();
            newMessagesBar.step();
            log.trace("Consumed {}", pollContext);
            ((Queue) concurrentHashMap.computeIfAbsent((String) pollContext.key(), str -> {
                return new ConcurrentLinkedQueue();
            })).add(pollContext);
        });
        Awaitility.await().atMost(5L, TimeUnit.MINUTES).untilAsserted(() -> {
            Assertions.assertThat(atomicLong.get()).isEqualTo(100000L);
        });
        this.parallelConsumer.closeDrainFirst();
        newMessagesBar.close();
        Math.max(25000, 1);
        log.debug("Testing...");
        KafkaTestUtils.checkExactOrdering(concurrentHashMap, generateRecords);
    }
}
