/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.core.impl;

import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.sdk.CacheZone;
import org.apache.inlong.common.pojo.sdk.CacheZoneConfig;
import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
import org.apache.inlong.common.pojo.sdk.Topic;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceClusterInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceGroupInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSourceStreamSinkInfo;
import org.apache.inlong.manager.service.core.SortConfigLoader;
import org.apache.inlong.manager.service.core.SortSourceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Lazy
@Service
public class SortSourceServiceImpl
implements SortSourceService {
    private static final Logger LOGGER = LoggerFactory.getLogger(SortSourceServiceImpl.class);
    private static final Gson GSON = new Gson();
    private static final Set<String> SUPPORTED_MQ_TYPE = new HashSet<String>(){
        {
            this.add("KAFKA");
            this.add("TUBEMQ");
            this.add("PULSAR");
        }
    };
    private static final String KEY_AUTH = "authentication";
    private static final String KEY_TENANT = "tenant";
    private static final int RESPONSE_CODE_SUCCESS = 0;
    private static final int RESPONSE_CODE_NO_UPDATE = 1;
    private static final int RESPONSE_CODE_FAIL = -1;
    private static final int RESPONSE_CODE_REQ_PARAMS_ERROR = -101;
    private Map<String, Map<String, String>> sortSourceMd5Map = new ConcurrentHashMap<String, Map<String, String>>();
    private Map<String, Map<String, CacheZoneConfig>> sortSourceConfigMap = new ConcurrentHashMap<String, Map<String, CacheZoneConfig>>();
    private Map<String, SortSourceClusterInfo> sortClusters;
    private Map<String, List<SortSourceClusterInfo>> mqClusters;
    private Map<String, SortSourceGroupInfo> groupInfos;
    private Map<String, Map<String, SortSourceStreamInfo>> allStreams;
    private Map<String, String> backupClusterTag;
    private Map<String, String> backupGroupMqResource;
    private Map<String, Map<String, String>> backupStreamMqResource;
    private Map<String, Map<String, List<SortSourceStreamSinkInfo>>> streamSinkMap;
    @Autowired
    private SortConfigLoader configLoader;

    @PostConstruct
    public void initialize() {
        LOGGER.info("create repository for " + SortSourceServiceImpl.class.getSimpleName());
        try {
            this.reload();
            this.setReloadTimer();
        }
        catch (Throwable t) {
            LOGGER.error("initialize SortSourceConfigRepository error", t);
        }
    }

    @Transactional(rollbackFor={Exception.class})
    public void reload() {
        LOGGER.debug("start to reload sort config.");
        try {
            this.reloadAllConfigs();
            this.parseAll();
        }
        catch (Throwable t) {
            LOGGER.error("fail to reload all source config", t);
        }
        LOGGER.debug("end to reload config");
    }

    @Override
    public SortSourceConfigResponse getSourceConfig(String cluster, String task, String md5) {
        if (StringUtils.isBlank((CharSequence)cluster) || StringUtils.isBlank((CharSequence)task)) {
            String errMsg = "blank cluster name or task name, return nothing";
            LOGGER.debug(errMsg);
            return SortSourceConfigResponse.builder().code(-101).msg(errMsg).build();
        }
        if (!this.sortSourceConfigMap.containsKey(cluster) || !this.sortSourceConfigMap.get(cluster).containsKey(task)) {
            String errMsg = String.format("there is no valid source config of cluster %s, task %s", cluster, task);
            LOGGER.debug(errMsg);
            return SortSourceConfigResponse.builder().code(0).msg(errMsg).build();
        }
        if (this.sortSourceMd5Map.get(cluster).get(task).equals(md5)) {
            return SortSourceConfigResponse.builder().code(1).msg("No update").md5(md5).build();
        }
        if (this.sortSourceConfigMap.get(cluster).get(task).getCacheZones().isEmpty()) {
            String errMsg = String.format("find empty cache zones of cluster %s, task %s, please check the manager log", cluster, task);
            LOGGER.debug(errMsg);
            return SortSourceConfigResponse.builder().code(-1).msg(errMsg).build();
        }
        return SortSourceConfigResponse.builder().code(0).msg("Success").data(this.sortSourceConfigMap.get(cluster).get(task)).md5(this.sortSourceMd5Map.get(cluster).get(task)).build();
    }

    private void reloadAllConfigs() {
        List<SortSourceClusterInfo> allClusters = this.configLoader.loadAllClusters();
        this.sortClusters = allClusters.stream().collect(Collectors.toMap(SortSourceClusterInfo::getName, v -> v));
        this.mqClusters = allClusters.stream().filter(cluster -> SUPPORTED_MQ_TYPE.contains(cluster.getType())).filter(SortSourceClusterInfo::isConsumable).collect(Collectors.groupingBy(SortSourceClusterInfo::getClusterTags));
        List<SortSourceStreamSinkInfo> allStreamSinks = this.configLoader.loadAllStreamSinks();
        this.streamSinkMap = new HashMap<String, Map<String, List<SortSourceStreamSinkInfo>>>();
        allStreamSinks.stream().filter(sink -> sink.getSortClusterName() != null).filter(sink -> sink.getSortTaskName() != null).forEach(sink -> {
            Map task2groupsMap = this.streamSinkMap.computeIfAbsent(sink.getSortClusterName(), k -> new ConcurrentHashMap());
            List sinkInfoList = task2groupsMap.computeIfAbsent(sink.getSortTaskName(), k -> new ArrayList());
            sinkInfoList.add(sink);
        });
        this.groupInfos = this.configLoader.loadAllGroup().stream().collect(Collectors.toMap(SortSourceGroupInfo::getGroupId, info -> info));
        this.backupClusterTag = this.configLoader.loadGroupBackupInfo("backup_cluster_tag").stream().collect(Collectors.toMap(InlongGroupExtEntity::getInlongGroupId, InlongGroupExtEntity::getKeyValue));
        this.backupGroupMqResource = this.configLoader.loadGroupBackupInfo("backup_mq_resource").stream().collect(Collectors.toMap(InlongGroupExtEntity::getInlongGroupId, InlongGroupExtEntity::getKeyValue));
        this.allStreams = this.configLoader.loadAllStreams().stream().collect(Collectors.groupingBy(SortSourceStreamInfo::getInlongGroupId, Collectors.toMap(SortSourceStreamInfo::getInlongStreamId, info -> info)));
        this.backupStreamMqResource = this.configLoader.loadStreamBackupInfo("backup_mq_resource").stream().collect(Collectors.groupingBy(InlongStreamExtEntity::getInlongGroupId, Collectors.toMap(InlongStreamExtEntity::getInlongStreamId, InlongStreamExtEntity::getKeyValue)));
    }

    private void parseAll() {
        ConcurrentHashMap<String, Map<String, String>> newMd5Map = new ConcurrentHashMap<String, Map<String, String>>();
        ConcurrentHashMap<String, Map<String, CacheZoneConfig>> newConfigMap = new ConcurrentHashMap<String, Map<String, CacheZoneConfig>>();
        this.streamSinkMap.forEach((sortClusterName, task2SinkList) -> {
            ConcurrentHashMap task2Config = new ConcurrentHashMap();
            ConcurrentHashMap task2Md5 = new ConcurrentHashMap();
            task2SinkList.forEach((taskName, sinkList) -> {
                try {
                    CacheZoneConfig cacheZoneConfig = CacheZoneConfig.builder().sortClusterName(sortClusterName).sortTaskId(taskName).build();
                    Map<String, CacheZone> cacheZoneMap = this.parseCacheZones((String)sortClusterName, (List<SortSourceStreamSinkInfo>)sinkList);
                    cacheZoneConfig.setCacheZones(cacheZoneMap);
                    String jsonStr = GSON.toJson((Object)cacheZoneConfig);
                    String md5 = DigestUtils.md5Hex((String)jsonStr);
                    task2Config.put(taskName, cacheZoneConfig);
                    task2Md5.put(taskName, md5);
                }
                catch (Throwable t) {
                    LOGGER.error("failed to parse sort source config of sortCluster={}, task={}", new Object[]{sortClusterName, taskName, t});
                }
            });
            newConfigMap.put((String)sortClusterName, task2Config);
            newMd5Map.put((String)sortClusterName, task2Md5);
        });
        this.sortSourceConfigMap = newConfigMap;
        this.sortSourceMd5Map = newMd5Map;
        this.sortClusters = null;
        this.mqClusters = null;
        this.groupInfos = null;
        this.allStreams = null;
        this.backupClusterTag = null;
        this.backupGroupMqResource = null;
        this.backupStreamMqResource = null;
        this.streamSinkMap = null;
    }

    private Map<String, CacheZone> parseCacheZones(String clusterName, List<SortSourceStreamSinkInfo> sinkList) {
        Preconditions.expectNotNull((Object)this.sortClusters.get(clusterName), (String)"sort cluster should not be NULL");
        String sortClusterTag = this.sortClusters.get(clusterName).getClusterTags();
        List sinkInfoList = sinkList.stream().filter(sinkInfo -> this.groupInfos.containsKey(sinkInfo.getGroupId()) && this.allStreams.containsKey(sinkInfo.getGroupId()) && this.allStreams.get(sinkInfo.getGroupId()).containsKey(sinkInfo.getStreamId())).collect(Collectors.toList());
        Map<String, List<SortSourceStreamSinkInfo>> tag2SinkInfos = sinkInfoList.stream().filter(sink -> Objects.nonNull(this.groupInfos.get(sink.getGroupId()))).filter(sink -> {
            if (StringUtils.isBlank((CharSequence)sortClusterTag)) {
                return true;
            }
            return sortClusterTag.equals(this.groupInfos.get(sink.getGroupId()).getClusterTag());
        }).collect(Collectors.groupingBy(sink -> {
            SortSourceGroupInfo groupInfo = this.groupInfos.get(sink.getGroupId());
            return groupInfo.getClusterTag();
        }));
        Map<String, List<SortSourceStreamSinkInfo>> backupTag2SinkInfos = sinkInfoList.stream().filter(sink -> this.backupClusterTag.containsKey(sink.getGroupId())).filter(sink -> {
            if (StringUtils.isBlank((CharSequence)sortClusterTag)) {
                return true;
            }
            return sortClusterTag.equals(this.backupClusterTag.get(sink.getGroupId()));
        }).collect(Collectors.groupingBy(info -> this.backupClusterTag.get(info.getGroupId())));
        List<CacheZone> cacheZones = this.parseCacheZonesByTag(tag2SinkInfos, false);
        List<CacheZone> backupCacheZones = this.parseCacheZonesByTag(backupTag2SinkInfos, true);
        return Stream.of(cacheZones, backupCacheZones).flatMap(Collection::stream).collect(Collectors.toMap(CacheZone::getZoneName, cacheZone -> cacheZone, (zone1, zone2) -> {
            zone1.getTopics().addAll(zone2.getTopics());
            return zone1;
        }));
    }

    private List<CacheZone> parseCacheZonesByTag(Map<String, List<SortSourceStreamSinkInfo>> tag2Sinks, boolean isBackup) {
        return tag2Sinks.keySet().stream().filter(this.mqClusters::containsKey).flatMap(tag -> {
            List sinks = (List)tag2Sinks.get(tag);
            List<SortSourceClusterInfo> clusters = this.mqClusters.get(tag);
            return clusters.stream().map(cluster -> {
                CacheZone zone = null;
                try {
                    zone = this.parseCacheZone(sinks, (SortSourceClusterInfo)cluster, isBackup);
                }
                catch (IllegalStateException e) {
                    LOGGER.error("fail to init cache zone for cluster " + cluster, (Throwable)e);
                }
                return zone;
            });
        }).collect(Collectors.toList());
    }

    private CacheZone parseCacheZone(List<SortSourceStreamSinkInfo> sinks, SortSourceClusterInfo cluster, boolean isBackupTag) {
        switch (cluster.getType()) {
            case "PULSAR": {
                return this.parsePulsarZone(sinks, cluster, isBackupTag);
            }
        }
        throw new BusinessException(String.format("do not support cluster type=%s of cluster=%s", cluster.getType(), cluster));
    }

    private CacheZone parsePulsarZone(List<SortSourceStreamSinkInfo> sinks, SortSourceClusterInfo cluster, boolean isBackupTag) {
        Map param = cluster.getExtParamsMap();
        String tenant = (String)param.get(KEY_TENANT);
        String auth = (String)param.get(KEY_AUTH);
        List sdkTopics = sinks.stream().map(sink -> {
            String groupId = sink.getGroupId();
            String streamId = sink.getStreamId();
            SortSourceGroupInfo groupInfo = this.groupInfos.get(groupId);
            SortSourceStreamInfo streamInfo = this.allStreams.get(groupId).get(streamId);
            try {
                String namespace = groupInfo.getMqResource();
                String topic = streamInfo.getMqResource();
                if (isBackupTag) {
                    if (this.backupGroupMqResource.containsKey(groupId)) {
                        namespace = this.backupGroupMqResource.get(groupId);
                    }
                    if (this.backupStreamMqResource.containsKey(groupId) && this.backupStreamMqResource.get(groupId).containsKey(streamId)) {
                        topic = this.backupStreamMqResource.get(groupId).get(streamId);
                    }
                }
                String fullTopic = tenant.concat("/").concat(namespace).concat("/").concat(topic);
                return Topic.builder().topic(fullTopic).topicProperties(sink.getExtParamsMap()).build();
            }
            catch (Exception e) {
                LOGGER.error("fail to parse topic of groupId={}, streamId={}", new Object[]{groupId, streamId, e});
                return null;
            }
        }).filter(Objects::nonNull).collect(Collectors.toList());
        return CacheZone.builder().zoneName(cluster.getName()).serviceUrl(cluster.getUrl()).topics(sdkTopics).authentication(auth).cacheZoneProperties(cluster.getExtParamsMap()).zoneType("PULSAR").build();
    }

    private void setReloadTimer() {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        long reloadInterval = 60000L;
        executorService.scheduleAtFixedRate(this::reload, reloadInterval, reloadInterval, TimeUnit.MILLISECONDS);
    }
}

