package org.apache.pinot.controller.util;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.BiMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.spi.config.table.TableStatus;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/controller/util/ConsumingSegmentInfoReader.class */
public class ConsumingSegmentInfoReader {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumingSegmentInfoReader.class);
    private final Executor _executor;
    private final HttpConnectionManager _connectionManager;
    private final PinotHelixResourceManager _pinotHelixResourceManager;

    @JsonIgnoreProperties(ignoreUnknown = true)
    /* loaded from: input_file:org/apache/pinot/controller/util/ConsumingSegmentInfoReader$ConsumingSegmentInfo.class */
    public static class ConsumingSegmentInfo {

        @JsonProperty("serverName")
        public String _serverName;

        @JsonProperty("consumerState")
        public String _consumerState;

        @JsonProperty("lastConsumedTimestamp")
        public long _lastConsumedTimestamp;

        @JsonProperty("partitionToOffsetMap")
        public Map<String, String> _partitionToOffsetMap;

        public ConsumingSegmentInfo(@JsonProperty("serverName") String str, @JsonProperty("consumerState") String str2, @JsonProperty("lastConsumedTimestamp") long j, @JsonProperty("partitionToOffsetMap") Map<String, String> map) {
            this._serverName = str;
            this._consumerState = str2;
            this._lastConsumedTimestamp = j;
            this._partitionToOffsetMap = map;
        }
    }

    @JsonIgnoreProperties(ignoreUnknown = true)
    /* loaded from: input_file:org/apache/pinot/controller/util/ConsumingSegmentInfoReader$ConsumingSegmentsInfoMap.class */
    public static class ConsumingSegmentsInfoMap {
        public TreeMap<String, List<ConsumingSegmentInfo>> _segmentToConsumingInfoMap;

        public ConsumingSegmentsInfoMap(@JsonProperty("segmentToConsumingInfoMap") TreeMap<String, List<ConsumingSegmentInfo>> treeMap) {
            this._segmentToConsumingInfoMap = treeMap;
        }
    }

    public ConsumingSegmentInfoReader(Executor executor, HttpConnectionManager httpConnectionManager, PinotHelixResourceManager pinotHelixResourceManager) {
        this._executor = executor;
        this._connectionManager = httpConnectionManager;
        this._pinotHelixResourceManager = pinotHelixResourceManager;
    }

    public ConsumingSegmentsInfoMap getConsumingSegmentsInfo(String str, int i) throws InvalidConfigException {
        Map<String, List<SegmentConsumerInfo>> consumingSegmentsInfoFromServers = getConsumingSegmentsInfoFromServers(str, this._pinotHelixResourceManager.getDataInstanceAdminEndpoints(this._pinotHelixResourceManager.getServerToSegmentsMap(str).keySet()), i);
        TreeMap treeMap = new TreeMap();
        for (Map.Entry<String, List<SegmentConsumerInfo>> entry : consumingSegmentsInfoFromServers.entrySet()) {
            String key = entry.getKey();
            for (SegmentConsumerInfo segmentConsumerInfo : entry.getValue()) {
                ((List) treeMap.computeIfAbsent(segmentConsumerInfo.getSegmentName(), str2 -> {
                    return new ArrayList();
                })).add(new ConsumingSegmentInfo(key, segmentConsumerInfo.getConsumerState(), segmentConsumerInfo.getLastConsumedTimestamp(), segmentConsumerInfo.getPartitionToOffsetMap()));
            }
        }
        this._pinotHelixResourceManager.getConsumingSegments(str).forEach(str3 -> {
            treeMap.putIfAbsent(str3, Collections.emptyList());
        });
        return new ConsumingSegmentsInfoMap(treeMap);
    }

    private Map<String, List<SegmentConsumerInfo>> getConsumingSegmentsInfoFromServers(String str, BiMap<String, String> biMap, int i) {
        LOGGER.info("Reading consuming segment info from servers: {} for table: {}", biMap.keySet(), str);
        ArrayList arrayList = new ArrayList(biMap.size());
        BiMap inverse = biMap.inverse();
        Iterator it = inverse.keySet().iterator();
        while (it.hasNext()) {
            arrayList.add(generateServerURL(str, (String) it.next()));
        }
        CompletionServiceHelper.CompletionServiceResponse doMultiGetRequest = new CompletionServiceHelper(this._executor, this._connectionManager, inverse).doMultiGetRequest(arrayList, str, false, i);
        HashMap hashMap = new HashMap();
        int i2 = 0;
        for (Map.Entry<String, String> entry : doMultiGetRequest._httpResponses.entrySet()) {
            try {
                hashMap.put(entry.getKey(), (List) JsonUtils.stringToObject(entry.getValue(), new TypeReference<List<SegmentConsumerInfo>>() { // from class: org.apache.pinot.controller.util.ConsumingSegmentInfoReader.1
                }));
            } catch (IOException e) {
                i2++;
                LOGGER.error("Unable to parse server {} response due to an error: ", entry.getKey(), e);
            }
        }
        if (i2 != 0) {
            LOGGER.warn("Failed to parse {} / {} segment size info responses from servers.", Integer.valueOf(i2), Integer.valueOf(arrayList.size()));
        }
        return hashMap;
    }

    private String generateServerURL(String str, String str2) {
        return String.format("%s/tables/%s/consumingSegmentsInfo", str2, str);
    }

    public TableStatus.IngestionStatus getIngestionStatus(String str, int i) {
        try {
            for (Map.Entry<String, List<ConsumingSegmentInfo>> entry : getConsumingSegmentsInfo(str, i)._segmentToConsumingInfoMap.entrySet()) {
                String key = entry.getKey();
                List<ConsumingSegmentInfo> value = entry.getValue();
                if (value == null || value.isEmpty()) {
                    return TableStatus.IngestionStatus.newIngestionStatus(TableStatus.IngestionState.UNHEALTHY, "Did not get any response from servers for segment: " + key);
                }
                Set<String> serversForSegment = this._pinotHelixResourceManager.getServersForSegment(str, key);
                if (serversForSegment.size() != value.size()) {
                    serversForSegment.removeAll((Set) value.stream().map(consumingSegmentInfo -> {
                        return consumingSegmentInfo._serverName;
                    }).collect(Collectors.toSet()));
                    return TableStatus.IngestionStatus.newIngestionStatus(TableStatus.IngestionState.UNHEALTHY, "Not all servers responded for segment: " + key + " Missing servers : " + serversForSegment);
                }
                for (ConsumingSegmentInfo consumingSegmentInfo2 : value) {
                    if (consumingSegmentInfo2._consumerState.equals(CommonConstants.ConsumerState.NOT_CONSUMING.toString())) {
                        return TableStatus.IngestionStatus.newIngestionStatus(TableStatus.IngestionState.UNHEALTHY, "Segment: " + key + " is not being consumed on server: " + consumingSegmentInfo2._serverName);
                    }
                }
            }
            return TableStatus.IngestionStatus.newIngestionStatus(TableStatus.IngestionState.HEALTHY, "");
        } catch (Exception e) {
            String str2 = "Unable to get consuming segments info from all the servers. Reason: " + e.getMessage();
            LOGGER.error("Unable to get consuming segments info from all the servers", e);
            return TableStatus.IngestionStatus.newIngestionStatus(TableStatus.IngestionState.UNHEALTHY, str2);
        }
    }
}
