package org.wso2.carbon.inbound.endpoint.protocol.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Blacklist;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.consumer.Whitelist;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
import org.apache.synapse.SynapseException;

/* loaded from: input_file:org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAMessageListener.class */
public class KAFKAMessageListener extends AbstractKafkaMessageListener {
    public KAFKAMessageListener(int i, List<String> list, Properties properties, InjectHandler injectHandler) throws Exception {
        this.threadCount = i;
        this.topics = list;
        this.kafkaProperties = properties;
        this.injectHandler = injectHandler;
    }

    @Override // org.wso2.carbon.inbound.endpoint.protocol.kafka.AbstractKafkaMessageListener
    public boolean createKafkaConsumerConnector() throws Exception {
        log.debug("Create the connection and start to consume the streams");
        try {
            if (this.consumerConnector == null) {
                log.info("Creating Kafka Consumer Connector...");
                if (!this.kafkaProperties.containsKey(KAFKAConstants.CONSUMER_TIMEOUT)) {
                    this.kafkaProperties.put(KAFKAConstants.CONSUMER_TIMEOUT, "3000");
                }
                this.consumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(this.kafkaProperties));
                log.info("Kafka Consumer Connector is created");
                start();
            }
            return true;
        } catch (ZkTimeoutException e) {
            log.error(" Error in Creating Kafka Consumer Connector | ZkTimeout" + e.getMessage());
            throw new SynapseException(" Error in Creating Kafka Consumer Connector| ZkTimeout");
        } catch (Exception e2) {
            log.error(" Error in Creating Kafka Consumer Connector." + e2.getMessage(), e2);
            throw new SynapseException(" Error in Creating Kafka Consumer Connector ", e2);
        }
    }

    @Override // org.wso2.carbon.inbound.endpoint.protocol.kafka.AbstractKafkaMessageListener
    public void start() throws Exception {
        log.debug("Start to consume the streams");
        try {
            log.info("Starting KAFKA consumer...");
            HashMap hashMap = new HashMap();
            if (this.topics != null && this.topics.size() > 0) {
                Iterator<String> it = this.topics.iterator();
                while (it.hasNext()) {
                    hashMap.put(it.next(), Integer.valueOf(this.threadCount));
                }
                Map createMessageStreams = this.consumerConnector.createMessageStreams(hashMap);
                this.consumerIte = new ArrayList<>();
                Iterator<String> it2 = this.topics.iterator();
                while (it2.hasNext()) {
                    startConsumers((List) createMessageStreams.get(it2.next()));
                }
            } else if (this.kafkaProperties.getProperty(KAFKAConstants.TOPIC_FILTER) != null) {
                startConsumers((this.kafkaProperties.getProperty(KAFKAConstants.FILTER_FROM_WHITE_LIST) == null || this.kafkaProperties.getProperty(KAFKAConstants.FILTER_FROM_WHITE_LIST).isEmpty()) ? Boolean.TRUE.booleanValue() : Boolean.parseBoolean(this.kafkaProperties.getProperty(KAFKAConstants.FILTER_FROM_WHITE_LIST)) ? this.consumerConnector.createMessageStreamsByFilter(new Whitelist(this.kafkaProperties.getProperty(KAFKAConstants.TOPIC_FILTER)), this.threadCount) : this.consumerConnector.createMessageStreamsByFilter(new Blacklist(this.kafkaProperties.getProperty(KAFKAConstants.TOPIC_FILTER)), this.threadCount));
            }
        } catch (Exception e) {
            log.error("Error while Starting KAFKA consumer." + e.getMessage(), e);
            throw new SynapseException("Error while Starting KAFKA consumer.", e);
        }
    }

    protected void startConsumers(List<KafkaStream<byte[], byte[]>> list) {
        if (list.size() >= 1) {
            this.consumerIte.add(list.get(0).iterator());
        }
    }

    @Override // org.wso2.carbon.inbound.endpoint.protocol.kafka.AbstractKafkaMessageListener
    public void injectMessageToESB(String str) {
        if (this.consumerIte.size() == 1) {
            injectMessageToESB(str, this.consumerIte.get(0));
        } else {
            log.debug("There are multiple topics to consume from not a single topic");
        }
    }

    public void injectMessageToESB(String str, ConsumerIterator<byte[], byte[]> consumerIterator) {
        this.injectHandler.invoke((byte[]) consumerIterator.next().message(), str);
    }

    @Override // org.wso2.carbon.inbound.endpoint.protocol.kafka.AbstractKafkaMessageListener
    public boolean hasNext() {
        if (this.consumerIte.size() == 1) {
            return hasNext(this.consumerIte.get(0));
        }
        log.debug("There are multiple topics to consume from not a single topic,");
        return false;
    }

    public boolean hasNext(ConsumerIterator<byte[], byte[]> consumerIterator) {
        try {
            return consumerIterator.hasNext();
        } catch (Exception e) {
            if (!log.isDebugEnabled()) {
                return false;
            }
            log.debug("Kafka listener is interrupted by server shutdown.", e);
            return false;
        } catch (ConsumerTimeoutException e2) {
            if (!log.isDebugEnabled()) {
                return false;
            }
            log.debug("Topic has no new messages to consume.");
            return false;
        }
    }

    @Override // org.wso2.carbon.inbound.endpoint.protocol.kafka.AbstractKafkaMessageListener
    public boolean hasMultipleTopicsToConsume() {
        return this.consumerIte.size() > 1;
    }

    @Override // org.wso2.carbon.inbound.endpoint.protocol.kafka.AbstractKafkaMessageListener
    public void consumeMultipleTopics(String str) {
        Iterator<ConsumerIterator<byte[], byte[]>> it = this.consumerIte.iterator();
        while (it.hasNext()) {
            ConsumerIterator<byte[], byte[]> next = it.next();
            if (hasNext(next)) {
                injectMessageToESB(str, next);
            }
        }
    }
}
