package org.wso2.carbon.inbound.endpoint.protocol.kafka;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.apache.synapse.SynapseException;

/* loaded from: input_file:org/wso2/carbon/inbound/endpoint/protocol/kafka/SimpleKafkaMessageListener.class */
public class SimpleKafkaMessageListener extends AbstractKafkaMessageListener {
    private List<String> replicaBrokers;
    private boolean init;
    private String topic;
    private long maxReads = Long.MAX_VALUE;
    private int partition;
    private List<String> seedBrokers;
    private int port;
    private String leadBroker;
    private String clientName;
    private SimpleConsumer consumer;
    private long readOffset;

    public SimpleKafkaMessageListener(Properties properties, InjectHandler injectHandler) throws Exception {
        this.kafkaProperties = properties;
        this.injectHandler = injectHandler;
        this.replicaBrokers = new ArrayList();
        validateInputParameters();
    }

    private void validateInputParameters() throws Exception {
        if (this.kafkaProperties.getProperty(KAFKAConstants.SIMPLE_TOPIC) == null) {
            log.error("simple consumer topic is invalid");
            throw new SynapseException("simple consumer topic is invalid");
        }
        this.topic = this.kafkaProperties.getProperty(KAFKAConstants.SIMPLE_TOPIC);
        if (this.kafkaProperties.getProperty(KAFKAConstants.SIMPLE_BROKERS) == null) {
            log.error("simple consumer brokers is invalid");
            throw new SynapseException("simple consumer brokers is invalid");
        }
        this.seedBrokers = getSeedBrokers(this.kafkaProperties.getProperty(KAFKAConstants.SIMPLE_BROKERS));
        if (this.kafkaProperties.getProperty(KAFKAConstants.SIMPLE_PORT) == null) {
            log.error("simple consumer port is invalid");
            throw new SynapseException("simple consumer port is invalid");
        }
        try {
            this.port = Integer.parseInt(this.kafkaProperties.getProperty(KAFKAConstants.SIMPLE_PORT));
            if (this.kafkaProperties.getProperty(KAFKAConstants.SIMPLE_PARTITION) == null) {
                log.error("simple consumer partition is invalid");
                throw new SynapseException("simple consumer partition is invalid");
            }
            try {
                this.partition = Integer.parseInt(this.kafkaProperties.getProperty(KAFKAConstants.SIMPLE_PARTITION));
                if (this.kafkaProperties.getProperty(KAFKAConstants.SIMPLE_MAX_MSG_TO_READ) == null) {
                    log.error("simple consumer maximum messages to read is invalid");
                    throw new SynapseException("simple consumer maximum messages to read is invalid");
                }
                try {
                    this.maxReads = Long.parseLong(this.kafkaProperties.getProperty(KAFKAConstants.SIMPLE_MAX_MSG_TO_READ));
                } catch (NumberFormatException e) {
                    log.error("maximum messages should be a number " + e.getMessage(), e);
                    throw new SynapseException("maximum messages should be a number", e);
                }
            } catch (NumberFormatException e2) {
                log.error("simple partition should be a number " + e2.getMessage(), e2);
                throw new SynapseException("simple partition should be a number", e2);
            }
        } catch (NumberFormatException e3) {
            log.error("simple consumer port should be number." + e3.getMessage(), e3);
            throw new SynapseException("simple consumer port should be number.", e3);
        }
    }

    private List<String> getSeedBrokers(String str) {
        try {
            return Arrays.asList(str.split(","));
        } catch (Exception e) {
            log.error("Error to split the brokers from broker list" + e.getMessage(), e);
            throw new SynapseException("Error to split the brokers from broker list", e);
        }
    }

    @Override // org.wso2.carbon.inbound.endpoint.protocol.kafka.AbstractKafkaMessageListener
    public boolean createKafkaConsumerConnector() throws Exception {
        return run();
    }

    @Override // org.wso2.carbon.inbound.endpoint.protocol.kafka.AbstractKafkaMessageListener
    public void start() throws Exception {
    }

    @Override // org.wso2.carbon.inbound.endpoint.protocol.kafka.AbstractKafkaMessageListener
    public void injectMessageToESB(String str) {
        log.debug("Fetch the messages until maximum message is zero");
        if (this.maxReads > 0) {
            if (this.consumer == null) {
                this.consumer = new SimpleConsumer(this.leadBroker, this.port, KAFKAConstants.SO_TIMEOUT, KAFKAConstants.BUFFER_SIZE, this.clientName);
            }
            FetchResponse fetch = this.consumer.fetch(new FetchRequestBuilder().clientId(this.clientName).addFetch(this.topic, this.partition, this.readOffset, KAFKAConstants.SO_TIMEOUT).build());
            if (fetch.hasError()) {
                short errorCode = fetch.errorCode(this.topic, this.partition);
                log.error("Error fetching data from the Broker:" + this.leadBroker + " Reason: " + ((int) errorCode));
                if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
                    this.readOffset = getLastOffset(this.consumer, this.topic, this.partition, OffsetRequest.LatestTime(), this.clientName);
                }
                this.consumer.close();
                this.consumer = null;
                try {
                    this.leadBroker = findNewLeader(this.leadBroker, this.topic, this.partition, this.port);
                } catch (Exception e) {
                    log.error("Error to find the new leader " + e.getMessage(), e);
                }
            }
            try {
                try {
                    Iterator it = fetch.messageSet(this.topic, this.partition).iterator();
                    while (it.hasNext()) {
                        MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
                        long offset = messageAndOffset.offset();
                        if (offset < this.readOffset) {
                            log.info("Found an old offset: " + offset + " Expecting: " + this.readOffset);
                        } else {
                            this.readOffset = messageAndOffset.nextOffset();
                            ByteBuffer payload = messageAndOffset.message().payload();
                            byte[] bArr = new byte[payload.limit()];
                            payload.get(bArr);
                            try {
                                if (log.isDebugEnabled()) {
                                    log.debug("Start : Add to injectHandler to invoke");
                                }
                                this.injectHandler.invoke(bArr, str);
                                if (log.isDebugEnabled()) {
                                    log.debug("End : Add the injectHandler to invoke");
                                }
                            } catch (Exception e2) {
                                log.error("Error while invoking the bytes " + e2.getMessage(), e2);
                            }
                            if (log.isDebugEnabled()) {
                                log.debug("0 - added to queue!");
                            }
                            if (log.isDebugEnabled()) {
                                log.debug("Reduce the maximum message by 1");
                            }
                            this.maxReads--;
                            if (this.maxReads < 1) {
                                break;
                            }
                        }
                    }
                    if (this.consumer != null) {
                        this.consumer.close();
                    }
                } catch (Throwable th) {
                    if (this.consumer != null) {
                        this.consumer.close();
                    }
                    throw th;
                }
            } catch (Exception e3) {
                log.error("Error while fetching the responses" + e3.getMessage(), e3);
                log.debug("Error to fetch the responses");
                if (this.consumer != null) {
                    this.consumer.close();
                }
            }
        }
    }

    @Override // org.wso2.carbon.inbound.endpoint.protocol.kafka.AbstractKafkaMessageListener
    public boolean hasNext() {
        return this.maxReads == Long.MAX_VALUE || this.maxReads > 0;
    }

    public boolean run() throws Exception {
        if (this.init) {
            return this.init;
        }
        PartitionMetadata findLeader = findLeader(this.seedBrokers, this.port, this.topic, this.partition);
        if (findLeader == null) {
            throw new SynapseException("Can't find metadata for Topic and Partition. Exiting");
        }
        if (findLeader.leader() == null) {
            throw new SynapseException("Can't find Leader for Topic and Partition. Exiting");
        }
        this.leadBroker = findLeader.leader().host();
        this.clientName = "Client_" + this.topic + "_" + this.partition;
        this.consumer = new SimpleConsumer(this.leadBroker, this.port, KAFKAConstants.BUFFER_SIZE, KAFKAConstants.SO_TIMEOUT, this.clientName);
        this.readOffset = getLastOffset(this.consumer, this.topic, this.partition, OffsetRequest.EarliestTime(), this.clientName);
        this.init = true;
        return this.init;
    }

    public static long getLastOffset(SimpleConsumer simpleConsumer, String str, int i, long j, String str2) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(str, i);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(j, 1));
        OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), str2));
        if (!offsetsBefore.hasError()) {
            return offsetsBefore.offsets(str, i)[0];
        }
        log.error("Error fetching data Offset Data the Broker. Reason: " + ((int) offsetsBefore.errorCode(str, i)));
        return 0L;
    }

    private String findNewLeader(String str, String str2, int i, int i2) throws Exception {
        boolean z;
        for (int i3 = 0; i3 < 3; i3++) {
            PartitionMetadata findLeader = findLeader(this.replicaBrokers, i2, str2, i);
            if (findLeader == null) {
                z = true;
            } else if (findLeader.leader() == null) {
                z = true;
            } else {
                if (!str.equalsIgnoreCase(findLeader.leader().host()) || i3 != 0) {
                    return findLeader.leader().host();
                }
                z = true;
            }
            if (z) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            }
        }
        throw new SynapseException("Unable to find new leader after Broker failure. Exiting");
    }

    /* JADX WARN: Finally extract failed */
    private PartitionMetadata findLeader(List<String> list, int i, String str, int i2) throws Exception {
        PartitionMetadata partitionMetadata = null;
        Iterator<String> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String next = it.next();
            SimpleConsumer simpleConsumer = null;
            try {
                try {
                    simpleConsumer = new SimpleConsumer(next, i, KAFKAConstants.SO_TIMEOUT, KAFKAConstants.BUFFER_SIZE, "leaderLookup");
                    Iterator it2 = simpleConsumer.send(new TopicMetadataRequest(Collections.singletonList(str))).topicsMetadata().iterator();
                    while (it2.hasNext()) {
                        for (PartitionMetadata partitionMetadata2 : ((TopicMetadata) it2.next()).partitionsMetadata()) {
                            if (partitionMetadata2.partitionId() == i2) {
                                partitionMetadata = partitionMetadata2;
                                if (simpleConsumer != null) {
                                    simpleConsumer.close();
                                }
                            }
                        }
                    }
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                } catch (Exception e) {
                    throw new SynapseException("Error communicating with Broker [" + next + "] to find Leader for [" + str + ", " + i2 + "] Reason: ", e);
                }
            } catch (Throwable th) {
                if (simpleConsumer != null) {
                    simpleConsumer.close();
                }
                throw th;
            }
        }
        if (partitionMetadata != null) {
            this.replicaBrokers.clear();
            Iterator it3 = partitionMetadata.replicas().iterator();
            while (it3.hasNext()) {
                this.replicaBrokers.add(((Broker) it3.next()).host());
            }
        }
        return partitionMetadata;
    }
}
