package com.netflix.conductor.core.execution.tasks;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.class */
public class SystemTaskWorkerCoordinator {
    private QueueDAO queueDAO;
    private WorkflowExecutor workflowExecutor;
    private ExecutorService executorService;
    private int workerQueueSize;
    private int pollCount;
    private int pollInterval;
    private LinkedBlockingQueue<Runnable> workerQueue;
    private int unackTimeout;
    private Configuration config;
    private final String executionNameSpace;
    ExecutionConfig defaultExecutionConfig;
    ConcurrentHashMap<String, ExecutionConfig> queueExecutionConfigMap = new ConcurrentHashMap<>();
    private static final Logger logger = LoggerFactory.getLogger(SystemTaskWorkerCoordinator.class);
    static BlockingQueue<String> queue = new LinkedBlockingQueue();
    private static Set<String> listeningTaskQueues = new HashSet();
    public static Map<String, WorkflowSystemTask> taskNameWorkFlowTaskMapping = new ConcurrentHashMap();
    private static final String className = SystemTaskWorkerCoordinator.class.getName();

    @Inject
    public SystemTaskWorkerCoordinator(QueueDAO queueDAO, WorkflowExecutor workflowExecutor, Configuration configuration) {
        this.queueDAO = queueDAO;
        this.workflowExecutor = workflowExecutor;
        this.config = configuration;
        this.unackTimeout = configuration.getIntProperty("workflow.system.task.worker.callback.seconds", 30);
        int intProperty = configuration.getIntProperty("workflow.system.task.worker.thread.count", 10);
        this.pollCount = configuration.getIntProperty("workflow.system.task.worker.poll.count", 10);
        this.pollInterval = configuration.getIntProperty("workflow.system.task.worker.poll.interval", 50);
        this.workerQueueSize = configuration.getIntProperty("workflow.system.task.worker.queue.size", 100);
        this.workerQueue = new LinkedBlockingQueue<>(this.workerQueueSize);
        this.executionNameSpace = configuration.getProperty("workflow.system.task.worker.executionNameSpace", "");
        if (intProperty <= 0) {
            logger.info("System Task Worker DISABLED");
            return;
        }
        this.executorService = new ThreadPoolExecutor(intProperty, intProperty, 0L, TimeUnit.MILLISECONDS, this.workerQueue, new ThreadFactoryBuilder().setNameFormat("system-task-worker-%d").build());
        this.defaultExecutionConfig = new ExecutionConfig(this.executorService, this.workerQueue);
        new Thread(this::listen).start();
        logger.info("System Task Worker initialized with {} threads and a callback time of {} seconds and queue size: {} with pollCount: {} and poll interval: {}", new Object[]{Integer.valueOf(intProperty), Integer.valueOf(this.unackTimeout), Integer.valueOf(this.workerQueueSize), Integer.valueOf(this.pollCount), Integer.valueOf(this.pollInterval)});
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static synchronized void add(WorkflowSystemTask workflowSystemTask) {
        logger.info("Adding the queue for system task: {}", workflowSystemTask.getName());
        taskNameWorkFlowTaskMapping.put(workflowSystemTask.getName(), workflowSystemTask);
        queue.add(workflowSystemTask.getName());
    }

    private void listen() {
        while (true) {
            try {
                String poll = queue.poll(60L, TimeUnit.SECONDS);
                if (poll != null && !listeningTaskQueues.contains(poll) && shouldListen(poll)) {
                    listen(poll);
                    listeningTaskQueues.add(poll);
                }
            } catch (InterruptedException e) {
                Monitors.error(className, "listen");
                logger.warn("Error listening for workflow system tasks", e);
                return;
            }
        }
    }

    private void listen(String str) {
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(() -> {
            pollAndExecute(str);
        }, 1000L, this.pollInterval, TimeUnit.MILLISECONDS);
        logger.info("Started listening for queue: {}", str);
    }

    @VisibleForTesting
    void pollAndExecute(String str) {
        try {
            if (this.config.disableAsyncWorkers()) {
                logger.warn("System Task Worker is DISABLED.  Not polling for system task in queue : {}", str);
                return;
            }
            ExecutionConfig executionConfig = getExecutionConfig(str);
            LinkedBlockingQueue<Runnable> linkedBlockingQueue = executionConfig.workerQueue;
            int min = Math.min(linkedBlockingQueue.remainingCapacity(), this.pollCount);
            if (min <= 0) {
                logger.debug("All workers are busy, not polling. queue size: {}, max: {}, task:{}", new Object[]{Integer.valueOf(linkedBlockingQueue.size()), Integer.valueOf(this.workerQueueSize), str});
                return;
            }
            List<String> pop = this.queueDAO.pop(str, min, 200);
            Monitors.recordTaskPoll(str);
            logger.debug("Polling for {}, got {} tasks", str, Integer.valueOf(pop.size()));
            for (String str2 : pop) {
                logger.debug("Task: {} from queue: {} being sent to the workflow executor", str2, str);
                try {
                    WorkflowSystemTask workflowSystemTask = taskNameWorkFlowTaskMapping.get(QueueUtils.getTaskType(str));
                    executionConfig.service.submit(() -> {
                        this.workflowExecutor.executeSystemTask(workflowSystemTask, str2, this.unackTimeout);
                    });
                } catch (RejectedExecutionException e) {
                    logger.warn("Queue full for workers. Size: {}, queue:{}", Integer.valueOf(linkedBlockingQueue.size()), str);
                }
            }
        } catch (Exception e2) {
            Monitors.error(className, "pollAndExecute");
            logger.error("Error executing system task in queue:{}", str, e2);
        }
    }

    public boolean isFromCoordinatorExecutionNameSpace(String str) {
        return StringUtils.equals(QueueUtils.getExecutionNameSpace(str), this.executionNameSpace);
    }

    private boolean shouldListen(String str) {
        return isFromCoordinatorExecutionNameSpace(str) && isSystemTask(str);
    }

    public static boolean isSystemTask(String str) {
        String taskType = QueueUtils.getTaskType(str);
        if (!StringUtils.isNotBlank(taskType)) {
            return false;
        }
        WorkflowSystemTask workflowSystemTask = taskNameWorkFlowTaskMapping.get(taskType);
        return Objects.nonNull(workflowSystemTask) && workflowSystemTask.isAsync();
    }

    public ExecutionConfig getExecutionConfig(String str) {
        return !QueueUtils.isIsolatedQueue(str) ? this.defaultExecutionConfig : this.queueExecutionConfigMap.computeIfAbsent(str, str2 -> {
            return createExecutionConfig();
        });
    }

    private ExecutionConfig createExecutionConfig() {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(this.config.getIntProperty("workflow.isolated.system.task.worker.queue.size", 100));
        int intProperty = this.config.getIntProperty("workflow.isolated.system.task.worker.thread.count", 1);
        return new ExecutionConfig(new ThreadPoolExecutor(intProperty, intProperty, 0L, TimeUnit.MILLISECONDS, linkedBlockingQueue, new ThreadFactoryBuilder().setNameFormat("isolated-system-task-worker-%d").build()), linkedBlockingQueue);
    }
}
