/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.core.impl;

import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.sdk.CacheZone;
import org.apache.inlong.common.pojo.sdk.CacheZoneConfig;
import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
import org.apache.inlong.common.pojo.sdk.Topic;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceClusterInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceGroupInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
import org.apache.inlong.manager.service.core.SortSourceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Lazy
@Service
public class SortSourceServiceImpl
implements SortSourceService {
    private static final Logger LOGGER = LoggerFactory.getLogger(SortSourceServiceImpl.class);
    private static final Gson GSON = new Gson();
    private static final Set<String> SUPPORTED_MQ_TYPE = new HashSet<String>(){
        {
            this.add("KAFKA");
            this.add("TUBEMQ");
            this.add("PULSAR");
        }
    };
    private static final String KEY_SERVICE_URL = "serviceUrl";
    private static final String KEY_AUTH = "authentication";
    private static final String KEY_TENANT = "tenant";
    private static final String KEY_NAME_SPACE = "namespace";
    private static final int RESPONSE_CODE_SUCCESS = 0;
    private static final int RESPONSE_CODE_NO_UPDATE = 1;
    private static final int RESPONSE_CODE_FAIL = -1;
    private static final int RESPONSE_CODE_REQ_PARAMS_ERROR = -101;
    private Map<String, Map<String, String>> sortSourceMd5Map = new ConcurrentHashMap<String, Map<String, String>>();
    private Map<String, Map<String, CacheZoneConfig>> sortSourceConfigMap = new ConcurrentHashMap<String, Map<String, CacheZoneConfig>>();
    @Autowired
    private InlongClusterEntityMapper clusterEntityMapper;
    @Autowired
    private StreamSinkEntityMapper streamSinkEntityMapper;
    @Autowired
    private InlongGroupEntityMapper inlongGroupEntityMapper;

    @PostConstruct
    public void initialize() {
        LOGGER.info("create repository for " + SortSourceServiceImpl.class.getSimpleName());
        try {
            this.reload();
            this.setReloadTimer();
        }
        catch (Throwable t) {
            LOGGER.error("initialize SortSourceConfigRepository error", t);
        }
    }

    @Transactional(rollbackFor={Exception.class})
    public void reload() {
        LOGGER.debug("start to reload sort config.");
        try {
            this.reloadAllSourceConfig();
        }
        catch (Throwable t) {
            LOGGER.error("fail to reload all source config", t);
        }
        LOGGER.debug("end to reload config");
    }

    @Override
    public SortSourceConfigResponse getSourceConfig(String cluster, String task, String md5) {
        if (StringUtils.isBlank((CharSequence)cluster) || StringUtils.isBlank((CharSequence)task)) {
            String errMsg = "blank cluster name or task name, return nothing";
            LOGGER.error(errMsg);
            return SortSourceConfigResponse.builder().code(-101).msg(errMsg).build();
        }
        if (!this.sortSourceConfigMap.containsKey(cluster) || !this.sortSourceConfigMap.get(cluster).containsKey(task)) {
            String errMsg = String.format("there is no valid source config of cluster %s, task %s", cluster, task);
            LOGGER.error(errMsg);
            return SortSourceConfigResponse.builder().code(0).msg(errMsg).build();
        }
        if (this.sortSourceMd5Map.get(cluster).get(task).equals(md5)) {
            return SortSourceConfigResponse.builder().code(1).msg("No update").md5(md5).build();
        }
        if (this.sortSourceConfigMap.get(cluster).get(task).getCacheZones().isEmpty()) {
            String errMsg = String.format("find empty cache zones of cluster %s, task %s, please check the manager log", cluster, task);
            LOGGER.error(errMsg);
            return SortSourceConfigResponse.builder().code(-1).msg(errMsg).build();
        }
        return SortSourceConfigResponse.builder().code(0).msg("Success").data(this.sortSourceConfigMap.get(cluster).get(task)).md5(this.sortSourceMd5Map.get(cluster).get(task)).build();
    }

    private void reloadAllSourceConfig() {
        List<SortSourceStreamInfo> allStreamInfos = this.streamSinkEntityMapper.selectAllStreams().stream().filter(dto -> dto.getSortClusterName() != null && dto.getSortTaskName() != null).collect(Collectors.toList());
        ConcurrentHashMap<String, Map> groupMap = new ConcurrentHashMap<String, Map>();
        allStreamInfos.forEach(stream -> {
            Map task2groupsMap = groupMap.computeIfAbsent(stream.getSortClusterName(), k -> new ConcurrentHashMap());
            List groupIdList = task2groupsMap.computeIfAbsent(stream.getSortTaskName(), k -> new ArrayList());
            groupIdList.add(stream.getGroupId());
        });
        List groupInfos = this.inlongGroupEntityMapper.selectAllGroups();
        Map<String, SortSourceGroupInfo> allId2GroupInfos = groupInfos.stream().filter(dto -> dto.getGroupId() != null).filter(group -> SUPPORTED_MQ_TYPE.contains(group.getMqType())).collect(Collectors.toMap(SortSourceGroupInfo::getGroupId, dto -> dto, (g1, g2) -> g1));
        List clusterInfos = this.clusterEntityMapper.selectAllClusters();
        Map<String, List<SortSourceClusterInfo>> allTag2ClusterInfos = clusterInfos.stream().filter(dto -> dto.getClusterTags() != null).filter(SortSourceClusterInfo::isConsumable).filter(cluster -> SUPPORTED_MQ_TYPE.contains(cluster.getType())).collect(Collectors.groupingBy(SortSourceClusterInfo::getClusterTags));
        Map<String, SortSourceClusterInfo> name2ClusterInfos = clusterInfos.stream().collect(Collectors.toMap(SortSourceClusterInfo::getName, info -> info, (g1, g2) -> g1));
        ConcurrentHashMap<String, Map<String, String>> newMd5Map = new ConcurrentHashMap<String, Map<String, String>>();
        ConcurrentHashMap<String, Map<String, CacheZoneConfig>> newConfigMap = new ConcurrentHashMap<String, Map<String, CacheZoneConfig>>();
        groupMap.forEach((clusterName, task2Group) -> {
            if (!name2ClusterInfos.containsKey(clusterName)) {
                return;
            }
            String clusterTag = ((SortSourceClusterInfo)name2ClusterInfos.get(clusterName)).getClusterTags();
            ConcurrentHashMap validClusterInfos = new ConcurrentHashMap();
            if (allTag2ClusterInfos.containsKey(clusterTag)) {
                validClusterInfos.put(clusterTag, allTag2ClusterInfos.get(clusterTag));
            } else {
                validClusterInfos.putAll(allTag2ClusterInfos);
            }
            ConcurrentHashMap task2Config = new ConcurrentHashMap();
            ConcurrentHashMap task2Md5 = new ConcurrentHashMap();
            task2Group.forEach((task, groupList) -> {
                Map<String, CacheZone> cacheZones;
                HashMap<String, Map<String, String>> group2topicProp = new HashMap<String, Map<String, String>>();
                allStreamInfos.stream().filter(stream -> stream.getSortTaskName().equals(task) && stream.getSortClusterName().equals(clusterName)).forEach(sortSourceStreamInfo -> group2topicProp.put(sortSourceStreamInfo.getGroupId(), sortSourceStreamInfo.getExtParamsMap()));
                try {
                    cacheZones = this.getCacheZones((List<String>)groupList, allId2GroupInfos, validClusterInfos, (Map<String, Map<String, String>>)group2topicProp);
                }
                catch (Throwable t) {
                    LOGGER.error("fail to get cacheZones of clusterName {}, task {}", clusterName, task);
                    return;
                }
                CacheZoneConfig config = CacheZoneConfig.builder().cacheZones(cacheZones).sortClusterName(clusterName).sortTaskId(task).build();
                String jsonStr = GSON.toJson((Object)config);
                String md5 = DigestUtils.md5Hex((String)jsonStr);
                task2Config.put(task, config);
                task2Md5.put(task, md5);
            });
            newConfigMap.put((String)clusterName, task2Config);
            newMd5Map.put((String)clusterName, task2Md5);
        });
        this.sortSourceConfigMap = newConfigMap;
        this.sortSourceMd5Map = newMd5Map;
    }

    private Map<String, CacheZone> getCacheZones(List<String> groupIdList, Map<String, SortSourceGroupInfo> allId2GroupInfos, Map<String, List<SortSourceClusterInfo>> allTag2ClusterInfos, Map<String, Map<String, String>> group2topicProp) {
        List groupInfoStream = groupIdList.stream().filter(allId2GroupInfos::containsKey).map(allId2GroupInfos::get).collect(Collectors.toList());
        Map<String, List<SortSourceGroupInfo>> tag2GroupInfos = groupInfoStream.stream().collect(Collectors.groupingBy(SortSourceGroupInfo::getClusterTag));
        Map<String, List<SortSourceGroupInfo>> backupTag2GroupInfos = groupInfoStream.stream().filter(group -> group.getBackupClusterTag() != null && group.getBackupTopic() != null).collect(Collectors.groupingBy(SortSourceGroupInfo::getBackupClusterTag));
        List<CacheZone> firstTagCacheZoneList = this.getCacheZoneListByTag(tag2GroupInfos, allTag2ClusterInfos, group2topicProp, false);
        List<CacheZone> backupTagCacheZoneList = this.getCacheZoneListByTag(backupTag2GroupInfos, allTag2ClusterInfos, group2topicProp, true);
        return Stream.of(firstTagCacheZoneList, backupTagCacheZoneList).flatMap(Collection::stream).collect(Collectors.toMap(CacheZone::getZoneName, cacheZone -> cacheZone, (zone1, zone2) -> {
            zone1.getTopics().addAll(zone2.getTopics());
            return zone1;
        }));
    }

    private List<CacheZone> getCacheZoneListByTag(Map<String, List<SortSourceGroupInfo>> tag2GroupInfos, Map<String, List<SortSourceClusterInfo>> allTag2ClusterInfos, Map<String, Map<String, String>> group2topicProp, boolean isBackupTag) {
        ArrayList<String> tags = new ArrayList<String>(tag2GroupInfos.keySet());
        HashMap tag2ClusterInfos = new HashMap();
        allTag2ClusterInfos.entrySet().stream().filter(entry -> tag2GroupInfos.containsKey(entry.getKey())).forEach(entry -> {
            List cfr_ignored_0 = (List)tag2ClusterInfos.put(entry.getKey(), entry.getValue());
        });
        return tags.stream().filter(tag2ClusterInfos::containsKey).flatMap(tag -> {
            List groups = (List)tag2GroupInfos.get(tag);
            List clusters = (List)tag2ClusterInfos.get(tag);
            return clusters.stream().map(cluster -> {
                CacheZone zone = null;
                try {
                    zone = this.getCacheZone(groups, (SortSourceClusterInfo)cluster, group2topicProp, isBackupTag);
                }
                catch (IllegalStateException e) {
                    LOGGER.error("fail to init cache zone for cluster " + cluster, (Throwable)e);
                }
                return zone;
            });
        }).filter(Objects::nonNull).collect(Collectors.toList());
    }

    private CacheZone getCacheZone(List<SortSourceGroupInfo> groups, SortSourceClusterInfo cluster, Map<String, Map<String, String>> group2topicProp, boolean isBackupTag) {
        Map param = cluster.getExtParamsMap();
        String serviceUrl = (String)Optional.ofNullable(param.get(KEY_SERVICE_URL)).orElseThrow(() -> new IllegalStateException("there is no serviceUrl for cluster " + cluster.getName()));
        String tenant = (String)param.get(KEY_TENANT);
        String namespace = (String)param.get(KEY_NAME_SPACE);
        String authentication = Optional.ofNullable(param.get(KEY_AUTH)).orElse("");
        List topics = groups.stream().map(groupInfo -> this.getTopic((SortSourceGroupInfo)groupInfo, tenant, namespace, (Map)group2topicProp.get(groupInfo.getGroupId()), isBackupTag)).collect(Collectors.toList());
        return CacheZone.builder().serviceUrl(serviceUrl).authentication(authentication).cacheZoneProperties(param).zoneName(cluster.getName()).zoneType(cluster.getType()).topics(topics).build();
    }

    private Topic getTopic(SortSourceGroupInfo groupInfo, String tenant, String namespace, Map<String, String> topicProperties, boolean isBackupTag) {
        String topic = isBackupTag ? groupInfo.getBackupTopic() : groupInfo.getTopic();
        StringBuilder fullTopic = new StringBuilder();
        Optional.ofNullable(tenant).ifPresent(t -> fullTopic.append((String)t).append("/"));
        Optional.ofNullable(namespace).ifPresent(n -> fullTopic.append((String)n).append("/"));
        fullTopic.append(topic);
        return Topic.builder().topic(fullTopic.toString()).topicProperties(topicProperties).build();
    }

    private void setReloadTimer() {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        long reloadInterval = 60000L;
        executorService.scheduleAtFixedRate(this::reload, reloadInterval, reloadInterval, TimeUnit.MILLISECONDS);
    }
}

