package org.apache.inlong.manager.service.core.impl;

import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.sdk.CacheZone;
import org.apache.inlong.common.pojo.sdk.CacheZoneConfig;
import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
import org.apache.inlong.common.pojo.sdk.Topic;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceClusterInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceGroupInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamSinkInfo;
import org.apache.inlong.manager.service.core.SortConfigLoader;
import org.apache.inlong.manager.service.core.SortSourceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Lazy
@Service
/* loaded from: input_file:org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.class */
public class SortSourceServiceImpl implements SortSourceService {
    private static final Logger LOGGER = LoggerFactory.getLogger(SortSourceServiceImpl.class);
    private static final Gson GSON = new Gson();
    private static final Set<String> SUPPORTED_MQ_TYPE = new HashSet<String>() { // from class: org.apache.inlong.manager.service.core.impl.SortSourceServiceImpl.1
        {
            add("KAFKA");
            add("TUBEMQ");
            add("PULSAR");
        }
    };
    private static final String KEY_AUTH = "authentication";
    private static final String KEY_OLD_TENANT = "tenant";
    private static final String KEY_NEW_TENANT = "pulsarTenant";
    private static final int RESPONSE_CODE_SUCCESS = 0;
    private static final int RESPONSE_CODE_NO_UPDATE = 1;
    private static final int RESPONSE_CODE_FAIL = -1;
    private static final int RESPONSE_CODE_REQ_PARAMS_ERROR = -101;
    private Map<String, Map<String, String>> sortSourceMd5Map = new ConcurrentHashMap();
    private Map<String, Map<String, CacheZoneConfig>> sortSourceConfigMap = new ConcurrentHashMap();
    private Map<String, SortSourceClusterInfo> sortClusters;
    private Map<String, List<SortSourceClusterInfo>> mqClusters;
    private Map<String, SortSourceGroupInfo> groupInfos;
    private Map<String, Map<String, SortSourceStreamInfo>> allStreams;
    private Map<String, String> backupClusterTag;
    private Map<String, String> backupGroupMqResource;
    private Map<String, Map<String, String>> backupStreamMqResource;
    private Map<String, Map<String, List<SortSourceStreamSinkInfo>>> streamSinkMap;

    @Autowired
    private SortConfigLoader configLoader;

    @PostConstruct
    public void initialize() {
        LOGGER.info("create repository for " + SortSourceServiceImpl.class.getSimpleName());
        try {
            reload();
            setReloadTimer();
        } catch (Throwable th) {
            LOGGER.error("initialize SortSourceConfigRepository error", th);
        }
    }

    @Transactional(rollbackFor = {Exception.class})
    public void reload() {
        LOGGER.debug("start to reload sort config.");
        try {
            reloadAllConfigs();
            parseAll();
        } catch (Throwable th) {
            LOGGER.error("fail to reload all source config", th);
        }
        LOGGER.debug("end to reload config");
    }

    @Override // org.apache.inlong.manager.service.core.SortSourceService
    public SortSourceConfigResponse getSourceConfig(String str, String str2, String str3) {
        if (StringUtils.isBlank(str) || StringUtils.isBlank(str2)) {
            LOGGER.debug("blank cluster name or task name, return nothing");
            return SortSourceConfigResponse.builder().code(RESPONSE_CODE_REQ_PARAMS_ERROR).msg("blank cluster name or task name, return nothing").build();
        }
        if (!this.sortSourceConfigMap.containsKey(str) || !this.sortSourceConfigMap.get(str).containsKey(str2)) {
            String format = String.format("there is no valid source config of cluster %s, task %s", str, str2);
            LOGGER.debug(format);
            return SortSourceConfigResponse.builder().code(RESPONSE_CODE_SUCCESS).msg(format).build();
        }
        if (this.sortSourceMd5Map.get(str).get(str2).equals(str3)) {
            return SortSourceConfigResponse.builder().code(RESPONSE_CODE_NO_UPDATE).msg("No update").md5(str3).build();
        }
        if (!this.sortSourceConfigMap.get(str).get(str2).getCacheZones().isEmpty()) {
            return SortSourceConfigResponse.builder().code(RESPONSE_CODE_SUCCESS).msg("Success").data(this.sortSourceConfigMap.get(str).get(str2)).md5(this.sortSourceMd5Map.get(str).get(str2)).build();
        }
        String format2 = String.format("find empty cache zones of cluster %s, task %s, please check the manager log", str, str2);
        LOGGER.debug(format2);
        return SortSourceConfigResponse.builder().code(RESPONSE_CODE_FAIL).msg(format2).build();
    }

    private void reloadAllConfigs() {
        List<SortSourceClusterInfo> loadAllClusters = this.configLoader.loadAllClusters();
        this.sortClusters = (Map) loadAllClusters.stream().collect(Collectors.toMap((v0) -> {
            return v0.getName();
        }, sortSourceClusterInfo -> {
            return sortSourceClusterInfo;
        }));
        this.mqClusters = new HashMap();
        loadAllClusters.stream().filter(sortSourceClusterInfo2 -> {
            return SUPPORTED_MQ_TYPE.contains(sortSourceClusterInfo2.getType());
        }).filter((v0) -> {
            return v0.isConsumable();
        }).forEach(sortSourceClusterInfo3 -> {
            sortSourceClusterInfo3.getClusterTagsSet().forEach(str -> {
                this.mqClusters.computeIfAbsent(str, str -> {
                    return new ArrayList();
                }).add(sortSourceClusterInfo3);
            });
        });
        List<SortSourceStreamSinkInfo> loadAllStreamSinks = this.configLoader.loadAllStreamSinks();
        this.streamSinkMap = new HashMap();
        loadAllStreamSinks.stream().filter(sortSourceStreamSinkInfo -> {
            return StringUtils.isNotBlank(sortSourceStreamSinkInfo.getSortClusterName());
        }).filter(sortSourceStreamSinkInfo2 -> {
            return Objects.nonNull(this.sortClusters.get(sortSourceStreamSinkInfo2.getSortClusterName()));
        }).filter(sortSourceStreamSinkInfo3 -> {
            return StringUtils.isNotBlank(sortSourceStreamSinkInfo3.getSortTaskName());
        }).forEach(sortSourceStreamSinkInfo4 -> {
            this.streamSinkMap.computeIfAbsent(sortSourceStreamSinkInfo4.getSortClusterName(), str -> {
                return new ConcurrentHashMap();
            }).computeIfAbsent(sortSourceStreamSinkInfo4.getSortTaskName(), str2 -> {
                return new ArrayList();
            }).add(sortSourceStreamSinkInfo4);
        });
        this.groupInfos = (Map) this.configLoader.loadAllGroup().stream().filter(sortSourceGroupInfo -> {
            return StringUtils.isNotBlank(sortSourceGroupInfo.getMqResource());
        }).filter(sortSourceGroupInfo2 -> {
            return StringUtils.isNotBlank(sortSourceGroupInfo2.getClusterTag());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getGroupId();
        }, sortSourceGroupInfo3 -> {
            return sortSourceGroupInfo3;
        }));
        this.backupClusterTag = (Map) this.configLoader.loadGroupBackupInfo("backup_cluster_tag").stream().collect(Collectors.toMap((v0) -> {
            return v0.getInlongGroupId();
        }, (v0) -> {
            return v0.getKeyValue();
        }));
        this.backupGroupMqResource = (Map) this.configLoader.loadGroupBackupInfo("backup_mq_resource").stream().collect(Collectors.toMap((v0) -> {
            return v0.getInlongGroupId();
        }, (v0) -> {
            return v0.getKeyValue();
        }));
        this.allStreams = (Map) this.configLoader.loadAllStreams().stream().filter(sortSourceStreamInfo -> {
            return StringUtils.isNotBlank(sortSourceStreamInfo.getMqResource());
        }).collect(Collectors.groupingBy((v0) -> {
            return v0.getInlongGroupId();
        }, Collectors.toMap((v0) -> {
            return v0.getInlongStreamId();
        }, sortSourceStreamInfo2 -> {
            return sortSourceStreamInfo2;
        })));
        this.backupStreamMqResource = (Map) this.configLoader.loadStreamBackupInfo("backup_mq_resource").stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getInlongGroupId();
        }, Collectors.toMap((v0) -> {
            return v0.getInlongStreamId();
        }, (v0) -> {
            return v0.getKeyValue();
        })));
    }

    private void parseAll() {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
        this.streamSinkMap.forEach((str, map) -> {
            ConcurrentHashMap concurrentHashMap3 = new ConcurrentHashMap();
            ConcurrentHashMap concurrentHashMap4 = new ConcurrentHashMap();
            map.forEach((str, list) -> {
                try {
                    CacheZoneConfig build = CacheZoneConfig.builder().sortClusterName(str).sortTaskId(str).build();
                    build.setCacheZones(parseCacheZones(str, list));
                    String md5Hex = DigestUtils.md5Hex(GSON.toJson(build));
                    concurrentHashMap3.put(str, build);
                    concurrentHashMap4.put(str, md5Hex);
                } catch (Throwable th) {
                    LOGGER.warn("failed to parse sort source config of sortCluster={}, task={}", new Object[]{str, str, th});
                }
            });
            concurrentHashMap2.put(str, concurrentHashMap3);
            concurrentHashMap.put(str, concurrentHashMap4);
        });
        this.sortSourceConfigMap = concurrentHashMap2;
        this.sortSourceMd5Map = concurrentHashMap;
        this.sortClusters = null;
        this.mqClusters = null;
        this.groupInfos = null;
        this.allStreams = null;
        this.backupClusterTag = null;
        this.backupGroupMqResource = null;
        this.backupStreamMqResource = null;
        this.streamSinkMap = null;
    }

    private Map<String, CacheZone> parseCacheZones(String str, List<SortSourceStreamSinkInfo> list) {
        Preconditions.expectNotNull(this.sortClusters.get(str), "sort cluster should not be NULL");
        Set clusterTagsSet = this.sortClusters.get(str).getClusterTagsSet();
        List list2 = (List) list.stream().filter(sortSourceStreamSinkInfo -> {
            return this.groupInfos.containsKey(sortSourceStreamSinkInfo.getGroupId()) && this.allStreams.containsKey(sortSourceStreamSinkInfo.getGroupId()) && this.allStreams.get(sortSourceStreamSinkInfo.getGroupId()).containsKey(sortSourceStreamSinkInfo.getStreamId());
        }).collect(Collectors.toList());
        return (Map) Stream.of((Object[]) new List[]{parseCacheZonesByTag((Map) list2.stream().filter(sortSourceStreamSinkInfo2 -> {
            return Objects.nonNull(this.groupInfos.get(sortSourceStreamSinkInfo2.getGroupId()));
        }).filter(sortSourceStreamSinkInfo3 -> {
            if (CollectionUtils.isEmpty(clusterTagsSet)) {
                return true;
            }
            return clusterTagsSet.contains(this.groupInfos.get(sortSourceStreamSinkInfo3.getGroupId()).getClusterTag());
        }).collect(Collectors.groupingBy(sortSourceStreamSinkInfo4 -> {
            return this.groupInfos.get(sortSourceStreamSinkInfo4.getGroupId()).getClusterTag();
        })), false), parseCacheZonesByTag((Map) list2.stream().filter(sortSourceStreamSinkInfo5 -> {
            return this.backupClusterTag.containsKey(sortSourceStreamSinkInfo5.getGroupId());
        }).filter(sortSourceStreamSinkInfo6 -> {
            if (CollectionUtils.isEmpty(clusterTagsSet)) {
                return true;
            }
            return clusterTagsSet.contains(this.backupClusterTag.get(sortSourceStreamSinkInfo6.getGroupId()));
        }).collect(Collectors.groupingBy(sortSourceStreamSinkInfo7 -> {
            return this.backupClusterTag.get(sortSourceStreamSinkInfo7.getGroupId());
        })), true)}).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getZoneName();
        }, cacheZone -> {
            return cacheZone;
        }, (cacheZone2, cacheZone3) -> {
            cacheZone2.getTopics().addAll(cacheZone3.getTopics());
            return cacheZone2;
        }));
    }

    private List<CacheZone> parseCacheZonesByTag(Map<String, List<SortSourceStreamSinkInfo>> map, boolean z) {
        Stream<String> stream = map.keySet().stream();
        Map<String, List<SortSourceClusterInfo>> map2 = this.mqClusters;
        map2.getClass();
        return (List) stream.filter((v1) -> {
            return r1.containsKey(v1);
        }).flatMap(str -> {
            List list = (List) map.get(str);
            return this.mqClusters.get(str).stream().map(sortSourceClusterInfo -> {
                CacheZone cacheZone = RESPONSE_CODE_SUCCESS;
                try {
                    cacheZone = parseCacheZone(list, sortSourceClusterInfo, z);
                } catch (IllegalStateException e) {
                    LOGGER.error("fail to init cache zone for cluster " + sortSourceClusterInfo, e);
                }
                return cacheZone;
            });
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
    }

    private CacheZone parseCacheZone(List<SortSourceStreamSinkInfo> list, SortSourceClusterInfo sortSourceClusterInfo, boolean z) {
        String type = sortSourceClusterInfo.getType();
        boolean z2 = RESPONSE_CODE_FAIL;
        switch (type.hashCode()) {
            case -1923789955:
                if (type.equals("PULSAR")) {
                    z2 = RESPONSE_CODE_SUCCESS;
                    break;
                }
                break;
        }
        switch (z2) {
            case RESPONSE_CODE_SUCCESS /* 0 */:
                return parsePulsarZone(list, sortSourceClusterInfo, z);
            default:
                throw new BusinessException(String.format("do not support cluster type=%s of cluster=%s", sortSourceClusterInfo.getType(), sortSourceClusterInfo));
        }
    }

    private CacheZone parsePulsarZone(List<SortSourceStreamSinkInfo> list, SortSourceClusterInfo sortSourceClusterInfo, boolean z) {
        Map extParamsMap = sortSourceClusterInfo.getExtParamsMap();
        String str = (String) Optional.ofNullable(extParamsMap.get("pulsarTenant")).orElse(extParamsMap.get("tenant"));
        return CacheZone.builder().zoneName(sortSourceClusterInfo.getName()).serviceUrl(sortSourceClusterInfo.getUrl()).topics((List) list.stream().map(sortSourceStreamSinkInfo -> {
            String groupId = sortSourceStreamSinkInfo.getGroupId();
            String streamId = sortSourceStreamSinkInfo.getStreamId();
            SortSourceGroupInfo sortSourceGroupInfo = this.groupInfos.get(groupId);
            SortSourceStreamInfo sortSourceStreamInfo = this.allStreams.get(groupId).get(streamId);
            try {
                String mqResource = sortSourceGroupInfo.getMqResource();
                String mqResource2 = sortSourceStreamInfo.getMqResource();
                if (z) {
                    if (this.backupGroupMqResource.containsKey(groupId)) {
                        mqResource = this.backupGroupMqResource.get(groupId);
                    }
                    if (this.backupStreamMqResource.containsKey(groupId) && this.backupStreamMqResource.get(groupId).containsKey(streamId)) {
                        mqResource2 = this.backupStreamMqResource.get(groupId).get(streamId);
                    }
                }
                return Topic.builder().topic(str.concat("/").concat(mqResource).concat("/").concat(mqResource2)).topicProperties(sortSourceStreamSinkInfo.getExtParamsMap()).build();
            } catch (Exception e) {
                LOGGER.error("fail to parse topic of groupId={}, streamId={}", new Object[]{groupId, streamId, e});
                return null;
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList())).authentication((String) extParamsMap.getOrDefault(KEY_AUTH, "")).cacheZoneProperties(sortSourceClusterInfo.getExtParamsMap()).zoneType("PULSAR").build();
    }

    private void setReloadTimer() {
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::reload, 60000L, 60000L, TimeUnit.MILLISECONDS);
    }
}
