package com.espertech.esperio.kafka;

import com.espertech.esper.client.ConfigurationException;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.core.service.EPServiceProviderSPI;
import com.espertech.esper.util.JavaClassHelper;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:production/esperio-kafka/com/espertech/esperio/kafka/EsperIOKafkaInputAdapter.class */
public class EsperIOKafkaInputAdapter {
    private static final Logger log = LoggerFactory.getLogger(EsperIOKafkaInputAdapter.class);
    private final Properties properties;
    private final String engineURI;
    private KafkaConsumer consumer;
    private ExecutorService executorService;
    private EsperIOKafkaInputRunnable runnable;
    private EsperIOKafkaInputProcessor processor;

    public EsperIOKafkaInputAdapter(Properties properties, String str) {
        this.properties = properties;
        this.engineURI = str;
    }

    public void start() {
        if (log.isInfoEnabled()) {
            log.info("Starting EsperIO Kafka Input Adapter for engine URI '{}'", this.engineURI);
        }
        Properties properties = new Properties();
        for (String str : this.properties.stringPropertyNames()) {
            if (!str.startsWith("esperio")) {
                properties.put(str, this.properties.getProperty(str));
            }
        }
        try {
            this.consumer = new KafkaConsumer(properties);
        } catch (Throwable th) {
            log.error("Error obtaining Kafka consumer for URI '{}': {}", new Object[]{this.engineURI, th.getMessage(), th});
        }
        EPServiceProviderSPI ePServiceProviderSPI = (EPServiceProviderSPI) EPServiceProviderManager.getProvider(this.engineURI);
        String requiredProperty = getRequiredProperty(this.properties, EsperIOKafkaConfig.INPUT_SUBSCRIBER_CONFIG);
        try {
            ((EsperIOKafkaInputSubscriber) JavaClassHelper.instantiate(EsperIOKafkaInputSubscriber.class, requiredProperty, ePServiceProviderSPI.getEngineImportService().getClassForNameProvider())).subscribe(new EsperIOKafkaInputSubscriberContext(this.consumer, ePServiceProviderSPI, this.properties));
            String requiredProperty2 = getRequiredProperty(this.properties, EsperIOKafkaConfig.INPUT_PROCESSOR_CONFIG);
            try {
                this.processor = (EsperIOKafkaInputProcessor) JavaClassHelper.instantiate(EsperIOKafkaInputProcessor.class, requiredProperty2, ePServiceProviderSPI.getEngineImportService().getClassForNameProvider());
                this.processor.init(new EsperIOKafkaInputProcessorContext(this.consumer, ePServiceProviderSPI, this.properties, this));
                this.executorService = Executors.newFixedThreadPool(1, new EsperIOKafkaInputThreadFactory(this.engineURI));
                this.runnable = new EsperIOKafkaInputRunnable(this.consumer, this.processor);
                this.executorService.submit(this.runnable);
                if (log.isInfoEnabled()) {
                    log.info("Completed starting EsperIO Kafka Input Adapter for engine URI '{}'", this.engineURI);
                }
            } catch (Throwable th2) {
                throw new ConfigurationException("Unexpected exception invoking processor init method on class " + requiredProperty2 + " for engine URI '" + this.engineURI + "': " + th2.getMessage(), th2);
            }
        } catch (Throwable th3) {
            throw new ConfigurationException("Unexpected exception invoking subscriber subscribe method on class " + requiredProperty + " for engine URI '" + this.engineURI + "': " + th3.getMessage(), th3);
        }
    }

    public void destroy() {
        if (log.isDebugEnabled()) {
            log.debug("Destroying Esper Kafka Input Adapter for engine URI '{}'", this.engineURI);
        }
        this.runnable.setShutdown(true);
        this.executorService.shutdown();
        try {
            this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.processor.close();
        this.consumer.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String getRequiredProperty(Properties properties, String str) {
        String property = properties.getProperty(str);
        if (property == null) {
            throw new ConfigurationException("Property '" + str + "' not provided");
        }
        return property;
    }
}
