/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.util;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.BiMap;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.restlet.resources.SegmentConsumerInfo;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.spi.config.table.TableStatus;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumingSegmentInfoReader {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumingSegmentInfoReader.class);
    private final Executor _executor;
    private final HttpConnectionManager _connectionManager;
    private final PinotHelixResourceManager _pinotHelixResourceManager;

    public ConsumingSegmentInfoReader(Executor executor, HttpConnectionManager connectionManager, PinotHelixResourceManager helixResourceManager) {
        this._executor = executor;
        this._connectionManager = connectionManager;
        this._pinotHelixResourceManager = helixResourceManager;
    }

    public ConsumingSegmentsInfoMap getConsumingSegmentsInfo(String tableNameWithType, int timeoutMs) throws InvalidConfigException {
        Map<String, List<String>> serverToSegments = this._pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
        BiMap<String, String> serverToEndpoints = this._pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
        Map<String, List<SegmentConsumerInfo>> serverToSegmentConsumerInfoMap = this.getConsumingSegmentsInfoFromServers(tableNameWithType, serverToEndpoints, timeoutMs);
        TreeMap<String, List<ConsumingSegmentInfo>> consumingSegmentInfoMap = new TreeMap<String, List<ConsumingSegmentInfo>>();
        for (Map.Entry<String, List<SegmentConsumerInfo>> entry : serverToSegmentConsumerInfoMap.entrySet()) {
            String serverName = entry.getKey();
            for (SegmentConsumerInfo info : entry.getValue()) {
                consumingSegmentInfoMap.computeIfAbsent(info.getSegmentName(), k -> new ArrayList()).add(new ConsumingSegmentInfo(serverName, info.getConsumerState(), info.getLastConsumedTimestamp(), info.getPartitionToOffsetMap()));
            }
        }
        Set<String> consumingSegments = this._pinotHelixResourceManager.getConsumingSegments(tableNameWithType);
        consumingSegments.forEach(c -> consumingSegmentInfoMap.putIfAbsent((String)c, Collections.emptyList()));
        return new ConsumingSegmentsInfoMap(consumingSegmentInfoMap);
    }

    private Map<String, List<SegmentConsumerInfo>> getConsumingSegmentsInfoFromServers(String tableNameWithType, BiMap<String, String> serverToEndpoints, int timeoutMs) {
        LOGGER.info("Reading consuming segment info from servers: {} for table: {}", (Object)serverToEndpoints.keySet(), (Object)tableNameWithType);
        ArrayList<String> serverUrls = new ArrayList<String>(serverToEndpoints.size());
        BiMap endpointsToServers = serverToEndpoints.inverse();
        for (String endpoint : endpointsToServers.keySet()) {
            String consumingSegmentInfoURI = this.generateServerURL(tableNameWithType, endpoint);
            serverUrls.add(consumingSegmentInfoURI);
        }
        CompletionServiceHelper completionServiceHelper = new CompletionServiceHelper(this._executor, this._connectionManager, (BiMap<String, String>)endpointsToServers);
        CompletionServiceHelper.CompletionServiceResponse serviceResponse = completionServiceHelper.doMultiGetRequest(serverUrls, tableNameWithType, false, timeoutMs);
        HashMap<String, List<SegmentConsumerInfo>> serverToConsumingSegmentInfoList = new HashMap<String, List<SegmentConsumerInfo>>();
        int failedParses = 0;
        for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
            try {
                List segmentConsumerInfos = (List)JsonUtils.stringToObject((String)streamResponse.getValue(), (TypeReference)new TypeReference<List<SegmentConsumerInfo>>(){});
                serverToConsumingSegmentInfoList.put(streamResponse.getKey(), segmentConsumerInfos);
            }
            catch (IOException e) {
                ++failedParses;
                LOGGER.error("Unable to parse server {} response due to an error: ", (Object)streamResponse.getKey(), (Object)e);
            }
        }
        if (failedParses != 0) {
            LOGGER.warn("Failed to parse {} / {} segment size info responses from servers.", (Object)failedParses, (Object)serverUrls.size());
        }
        return serverToConsumingSegmentInfoList;
    }

    private String generateServerURL(String tableNameWithType, String endpoint) {
        return String.format("%s/tables/%s/consumingSegmentsInfo", endpoint, tableNameWithType);
    }

    public TableStatus.IngestionStatus getIngestionStatus(String tableNameWithType, int timeoutMs) {
        try {
            ConsumingSegmentsInfoMap consumingSegmentsInfoMap = this.getConsumingSegmentsInfo(tableNameWithType, timeoutMs);
            for (Map.Entry<String, List<ConsumingSegmentInfo>> consumingSegmentInfoEntry : consumingSegmentsInfoMap._segmentToConsumingInfoMap.entrySet()) {
                String segmentName = consumingSegmentInfoEntry.getKey();
                List<ConsumingSegmentInfo> consumingSegmentInfoList = consumingSegmentInfoEntry.getValue();
                if (consumingSegmentInfoList == null || consumingSegmentInfoList.isEmpty()) {
                    String errorMessage = "Did not get any response from servers for segment: " + segmentName;
                    return TableStatus.IngestionStatus.newIngestionStatus((TableStatus.IngestionState)TableStatus.IngestionState.UNHEALTHY, (String)errorMessage);
                }
                Set<String> serversForSegment = this._pinotHelixResourceManager.getServersForSegment(tableNameWithType, segmentName);
                if (serversForSegment.size() != consumingSegmentInfoList.size()) {
                    Set serversResponded = consumingSegmentInfoList.stream().map(c -> c._serverName).collect(Collectors.toSet());
                    serversForSegment.removeAll(serversResponded);
                    String errorMessage = "Not all servers responded for segment: " + segmentName + " Missing servers : " + serversForSegment;
                    return TableStatus.IngestionStatus.newIngestionStatus((TableStatus.IngestionState)TableStatus.IngestionState.UNHEALTHY, (String)errorMessage);
                }
                for (ConsumingSegmentInfo consumingSegmentInfo : consumingSegmentInfoList) {
                    if (!consumingSegmentInfo._consumerState.equals(CommonConstants.ConsumerState.NOT_CONSUMING.toString())) continue;
                    String errorMessage = "Segment: " + segmentName + " is not being consumed on server: " + consumingSegmentInfo._serverName;
                    return TableStatus.IngestionStatus.newIngestionStatus((TableStatus.IngestionState)TableStatus.IngestionState.UNHEALTHY, (String)errorMessage);
                }
            }
            return TableStatus.IngestionStatus.newIngestionStatus((TableStatus.IngestionState)TableStatus.IngestionState.HEALTHY, (String)"");
        }
        catch (Exception e) {
            String errorMessage = "Unable to get consuming segments info from all the servers. Reason: " + e.getMessage();
            LOGGER.error("Unable to get consuming segments info from all the servers", (Throwable)e);
            return TableStatus.IngestionStatus.newIngestionStatus((TableStatus.IngestionState)TableStatus.IngestionState.UNHEALTHY, (String)errorMessage);
        }
    }

    @JsonIgnoreProperties(ignoreUnknown=true)
    public static class ConsumingSegmentInfo {
        @JsonProperty(value="serverName")
        public String _serverName;
        @JsonProperty(value="consumerState")
        public String _consumerState;
        @JsonProperty(value="lastConsumedTimestamp")
        public long _lastConsumedTimestamp;
        @JsonProperty(value="partitionToOffsetMap")
        public Map<String, String> _partitionToOffsetMap;

        public ConsumingSegmentInfo(@JsonProperty(value="serverName") String serverName, @JsonProperty(value="consumerState") String consumerState, @JsonProperty(value="lastConsumedTimestamp") long lastConsumedTimestamp, @JsonProperty(value="partitionToOffsetMap") Map<String, String> partitionToOffsetMap) {
            this._serverName = serverName;
            this._consumerState = consumerState;
            this._lastConsumedTimestamp = lastConsumedTimestamp;
            this._partitionToOffsetMap = partitionToOffsetMap;
        }
    }

    @JsonIgnoreProperties(ignoreUnknown=true)
    public static class ConsumingSegmentsInfoMap {
        public TreeMap<String, List<ConsumingSegmentInfo>> _segmentToConsumingInfoMap;

        public ConsumingSegmentsInfoMap(@JsonProperty(value="segmentToConsumingInfoMap") TreeMap<String, List<ConsumingSegmentInfo>> segmentToConsumingInfoMap) {
            this._segmentToConsumingInfoMap = segmentToConsumingInfoMap;
        }
    }
}

