package org.wso2.carbon.event.input.adapter.kafka;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterListener;

/* loaded from: input_file:org/wso2/carbon/event/input/adapter/kafka/ConsumerKafkaAdaptor.class */
public class ConsumerKafkaAdaptor {
    private final ConsumerConnector consumer;
    private final String topic;
    private ExecutorService executor;
    private Log log = LogFactory.getLog(ConsumerKafkaAdaptor.class);
    private int tenantId;

    public ConsumerKafkaAdaptor(String str, int i, ConsumerConfig consumerConfig) {
        this.consumer = Consumer.createJavaConsumerConnector(consumerConfig);
        this.topic = str;
        this.tenantId = i;
    }

    public void shutdown() {
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
        if (this.executor != null) {
            this.executor.shutdown();
        }
    }

    public void run(int i, InputEventAdapterListener inputEventAdapterListener) {
        try {
            HashMap hashMap = new HashMap();
            hashMap.put(this.topic, Integer.valueOf(i));
            List list = (List) this.consumer.createMessageStreams(hashMap).get(this.topic);
            this.executor = Executors.newFixedThreadPool(i);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                this.executor.submit(new KafkaConsumer((KafkaStream) it.next(), inputEventAdapterListener, this.tenantId));
            }
        } catch (Throwable th) {
            this.log.error("Error while creating KafkaConsumer ", th);
        }
    }
}
