package org.apache.inlong.manager.service.source;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:org/apache/inlong/manager/service/source/SourceSnapshotOperator.class */
public class SourceSnapshotOperator implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(SourceSnapshotOperator.class);

    @Autowired
    private StreamSourceEntityMapper sourceMapper;
    private final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, new ArrayBlockingQueue(100), new ThreadFactoryBuilder().setNameFormat("stream-source-snapshot-%s").build(), new ThreadPoolExecutor.CallerRunsPolicy());
    private Cache<String, ConcurrentHashMap<Integer, Integer>> agentTaskCache = CacheBuilder.newBuilder().maximumSize(1000).expireAfterWrite(30, TimeUnit.SECONDS).build(new CacheLoader<String, ConcurrentHashMap<Integer, Integer>>() { // from class: org.apache.inlong.manager.service.source.SourceSnapshotOperator.1
        public ConcurrentHashMap<Integer, Integer> load(String str) {
            List<StreamSourceEntity> selectByAgentIp = SourceSnapshotOperator.this.sourceMapper.selectByAgentIp(str);
            if (CollectionUtils.isEmpty(selectByAgentIp)) {
                return null;
            }
            ConcurrentHashMap<Integer, Integer> concurrentHashMap = new ConcurrentHashMap<>();
            for (StreamSourceEntity streamSourceEntity : selectByAgentIp) {
                concurrentHashMap.put(streamSourceEntity.getId(), streamSourceEntity.getStatus());
            }
            return concurrentHashMap;
        }
    });
    private LinkedBlockingQueue<TaskSnapshotRequest> snapshotQueue = null;

    @Value("${stream.source.snapshot.queue.size:10000}")
    private int queueSize = 10000;
    private volatile boolean isClose = false;

    /* loaded from: input_file:org/apache/inlong/manager/service/source/SourceSnapshotOperator$SaveSnapshotTaskRunnable.class */
    private class SaveSnapshotTaskRunnable implements Runnable {
        private SaveSnapshotTaskRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!SourceSnapshotOperator.this.isClose) {
                try {
                    TaskSnapshotRequest taskSnapshotRequest = (TaskSnapshotRequest) SourceSnapshotOperator.this.snapshotQueue.poll(1L, TimeUnit.SECONDS);
                    if (taskSnapshotRequest != null && !CollectionUtils.isEmpty(taskSnapshotRequest.getSnapshotList())) {
                        for (TaskSnapshotMessage taskSnapshotMessage : taskSnapshotRequest.getSnapshotList()) {
                            Integer jobId = taskSnapshotMessage.getJobId();
                            StreamSourceEntity streamSourceEntity = new StreamSourceEntity();
                            streamSourceEntity.setId(jobId);
                            streamSourceEntity.setSnapshot(taskSnapshotMessage.getSnapshot());
                            streamSourceEntity.setReportTime(taskSnapshotRequest.getReportTime());
                            SourceSnapshotOperator.this.sourceMapper.updateSnapshot(streamSourceEntity);
                        }
                    }
                } catch (Throwable th) {
                    SourceSnapshotOperator.LOGGER.error("source snapshot task runnable error", th);
                }
            }
        }
    }

    @PostConstruct
    private void startSaveSnapshotTask() {
        if (this.snapshotQueue == null) {
            this.snapshotQueue = new LinkedBlockingQueue<>(this.queueSize);
        }
        this.executorService.execute(new SaveSnapshotTaskRunnable());
        LOGGER.info("source snapshot operate thread started successfully");
    }

    public Boolean snapshot(TaskSnapshotRequest taskSnapshotRequest) {
        if (taskSnapshotRequest == null) {
            return true;
        }
        String agentIp = taskSnapshotRequest.getAgentIp();
        List snapshotList = taskSnapshotRequest.getSnapshotList();
        if (CollectionUtils.isEmpty(snapshotList)) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("receive snapshot from ip={}, but snapshot list is empty", agentIp);
            }
            return true;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("receive snapshot from ip={}, msg size={}", agentIp, Integer.valueOf(snapshotList.size()));
        }
        try {
            this.snapshotQueue.offer(taskSnapshotRequest);
            ConcurrentHashMap concurrentHashMap = (ConcurrentHashMap) this.agentTaskCache.getIfPresent(agentIp);
            if (MapUtils.isEmpty(concurrentHashMap)) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("success report snapshot for ip={}, task status cache is null", agentIp);
                }
                return true;
            }
            boolean z = false;
            Iterator it = snapshotList.iterator();
            while (it.hasNext()) {
                Integer jobId = ((TaskSnapshotMessage) it.next()).getJobId();
                if (jobId != null) {
                    if (SourceStatus.TEMP_TO_NORMAL.contains((Integer) concurrentHashMap.get(jobId))) {
                        z = true;
                        this.sourceMapper.updateStatus(jobId, SourceStatus.SOURCE_NORMAL.getCode(), false);
                    }
                }
            }
            if (z) {
                this.agentTaskCache.invalidate(agentIp);
            }
            return true;
        } catch (Throwable th) {
            LOGGER.error("put source snapshot error", th);
            return false;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.isClose = true;
    }
}
