/*
 * Decompiled with CFR 0.152.
 */
package org.apache.inlong.manager.service.source;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.inlong.common.pojo.agent.TaskSnapshotMessage;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
import org.apache.inlong.manager.common.enums.SourceState;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class SourceSnapshotOperation
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(SourceSnapshotOperation.class);
    private final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100), new ThreadFactoryBuilder().setNameFormat("stream-source-snapshot-%s").build(), new ThreadPoolExecutor.CallerRunsPolicy());
    @Autowired
    private StreamSourceEntityMapper sourceMapper;
    private Cache<String, ConcurrentHashMap<Integer, Integer>> agentTaskCache = CacheBuilder.newBuilder().maximumSize(1000L).expireAfterWrite(30L, TimeUnit.SECONDS).build((CacheLoader)new CacheLoader<String, ConcurrentHashMap<Integer, Integer>>(){

        public ConcurrentHashMap<Integer, Integer> load(String agentIp) {
            List sourceEntities = SourceSnapshotOperation.this.sourceMapper.selectByAgentIp(agentIp);
            if (CollectionUtils.isEmpty((Collection)sourceEntities)) {
                return null;
            }
            ConcurrentHashMap<Integer, Integer> tmpMap = new ConcurrentHashMap<Integer, Integer>();
            for (StreamSourceEntity entity : sourceEntities) {
                tmpMap.put(entity.getId(), entity.getStatus());
            }
            return tmpMap;
        }
    });
    private LinkedBlockingQueue<TaskSnapshotRequest> snapshotQueue = null;
    @Value(value="${stream.source.snapshot.queue.size:10000}")
    private int queueSize = 10000;
    private volatile boolean isClose = false;

    @PostConstruct
    private void startSaveSnapshotTask() {
        if (this.snapshotQueue == null) {
            this.snapshotQueue = new LinkedBlockingQueue(this.queueSize);
        }
        SaveSnapshotTaskRunnable taskRunnable = new SaveSnapshotTaskRunnable();
        this.executorService.execute(taskRunnable);
        LOGGER.info("source snapshot operate thread started successfully");
    }

    public Boolean snapshot(TaskSnapshotRequest request) {
        if (request == null) {
            return true;
        }
        String agentIp = request.getAgentIp();
        List snapshotList = request.getSnapshotList();
        if (CollectionUtils.isEmpty((Collection)snapshotList)) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("receive snapshot from ip={}, but snapshot list is empty", (Object)agentIp);
            }
            return true;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("receive snapshot from ip={}, msg size={}", (Object)agentIp, (Object)snapshotList.size());
        }
        try {
            this.snapshotQueue.offer(request);
            ConcurrentHashMap idStatusMap = (ConcurrentHashMap)this.agentTaskCache.getIfPresent((Object)agentIp);
            if (MapUtils.isEmpty((Map)idStatusMap)) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("success report snapshot for ip={}, task status cache is null", (Object)agentIp);
                }
                return true;
            }
            boolean isInvalid = false;
            for (TaskSnapshotMessage snapshot : snapshotList) {
                Integer status;
                Integer id = snapshot.getJobId();
                if (id == null || !SourceState.TEMP_TO_NORMAL.contains(status = (Integer)idStatusMap.get(id))) continue;
                isInvalid = true;
                this.sourceMapper.updateStatus(id, SourceState.SOURCE_NORMAL.getCode(), Boolean.valueOf(false));
            }
            if (isInvalid) {
                this.agentTaskCache.invalidate((Object)agentIp);
            }
            return true;
        }
        catch (Throwable t) {
            LOGGER.error("put source snapshot error", t);
            return false;
        }
    }

    @Override
    public void close() {
        this.isClose = true;
    }

    private class SaveSnapshotTaskRunnable
    implements Runnable {
        private SaveSnapshotTaskRunnable() {
        }

        @Override
        public void run() {
            while (!SourceSnapshotOperation.this.isClose) {
                try {
                    TaskSnapshotRequest request = (TaskSnapshotRequest)SourceSnapshotOperation.this.snapshotQueue.poll(1L, TimeUnit.SECONDS);
                    if (request == null || CollectionUtils.isEmpty((Collection)request.getSnapshotList())) continue;
                    List requestList = request.getSnapshotList();
                    for (TaskSnapshotMessage message : requestList) {
                        Integer id = message.getJobId();
                        StreamSourceEntity entity = new StreamSourceEntity();
                        entity.setId(id);
                        entity.setSnapshot(message.getSnapshot());
                        entity.setReportTime(request.getReportTime());
                        SourceSnapshotOperation.this.sourceMapper.updateSnapshot(entity);
                    }
                }
                catch (Throwable t) {
                    LOGGER.error("source snapshot task runnable error", t);
                }
            }
        }
    }
}

