package org.wso2.siddhi.extension.input.transport.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;

/* loaded from: input_file:org/wso2/siddhi/extension/input/transport/kafka/KafkaConsumerThread.class */
public class KafkaConsumerThread implements Runnable {
    private static final Logger log = Logger.getLogger(KafkaConsumerThread.class);
    private final KafkaConsumer<byte[], byte[]> consumer;
    private SourceEventListener sourceEventListener;
    private String[] topics;
    private Map<String, Map<Integer, Long>> topicOffsetMap;
    private volatile boolean paused;
    private volatile boolean inactive;
    private final Lock consumerLock = new ReentrantLock();
    private List<TopicPartition> partitionsList = new ArrayList();

    public KafkaConsumerThread(SourceEventListener sourceEventListener, String[] strArr, String[] strArr2, Properties properties, Map<String, Map<Integer, Long>> map) {
        this.topicOffsetMap = new HashMap();
        this.consumer = new KafkaConsumer<>(properties);
        this.sourceEventListener = sourceEventListener;
        this.topicOffsetMap = map;
        this.topics = strArr;
        if (null != strArr2) {
            for (String str : strArr) {
                if (null == map.get(str)) {
                    this.topicOffsetMap.put(str, new HashMap());
                }
                for (String str2 : strArr2) {
                    this.partitionsList.add(new TopicPartition(str, Integer.parseInt(str2)));
                }
                log.info("Adding partitions " + Arrays.toString(strArr2) + " for topic: " + str);
                this.consumer.assign(this.partitionsList);
            }
            restore(map);
        } else {
            for (String str3 : strArr) {
                if (null == map.get(str3)) {
                    this.topicOffsetMap.put(str3, new HashMap());
                }
            }
            this.consumer.subscribe(Arrays.asList(strArr));
        }
        log.info("Subscribed for topics: " + Arrays.toString(strArr));
    }

    public void pause() {
        this.paused = true;
    }

    public void resume() {
        restore(this.topicOffsetMap);
        this.paused = false;
    }

    public void restore(Map<String, Map<Integer, Long>> map) {
        Lock lock = this.consumerLock;
        if (null != map) {
            for (String str : this.topics) {
                Map<Integer, Long> map2 = map.get(str);
                if (null != map2) {
                    for (Map.Entry<Integer, Long> entry : map2.entrySet()) {
                        TopicPartition topicPartition = new TopicPartition(str, entry.getKey().intValue());
                        if (this.partitionsList.contains(topicPartition)) {
                            log.info("Seeking partition: " + topicPartition + " for topic: " + str + " offset: " + (entry.getValue().longValue() + 1));
                            try {
                                lock.lock();
                                this.consumer.seek(topicPartition, entry.getValue().longValue() + 1);
                                lock.unlock();
                            } catch (Throwable th) {
                                lock.unlock();
                                throw th;
                            }
                        }
                    }
                }
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        Lock lock = this.consumerLock;
        while (!this.inactive) {
            while (!this.paused) {
                try {
                    lock.lock();
                    ConsumerRecords poll = this.consumer.poll(100L);
                    lock.unlock();
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        String obj = consumerRecord.value().toString();
                        if (log.isDebugEnabled()) {
                            log.debug("Event received in Kafka Event Adaptor: " + obj + ", offSet: " + consumerRecord.offset() + ", key: " + consumerRecord.key() + ", topic: " + consumerRecord.topic() + ", partition: " + consumerRecord.partition());
                        }
                        this.topicOffsetMap.get(consumerRecord.topic()).put(Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()));
                        this.sourceEventListener.onEvent(obj, (String[]) null);
                    }
                    try {
                        try {
                            lock.lock();
                            if (!poll.isEmpty()) {
                                this.consumer.commitAsync();
                            }
                            lock.unlock();
                        } catch (CommitFailedException e) {
                            log.error("Kafka commit failed for topic kafka_result_topic", e);
                            lock.unlock();
                        }
                    } catch (Throwable th) {
                        throw th;
                    }
                } finally {
                    lock.unlock();
                }
            }
        }
        lock.lock();
        this.consumer.close();
    }

    public void shutdownConsumer() {
        this.inactive = true;
    }

    public Map<String, Map<Integer, Long>> getTopicOffsetMap() {
        return this.topicOffsetMap;
    }
}
