/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.kafka.source;

import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.kafka.source.KafkaConsumerThread;
import io.siddhi.extension.io.kafka.source.KafkaSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.log4j.Logger;

public class ConsumerKafkaGroup {
    private static final Logger LOG = Logger.getLogger(ConsumerKafkaGroup.class);
    private final String[] topics;
    private final String[] partitions;
    private final Properties props;
    private List<KafkaConsumerThread> kafkaConsumerThreadList;
    private ScheduledExecutorService executorService;
    private String threadingOption;
    private boolean isBinaryMessage;
    private KafkaSource.KafkaSourceState kafkaSourceState;

    ConsumerKafkaGroup(String[] topics, String[] partitions, Properties props, String threadingOption, ScheduledExecutorService executorService, boolean isBinaryMessage, boolean enableOffsetCommit, SourceEventListener sourceEventListener) {
        block4: {
            block5: {
                block3: {
                    this.kafkaConsumerThreadList = new ArrayList<KafkaConsumerThread>();
                    this.threadingOption = threadingOption;
                    this.topics = topics;
                    this.partitions = partitions;
                    this.props = props;
                    this.executorService = executorService;
                    this.isBinaryMessage = isBinaryMessage;
                    if (!"single.thread".equals(threadingOption)) break block3;
                    KafkaConsumerThread kafkaConsumerThread = new KafkaConsumerThread(sourceEventListener, topics, partitions, props, false, isBinaryMessage, enableOffsetCommit);
                    this.kafkaConsumerThreadList.add(kafkaConsumerThread);
                    LOG.info((Object)("Kafka Consumer thread starting to listen on topic(s): " + Arrays.toString(topics) + " with partition/s: " + Arrays.toString(partitions)));
                    break block4;
                }
                if (!"topic.wise".equals(threadingOption)) break block5;
                for (String topic : topics) {
                    KafkaConsumerThread kafkaConsumerThread = new KafkaConsumerThread(sourceEventListener, new String[]{topic}, partitions, props, false, isBinaryMessage, enableOffsetCommit);
                    this.kafkaConsumerThreadList.add(kafkaConsumerThread);
                    LOG.info((Object)("Kafka Consumer thread starting to listen on topic: " + topic + " with partition/s: " + Arrays.toString(partitions)));
                }
                break block4;
            }
            if (!"partition.wise".equals(threadingOption)) break block4;
            for (String topic : topics) {
                for (String partition : partitions) {
                    KafkaConsumerThread kafkaConsumerThread = new KafkaConsumerThread(sourceEventListener, new String[]{topic}, new String[]{partition}, props, true, isBinaryMessage, enableOffsetCommit);
                    this.kafkaConsumerThreadList.add(kafkaConsumerThread);
                    LOG.info((Object)("Kafka Consumer thread starting to listen on topic: " + topic + " with partition: " + partition));
                }
            }
        }
    }

    void pause() {
        this.kafkaConsumerThreadList.forEach(KafkaConsumerThread::pause);
    }

    void resume() {
        this.kafkaConsumerThreadList.forEach(KafkaConsumerThread::resume);
    }

    void restoreState() {
        this.kafkaConsumerThreadList.forEach(kafkaConsumerThread -> kafkaConsumerThread.restore());
    }

    void shutdown() {
        this.kafkaConsumerThreadList.forEach(KafkaConsumerThread::shutdownConsumer);
    }

    void run() {
        try {
            for (KafkaConsumerThread consumerThread : this.kafkaConsumerThreadList) {
                this.executorService.submit(consumerThread);
            }
        }
        catch (Throwable t) {
            LOG.error((Object)("Error while creating KafkaConsumerThread for topic(s): " + Arrays.toString(this.topics)), t);
        }
    }

    public void setKafkaSourceState(KafkaSource.KafkaSourceState kafkaSourceState) {
        this.kafkaSourceState = kafkaSourceState;
        for (KafkaConsumerThread consumer : this.kafkaConsumerThreadList) {
            consumer.setKafkaSourceState(kafkaSourceState);
        }
    }
}

