package com.espertech.esperio.kafka;

import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.time.CurrentTimeSpanEvent;
import com.espertech.esper.util.JavaClassHelper;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:production/esperio-kafka/com/espertech/esperio/kafka/EsperIOKafkaInputProcessorDefault.class */
public class EsperIOKafkaInputProcessorDefault implements EsperIOKafkaInputProcessor {
    private static Logger log = LoggerFactory.getLogger(EsperIOKafkaInputProcessorDefault.class);
    private EPServiceProvider engine;
    private EsperIOKafkaInputTimestampExtractor timestampExtractor;

    @Override // com.espertech.esperio.kafka.EsperIOKafkaInputProcessor
    public void init(EsperIOKafkaInputProcessorContext esperIOKafkaInputProcessorContext) {
        this.engine = esperIOKafkaInputProcessorContext.getEngine();
        String property = esperIOKafkaInputProcessorContext.getProperties().getProperty(EsperIOKafkaConfig.INPUT_TIMESTAMPEXTRACTOR_CONFIG);
        if (property != null) {
            this.timestampExtractor = (EsperIOKafkaInputTimestampExtractor) JavaClassHelper.instantiate(EsperIOKafkaInputTimestampExtractor.class, property, esperIOKafkaInputProcessorContext.getEngine().getEngineImportService().getClassForNameProvider());
        }
    }

    @Override // com.espertech.esperio.kafka.EsperIOKafkaInputProcessor
    public void process(ConsumerRecords<Object, Object> consumerRecords) {
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord<Object, Object> consumerRecord = (ConsumerRecord) it.next();
            if (this.timestampExtractor != null) {
                long extract = this.timestampExtractor.extract(consumerRecord);
                if (log.isDebugEnabled()) {
                    log.debug("Sending time span {}", Long.valueOf(extract));
                }
                this.engine.getEPRuntime().sendEvent(new CurrentTimeSpanEvent(extract));
            }
            if (consumerRecord.value() != null) {
                if (log.isDebugEnabled()) {
                    log.debug("Sending event {}", consumerRecord.value().toString());
                }
                this.engine.getEPRuntime().sendEvent(consumerRecord.value());
            }
        }
    }

    @Override // com.espertech.esperio.kafka.EsperIOKafkaInputProcessor
    public void close() {
    }
}
