package org.wso2.siddhi.extension.input.transport.kafka;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.log4j.Logger;
import org.wso2.siddhi.annotation.Example;
import org.wso2.siddhi.annotation.Extension;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.input.source.Source;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.core.util.transport.OptionHolder;

@Extension(name = "kafka", namespace = "source", description = "TBD", examples = {@Example(description = "TBD", syntax = "TBD")})
/* loaded from: input_file:org/wso2/siddhi/extension/input/transport/kafka/KafkaSource.class */
public class KafkaSource extends Source {
    static final String SINGLE_THREADED = "single.thread";
    static final String TOPIC_WISE = "topic.wise";
    static final String PARTITION_WISE = "partition.wise";
    private static final Logger log = Logger.getLogger(KafkaSource.class);
    private static final String ADAPTOR_SUBSCRIBER_TOPIC = "topic";
    private static final String ADAPTOR_SUBSCRIBER_GROUP_ID = "group.id";
    private static final String ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS = "bootstrap.servers";
    private static final String ADAPTOR_SUBSCRIBER_PARTITION_NO_LIST = "partition.no.list";
    private static final String ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES = "optional.configuration";
    private static final String TOPIC_OFFSET_MAP = "topic.offset.map";
    private static final String THREADING_OPTION = "threading.option";
    private static final String HEADER_SEPARATOR = ",";
    private static final String ENTRY_SEPARATOR = ":";
    private SourceEventListener sourceEventListener;
    private ScheduledExecutorService executorService;
    private OptionHolder optionHolder;
    private ConsumerKafkaGroup consumerKafkaGroup;
    private Map<String, Map<Integer, Long>> topicOffsetMap = new HashMap();

    private static Properties createConsumerConfig(String str, String str2, String str3) {
        String[] split;
        Properties properties = new Properties();
        properties.put(ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS, str);
        if (null != str2) {
            properties.put(ADAPTOR_SUBSCRIBER_GROUP_ID, str2);
        }
        properties.put("session.timeout.ms", "30000");
        properties.put("enable.auto.commit", "false");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        if (str3 != null && (split = str3.split(HEADER_SEPARATOR)) != null && split.length > 0) {
            for (String str4 : split) {
                try {
                    String[] split2 = str4.split(ENTRY_SEPARATOR, 2);
                    properties.put(split2[0], split2[1]);
                } catch (Exception e) {
                    log.warn("Optional property '" + str4 + "' is not defined in the correct format.", e);
                }
            }
        }
        return properties;
    }

    public void init(SourceEventListener sourceEventListener, OptionHolder optionHolder, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.sourceEventListener = sourceEventListener;
        this.optionHolder = optionHolder;
        this.executorService = siddhiAppContext.getScheduledExecutorService();
        siddhiAppContext.getSnapshotService().addSnapshotable("kafka-sink", this);
    }

    public void connect() throws ConnectionUnavailableException {
        String validateAndGetStaticValue = this.optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS);
        String validateAndGetStaticValue2 = this.optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_GROUP_ID, (String) null);
        String validateAndGetStaticValue3 = this.optionHolder.validateAndGetStaticValue(THREADING_OPTION);
        String validateAndGetStaticValue4 = this.optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_PARTITION_NO_LIST, (String) null);
        this.consumerKafkaGroup = new ConsumerKafkaGroup(this.optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_TOPIC).split(HEADER_SEPARATOR), validateAndGetStaticValue4 != null ? validateAndGetStaticValue4.split(HEADER_SEPARATOR) : null, createConsumerConfig(validateAndGetStaticValue, validateAndGetStaticValue2, this.optionHolder.validateAndGetStaticValue(ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES, (String) null)), this.topicOffsetMap, validateAndGetStaticValue3, this.executorService);
        this.consumerKafkaGroup.run(this.sourceEventListener);
    }

    public void disconnect() {
        if (this.consumerKafkaGroup != null) {
            this.consumerKafkaGroup.shutdown();
            log.debug("Kafka Adapter disconnected for topic/s" + this.optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_TOPIC));
        }
    }

    public void destroy() {
        this.consumerKafkaGroup = null;
    }

    public void pause() {
        if (this.consumerKafkaGroup != null) {
            this.consumerKafkaGroup.pause();
            if (log.isDebugEnabled()) {
                log.debug("Kafka Adapter paused for topic/s" + this.optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_TOPIC));
            }
        }
    }

    public void resume() {
        if (this.consumerKafkaGroup != null) {
            this.consumerKafkaGroup.resume();
            if (log.isDebugEnabled()) {
                log.debug("Kafka Adapter resumed for topic/s" + this.optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_TOPIC));
            }
        }
    }

    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_OFFSET_MAP, this.topicOffsetMap);
        return hashMap;
    }

    public void restoreState(Map<String, Object> map) {
        this.topicOffsetMap = (Map) map.get(TOPIC_OFFSET_MAP);
        this.consumerKafkaGroup.restore(this.topicOffsetMap);
    }
}
