package org.apache.eagle.metric.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;

/* loaded from: input_file:org/apache/eagle/metric/kafka/KafkaLatestOffsetFetcher.class */
public class KafkaLatestOffsetFetcher {
    private List<String> brokerList = new ArrayList();
    private int port;

    public KafkaLatestOffsetFetcher(String str) {
        String[] split = str.split(",");
        for (String str2 : split) {
            this.brokerList.add(str2.split(":")[0]);
        }
        this.port = Integer.valueOf(split[0].split(":")[1]).intValue();
    }

    public Map<Integer, Long> fetch(String str, int i) {
        Map<Integer, PartitionMetadata> fetchPartitionMetadata = fetchPartitionMetadata(this.brokerList, this.port, str, i);
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < i; i2++) {
            PartitionMetadata partitionMetadata = fetchPartitionMetadata.get(Integer.valueOf(i2));
            if (partitionMetadata == null || partitionMetadata.leader() == null) {
                hashMap.put(Integer.valueOf(i2), -1L);
            }
            String host = partitionMetadata.leader().host();
            String str2 = "Client_" + str + "_" + i2;
            SimpleConsumer simpleConsumer = new SimpleConsumer(host, this.port, 100000, 65536, str2);
            long latestOffset = getLatestOffset(simpleConsumer, str, i2, str2);
            if (simpleConsumer != null) {
                simpleConsumer.close();
            }
            hashMap.put(Integer.valueOf(i2), Long.valueOf(latestOffset));
        }
        return hashMap;
    }

    public long getLatestOffset(SimpleConsumer simpleConsumer, String str, int i, String str2) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(str, i);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
        OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), str2));
        if (offsetsBefore.hasError()) {
            throw new RuntimeException("Error fetching data offset from the broker. Reason: " + ((int) offsetsBefore.errorCode(str, i)));
        }
        return offsetsBefore.offsets(str, i)[0];
    }

    private Map<Integer, PartitionMetadata> fetchPartitionMetadata(List<String> list, int i, String str, int i2) {
        HashMap hashMap = new HashMap();
        Iterator<String> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String next = it.next();
            SimpleConsumer simpleConsumer = null;
            try {
                try {
                    simpleConsumer = new SimpleConsumer(next, i, 100000, 65536, "leaderLookup");
                    Iterator it2 = simpleConsumer.send(new TopicMetadataRequest(Collections.singletonList(str))).topicsMetadata().iterator();
                    while (it2.hasNext()) {
                        for (PartitionMetadata partitionMetadata : ((TopicMetadata) it2.next()).partitionsMetadata()) {
                            hashMap.put(Integer.valueOf(partitionMetadata.partitionId()), partitionMetadata);
                        }
                    }
                    if (hashMap.size() == i2) {
                        if (simpleConsumer != null) {
                            simpleConsumer.close();
                        }
                    } else if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                } catch (Exception e) {
                    throw new RuntimeException("Error communicating with Broker [" + next + "] to find Leader for [" + str + "] Reason: ", e);
                }
            } catch (Throwable th) {
                if (simpleConsumer != null) {
                    simpleConsumer.close();
                }
                throw th;
            }
        }
        return hashMap;
    }
}
