/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.repository;

import com.google.common.base.Splitter;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject;
import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
import org.apache.inlong.common.pojo.dataproxy.IRepository;
import org.apache.inlong.common.pojo.dataproxy.InLongIdObject;
import org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject;
import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.ClusterSetMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.pojo.dataproxy.CacheCluster;
import org.apache.inlong.manager.pojo.dataproxy.InlongGroupId;
import org.apache.inlong.manager.pojo.dataproxy.InlongStreamId;
import org.apache.inlong.manager.pojo.dataproxy.ProxyCluster;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;

@Lazy
@Repository(value="dataProxyConfigRepository")
public class DataProxyConfigRepository
implements IRepository {
    public static final Logger LOGGER = LoggerFactory.getLogger(DataProxyConfigRepository.class);
    public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag";
    public static final String KEY_BACKUP_TOPIC = "backup_topic";
    public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
    public static final String KEY_DATA_NODE_NAME = "defaultDataNodeName";
    public static final String KEY_SORT_CONSUEMER_GROUP = "defaultSortConsumerGroup";
    public static final String KEY_SINK_NAME = "defaultSinkName";
    public static final Splitter.MapSplitter MAP_SPLITTER = Splitter.on((String)"&").trimResults().withKeyValueSeparator("=");
    public static final String CACHE_CLUSTER_PRODUCER_TAG = "producer";
    public static final String CACHE_CLUSTER_CONSUMER_TAG = "consumer";
    private static final Gson gson = new Gson();
    private Map<String, String> proxyConfigJson = new ConcurrentHashMap<String, String>();
    private Map<String, String> proxyMd5Map = new ConcurrentHashMap<String, String>();
    private long reloadInterval;
    @Autowired
    private ClusterSetMapper clusterSetMapper;
    @Autowired
    private InlongClusterEntityMapper clusterMapper;
    @Autowired
    private InlongGroupEntityMapper inlongGroupMapper;
    @Autowired
    private StreamSinkEntityMapper streamSinkMapper;

    @PostConstruct
    public void initialize() {
        LOGGER.info("create repository for " + DataProxyConfigRepository.class.getSimpleName());
        try {
            this.reloadInterval = 60000L;
            this.reload();
            this.setReloadTimer();
        }
        catch (Throwable t) {
            LOGGER.error("Initialize DataProxyConfigRepository error", t);
        }
    }

    public ClusterSetMapper getClusterSetMapper() {
        return this.clusterSetMapper;
    }

    public void setClusterSetMapper(ClusterSetMapper clusterSetMapper) {
        this.clusterSetMapper = clusterSetMapper;
    }

    public InlongClusterEntityMapper getClusterMapper() {
        return this.clusterMapper;
    }

    public void setClusterMapper(InlongClusterEntityMapper clusterMapper) {
        this.clusterMapper = clusterMapper;
    }

    public InlongGroupEntityMapper getInlongGroupMapper() {
        return this.inlongGroupMapper;
    }

    public void setInlongGroupMapper(InlongGroupEntityMapper inlongGroupMapper) {
        this.inlongGroupMapper = inlongGroupMapper;
    }

    public StreamSinkEntityMapper getStreamSinkMapper() {
        return this.streamSinkMapper;
    }

    public void setStreamSinkMapper(StreamSinkEntityMapper streamSinkMapper) {
        this.streamSinkMapper = streamSinkMapper;
    }

    @Transactional(rollbackFor={Exception.class})
    public void reload() {
        LOGGER.info("start to reload config.");
        Map<String, ProxyClusterObject> proxyClusterMap = this.reloadProxyCluster();
        if (proxyClusterMap.size() == 0) {
            return;
        }
        Map<String, Map<String, List<CacheCluster>>> cacheClusterMap = this.reloadCacheCluster();
        Map<String, List<InLongIdObject>> inlongIdMap = this.reloadInlongId();
        for (Map.Entry<String, ProxyClusterObject> entry : proxyClusterMap.entrySet()) {
            String clusterTag = entry.getValue().getSetName();
            List<InLongIdObject> inlongIds = inlongIdMap.get(clusterTag);
            if (inlongIds == null) continue;
            entry.getValue().setInlongIds(inlongIds);
        }
        this.generateClusterJson(proxyClusterMap, cacheClusterMap);
        LOGGER.info("end to reload config.");
    }

    private Map<String, ProxyClusterObject> reloadProxyCluster() {
        HashMap<String, ProxyClusterObject> proxyClusterMap = new HashMap<String, ProxyClusterObject>();
        for (ProxyCluster proxyCluster : this.clusterSetMapper.selectProxyCluster()) {
            ProxyClusterObject obj = new ProxyClusterObject();
            obj.setName(proxyCluster.getClusterName());
            obj.setSetName(proxyCluster.getClusterTag());
            obj.setZone(proxyCluster.getExtTag());
            proxyClusterMap.put(obj.getName(), obj);
        }
        return proxyClusterMap;
    }

    private Map<String, Map<String, List<CacheCluster>>> reloadCacheCluster() {
        HashMap<String, Map<String, List<CacheCluster>>> cacheClusterMap = new HashMap<String, Map<String, List<CacheCluster>>>();
        for (CacheCluster cacheCluster : this.clusterSetMapper.selectCacheCluster()) {
            Map tagMap;
            String producerTag;
            if (StringUtils.isEmpty((CharSequence)cacheCluster.getExtTag()) || !StringUtils.equalsIgnoreCase((CharSequence)(producerTag = (tagMap = MAP_SPLITTER.split((CharSequence)cacheCluster.getExtTag())).getOrDefault(CACHE_CLUSTER_PRODUCER_TAG, Boolean.TRUE.toString())), (CharSequence)Boolean.TRUE.toString())) continue;
            cacheClusterMap.computeIfAbsent(cacheCluster.getClusterTags(), k -> new HashMap()).computeIfAbsent(cacheCluster.getExtTag(), k -> new ArrayList()).add(cacheCluster);
        }
        return cacheClusterMap;
    }

    private Map<String, List<InLongIdObject>> reloadInlongId() {
        HashMap groupIdMap = new HashMap();
        this.clusterSetMapper.selectInlongGroupId().forEach(value -> groupIdMap.put(value.getInlongGroupId(), value));
        HashMap<String, List<InLongIdObject>> inlongIdMap = new HashMap<String, List<InLongIdObject>>();
        for (InlongStreamId streamIdObj : this.clusterSetMapper.selectInlongStreamId()) {
            String groupId = streamIdObj.getInlongGroupId();
            InlongGroupId groupIdObj = (InlongGroupId)groupIdMap.get(groupId);
            if (groupId == null) continue;
            Map<String, String> groupParams = this.getExtParams(groupIdObj.getExtParams());
            Map<String, String> streamParams = this.getExtParams(streamIdObj.getExtParams());
            this.parseMasterTopic(groupIdObj, streamIdObj, groupParams, streamParams, inlongIdMap);
            this.parseBackupTopic(groupIdObj, streamIdObj, groupParams, streamParams, inlongIdMap);
        }
        return inlongIdMap;
    }

    private Map<String, String> getExtParams(String extParams) {
        if (!StringUtils.isEmpty((CharSequence)extParams)) {
            try {
                Map groupParams = (Map)gson.fromJson(extParams, HashMap.class);
                return groupParams;
            }
            catch (Exception e) {
                LOGGER.error("Fail to parse ext error:{},params:{}", new Object[]{e.getMessage(), extParams, e});
            }
        }
        return new HashMap<String, String>();
    }

    private void parseMasterTopic(InlongGroupId groupIdObj, InlongStreamId streamIdObj, Map<String, String> groupParams, Map<String, String> streamParams, Map<String, List<InLongIdObject>> inlongIdMap) {
        String groupTopic = groupIdObj.getTopic();
        String streamTopic = streamIdObj.getTopic();
        String finalTopic = null;
        if (StringUtils.isEmpty((CharSequence)groupTopic)) {
            if (StringUtils.isEmpty((CharSequence)streamTopic)) {
                return;
            }
            finalTopic = streamTopic;
        } else {
            finalTopic = StringUtils.isEmpty((CharSequence)streamTopic) ? groupTopic : groupTopic + "/" + streamTopic;
        }
        InLongIdObject obj = new InLongIdObject();
        String inlongId = streamIdObj.getInlongGroupId() + "." + streamIdObj.getInlongStreamId();
        obj.setInlongId(inlongId);
        obj.setTopic(finalTopic);
        HashMap<String, String> params = new HashMap<String, String>();
        params.putAll(groupParams);
        params.putAll(streamParams);
        obj.setParams(params);
        inlongIdMap.computeIfAbsent(groupIdObj.getClusterTag(), k -> new ArrayList()).add(obj);
    }

    private void parseBackupTopic(InlongGroupId groupIdObj, InlongStreamId streamIdObj, Map<String, String> groupParams, Map<String, String> streamParams, Map<String, List<InLongIdObject>> inlongIdMap) {
        HashMap<String, String> params = new HashMap<String, String>();
        params.putAll(groupParams);
        params.putAll(streamParams);
        String clusterTag = (String)params.get(KEY_BACKUP_CLUSTER_TAG);
        if (StringUtils.isEmpty((CharSequence)clusterTag)) {
            return;
        }
        String groupTopic = groupParams.get(KEY_BACKUP_TOPIC);
        String streamTopic = streamParams.get(KEY_BACKUP_TOPIC);
        String finalTopic = null;
        if (StringUtils.isEmpty((CharSequence)groupTopic)) {
            if (StringUtils.isEmpty((CharSequence)streamTopic)) {
                return;
            }
            finalTopic = streamTopic;
        } else {
            finalTopic = StringUtils.isEmpty((CharSequence)streamTopic) ? groupTopic : groupTopic + "/" + streamTopic;
        }
        InLongIdObject obj = new InLongIdObject();
        String inlongId = streamIdObj.getInlongGroupId() + "." + streamIdObj.getInlongStreamId();
        obj.setInlongId(inlongId);
        obj.setTopic(finalTopic);
        obj.setParams(params);
        inlongIdMap.computeIfAbsent(clusterTag, k -> new ArrayList()).add(obj);
    }

    private void setReloadTimer() {
        Timer reloadTimer = new Timer(true);
        RepositoryTimerTask task = new RepositoryTimerTask((IRepository)this);
        reloadTimer.scheduleAtFixedRate((TimerTask)task, this.reloadInterval, this.reloadInterval);
    }

    private void generateClusterJson(Map<String, ProxyClusterObject> proxyClusterMap, Map<String, Map<String, List<CacheCluster>>> cacheClusterMap) {
        ConcurrentHashMap<String, String> newProxyConfigJson = new ConcurrentHashMap<String, String>();
        ConcurrentHashMap<String, String> newProxyMd5Map = new ConcurrentHashMap<String, String>();
        HashMap<String, Map> tagCache = new HashMap<String, Map>();
        for (Map.Entry<String, ProxyClusterObject> entry : proxyClusterMap.entrySet()) {
            ProxyClusterObject proxyObj = entry.getValue();
            DataProxyCluster clusterObj = new DataProxyCluster();
            clusterObj.setProxyCluster(proxyObj);
            String clusterTag = proxyObj.getSetName();
            String extTag = proxyObj.getZone();
            Map<String, List<CacheCluster>> cacheClusterZoneMap = cacheClusterMap.get(clusterTag);
            if (cacheClusterZoneMap != null) {
                Map subTagMap = tagCache.computeIfAbsent(extTag, k -> MAP_SPLITTER.split((CharSequence)extTag));
                for (Map.Entry<String, List<CacheCluster>> cacheEntry : cacheClusterZoneMap.entrySet()) {
                    Map wholeTagMap;
                    if (cacheEntry.getValue().size() == 0 || !this.isSubTag(wholeTagMap = tagCache.computeIfAbsent(cacheEntry.getKey(), k -> MAP_SPLITTER.split((CharSequence)cacheEntry.getKey())), subTagMap)) continue;
                    CacheClusterSetObject cacheSet = clusterObj.getCacheClusterSet();
                    cacheSet.setSetName(clusterTag);
                    List<CacheCluster> cacheClusterList = cacheEntry.getValue();
                    cacheSet.setType(cacheClusterList.get(0).getType());
                    ArrayList<CacheClusterObject> cacheClusters = new ArrayList<CacheClusterObject>(cacheClusterList.size());
                    cacheSet.setCacheClusters(cacheClusters);
                    for (CacheCluster cacheCluster : cacheClusterList) {
                        CacheClusterObject obj = new CacheClusterObject();
                        obj.setName(cacheCluster.getClusterName());
                        obj.setZone(cacheCluster.getExtTag());
                        try {
                            Map params = (Map)gson.fromJson(cacheCluster.getExtParams(), Map.class);
                            obj.setParams(params);
                        }
                        catch (Exception e) {
                            LOGGER.error(e.getMessage(), (Throwable)e);
                        }
                        cacheClusters.add(obj);
                    }
                }
            }
            String jsonDataProxyCluster = gson.toJson((Object)clusterObj);
            String md5 = DigestUtils.md5Hex((String)jsonDataProxyCluster);
            DataProxyConfigResponse response = new DataProxyConfigResponse();
            response.setResult(Boolean.valueOf(true));
            response.setErrCode(Integer.valueOf(0));
            response.setMd5(md5);
            response.setData(clusterObj);
            String jsonResponse = gson.toJson((Object)response);
            newProxyConfigJson.put(proxyObj.getName(), jsonResponse);
            newProxyMd5Map.put(proxyObj.getName(), md5);
        }
        this.proxyConfigJson = newProxyConfigJson;
        this.proxyMd5Map = newProxyMd5Map;
    }

    private boolean isSubTag(Map<String, String> wholeTagMap, Map<String, String> subTagMap) {
        for (Map.Entry<String, String> entry : subTagMap.entrySet()) {
            String value = wholeTagMap.get(entry.getKey());
            if (value != null && value.equals(entry.getValue())) continue;
            return false;
        }
        return true;
    }

    public String getProxyMd5(String clusterName) {
        return this.proxyMd5Map.get(clusterName);
    }

    public String getProxyConfigJson(String clusterName) {
        return this.proxyConfigJson.get(clusterName);
    }

    public String changeClusterTag(String inlongGroupId, String clusterTag, String topic) {
        try {
            InlongGroupEntity oldGroup = this.inlongGroupMapper.selectByGroupId(inlongGroupId);
            if (oldGroup == null) {
                throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
            }
            String oldClusterTag = oldGroup.getInlongClusterTag();
            if (StringUtils.equals((CharSequence)oldClusterTag, (CharSequence)clusterTag)) {
                return "Cluster tag is same.";
            }
            InlongGroupEntity newGroup = this.prepareClusterTagGroup(oldGroup, clusterTag, topic);
            HashMap clusterMap = new HashMap();
            ClusterPageRequest clusterRequest = new ClusterPageRequest();
            List clusters = this.clusterMapper.selectByCondition(clusterRequest);
            clusters.forEach(v -> clusterMap.put(v.getName(), v));
            SinkPageRequest request = new SinkPageRequest();
            request.setInlongGroupId(inlongGroupId);
            List streamSinks = this.streamSinkMapper.selectByCondition(request);
            ArrayList<StreamSinkEntity> newStreamSinks = new ArrayList<StreamSinkEntity>();
            for (StreamSinkEntity streamSink : streamSinks) {
                String clusterType;
                StreamSinkEntity newStreamSink;
                String clusterName = streamSink.getInlongClusterName();
                InlongClusterEntity cluster = (InlongClusterEntity)clusterMap.get(clusterName);
                if (cluster == null || !StringUtils.equals((CharSequence)oldClusterTag, (CharSequence)cluster.getClusterTags()) || (newStreamSink = this.createNewStreamSink(clusters, clusterType = cluster.getType(), clusterTag, streamSink)) == null) continue;
                newStreamSinks.add(newStreamSink);
            }
            newStreamSinks.forEach(v -> this.streamSinkMapper.insert(v));
            int rowCount = this.inlongGroupMapper.updateByIdentifierSelective(newGroup);
            if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
                LOGGER.error("inlong group has already updated with group id={}, curVersion={}", (Object)newGroup.getInlongGroupId(), (Object)newGroup.getVersion());
                throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
            }
            return inlongGroupId;
        }
        catch (Exception e) {
            LOGGER.error(e.getMessage(), (Throwable)e);
            return e.getMessage();
        }
    }

    private StreamSinkEntity createNewStreamSink(List<InlongClusterEntity> clusters, String clusterType, String clusterTag, StreamSinkEntity srcStreamSink) {
        for (InlongClusterEntity v : clusters) {
            if (!StringUtils.equals((CharSequence)clusterType, (CharSequence)v.getType()) || !StringUtils.equals((CharSequence)clusterTag, (CharSequence)v.getClusterTags())) continue;
            Gson gson = new Gson();
            String newExtParams = v.getExtParams();
            JsonObject extParams = (JsonObject)gson.fromJson(newExtParams, JsonObject.class);
            if (extParams.has(KEY_SINK_NAME) && extParams.has(KEY_SORT_TASK_NAME) && extParams.has(KEY_DATA_NODE_NAME) && extParams.has(KEY_SORT_CONSUEMER_GROUP)) {
                String sinkName = extParams.get(KEY_SINK_NAME).getAsString();
                String sortTaskName = extParams.get(KEY_SORT_TASK_NAME).getAsString();
                String dataNodeName = extParams.get(KEY_DATA_NODE_NAME).getAsString();
                String sortConsumerGroup = extParams.get(KEY_SORT_CONSUEMER_GROUP).getAsString();
                StreamSinkEntity newStreamSink = this.copyStreamSink(srcStreamSink);
                newStreamSink.setInlongClusterName(v.getName());
                newStreamSink.setSinkName(sinkName);
                newStreamSink.setSortTaskName(sortTaskName);
                newStreamSink.setDataNodeName(dataNodeName);
                newStreamSink.setSortConsumerGroup(sortConsumerGroup);
                return newStreamSink;
            }
            return null;
        }
        return null;
    }

    private StreamSinkEntity copyStreamSink(StreamSinkEntity streamSink) {
        StreamSinkEntity streamSinkDest = new StreamSinkEntity();
        CommonBeanUtils.copyProperties((Object)streamSink, (Object)streamSinkDest);
        streamSinkDest.setId(null);
        streamSinkDest.setModifyTime(new Date());
        return streamSinkDest;
    }

    private InlongGroupEntity prepareClusterTagGroup(InlongGroupEntity oldGroup, String clusterTag, String topic) throws IllegalAccessException, InvocationTargetException {
        String extParams = oldGroup.getExtParams();
        if (StringUtils.isEmpty((CharSequence)extParams)) {
            extParams = "{}";
        }
        Gson gson = new Gson();
        JsonObject extParamsObj = (JsonObject)gson.fromJson(extParams, JsonObject.class);
        extParamsObj.addProperty(KEY_BACKUP_CLUSTER_TAG, oldGroup.getInlongClusterTag());
        extParamsObj.addProperty(KEY_BACKUP_TOPIC, oldGroup.getMqResource());
        InlongGroupEntity newGroup = new InlongGroupEntity();
        BeanUtils.copyProperties((Object)newGroup, (Object)oldGroup);
        newGroup.setId(null);
        newGroup.setInlongClusterTag(clusterTag);
        newGroup.setMqResource(topic);
        String newExtParams = extParamsObj.toString();
        newGroup.setExtParams(newExtParams);
        return newGroup;
    }

    public String removeBackupClusterTag(String inlongGroupId) {
        InlongGroupEntity oldGroup = this.inlongGroupMapper.selectByGroupId(inlongGroupId);
        if (oldGroup == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        String extParams = oldGroup.getExtParams();
        if (StringUtils.isEmpty((CharSequence)extParams)) {
            return inlongGroupId;
        }
        Gson gson = new Gson();
        JsonObject extParamsObj = (JsonObject)gson.fromJson(extParams, JsonObject.class);
        if (!extParamsObj.has(KEY_BACKUP_CLUSTER_TAG)) {
            return inlongGroupId;
        }
        String oldClusterTag = extParamsObj.get(KEY_BACKUP_CLUSTER_TAG).getAsString();
        extParamsObj.remove(KEY_BACKUP_CLUSTER_TAG);
        extParamsObj.remove(KEY_BACKUP_TOPIC);
        String newExtParams = extParamsObj.toString();
        oldGroup.setExtParams(newExtParams);
        int rowCount = this.inlongGroupMapper.updateByIdentifierSelective(oldGroup);
        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
            LOGGER.error("inlong group has already updated with group id={}, curVersion={}", (Object)oldGroup.getInlongGroupId(), (Object)oldGroup.getVersion());
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        HashMap clusterMap = new HashMap();
        ClusterPageRequest clusterRequest = new ClusterPageRequest();
        List clusters = this.clusterMapper.selectByCondition(clusterRequest);
        clusters.forEach(v -> clusterMap.put(v.getName(), v));
        SinkPageRequest request = new SinkPageRequest();
        request.setInlongGroupId(inlongGroupId);
        List streamSinks = this.streamSinkMapper.selectByCondition(request);
        ArrayList<StreamSinkEntity> deleteStreamSinks = new ArrayList<StreamSinkEntity>();
        for (StreamSinkEntity streamSink : streamSinks) {
            String clusterName = streamSink.getInlongClusterName();
            InlongClusterEntity cluster = (InlongClusterEntity)clusterMap.get(clusterName);
            if (cluster == null || !StringUtils.equals((CharSequence)oldClusterTag, (CharSequence)cluster.getClusterTags())) continue;
            deleteStreamSinks.add(streamSink);
        }
        deleteStreamSinks.forEach(v -> this.streamSinkMapper.deleteByPrimaryKey(v.getId()));
        return inlongGroupId;
    }
}

