package org.apache.inlong.manager.service.operationlog;

import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import javax.annotation.PostConstruct;
import org.apache.inlong.manager.dao.entity.OperationLogEntity;
import org.apache.inlong.manager.dao.mapper.OperationLogEntityMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/inlong/manager/service/operationlog/OperationLogPool.class */
public class OperationLogPool {
    private static final int TIMEOUT_SECOND = 30;
    private static final int BUFFER_SIZE = 500;

    @Autowired
    private OperationLogEntityMapper operationLogMapper;
    private static final Logger log = LoggerFactory.getLogger(OperationLogPool.class);
    private static final ArrayBlockingQueue<OperationLogEntity> OPERATION_POOL = new ArrayBlockingQueue<>(10000);
    private static final int THREAD_NUM = 3;
    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(10000), new ThreadFactoryBuilder().setNameFormat("inlong-operation-log-%s").build(), new ThreadPoolExecutor.CallerRunsPolicy());

    public static void publish(OperationLogEntity operationLogEntity) {
        if (OPERATION_POOL.offer(operationLogEntity)) {
            return;
        }
        log.info("discard operation log: {}", operationLogEntity);
    }

    @PostConstruct
    public void init() {
        IntStream.range(0, THREAD_NUM).forEach(i -> {
            EXECUTOR_SERVICE.submit(this::saveOperationLog);
        });
    }

    private void saveOperationLog() {
        ArrayList arrayList = new ArrayList(BUFFER_SIZE);
        while (true) {
            arrayList.clear();
            try {
                int drain = Queues.drain(OPERATION_POOL, arrayList, BUFFER_SIZE, 30L, TimeUnit.SECONDS);
                if (!arrayList.isEmpty()) {
                    long currentTimeMillis = System.currentTimeMillis();
                    this.operationLogMapper.insertBatch(arrayList);
                    log.info("receive {} logs and saved cost {} ms", Integer.valueOf(drain), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                }
            } catch (InterruptedException e) {
                log.error("save operation log interrupted", e);
                return;
            } catch (Exception e2) {
                log.error("save operation log error: ", e2);
            }
        }
    }
}
