/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.common.reporpter;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Instant;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.inlong.common.reporpter.ConfigLogTypeEnum;
import org.apache.inlong.common.reporpter.StreamConfigLogReporter;
import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamConfigLogMetric
implements Runnable {
    public static final Logger LOGGER = LoggerFactory.getLogger(StreamConfigLogMetric.class);
    public static final String CONFIG_LOG_REPORT_ENABLE = "report.config.log.enable";
    public static final String CONFIG_LOG_REPORT_SERVER_URL = "report.config.log.server.url";
    public static final String CONFIG_LOG_REPORT_INTERVAL = "report.config.log.interval";
    public static final String CONFIG_LOG_REPORT_CLIENT_VERSION = "report.config.log.client.version";
    public static final String CONFIG_LOG_PULSAR_PRODUCER = "pulsar-producer";
    public static final String CONFIG_LOG_PULSAR_CLIENT = "pulsar-client";
    private StreamConfigLogReporter streamConfigLogReporter;
    private String moduleName;
    private String clientVersion;
    private String localIp;
    private long reportInterval;
    public ConcurrentHashMap<String, StreamConfigLogInfo> dataCacheMap = new ConcurrentHashMap();
    private static ScheduledExecutorService statExecutor = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setNameFormat("StreamConfigLogMetric-Report").setUncaughtExceptionHandler((t, e) -> LOGGER.error(t.getName() + " has an uncaught exception: ", e)).build());

    public StreamConfigLogMetric(String moduleName, String serverUrl, long reportInterval, String localIp, String clientVersion) {
        this.streamConfigLogReporter = new StreamConfigLogReporter(serverUrl);
        this.reportInterval = reportInterval;
        this.moduleName = moduleName;
        this.localIp = localIp;
        this.clientVersion = clientVersion;
        statExecutor.scheduleWithFixedDelay(this, reportInterval, reportInterval, TimeUnit.MILLISECONDS);
    }

    public void updateConfigLog(String inlongGroupId, String inlongStreamId, String configName, ConfigLogTypeEnum configLogTypeEnum, String log) {
        String key = this.moduleName + "-" + inlongGroupId + "-" + inlongStreamId + "-" + configName;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("updateConfigLog key = {}", (Object)key);
        }
        this.dataCacheMap.compute(key, (k, v) -> {
            if (v == null) {
                v = new StreamConfigLogInfo();
            }
            this.updateDataValue((StreamConfigLogInfo)v, inlongGroupId, inlongStreamId, configName, configLogTypeEnum, log);
            return v;
        });
    }

    private void updateDataValue(StreamConfigLogInfo streamConfigLogInfo, String inlongGroupId, String inlongStreamId, String configName, ConfigLogTypeEnum configLogTypeEnum, String log) {
        streamConfigLogInfo.setComponentName(this.moduleName);
        streamConfigLogInfo.setConfigName(configName);
        streamConfigLogInfo.setInlongGroupId(inlongGroupId);
        streamConfigLogInfo.setInlongStreamId(inlongStreamId);
        streamConfigLogInfo.setIp(this.localIp);
        streamConfigLogInfo.setVersion(this.clientVersion);
        streamConfigLogInfo.setLogInfo(log);
        streamConfigLogInfo.setReportTime(Instant.now().toEpochMilli());
        streamConfigLogInfo.setLogType(configLogTypeEnum.getType());
    }

    @Override
    public void run() {
        try {
            Set<Map.Entry<String, StreamConfigLogInfo>> set = this.dataCacheMap.entrySet();
            long currentTimeMills = Instant.now().toEpochMilli();
            for (Map.Entry<String, StreamConfigLogInfo> entry : set) {
                StreamConfigLogInfo streamConfigLogInfo = entry.getValue();
                if (currentTimeMills - streamConfigLogInfo.getReportTime() < this.reportInterval) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Report metric data config key = {}!", (Object)streamConfigLogInfo.getConfigName());
                    }
                    this.streamConfigLogReporter.asyncReportData(streamConfigLogInfo);
                    continue;
                }
                this.dataCacheMap.remove(entry.getKey());
                LOGGER.info("Remove expired config key {}", (Object)entry.getKey());
            }
        }
        catch (Exception e) {
            LOGGER.error("Report streamConfigLogMetric has exception = {}", (Throwable)e);
        }
    }
}

