/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.kafka.source;

import io.siddhi.annotation.Example;
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.input.source.Source;
import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.core.stream.input.source.SourceSyncCallback;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.kafka.KafkaIOUtils;
import io.siddhi.extension.io.kafka.source.ConsumerKafkaGroup;
import io.siddhi.extension.io.kafka.source.SequenceKey;
import io.siddhi.query.api.exception.SiddhiAppValidationException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.log4j.Logger;

@Extension(name="kafka", namespace="source", description="A Kafka source receives events to be processed by WSO2 SP from a topic with a partition for a Kafka cluster. The events received can be in the `TEXT` `XML` `JSON` or `Binary` format.\nIf the topic is not already created in the Kafka cluster, the Kafka sink creates the default partition for the given topic.", parameters={@Parameter(name="bootstrap.servers", description="This specifies the list of Kafka servers to which the Kafka source must listen. This list can be provided as a set of comma-separated values.\ne.g., `localhost:9092,localhost:9093`", type={DataType.STRING}), @Parameter(name="topic.list", description="This specifies the list of topics to which the source must listen. This list can be provided as a set of comma-separated values.\ne.g., `topic_one,topic_two`", type={DataType.STRING}), @Parameter(name="group.id", description="This is an ID to identify the Kafka source group. The group ID ensures that sources with the same topic and partition that are in the same group do not receive the same event.", type={DataType.STRING}), @Parameter(name="threading.option", description=" This specifies whether the Kafka source is to be run on a single thread, or in multiple threads based on a condition. Possible values are as follows:\n`single.thread`: To run the Kafka source on a single thread.\n`topic.wise`: To use a separate thread per topic.\n`partition.wise`: To use a separate thread per partition.", type={DataType.STRING}), @Parameter(name="partition.no.list", description="The partition number list for the given topic. This is provided as a list of comma-separated values. e.g., `0,1,2,`.", type={DataType.STRING}, optional=true, defaultValue="0"), @Parameter(name="seq.enabled", description="If this parameter is set to `true`, the sequence of the events received via the source is taken into account. Therefore, each event should contain a sequence number as an attribute value to indicate the sequence.", type={DataType.BOOL}, optional=true, defaultValue="false"), @Parameter(name="is.binary.message", description="In order to receive binary events via the Kafka source,it is required to setthis parameter to 'True'.", type={DataType.BOOL}, optional=true, defaultValue="false"), @Parameter(name="topic.offsets.map", description="This parameter specifies reading offsets for each topic and partition. The value for this parameter is specified in the following format: \n `<topic>=<offset>,<topic>=<offset>,`\n  When an offset is defined for a topic, the Kafka source skips reading the message with the number specified as the offset as well as all the messages sent previous to that message. If the offset is not defined for a specific topic it reads messages from the beginning. \ne.g., `stocks=100,trades=50` reads from the 101th message of the `stocks` topic, and from the 51st message of the `trades` topic.", type={DataType.STRING}, optional=true, defaultValue="null"), @Parameter(name="enable.offsets.commit", description="This parameter specifies whether to commit offsets. \nIf the manual asynchronous offset committing is needed, `enable.offsets.commit` should be `true` and `enable.auto.commit` should be `false`. \nIf periodical committing is needed `enable.offsets.commit` should be `true` and `enable.auto.commit` should be `true`. \nIf committing is not needed, `enable.offsets.commit` should be `false`. \n\nNote: `enable.auto.commit` is an `optional.configuration` property. If it is set to `true`, Source will periodically(default: 1000ms. Configurable with `auto.commit.interval.ms` property as an `optional.configuration`) commit its current offset (defined as the offset of the next message to be read) for the partitions it is reading from back to Kafka. \nTo guarantee at-least-once processing, we recommend you to enable Siddhi Periodic State Persistence when `enable.auto.commit` property is set to `true`. \nDuring manual committing, it might introduce a latency during consumption.", type={DataType.BOOL}, optional=true, defaultValue="true"), @Parameter(name="enable.async.commit", description="This parameter will changes the type of the committing offsets returned on the last poll for the subscribed list of topics and partitions.\nWhen `enable.async.commit` is set to true, committing will be an asynchronous call.", type={DataType.BOOL}, optional=true, defaultValue="true"), @Parameter(name="optional.configuration", description="This parameter contains all the other possible configurations that the consumer is created with. \ne.g., `ssl.keystore.type:JKS,batch.size:200`.", type={DataType.STRING}, optional=true, defaultValue="null")}, examples={@Example(syntax="@App:name('TestExecutionPlan') \ndefine stream BarStream (symbol string, price float, volume long); \n@info(name = 'query1') \n@source(\ntype='kafka', \ntopic.list='kafka_topic,kafka_topic2', \ngroup.id='test', \nthreading.option='partition.wise', \nbootstrap.servers='localhost:9092', \npartition.no.list='0,1', \n@map(type='xml'))\nDefine stream FooStream (symbol string, price float, volume long);\nfrom FooStream select symbol, price, volume insert into BarStream;\n", description="This kafka source configuration listens to the `kafka_topic` and `kafka_topic2` topics with `0` and `1` partitions. A thread is created for each topic and partition combination. The events are received in the XML format, mapped to a Siddhi event, and sent to a stream named `FooStream`. "), @Example(syntax="@App:name('TestExecutionPlan') \ndefine stream BarStream (symbol string, price float, volume long); \n@info(name = 'query1') \n@source(\ntype='kafka', \ntopic.list='kafka_topic',\ngroup.id='test', \nthreading.option='single.thread',\nbootstrap.servers='localhost:9092',\n@map(type='xml'))\nDefine stream FooStream (symbol string, price float, volume long);\nfrom FooStream select symbol, price, volume insert into BarStream;\n", description="This Kafka source configuration listens to the `kafka_topic` topic for the default partition because no `partition.no.list` is defined. Only one thread is created for the topic. The events are received in the XML format, mapped to a Siddhi event, and sent to a stream named `FooStream`."), @Example(syntax="@App:name('TestExecutionPlan')\n@source(type='kafka',\n        topic.list='trp_topic',\n        partition.no.list='0',\n        threading.option='single.thread',\n        group.id='group',\n        bootstrap.servers='localhost:9092',\n        @map(type='xml', enclosing.element='//events',             @attributes(symbol ='symbol', price = 'price', volume = 'volume',                         partition = 'trp:partition',                         topic = 'trp:topic', key = 'trp:key',                         recordTimestamp = 'trp:record.timestamp',                          eventTimestamp = 'trp:event.timestamp',                         checkSum = 'trp:check.sum', topicOffset = 'trp:offset')))\ndefine stream FooStream (symbol string, price float, volume long,                                      partition string,                                      topic string, key string,                                      recordTimestamp string,                                      eventTimestamp string, checkSum string,                                      topicOffset string);\nfrom FooStream select * insert into BarStream;", description="This Kafka source configuration listens to the `trp_topic` topic for the default partition because no `partition.no.list` is defined. \nSince the custom attribute mapping is enabled with TRP values, the siddhi event will be populated with the relevant trp values as well")})
public class KafkaSource
extends Source<KafkaSourceState>
implements SourceSyncCallback {
    public static final String SINGLE_THREADED = "single.thread";
    static final String TOPIC_WISE = "topic.wise";
    static final String PARTITION_WISE = "partition.wise";
    public static final String ADAPTOR_SUBSCRIBER_TOPIC = "topic.list";
    public static final String ADAPTOR_SUBSCRIBER_GROUP_ID = "group.id";
    public static final String ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS = "bootstrap.servers";
    public static final String ADAPTOR_SUBSCRIBER_PARTITION_NO_LIST = "partition.no.list";
    public static final String ADAPTOR_ENABLE_AUTO_COMMIT = "enable.auto.commit";
    public static final String ADAPTOR_ENABLE_OFFSET_COMMIT = "enable.offsets.commit";
    public static final String ADAPTOR_ENABLE_ASYNC_COMMIT = "enable.async.commit";
    public static final String ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES = "optional.configuration";
    private static final String TOPIC_OFFSET_MAP = "topic.offsets.map";
    public static final String THREADING_OPTION = "threading.option";
    public static final String SEQ_ENABLED = "seq.enabled";
    private static final String LAST_RECEIVED_SEQ_NO_KEY = "lastReceivedSeqNo";
    public static final String IS_BINARY_MESSAGE = "is.binary.message";
    private static final Logger LOG = Logger.getLogger(KafkaSource.class);
    private static final String TOPIC = "topic";
    private static final String PARTITION = "partition";
    private static final String OFFSET = "offSet";
    private OptionHolder optionHolder;
    private ConsumerKafkaGroup consumerKafkaGroup;
    private String bootstrapServers;
    private String groupID;
    private String[] partitions;
    private String[] topics;
    private String optionalConfigs;
    private boolean seqEnabled = false;
    private boolean isBinaryMessage;
    private boolean enableOffsetCommit;
    private boolean enableAsyncCommit;
    private String topicOffsetMapConfig;
    private SiddhiAppContext siddhiAppContext;
    private KafkaSourceState kafkaSourceState;
    private String threadingOption;
    private SourceEventListener sourceEventListener;
    private String[] requiredProperties;

    public StateFactory<KafkaSourceState> init(SourceEventListener sourceEventListener, OptionHolder optionHolder, String[] requiredProperties, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
        this.siddhiAppContext = siddhiAppContext;
        this.optionHolder = optionHolder;
        this.requiredProperties = (String[])requiredProperties.clone();
        this.sourceEventListener = sourceEventListener;
        this.bootstrapServers = optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS);
        this.groupID = optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_GROUP_ID);
        this.threadingOption = optionHolder.validateAndGetStaticValue(THREADING_OPTION);
        String partitionList = optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_PARTITION_NO_LIST, null);
        this.partitions = partitionList != null ? partitionList.split(",") : null;
        String topicList = optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_TOPIC);
        this.topics = topicList.split(",");
        this.seqEnabled = optionHolder.validateAndGetStaticValue(SEQ_ENABLED, "false").equalsIgnoreCase("true");
        this.optionalConfigs = optionHolder.validateAndGetStaticValue(ADAPTOR_OPTIONAL_CONFIGURATION_PROPERTIES, null);
        this.isBinaryMessage = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(IS_BINARY_MESSAGE, "false"));
        this.enableOffsetCommit = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(ADAPTOR_ENABLE_OFFSET_COMMIT, "true"));
        this.enableAsyncCommit = Boolean.parseBoolean(optionHolder.validateAndGetStaticValue(ADAPTOR_ENABLE_ASYNC_COMMIT, "true"));
        this.topicOffsetMapConfig = optionHolder.validateAndGetStaticValue(TOPIC_OFFSET_MAP, null);
        if (PARTITION_WISE.equals(this.threadingOption) && null == this.partitions) {
            throw new SiddhiAppValidationException("Threading option is selected as 'partition.wise' but there are no partitions given");
        }
        return () -> new KafkaSourceState(this.seqEnabled);
    }

    public Class[] getOutputEventClasses() {
        return new Class[]{String.class, ByteBuffer.class};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void connect(Source.ConnectionCallback connectionCallback, KafkaSourceState kafkaSourceState) throws ConnectionUnavailableException {
        try {
            ExecutorService executorService = this.siddhiAppContext.getExecutorService();
            this.consumerKafkaGroup = new ConsumerKafkaGroup(this.topics, this.partitions, KafkaSource.createConsumerConfig(this.bootstrapServers, this.groupID, this.optionalConfigs, this.isBinaryMessage, this.enableOffsetCommit), this.threadingOption, executorService, this.isBinaryMessage, this.enableOffsetCommit, this.enableAsyncCommit, this.sourceEventListener, this.requiredProperties);
            this.checkTopicsAvailableInCluster();
            this.checkPartitionsAvailableForTheTopicsInCluster();
            this.kafkaSourceState = kafkaSourceState;
            if (!kafkaSourceState.isRestored && this.topicOffsetMapConfig != null) {
                KafkaSourceState kafkaSourceState2 = kafkaSourceState;
                synchronized (kafkaSourceState2) {
                    kafkaSourceState.topicOffsetMap = this.readTopicOffsetsConfig(this.topicOffsetMapConfig);
                }
                this.consumerKafkaGroup.setKafkaSourceState(kafkaSourceState);
                this.consumerKafkaGroup.restoreState();
            } else {
                this.consumerKafkaGroup.setKafkaSourceState(kafkaSourceState);
            }
            this.consumerKafkaGroup.run();
        }
        catch (SiddhiAppRuntimeException e) {
            throw e;
        }
        catch (Throwable e) {
            throw new ConnectionUnavailableException("Error when initiating connection with Kafka server: " + this.bootstrapServers + " in Siddhi App: " + this.siddhiAppContext.getName(), e);
        }
    }

    public void disconnect() {
        this.kafkaSourceState = null;
        if (this.consumerKafkaGroup != null) {
            this.consumerKafkaGroup.setKafkaSourceState(null);
            this.consumerKafkaGroup.shutdown();
            LOG.info((Object)("Kafka Adapter disconnected for topic(s): " + this.optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_TOPIC)));
        }
    }

    public void destroy() {
        this.consumerKafkaGroup = null;
    }

    public void pause() {
        if (this.consumerKafkaGroup != null) {
            this.consumerKafkaGroup.pause();
            LOG.info((Object)("Kafka Adapter paused for topic(s): " + this.optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_TOPIC)));
        }
    }

    public void resume() {
        if (this.consumerKafkaGroup != null) {
            this.consumerKafkaGroup.resume();
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Kafka Adapter resumed for topic(s): " + this.optionHolder.validateAndGetStaticValue(ADAPTOR_SUBSCRIBER_TOPIC)));
            }
        }
    }

    public void update(String[] transportSyncProperties) {
        for (String propertiesStr : transportSyncProperties) {
            String[] properties = propertiesStr.split(",");
            String topic = "";
            Integer partition = 0;
            for (String property : properties) {
                Map partitionOffsetMap;
                String[] keyValues = property.split(":");
                if (keyValues[0].equals(TOPIC)) {
                    topic = keyValues[1];
                    this.kafkaSourceState.topicOffsetMap.computeIfAbsent(keyValues[1], k -> new HashMap());
                    continue;
                }
                if (keyValues[0].equals(PARTITION)) {
                    partitionOffsetMap = (Map)this.kafkaSourceState.topicOffsetMap.get(topic);
                    if (null != partitionOffsetMap.get(Integer.valueOf(keyValues[1]))) continue;
                    partition = Integer.valueOf(keyValues[1]);
                    partitionOffsetMap.put(partition, 0L);
                    continue;
                }
                if (!keyValues[0].equals(OFFSET)) continue;
                partitionOffsetMap = (Map)this.kafkaSourceState.topicOffsetMap.get(topic);
                long savedOffsetValue = (Long)partitionOffsetMap.get(partition);
                Long offsetValue = Long.valueOf(keyValues[1]);
                if (offsetValue <= savedOffsetValue) continue;
                partitionOffsetMap.put(partition, offsetValue);
            }
        }
    }

    protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
        return null;
    }

    private Map<String, Map<Integer, Long>> readTopicOffsetsConfig(String topicOffsetsConfig) {
        String[] topicOffsets;
        HashMap<String, Map<Integer, Long>> perTopicPerPartitionOffset = new HashMap<String, Map<Integer, Long>>();
        for (String entry : topicOffsets = topicOffsetsConfig.split(",")) {
            String[] topicOffset = entry.split("=");
            if (topicOffset.length != 2) {
                LOG.error((Object)"Topic offset should be given in <topic>=<offset>,.. format. ");
                return null;
            }
            boolean isTopicListed = Arrays.stream(this.topics).anyMatch(topic -> topic.equals(topicOffset[0]));
            if (!isTopicListed) {
                LOG.error((Object)("Topic " + topicOffset[0] + " not listed in topic.list config"));
                return null;
            }
            HashMap partitionOffset = new HashMap();
            Arrays.stream(this.partitions).forEach(partition -> partitionOffset.put(Integer.parseInt(partition), Long.parseLong(topicOffset[1])));
            perTopicPerPartitionOffset.put(topicOffset[0], partitionOffset);
        }
        return perTopicPerPartitionOffset;
    }

    private void checkTopicsAvailableInCluster() {
        Properties props = KafkaSource.createConsumerConfig(this.bootstrapServers, this.groupID, this.optionalConfigs, this.isBinaryMessage, this.enableOffsetCommit);
        props.put(ADAPTOR_SUBSCRIBER_GROUP_ID, "test-consumer-group");
        KafkaConsumer consumer = new KafkaConsumer(props);
        Map testTopicList = consumer.listTopics();
        boolean topicsAvailable = true;
        StringBuilder invalidTopics = new StringBuilder("");
        for (String topic : this.topics) {
            boolean topicAvailable = false;
            for (Map.Entry entry : testTopicList.entrySet()) {
                if (!((String)entry.getKey()).equals(topic)) continue;
                topicAvailable = true;
            }
            if (topicAvailable) continue;
            topicsAvailable = false;
            if ("".equals(invalidTopics.toString())) {
                invalidTopics.append(topic);
            } else {
                invalidTopics.append(',').append(topic);
            }
            LOG.warn((Object)("Topic, " + topic + " is not available."));
        }
        if (!(null == this.partitions || this.partitions.length == 1 && this.partitions[0].equals("0") || topicsAvailable)) {
            String errorMessage = "Topic(s) " + invalidTopics + " aren't available. Topics won't be created since there are partition numbers defined in the query.";
            LOG.error((Object)errorMessage);
            throw new SiddhiAppRuntimeException("Topic(s) " + invalidTopics + " aren't available. Topics won't be created since there are partition numbers defined in the query.");
        }
        if (!topicsAvailable) {
            if (this.siddhiAppContext.isTransportChannelCreationEnabled()) {
                LOG.warn((Object)("Topic(s) " + invalidTopics + " aren't available. These Topics will be created with the default partition."));
            } else {
                throw new SiddhiAppRuntimeException("Topic(s) " + invalidTopics + " creation failed. User has disabled topic creation by setting " + "transportChannelCreationEnabled" + " property to false. Hence Siddhi App deployment will be aborted.");
            }
        }
    }

    private void checkPartitionsAvailableForTheTopicsInCluster() {
        Properties configProperties = KafkaSource.createProducerConfig(this.bootstrapServers, this.optionalConfigs, this.isBinaryMessage);
        KafkaProducer producer = new KafkaProducer(configProperties);
        boolean partitionsAvailable = true;
        StringBuilder invalidPartitions = new StringBuilder("");
        for (String topic : this.topics) {
            List partitionInfos = producer.partitionsFor(topic);
            if (null == this.partitions || this.partitions.length == 1 && this.partitions[0].equals("0")) continue;
            for (String partition : this.partitions) {
                boolean partitonAvailable = false;
                for (PartitionInfo partitionInfo : partitionInfos) {
                    if (Integer.parseInt(partition) != partitionInfo.partition()) continue;
                    partitonAvailable = true;
                }
                if (partitonAvailable) continue;
                partitionsAvailable = false;
                if ("".equals(invalidPartitions.toString())) {
                    invalidPartitions.append(partition);
                } else {
                    invalidPartitions.append(',').append(partition);
                }
                LOG.error((Object)("Partition number, " + partition + " in 'partition.id' is not available in topic partitions"));
            }
            if (partitionsAvailable) continue;
            throw new SiddhiAppRuntimeException("Partition number(s) " + invalidPartitions + " aren't available for the topic: " + topic);
        }
    }

    private static Properties createConsumerConfig(String zkServerList, String groupId, String optionalConfigs, boolean isBinaryMessage, boolean enableOffsetCommit) {
        Properties props = new Properties();
        props.put(ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS, zkServerList);
        props.put(ADAPTOR_SUBSCRIBER_GROUP_ID, groupId);
        props.put("session.timeout.ms", "30000");
        if (!enableOffsetCommit) {
            props.put(ADAPTOR_ENABLE_AUTO_COMMIT, "false");
        }
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        if (!isBinaryMessage) {
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        } else {
            props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        }
        KafkaIOUtils.splitHeaderValues(optionalConfigs, props);
        return props;
    }

    private static Properties createProducerConfig(String zkServerList, String optionalConfigs, boolean isBinaryMessage) {
        Properties configProperties = new Properties();
        configProperties.put(ADAPTOR_SUBSCRIBER_ZOOKEEPER_CONNECT_SERVERS, zkServerList);
        configProperties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaIOUtils.splitHeaderValues(optionalConfigs, configProperties);
        if (!isBinaryMessage) {
            configProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        } else {
            configProperties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        }
        return configProperties;
    }

    public class KafkaSourceState
    extends State {
        private Map<String, Map<Integer, Long>> topicOffsetMap = new HashMap<String, Map<Integer, Long>>();
        private Map<String, Map<SequenceKey, Integer>> consumerLastReceivedSeqNoMap = null;
        private boolean isRestored = false;

        public Map<String, Map<Integer, Long>> getTopicOffsetMap() {
            return this.topicOffsetMap;
        }

        public Map<String, Map<SequenceKey, Integer>> getConsumerLastReceivedSeqNoMap() {
            return this.consumerLastReceivedSeqNoMap;
        }

        public KafkaSourceState(boolean seqEnabled) {
            if (seqEnabled) {
                this.consumerLastReceivedSeqNoMap = new HashMap<String, Map<SequenceKey, Integer>>();
            }
        }

        public Map<String, Object> snapshot() {
            HashMap<String, Object> currentState = new HashMap<String, Object>();
            currentState.put(KafkaSource.TOPIC_OFFSET_MAP, this.topicOffsetMap);
            if (KafkaSource.this.seqEnabled) {
                currentState.put(KafkaSource.LAST_RECEIVED_SEQ_NO_KEY, this.consumerLastReceivedSeqNoMap);
            }
            return currentState;
        }

        public void restore(Map<String, Object> state) {
            this.isRestored = true;
            this.topicOffsetMap = (Map)state.get(KafkaSource.TOPIC_OFFSET_MAP);
            if (KafkaSource.this.consumerKafkaGroup != null) {
                KafkaSource.this.consumerKafkaGroup.restoreState();
            }
            if (KafkaSource.this.seqEnabled) {
                this.consumerLastReceivedSeqNoMap = (Map)state.get(KafkaSource.LAST_RECEIVED_SEQ_NO_KEY);
            }
        }

        public boolean canDestroy() {
            return false;
        }
    }
}

