package org.apache.inlong.manager.service.repository;

import com.google.common.base.Splitter;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.dataproxy.CacheClusterObject;
import org.apache.inlong.common.pojo.dataproxy.CacheClusterSetObject;
import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
import org.apache.inlong.common.pojo.dataproxy.IRepository;
import org.apache.inlong.common.pojo.dataproxy.InLongIdObject;
import org.apache.inlong.common.pojo.dataproxy.ProxyClusterObject;
import org.apache.inlong.common.pojo.dataproxy.RepositoryTimerTask;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.ClusterSetMapper;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.pojo.dataproxy.CacheCluster;
import org.apache.inlong.manager.pojo.dataproxy.InlongGroupId;
import org.apache.inlong.manager.pojo.dataproxy.InlongStreamId;
import org.apache.inlong.manager.pojo.dataproxy.ProxyCluster;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.service.core.SortConfigLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;

@Lazy
@Repository("dataProxyConfigRepository")
/* loaded from: input_file:org/apache/inlong/manager/service/repository/DataProxyConfigRepository.class */
public class DataProxyConfigRepository implements IRepository {
    public static final String KEY_NAMESPACE = "namespace";
    public static final String KEY_BACKUP_CLUSTER_TAG = "backup_cluster_tag";
    public static final String KEY_BACKUP_TOPIC = "backup_topic";
    public static final String KEY_SORT_TASK_NAME = "defaultSortTaskName";
    public static final String KEY_DATA_NODE_NAME = "defaultDataNodeName";
    public static final String KEY_SORT_CONSUMER_GROUP = "defaultSortConsumerGroup";
    public static final String KEY_SINK_NAME = "defaultSinkName";
    public static final String CACHE_CLUSTER_PRODUCER_TAG = "producer";
    public static final String CACHE_CLUSTER_CONSUMER_TAG = "consumer";
    private Map<String, String> proxyConfigJson = new ConcurrentHashMap();
    private Map<String, String> proxyMd5Map = new ConcurrentHashMap();
    private long reloadInterval;

    @Autowired
    private ClusterSetMapper clusterSetMapper;

    @Autowired
    private InlongClusterEntityMapper clusterMapper;

    @Autowired
    private InlongGroupEntityMapper inlongGroupMapper;

    @Autowired
    private StreamSinkEntityMapper streamSinkMapper;

    @Autowired
    private SortConfigLoader sortConfigLoader;
    public static final Logger LOGGER = LoggerFactory.getLogger(DataProxyConfigRepository.class);
    public static final Splitter.MapSplitter MAP_SPLITTER = Splitter.on("&").trimResults().withKeyValueSeparator("=");
    private static final Gson GSON = new Gson();

    @PostConstruct
    public void initialize() {
        LOGGER.info("create repository for " + DataProxyConfigRepository.class.getSimpleName());
        try {
            this.reloadInterval = 60000L;
            reload();
            setReloadTimer();
        } catch (Throwable th) {
            LOGGER.error("Initialize DataProxyConfigRepository error", th);
        }
    }

    public ClusterSetMapper getClusterSetMapper() {
        return this.clusterSetMapper;
    }

    public void setClusterSetMapper(ClusterSetMapper clusterSetMapper) {
        this.clusterSetMapper = clusterSetMapper;
    }

    public InlongClusterEntityMapper getClusterMapper() {
        return this.clusterMapper;
    }

    public void setClusterMapper(InlongClusterEntityMapper inlongClusterEntityMapper) {
        this.clusterMapper = inlongClusterEntityMapper;
    }

    public InlongGroupEntityMapper getInlongGroupMapper() {
        return this.inlongGroupMapper;
    }

    public void setInlongGroupMapper(InlongGroupEntityMapper inlongGroupEntityMapper) {
        this.inlongGroupMapper = inlongGroupEntityMapper;
    }

    public StreamSinkEntityMapper getStreamSinkMapper() {
        return this.streamSinkMapper;
    }

    public void setStreamSinkMapper(StreamSinkEntityMapper streamSinkEntityMapper) {
        this.streamSinkMapper = streamSinkEntityMapper;
    }

    @Transactional(rollbackFor = {Exception.class})
    public void reload() {
        LOGGER.info("start to reload config:" + getClass().getSimpleName());
        HashMap hashMap = new HashMap();
        reloadProxyCluster(hashMap);
        if (hashMap.size() == 0) {
            return;
        }
        reloadCacheCluster(hashMap);
        reloadInlongId(hashMap);
        generateClusterJson(hashMap);
        LOGGER.info("end to reload config:" + getClass().getSimpleName());
    }

    private void reloadProxyCluster(Map<String, DataProxyCluster> map) {
        for (ProxyCluster proxyCluster : this.clusterSetMapper.selectProxyCluster()) {
            ProxyClusterObject proxyClusterObject = new ProxyClusterObject();
            proxyClusterObject.setName(proxyCluster.getClusterName());
            proxyClusterObject.setSetName(proxyCluster.getClusterTag());
            proxyClusterObject.setZone(proxyCluster.getExtTag());
            DataProxyCluster dataProxyCluster = new DataProxyCluster();
            dataProxyCluster.setProxyCluster(proxyClusterObject);
            map.put(proxyClusterObject.getName(), dataProxyCluster);
        }
    }

    private void reloadCacheCluster(Map<String, DataProxyCluster> map) {
        Map map2;
        HashMap hashMap = new HashMap();
        for (CacheCluster cacheCluster : this.clusterSetMapper.selectCacheCluster()) {
            if (!StringUtils.isEmpty(cacheCluster.getExtTag()) && StringUtils.equalsIgnoreCase((String) MAP_SPLITTER.split(cacheCluster.getExtTag()).getOrDefault(CACHE_CLUSTER_PRODUCER_TAG, Boolean.TRUE.toString()), Boolean.TRUE.toString())) {
                ((List) ((Map) hashMap.computeIfAbsent(cacheCluster.getClusterTags(), str -> {
                    return new HashMap();
                })).computeIfAbsent(cacheCluster.getExtTag(), str2 -> {
                    return new ArrayList();
                })).add(cacheCluster);
            }
        }
        HashMap hashMap2 = new HashMap();
        Iterator<Map.Entry<String, DataProxyCluster>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            DataProxyCluster value = it.next().getValue();
            ProxyClusterObject proxyCluster = value.getProxyCluster();
            String setName = proxyCluster.getSetName();
            String zone = proxyCluster.getZone();
            if (!StringUtils.isEmpty(zone) && (map2 = (Map) hashMap.get(setName)) != null) {
                Map<String, String> map3 = (Map) hashMap2.computeIfAbsent(zone, str3 -> {
                    return MAP_SPLITTER.split(zone);
                });
                for (Map.Entry entry : map2.entrySet()) {
                    if (((List) entry.getValue()).size() != 0 && isSubTag((Map) hashMap2.computeIfAbsent((String) entry.getKey(), str4 -> {
                        return MAP_SPLITTER.split((CharSequence) entry.getKey());
                    }), map3)) {
                        CacheClusterSetObject cacheClusterSet = value.getCacheClusterSet();
                        cacheClusterSet.setSetName(setName);
                        List<CacheCluster> list = (List) entry.getValue();
                        cacheClusterSet.setType(((CacheCluster) list.get(0)).getType());
                        List cacheClusters = cacheClusterSet.getCacheClusters();
                        for (CacheCluster cacheCluster2 : list) {
                            CacheClusterObject cacheClusterObject = new CacheClusterObject();
                            cacheClusterObject.setName(cacheCluster2.getClusterName());
                            cacheClusterObject.setZone(cacheCluster2.getExtTag());
                            cacheClusterObject.setToken(cacheCluster2.getToken());
                            cacheClusterObject.setParams(fromJsonToMap(cacheCluster2.getExtParams()));
                            cacheClusters.add(cacheClusterObject);
                        }
                    }
                }
            }
        }
    }

    private Map<String, String> fromJsonToMap(String str) {
        HashMap hashMap = new HashMap();
        if (StringUtils.isBlank(str)) {
            return hashMap;
        }
        try {
            JsonObject jsonObject = (JsonObject) GSON.fromJson(str, JsonObject.class);
            for (String str2 : jsonObject.keySet()) {
                JsonElement jsonElement = jsonObject.get(str2);
                if (jsonElement.isJsonPrimitive()) {
                    hashMap.put(str2, jsonElement.getAsString());
                } else {
                    hashMap.put(str2, jsonElement.toString());
                }
            }
        } catch (Exception e) {
            LOGGER.error("parse json string to map error", e);
        }
        return hashMap;
    }

    private JsonObject fromJsonToJson(String str) {
        if (StringUtils.isBlank(str)) {
            return new JsonObject();
        }
        try {
            return (JsonObject) GSON.fromJson(str, JsonObject.class);
        } catch (Exception e) {
            LOGGER.error("parse json string to json object error", e);
            return new JsonObject();
        }
    }

    private void reloadInlongId(Map<String, DataProxyCluster> map) {
        HashMap hashMap = new HashMap();
        this.clusterSetMapper.selectInlongGroupId().forEach(inlongGroupId -> {
            hashMap.put(inlongGroupId.getInlongGroupId(), inlongGroupId);
        });
        HashMap hashMap2 = new HashMap();
        hashMap.forEach((str, inlongGroupId2) -> {
            hashMap2.put(str, fromJsonToMap(inlongGroupId2.getExtParams()));
        });
        this.sortConfigLoader.loadGroupBackupInfo(KEY_BACKUP_CLUSTER_TAG).forEach(inlongGroupExtEntity -> {
            ((Map) hashMap2.computeIfAbsent(inlongGroupExtEntity.getInlongGroupId(), str2 -> {
                return new HashMap();
            })).put(KEY_BACKUP_CLUSTER_TAG, inlongGroupExtEntity.getKeyValue());
        });
        HashMap hashMap3 = new HashMap();
        this.clusterSetMapper.selectInlongStreamId().forEach(inlongStreamId -> {
            hashMap3.put(getInlongId(inlongStreamId.getInlongGroupId(), inlongStreamId.getInlongStreamId()), inlongStreamId);
        });
        HashMap hashMap4 = new HashMap();
        hashMap3.forEach((str2, inlongStreamId2) -> {
            hashMap4.put(str2, fromJsonToMap(inlongStreamId2.getExtParams()));
        });
        this.sortConfigLoader.loadStreamBackupInfo("backup_mq_resource").forEach(inlongStreamExtEntity -> {
            ((Map) hashMap4.computeIfAbsent(getInlongId(inlongStreamExtEntity.getInlongGroupId(), inlongStreamExtEntity.getInlongStreamId()), str3 -> {
                return new HashMap();
            })).put("backup_mq_resource", inlongStreamExtEntity.getKeyValue());
        });
        Map<String, List<InLongIdObject>> parseInlongId = parseInlongId(hashMap, hashMap2, hashMap3, hashMap4);
        for (Map.Entry<String, DataProxyCluster> entry : map.entrySet()) {
            List<InLongIdObject> list = parseInlongId.get(entry.getValue().getProxyCluster().getSetName());
            if (list != null) {
                entry.getValue().getProxyCluster().getInlongIds().addAll(list);
            }
        }
    }

    private Map<String, List<InLongIdObject>> parseInlongId(Map<String, InlongGroupId> map, Map<String, Map<String, String>> map2, Map<String, InlongStreamId> map3, Map<String, Map<String, String>> map4) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, InlongStreamId> entry : map3.entrySet()) {
            InlongStreamId value = entry.getValue();
            String inlongGroupId = value.getInlongGroupId();
            InlongGroupId inlongGroupId2 = map.get(inlongGroupId);
            if (inlongGroupId == null || inlongGroupId2 == null) {
                LOGGER.debug("groupId {} or groupIdObj {} is null, ignored", inlongGroupId, inlongGroupId2);
            } else {
                InLongIdObject inLongIdObject = new InLongIdObject();
                String key = entry.getKey();
                inLongIdObject.setInlongId(key);
                Optional.ofNullable(map2.get(inlongGroupId)).ifPresent(map5 -> {
                    inLongIdObject.getParams().putAll(map5);
                });
                Optional.ofNullable(map4.get(key)).ifPresent(map6 -> {
                    inLongIdObject.getParams().putAll(map6);
                });
                if (StringUtils.isBlank(value.getTopic())) {
                    inLongIdObject.setTopic(inlongGroupId2.getTopic());
                } else {
                    inLongIdObject.setTopic(value.getTopic());
                    inLongIdObject.getParams().put(KEY_NAMESPACE, inlongGroupId2.getTopic());
                }
                ((List) hashMap.computeIfAbsent(inlongGroupId2.getClusterTag(), str -> {
                    return new ArrayList();
                })).add(inLongIdObject);
                InLongIdObject inLongIdObject2 = new InLongIdObject();
                inLongIdObject2.setInlongId(key);
                inLongIdObject2.getParams().putAll(inLongIdObject.getParams());
                Map<String, String> map7 = map2.get(inlongGroupId);
                if (map7 != null && map7.containsKey(KEY_BACKUP_CLUSTER_TAG) && map7.containsKey("backup_mq_resource")) {
                    String str2 = map7.get(KEY_BACKUP_CLUSTER_TAG);
                    String str3 = map7.get("backup_mq_resource");
                    Map<String, String> map8 = map4.get(key);
                    if (map8 == null || StringUtils.isBlank(map8.get("backup_mq_resource"))) {
                        inLongIdObject2.setTopic(str3);
                    } else {
                        inLongIdObject2.setTopic(map8.get("backup_mq_resource"));
                        inLongIdObject2.getParams().put(KEY_NAMESPACE, str3);
                    }
                    ((List) hashMap.computeIfAbsent(str2, str4 -> {
                        return new ArrayList();
                    })).add(inLongIdObject2);
                }
            }
        }
        return hashMap;
    }

    private String getInlongId(String str, String str2) {
        return str + "." + str2;
    }

    private void setReloadTimer() {
        new Timer(true).scheduleAtFixedRate((TimerTask) new RepositoryTimerTask(this), this.reloadInterval, this.reloadInterval);
    }

    private void generateClusterJson(Map<String, DataProxyCluster> map) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
        for (Map.Entry<String, DataProxyCluster> entry : map.entrySet()) {
            DataProxyCluster value = entry.getValue();
            String md5Hex = DigestUtils.md5Hex(GSON.toJson(value));
            DataProxyConfigResponse dataProxyConfigResponse = new DataProxyConfigResponse();
            dataProxyConfigResponse.setResult(true);
            dataProxyConfigResponse.setErrCode(0);
            dataProxyConfigResponse.setMd5(md5Hex);
            dataProxyConfigResponse.setData(value);
            concurrentHashMap.put(entry.getKey(), GSON.toJson(dataProxyConfigResponse));
            concurrentHashMap2.put(entry.getKey(), md5Hex);
        }
        this.proxyConfigJson = concurrentHashMap;
        this.proxyMd5Map = concurrentHashMap2;
    }

    private boolean isSubTag(Map<String, String> map, Map<String, String> map2) {
        for (Map.Entry<String, String> entry : map2.entrySet()) {
            String str = map.get(entry.getKey());
            if (str == null || !str.equals(entry.getValue())) {
                return false;
            }
        }
        return true;
    }

    public String getProxyMd5(String str) {
        return this.proxyMd5Map.get(str);
    }

    public String getProxyConfigJson(String str) {
        return this.proxyConfigJson.get(str);
    }

    public String changeClusterTag(String str, String str2, String str3) {
        try {
            InlongGroupEntity selectByGroupId = this.inlongGroupMapper.selectByGroupId(str);
            if (selectByGroupId == null) {
                throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
            }
            String inlongClusterTag = selectByGroupId.getInlongClusterTag();
            if (StringUtils.equals(inlongClusterTag, str2)) {
                return "Cluster tag is same.";
            }
            InlongGroupEntity prepareClusterTagGroup = prepareClusterTagGroup(selectByGroupId, str2, str3);
            HashMap hashMap = new HashMap();
            List<InlongClusterEntity> selectByCondition = this.clusterMapper.selectByCondition(new ClusterPageRequest());
            selectByCondition.forEach(inlongClusterEntity -> {
                hashMap.put(inlongClusterEntity.getName(), inlongClusterEntity);
            });
            SinkPageRequest sinkPageRequest = new SinkPageRequest();
            sinkPageRequest.setInlongGroupId(str);
            List<StreamSinkEntity> selectByCondition2 = this.streamSinkMapper.selectByCondition(sinkPageRequest);
            ArrayList arrayList = new ArrayList();
            for (StreamSinkEntity streamSinkEntity : selectByCondition2) {
                InlongClusterEntity inlongClusterEntity2 = (InlongClusterEntity) hashMap.get(streamSinkEntity.getInlongClusterName());
                if (inlongClusterEntity2 != null && StringUtils.equals(inlongClusterTag, inlongClusterEntity2.getClusterTags())) {
                    StreamSinkEntity createNewStreamSink = createNewStreamSink(selectByCondition, inlongClusterEntity2.getType(), str2, streamSinkEntity);
                    if (createNewStreamSink != null) {
                        arrayList.add(createNewStreamSink);
                    }
                }
            }
            arrayList.forEach(streamSinkEntity2 -> {
                this.streamSinkMapper.insert(streamSinkEntity2);
            });
            if (this.inlongGroupMapper.updateByIdentifierSelective(prepareClusterTagGroup) == InlongConstants.AFFECTED_ONE_ROW.intValue()) {
                return str;
            }
            LOGGER.error("inlong group has already updated with group id={}, curVersion={}", prepareClusterTagGroup.getInlongGroupId(), prepareClusterTagGroup.getVersion());
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
            return e.getMessage();
        }
    }

    private StreamSinkEntity createNewStreamSink(List<InlongClusterEntity> list, String str, String str2, StreamSinkEntity streamSinkEntity) {
        for (InlongClusterEntity inlongClusterEntity : list) {
            if (StringUtils.equals(str, inlongClusterEntity.getType()) && StringUtils.equals(str2, inlongClusterEntity.getClusterTags())) {
                JsonObject fromJsonToJson = fromJsonToJson(inlongClusterEntity.getExtParams());
                if (!fromJsonToJson.has(KEY_SINK_NAME) || !fromJsonToJson.has(KEY_SORT_TASK_NAME) || !fromJsonToJson.has(KEY_DATA_NODE_NAME) || !fromJsonToJson.has(KEY_SORT_CONSUMER_GROUP)) {
                    return null;
                }
                String asString = fromJsonToJson.get(KEY_SINK_NAME).getAsString();
                String asString2 = fromJsonToJson.get(KEY_SORT_TASK_NAME).getAsString();
                String asString3 = fromJsonToJson.get(KEY_DATA_NODE_NAME).getAsString();
                String asString4 = fromJsonToJson.get(KEY_SORT_CONSUMER_GROUP).getAsString();
                StreamSinkEntity copyStreamSink = copyStreamSink(streamSinkEntity);
                copyStreamSink.setInlongClusterName(inlongClusterEntity.getName());
                copyStreamSink.setSinkName(asString);
                copyStreamSink.setSortTaskName(asString2);
                copyStreamSink.setDataNodeName(asString3);
                copyStreamSink.setSortConsumerGroup(asString4);
                return copyStreamSink;
            }
        }
        return null;
    }

    private StreamSinkEntity copyStreamSink(StreamSinkEntity streamSinkEntity) {
        StreamSinkEntity streamSinkEntity2 = new StreamSinkEntity();
        CommonBeanUtils.copyProperties(streamSinkEntity, streamSinkEntity2);
        streamSinkEntity2.setId((Integer) null);
        streamSinkEntity2.setModifyTime(new Date());
        return streamSinkEntity2;
    }

    private InlongGroupEntity prepareClusterTagGroup(InlongGroupEntity inlongGroupEntity, String str, String str2) throws IllegalAccessException, InvocationTargetException {
        String extParams = inlongGroupEntity.getExtParams();
        if (StringUtils.isEmpty(extParams)) {
            extParams = "{}";
        }
        JsonObject fromJsonToJson = fromJsonToJson(extParams);
        fromJsonToJson.addProperty(KEY_BACKUP_CLUSTER_TAG, inlongGroupEntity.getInlongClusterTag());
        fromJsonToJson.addProperty(KEY_BACKUP_TOPIC, inlongGroupEntity.getMqResource());
        InlongGroupEntity inlongGroupEntity2 = new InlongGroupEntity();
        BeanUtils.copyProperties(inlongGroupEntity2, inlongGroupEntity);
        inlongGroupEntity2.setId((Integer) null);
        inlongGroupEntity2.setInlongClusterTag(str);
        inlongGroupEntity2.setMqResource(str2);
        inlongGroupEntity2.setExtParams(fromJsonToJson.toString());
        return inlongGroupEntity2;
    }

    public String removeBackupClusterTag(String str) {
        InlongGroupEntity selectByGroupId = this.inlongGroupMapper.selectByGroupId(str);
        if (selectByGroupId == null) {
            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
        }
        String extParams = selectByGroupId.getExtParams();
        if (StringUtils.isEmpty(extParams)) {
            return str;
        }
        JsonObject fromJsonToJson = fromJsonToJson(extParams);
        if (!fromJsonToJson.has(KEY_BACKUP_CLUSTER_TAG)) {
            return str;
        }
        String asString = fromJsonToJson.get(KEY_BACKUP_CLUSTER_TAG).getAsString();
        fromJsonToJson.remove(KEY_BACKUP_CLUSTER_TAG);
        fromJsonToJson.remove(KEY_BACKUP_TOPIC);
        selectByGroupId.setExtParams(fromJsonToJson.toString());
        if (this.inlongGroupMapper.updateByIdentifierSelective(selectByGroupId) != InlongConstants.AFFECTED_ONE_ROW.intValue()) {
            LOGGER.error("inlong group has already updated with group id={}, curVersion={}", selectByGroupId.getInlongGroupId(), selectByGroupId.getVersion());
            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
        }
        HashMap hashMap = new HashMap();
        this.clusterMapper.selectByCondition(new ClusterPageRequest()).forEach(inlongClusterEntity -> {
            hashMap.put(inlongClusterEntity.getName(), inlongClusterEntity);
        });
        SinkPageRequest sinkPageRequest = new SinkPageRequest();
        sinkPageRequest.setInlongGroupId(str);
        List<StreamSinkEntity> selectByCondition = this.streamSinkMapper.selectByCondition(sinkPageRequest);
        ArrayList arrayList = new ArrayList();
        for (StreamSinkEntity streamSinkEntity : selectByCondition) {
            InlongClusterEntity inlongClusterEntity2 = (InlongClusterEntity) hashMap.get(streamSinkEntity.getInlongClusterName());
            if (inlongClusterEntity2 != null && StringUtils.equals(asString, inlongClusterEntity2.getClusterTags())) {
                arrayList.add(streamSinkEntity);
            }
        }
        arrayList.forEach(streamSinkEntity2 -> {
            this.streamSinkMapper.deleteById(streamSinkEntity2.getId());
        });
        return str;
    }
}
