package org.wso2.siddhi.extension.input.transport.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.stream.input.source.SourceEventListener;

/* loaded from: input_file:org/wso2/siddhi/extension/input/transport/kafka/ConsumerKafkaGroup.class */
public class ConsumerKafkaGroup {
    private final String[] topics;
    private final String[] partitions;
    private final Properties props;
    private List<KafkaConsumerThread> kafkaConsumerThreadList = new ArrayList();
    private Map<String, Map<Integer, Long>> topicOffsetMap;
    private ScheduledExecutorService executorService;
    private String threadingOption;
    private static final Logger log = Logger.getLogger(ConsumerKafkaGroup.class);

    public ConsumerKafkaGroup(String[] strArr, String[] strArr2, Properties properties, Map<String, Map<Integer, Long>> map, String str, ScheduledExecutorService scheduledExecutorService) {
        this.topicOffsetMap = new HashMap();
        this.threadingOption = str;
        this.topicOffsetMap = map;
        this.topics = strArr;
        this.partitions = strArr2;
        this.props = properties;
        this.executorService = scheduledExecutorService;
    }

    public void pause() {
        this.kafkaConsumerThreadList.forEach((v0) -> {
            v0.pause();
        });
    }

    public void resume() {
        this.kafkaConsumerThreadList.forEach((v0) -> {
            v0.resume();
        });
    }

    public void restore(Map<String, Map<Integer, Long>> map) {
        this.kafkaConsumerThreadList.forEach(kafkaConsumerThread -> {
            kafkaConsumerThread.restore(map);
        });
    }

    public void shutdown() {
        this.kafkaConsumerThreadList.forEach((v0) -> {
            v0.shutdownConsumer();
        });
    }

    public void run(SourceEventListener sourceEventListener) {
        try {
            if ("single.thread".equals(this.threadingOption)) {
                KafkaConsumerThread kafkaConsumerThread = new KafkaConsumerThread(sourceEventListener, this.topics, this.partitions, this.props, this.topicOffsetMap);
                this.kafkaConsumerThreadList.add(kafkaConsumerThread);
                log.info("Kafka Consumer thread starting to listen on topic/s: " + Arrays.toString(this.topics) + " with partition/s: " + Arrays.toString(this.partitions));
                kafkaConsumerThread.run();
            } else if ("topic.wise".equals(this.threadingOption)) {
                for (String str : this.topics) {
                    KafkaConsumerThread kafkaConsumerThread2 = new KafkaConsumerThread(sourceEventListener, new String[]{str}, this.partitions, this.props, this.topicOffsetMap);
                    this.kafkaConsumerThreadList.add(kafkaConsumerThread2);
                    this.executorService.submit(kafkaConsumerThread2);
                    log.info("Kafka Consumer thread starting to listen on topic: " + str + " with partition/s: " + Arrays.toString(this.partitions));
                }
            } else if ("partition.wise".equals(this.threadingOption)) {
                for (String str2 : this.topics) {
                    for (String str3 : this.partitions) {
                        KafkaConsumerThread kafkaConsumerThread3 = new KafkaConsumerThread(sourceEventListener, new String[]{str2}, new String[]{str3}, this.props, this.topicOffsetMap);
                        this.kafkaConsumerThreadList.add(kafkaConsumerThread3);
                        this.executorService.submit(kafkaConsumerThread3);
                        log.info("Kafka Consumer thread starting to listen on topic: " + str2 + " with partition: " + str3);
                    }
                }
            }
        } catch (Throwable th) {
            log.error("Error while creating KafkaConsumerThread for topic/s: " + Arrays.toString(this.topics), th);
        }
    }

    public Map<String, Map<Integer, Long>> getTopicOffsetMap() {
        HashMap hashMap = new HashMap();
        Iterator<KafkaConsumerThread> it = this.kafkaConsumerThreadList.iterator();
        while (it.hasNext()) {
            for (Map.Entry<String, Map<Integer, Long>> entry : it.next().getTopicOffsetMap().entrySet()) {
                hashMap.put(entry.getKey(), entry.getValue());
            }
        }
        return hashMap;
    }
}
