package org.wso2.carbon.inbound.endpoint.protocol.kafka;

import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseException;
import org.wso2.carbon.inbound.endpoint.protocol.kafka.AbstractKafkaMessageListener;

/* loaded from: input_file:org/wso2/carbon/inbound/endpoint/protocol/kafka/KAFKAPollingConsumer.class */
public class KAFKAPollingConsumer {
    private static final Log log = LogFactory.getLog(KAFKAPollingConsumer.class.getName());
    private InjectHandler injectHandler;
    private Properties kafkaProperties;
    private int threadCount;
    private List<String> topics;
    protected AbstractKafkaMessageListener messageListener;
    private long scanInterval;
    private Long lastRanTime;
    private String name;

    public KAFKAPollingConsumer(Properties properties, long j, String str) throws Exception {
        this.kafkaProperties = properties;
        this.scanInterval = j;
        this.name = str;
        try {
            if (properties.getProperty(KAFKAConstants.THREAD_COUNT) == null || properties.getProperty(KAFKAConstants.THREAD_COUNT).equals("") || Integer.parseInt(properties.getProperty(KAFKAConstants.THREAD_COUNT)) <= 0) {
                this.threadCount = 1;
            } else {
                this.threadCount = Integer.parseInt(properties.getProperty(KAFKAConstants.THREAD_COUNT));
            }
            if (properties.getProperty(KAFKAConstants.TOPICS) != null) {
                this.topics = Arrays.asList(properties.getProperty(KAFKAConstants.TOPICS).split(","));
            }
        } catch (NumberFormatException e) {
            log.error("Invalid numeric value for thread count." + e.getMessage(), e);
            throw new SynapseException("Invalid numeric value for thread count.", e);
        }
    }

    public void startsMessageListener() throws Exception {
        log.debug("Create the Kafka message listener");
        if (this.messageListener == null) {
            try {
                if (this.kafkaProperties.getProperty(KAFKAConstants.CONSUMER_TYPE) == null || this.kafkaProperties.getProperty(KAFKAConstants.CONSUMER_TYPE).isEmpty() || this.kafkaProperties.getProperty(KAFKAConstants.CONSUMER_TYPE).equalsIgnoreCase(AbstractKafkaMessageListener.CONSUMER_TYPE.HIGHLEVEL.getName())) {
                    this.messageListener = new KAFKAMessageListener(this.threadCount, this.topics, this.kafkaProperties, this.injectHandler);
                } else if (this.kafkaProperties.getProperty(KAFKAConstants.CONSUMER_TYPE).equalsIgnoreCase(AbstractKafkaMessageListener.CONSUMER_TYPE.SIMPLE.getName())) {
                    this.messageListener = new SimpleKafkaMessageListener(this.kafkaProperties, this.injectHandler);
                }
            } catch (Exception e) {
                log.error("The consumer type should be high level or simple." + e.getMessage(), e);
                throw new SynapseException("The consumer type should be high level or simple", e);
            }
        }
    }

    public void execute() {
        try {
            log.debug("Executing : KAFKA Inbound EP : ");
            long time = new Date().getTime();
            if (this.lastRanTime == null || this.lastRanTime.longValue() + this.scanInterval <= time) {
                this.lastRanTime = Long.valueOf(time);
                poll();
            } else if (log.isDebugEnabled()) {
                log.debug("Skip cycle since concurrent rate is higher than the scan interval : KAFKA Inbound EP ");
            }
            if (log.isDebugEnabled()) {
                log.debug("End : KAFKA Inbound EP : ");
            }
        } catch (Exception e) {
            log.error("Error while retrieving or injecting KAFKA message." + e.getMessage(), e);
        }
    }

    public void registerHandler(InjectHandler injectHandler) {
        this.injectHandler = injectHandler;
    }

    public Object poll() {
        log.debug("Run to poll messages and inject to the sequence");
        try {
            if (!this.messageListener.createKafkaConsumerConnector()) {
                return null;
            }
            try {
                if (this.messageListener.hasMultipleTopicsToConsume()) {
                    if (this.injectHandler == null) {
                        return null;
                    }
                    this.messageListener.consumeMultipleTopics(this.name);
                } else {
                    if (this.injectHandler == null || !this.messageListener.hasNext()) {
                        return null;
                    }
                    this.messageListener.injectMessageToESB(this.name);
                }
                return null;
            } catch (Exception e) {
                log.error("Error while receiving KAFKA message." + e.getMessage(), e);
                return null;
            }
        } catch (Exception e2) {
            log.error(e2.getMessage());
            return null;
        }
    }
}
