/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.core.impl;

import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortFieldInfo;
import org.apache.inlong.manager.pojo.sort.standalone.SortTaskInfo;
import org.apache.inlong.manager.service.core.SortClusterService;
import org.apache.inlong.manager.service.core.SortConfigLoader;
import org.apache.inlong.manager.service.node.DataNodeOperator;
import org.apache.inlong.manager.service.node.DataNodeOperatorFactory;
import org.apache.inlong.manager.service.sink.SinkOperatorFactory;
import org.apache.inlong.manager.service.sink.StreamSinkOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Lazy
@Service
public class SortClusterServiceImpl
implements SortClusterService {
    private static final Logger LOGGER = LoggerFactory.getLogger(SortClusterServiceImpl.class);
    private static final Gson GSON = new Gson();
    private static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 60000L;
    private static final int RESPONSE_CODE_SUCCESS = 0;
    private static final int RESPONSE_CODE_NO_UPDATE = 1;
    private static final int RESPONSE_CODE_FAIL = -1;
    private static final int RESPONSE_CODE_REQ_PARAMS_ERROR = -101;
    private static final String KEY_GROUP_ID = "inlongGroupId";
    private static final String KEY_STREAM_ID = "inlongStreamId";
    private Map<String, List<String>> fieldMap;
    private Map<String, String> sortClusterMd5Map = new ConcurrentHashMap<String, String>();
    private Map<String, SortClusterConfig> sortClusterConfigMap = new ConcurrentHashMap<String, SortClusterConfig>();
    private Map<String, String> sortClusterErrorLogMap = new ConcurrentHashMap<String, String>();
    private long reloadInterval;
    @Autowired
    private SortConfigLoader sortConfigLoader;
    @Autowired
    private SinkOperatorFactory sinkOperatorFactory;
    @Autowired
    private DataNodeOperatorFactory dataNodeOperatorFactory;

    @PostConstruct
    public void initialize() {
        LOGGER.info("create repository for " + SortClusterServiceImpl.class.getSimpleName());
        try {
            this.reloadInterval = 60000L;
            this.reload();
            this.setReloadTimer();
        }
        catch (Throwable t) {
            LOGGER.error("Initialize SortClusterConfigRepository error", t);
        }
    }

    @Transactional(rollbackFor={Exception.class})
    public void reload() {
        LOGGER.debug("start to reload sort config");
        try {
            this.reloadAllClusterConfigV2();
        }
        catch (Throwable t) {
            LOGGER.error(t.getMessage(), t);
        }
        LOGGER.debug("end to reload config");
    }

    @Override
    public SortClusterResponse getClusterConfig(String clusterName, String md5) {
        if (StringUtils.isBlank((CharSequence)clusterName)) {
            String errMsg = "Blank cluster name, return nothing";
            LOGGER.info(errMsg);
            return SortClusterResponse.builder().msg(errMsg).code(-101).build();
        }
        if (this.sortClusterErrorLogMap.get(clusterName) != null) {
            return SortClusterResponse.builder().msg(this.sortClusterErrorLogMap.get(clusterName)).code(-1).build();
        }
        if (this.sortClusterConfigMap.get(clusterName) == null) {
            String errMsg = "There is not config for cluster " + clusterName;
            LOGGER.info(errMsg);
            return SortClusterResponse.builder().msg(errMsg).code(0).build();
        }
        if (this.sortClusterMd5Map.get(clusterName).equals(md5)) {
            return SortClusterResponse.builder().msg("No update").code(1).md5(md5).build();
        }
        return SortClusterResponse.builder().msg("Success").code(0).data(this.sortClusterConfigMap.get(clusterName)).md5(this.sortClusterMd5Map.get(clusterName)).build();
    }

    private void reloadAllClusterConfigV2() {
        List<SortFieldInfo> fieldInfos = this.sortConfigLoader.loadAllFields();
        this.fieldMap = new HashMap<String, List<String>>();
        fieldInfos.forEach(info -> {
            List fields = this.fieldMap.computeIfAbsent(info.getInlongGroupId(), k -> new ArrayList());
            fields.add(info.getFieldName());
        });
        List<StreamSinkEntity> sinkEntities = this.sortConfigLoader.loadAllStreamSinkEntity();
        List<SortTaskInfo> tasks = this.sortConfigLoader.loadAllTask();
        Map<String, List<SortTaskInfo>> clusterTaskMap = tasks.stream().filter(dto -> dto.getSortClusterName() != null).collect(Collectors.groupingBy(SortTaskInfo::getSortClusterName));
        Map<String, List<StreamSinkEntity>> task2AllStreams = sinkEntities.stream().filter(entity -> StringUtils.isNotBlank((CharSequence)entity.getInlongClusterName())).collect(Collectors.groupingBy(StreamSinkEntity::getSinkName));
        List<DataNodeEntity> dataNodeEntities = this.sortConfigLoader.loadAllDataNodeEntity();
        Map<String, DataNodeInfo> task2DataNodeMap = dataNodeEntities.stream().filter(entity -> StringUtils.isNotBlank((CharSequence)entity.getName())).map(entity -> {
            DataNodeOperator operator = this.dataNodeOperatorFactory.getInstance(entity.getType());
            return operator.getFromEntity((DataNodeEntity)entity);
        }).collect(Collectors.toMap(DataNodeInfo::getName, info -> info));
        ConcurrentHashMap<String, SortClusterConfig> newConfigMap = new ConcurrentHashMap<String, SortClusterConfig>();
        ConcurrentHashMap<String, String> newMd5Map = new ConcurrentHashMap<String, String>();
        ConcurrentHashMap<String, String> newErrorLogMap = new ConcurrentHashMap<String, String>();
        clusterTaskMap.forEach((clusterName, taskList) -> {
            try {
                SortClusterConfig config = this.getConfigByClusterNameV2((String)clusterName, (List<SortTaskInfo>)taskList, task2AllStreams, task2DataNodeMap);
                String jsonStr = GSON.toJson((Object)config);
                String md5 = DigestUtils.md5Hex((String)jsonStr);
                newConfigMap.put((String)clusterName, config);
                newMd5Map.put((String)clusterName, md5);
            }
            catch (Throwable e) {
                newErrorLogMap.put((String)clusterName, e.getMessage());
                LOGGER.error("Failed to update cluster config={}, error={}", clusterName, (Object)e.getMessage());
            }
        });
        this.sortClusterErrorLogMap = newErrorLogMap;
        this.sortClusterConfigMap = newConfigMap;
        this.sortClusterMd5Map = newMd5Map;
    }

    private SortClusterConfig getConfigByClusterNameV2(String clusterName, List<SortTaskInfo> tasks, Map<String, List<StreamSinkEntity>> task2AllStreams, Map<String, DataNodeInfo> task2DataNodeMap) {
        List taskConfigs = tasks.stream().map(task -> {
            String taskName = task.getSortTaskName();
            String type = task.getSinkType();
            String dataNodeName = task.getDataNodeName();
            DataNodeInfo nodeInfo = (DataNodeInfo)task2DataNodeMap.get(dataNodeName);
            List streams = (List)task2AllStreams.get(taskName);
            return SortTaskConfig.builder().name(taskName).type(type).idParams(this.parseIdParamsV2(streams)).sinkParams(this.parseSinkParamsV2(nodeInfo)).build();
        }).collect(Collectors.toList());
        return SortClusterConfig.builder().clusterName(clusterName).sortTasks(taskConfigs).build();
    }

    private List<Map<String, String>> parseIdParamsV2(List<StreamSinkEntity> streams) {
        return streams.stream().map(streamSink -> {
            StreamSinkOperator operator = this.sinkOperatorFactory.getInstance(streamSink.getSinkType());
            List<String> fields = this.fieldMap.get(streamSink.getInlongGroupId());
            return operator.parse2IdParams((StreamSinkEntity)streamSink, fields);
        }).filter(Objects::nonNull).collect(Collectors.toList());
    }

    private Map<String, String> parseSinkParamsV2(DataNodeInfo nodeInfo) {
        DataNodeOperator operator = this.dataNodeOperatorFactory.getInstance(nodeInfo.getType());
        return operator.parse2SinkParams(nodeInfo);
    }

    private void setReloadTimer() {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(this::reload, this.reloadInterval, this.reloadInterval, TimeUnit.MILLISECONDS);
    }
}

