package com.netflix.conductor.core.execution.tasks;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.class */
public class SystemTaskWorkerCoordinator {
    private QueueDAO queueDAO;
    private WorkflowExecutor workflowExecutor;
    private ExecutorService executorService;
    private int workerQueueSize;
    private int pollCount;
    private int pollInterval;
    private LinkedBlockingQueue<Runnable> workerQueue;
    private int unackTimeout;
    private Configuration config;
    private static Logger logger = LoggerFactory.getLogger(SystemTaskWorkerCoordinator.class);
    private static BlockingQueue<WorkflowSystemTask> queue = new LinkedBlockingQueue();
    private static Set<WorkflowSystemTask> listeningTasks = new HashSet();
    private static final String className = SystemTaskWorkerCoordinator.class.getName();

    @Inject
    public SystemTaskWorkerCoordinator(QueueDAO queueDAO, WorkflowExecutor workflowExecutor, Configuration configuration) {
        this.queueDAO = queueDAO;
        this.workflowExecutor = workflowExecutor;
        this.config = configuration;
        this.unackTimeout = configuration.getIntProperty("workflow.system.task.worker.callback.seconds", 30);
        int intProperty = configuration.getIntProperty("workflow.system.task.worker.thread.count", 10);
        this.pollCount = configuration.getIntProperty("workflow.system.task.worker.poll.count", 10);
        this.pollInterval = configuration.getIntProperty("workflow.system.task.worker.poll.interval", 50);
        this.workerQueueSize = configuration.getIntProperty("workflow.system.task.worker.queue.size", 100);
        this.workerQueue = new LinkedBlockingQueue<>(this.workerQueueSize);
        if (intProperty <= 0) {
            logger.info("System Task Worker DISABLED");
            return;
        }
        this.executorService = new ThreadPoolExecutor(intProperty, intProperty, 0L, TimeUnit.MILLISECONDS, this.workerQueue, new ThreadFactoryBuilder().setNameFormat("system-task-worker-%d").build());
        new Thread(this::listen).start();
        logger.info("System Task Worker initialized with {} threads and a callback time of {} seconds and queue size: {} with pollCount: {} and poll interval: {}", new Object[]{Integer.valueOf(intProperty), Integer.valueOf(this.unackTimeout), Integer.valueOf(this.workerQueueSize), Integer.valueOf(this.pollCount), Integer.valueOf(this.pollInterval)});
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static synchronized void add(WorkflowSystemTask workflowSystemTask) {
        logger.info("Adding the queue for system task: {}", workflowSystemTask.getName());
        queue.add(workflowSystemTask);
    }

    private void listen() {
        while (true) {
            try {
                WorkflowSystemTask poll = queue.poll(60L, TimeUnit.SECONDS);
                if (poll != null && poll.isAsync() && !listeningTasks.contains(poll)) {
                    listen(poll);
                    listeningTasks.add(poll);
                }
            } catch (InterruptedException e) {
                Monitors.error(className, "listen");
                logger.warn("Error listening for workflow system tasks", e);
                return;
            }
        }
    }

    private void listen(WorkflowSystemTask workflowSystemTask) {
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(() -> {
            pollAndExecute(workflowSystemTask);
        }, 1000L, this.pollInterval, TimeUnit.MILLISECONDS);
        logger.info("Started listening for system task: {}", workflowSystemTask.getName());
    }

    private void pollAndExecute(WorkflowSystemTask workflowSystemTask) {
        String name = workflowSystemTask.getName();
        try {
            if (this.config.disableAsyncWorkers()) {
                logger.warn("System Task Worker is DISABLED.  Not polling for system task: {}", name);
                return;
            }
            int min = Math.min(this.workerQueue.remainingCapacity(), this.pollCount);
            if (min <= 0) {
                logger.warn("All workers are busy, not polling. queue size: {}, max: {}, task:{}", new Object[]{Integer.valueOf(this.workerQueue.size()), Integer.valueOf(this.workerQueueSize), name});
                return;
            }
            List<String> pop = this.queueDAO.pop(name, min, 200);
            Monitors.recordTaskPoll(name);
            logger.debug("Polling for {}, got {} tasks", name, Integer.valueOf(pop.size()));
            for (String str : pop) {
                logger.debug("Task: {} of type: {} being sent to the workflow executor", str, name);
                try {
                    this.executorService.submit(() -> {
                        this.workflowExecutor.executeSystemTask(workflowSystemTask, str, this.unackTimeout);
                    });
                } catch (RejectedExecutionException e) {
                    logger.warn("Queue full for workers. Size: {}, task:{}", Integer.valueOf(this.workerQueue.size()), name);
                }
            }
        } catch (Exception e2) {
            Monitors.error(className, "pollAndExecute");
            logger.error("Error executing system task:{}", name, e2);
        }
    }
}
