package com.espertech.esperio.kafka;

import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPStatement;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:production/esperio-kafka/com/espertech/esperio/kafka/TestKafkaOutput.class */
public class TestKafkaOutput extends TestCase {
    private static final Logger log = LoggerFactory.getLogger(TestKafkaOutput.class);

    public void testOutput() {
        Properties outputPluginProps = SupportConstants.getOutputPluginProps();
        outputPluginProps.put(EsperIOKafkaConfig.TOPICS_CONFIG, SupportConstants.DEV_OUTPUT_TOPIC_JSON);
        outputPluginProps.put("key.serializer", StringSerializer.class.getName());
        outputPluginProps.put("value.serializer", StringSerializer.class.getName());
        outputPluginProps.put(EsperIOKafkaConfig.OUTPUT_FLOWCONTROLLER_CONFIG, EsperIOKafkaOutputFlowControllerByAnnotatedStmt.class.getName());
        EPServiceProvider engineWKafkaOutput = SupportConstants.getEngineWKafkaOutput(getClass().getSimpleName(), outputPluginProps);
        engineWKafkaOutput.getEPAdministrator().getConfiguration().addEventType("SupportBean", SupportBean.class);
        EPStatement createEPL = engineWKafkaOutput.getEPAdministrator().createEPL("@name('first') @KafkaOutputDefault select * from SupportBean");
        KafkaConsumer<String, String> initConsumer = initConsumer();
        List singletonList = Collections.singletonList(new TopicPartition(SupportConstants.DEV_OUTPUT_TOPIC_JSON, 0));
        initConsumer.assign(singletonList);
        int countMessages = countMessages(singletonList, initConsumer);
        initConsumer.seek(singletonList.iterator().next(), 0L);
        sendAndAwait(engineWKafkaOutput, initConsumer, "E1");
        sendAndAwait(engineWKafkaOutput, initConsumer, "E2");
        int countMessages2 = countMessages(singletonList, initConsumer);
        createEPL.destroy();
        engineWKafkaOutput.getEPRuntime().sendEvent(new SupportBean("XXX", -1));
        assertEquals(2, countMessages2 - countMessages);
        engineWKafkaOutput.getEPAdministrator().createEPL("@name('second') @KafkaOutputDefault select stringProp from SupportBean");
        sendAndAwait(engineWKafkaOutput, initConsumer, "E3");
        engineWKafkaOutput.destroy();
    }

    private int countMessages(Collection<TopicPartition> collection, KafkaConsumer<String, String> kafkaConsumer) {
        kafkaConsumer.seek(collection.iterator().next(), 0L);
        int i = 0;
        boolean z = true;
        while (z) {
            ConsumerRecords poll = kafkaConsumer.poll(1000L);
            i += poll.count();
            z = !poll.isEmpty();
        }
        return i;
    }

    private void sendAndAwait(EPServiceProvider ePServiceProvider, KafkaConsumer<String, String> kafkaConsumer, String str) {
        String str2 = str + "___" + UUID.randomUUID().toString();
        ePServiceProvider.getEPRuntime().sendEvent(new SupportBean(str2, 10));
        SupportAwaitUtil.awaitOrFail(10L, TimeUnit.SECONDS, "failed to receive expected event", () -> {
            Iterator it = kafkaConsumer.poll(1000L).iterator();
            boolean z = false;
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                log.info("Received: {}", consumerRecord.value());
                if (((String) consumerRecord.value()).contains(str2)) {
                    z = true;
                }
            }
            return z ? true : null;
        });
    }

    private KafkaConsumer<String, String> initConsumer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", SupportConstants.DEV_BOOTSTRAP_SERVER);
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        properties.put("group.id", "esperio_regression_output_t1__mygroup");
        return new KafkaConsumer<>(properties);
    }
}
