/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.core.impl;

import com.google.gson.Gson;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.sort.standalone.SortIdInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortSinkInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
import org.apache.inlong.manager.service.core.SortClusterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Lazy
@Service
public class SortClusterServiceImpl
implements SortClusterService {
    private static final Logger LOGGER = LoggerFactory.getLogger(SortClusterServiceImpl.class);
    private static final Gson GSON = new Gson();
    private static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 60000L;
    private static final int RESPONSE_CODE_SUCCESS = 0;
    private static final int RESPONSE_CODE_NO_UPDATE = 1;
    private static final int RESPONSE_CODE_FAIL = -1;
    private static final int RESPONSE_CODE_REQ_PARAMS_ERROR = -101;
    private static final String KEY_GROUP_ID = "inlongGroupId";
    private static final String KEY_STREAM_ID = "inlongStreamId";
    private Map<String, String> sortClusterMd5Map = new ConcurrentHashMap<String, String>();
    private Map<String, SortClusterConfig> sortClusterConfigMap = new ConcurrentHashMap<String, SortClusterConfig>();
    private Map<String, String> sortClusterErrorLogMap = new ConcurrentHashMap<String, String>();
    private long reloadInterval;
    @Autowired
    private StreamSinkEntityMapper streamSinkEntityMapper;
    @Autowired
    private DataNodeEntityMapper dataNodeEntityMapper;

    @PostConstruct
    public void initialize() {
        LOGGER.info("create repository for " + SortClusterServiceImpl.class.getSimpleName());
        try {
            this.reloadInterval = 60000L;
            this.reload();
            this.setReloadTimer();
        }
        catch (Throwable t) {
            LOGGER.error("Initialize SortClusterConfigRepository error", t);
        }
    }

    @Transactional(rollbackFor={Exception.class})
    public void reload() {
        LOGGER.debug("start to reload sort config");
        try {
            this.reloadAllClusterConfig();
        }
        catch (Throwable t) {
            LOGGER.error(t.getMessage(), t);
        }
        LOGGER.debug("end to reload config");
    }

    @Override
    public SortClusterResponse getClusterConfig(String clusterName, String md5) {
        if (StringUtils.isBlank((CharSequence)clusterName)) {
            String errMsg = "Blank cluster name, return nothing";
            LOGGER.info(errMsg);
            return SortClusterResponse.builder().msg(errMsg).code(-101).build();
        }
        if (this.sortClusterErrorLogMap.get(clusterName) != null) {
            return SortClusterResponse.builder().msg(this.sortClusterErrorLogMap.get(clusterName)).code(-1).build();
        }
        if (this.sortClusterConfigMap.get(clusterName) == null) {
            String errMsg = "There is not config for cluster " + clusterName;
            LOGGER.info(errMsg);
            return SortClusterResponse.builder().msg(errMsg).code(0).build();
        }
        if (this.sortClusterMd5Map.get(clusterName).equals(md5)) {
            return SortClusterResponse.builder().msg("No update").code(1).md5(md5).build();
        }
        return SortClusterResponse.builder().msg("Success").code(0).data(this.sortClusterConfigMap.get(clusterName)).md5(this.sortClusterMd5Map.get(clusterName)).build();
    }

    private void reloadAllClusterConfig() {
        List tasks = this.streamSinkEntityMapper.selectAllTasks();
        Map<String, List<SortTaskInfo>> clusterTaskMap = tasks.stream().filter(dto -> dto.getSortClusterName() != null).collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName));
        List idParams = this.streamSinkEntityMapper.selectAllIdParams();
        Map<String, List<SortIdInfo>> taskIdParamMap = idParams.stream().filter(dto -> dto.getSortTaskName() != null).collect(Collectors.groupingBy(SortIdInfo::getSortTaskName));
        List sinkParams = this.dataNodeEntityMapper.selectAllSinkParams();
        Map<String, SortSinkInfo> taskSinkParamMap = sinkParams.stream().filter(dto -> dto.getName() != null).collect(Collectors.toMap(SortSinkInfo::getName, param -> param));
        ConcurrentHashMap<String, SortClusterConfig> newConfigMap = new ConcurrentHashMap<String, SortClusterConfig>();
        ConcurrentHashMap<String, String> newMd5Map = new ConcurrentHashMap<String, String>();
        ConcurrentHashMap<String, String> newErrorLogMap = new ConcurrentHashMap<String, String>();
        clusterTaskMap.forEach((clusterName, taskList) -> {
            try {
                SortClusterConfig clusterConfig = this.getConfigByClusterName((String)clusterName, (List<SortTaskInfo>)taskList, taskIdParamMap, taskSinkParamMap);
                String jsonStr = GSON.toJson((Object)clusterConfig);
                String md5 = DigestUtils.md5Hex((String)jsonStr);
                newConfigMap.put((String)clusterName, clusterConfig);
                newMd5Map.put((String)clusterName, md5);
            }
            catch (Throwable e) {
                newErrorLogMap.put((String)clusterName, e.getMessage());
                LOGGER.error("Failed to update cluster config of {}, error is {}", clusterName, (Object)e.getMessage());
                LOGGER.error(e.getMessage(), e);
            }
        });
        this.sortClusterErrorLogMap = newErrorLogMap;
        this.sortClusterConfigMap = newConfigMap;
        this.sortClusterMd5Map = newMd5Map;
    }

    private SortClusterConfig getConfigByClusterName(String clusterName, List<SortTaskInfo> tasks, Map<String, List<SortIdInfo>> taskIdParamMap, Map<String, SortSinkInfo> taskSinkParamMap) {
        List taskConfigs = tasks.stream().map(task -> {
            String taskName = task.getSortTaskName();
            String type = task.getSinkType();
            List idParams = (List)taskIdParamMap.get(taskName);
            SortSinkInfo sinkParams = (SortSinkInfo)taskSinkParamMap.get(task.getDataNodeName());
            return this.getTaskConfig(taskName, type, idParams, sinkParams);
        }).filter(Objects::nonNull).collect(Collectors.toList());
        return SortClusterConfig.builder().clusterName(clusterName).sortTasks(taskConfigs).build();
    }

    private SortTaskConfig getTaskConfig(String taskName, String type, List<SortIdInfo> idParams, SortSinkInfo sinkParams) {
        if (idParams == null || sinkParams == null) {
            return null;
        }
        if (!type.equalsIgnoreCase(sinkParams.getType())) {
            throw new IllegalArgumentException(String.format("task type %s and sink type %s are not identical for task name %s", type, sinkParams.getType(), taskName));
        }
        return SortTaskConfig.builder().name(taskName).type(type).idParams(this.parseIdParams(idParams)).sinkParams(this.parseSinkParams(sinkParams)).build();
    }

    private List<Map<String, String>> parseIdParams(List<SortIdInfo> rowIdParams) {
        return rowIdParams.stream().map(row -> {
            Map param = (Map)GSON.fromJson(row.getExtParams(), HashMap.class);
            param.put(KEY_GROUP_ID, row.getInlongGroupId());
            param.put(KEY_STREAM_ID, row.getInlongStreamId());
            return param;
        }).collect(Collectors.toList());
    }

    private Map<String, String> parseSinkParams(SortSinkInfo rowSinkParams) {
        return (Map)GSON.fromJson(rowSinkParams.getExtParams(), HashMap.class);
    }

    private void setReloadTimer() {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(this::reload, this.reloadInterval, this.reloadInterval, TimeUnit.MILLISECONDS);
    }
}

