package com.netflix.conductor.core.execution.tasks;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.core.utils.SemaphoreUtil;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/netflix/conductor/core/execution/tasks/SystemTaskExecutor.class */
public class SystemTaskExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(SystemTaskExecutor.class);
    private final int callbackTime;
    private final QueueDAO queueDAO;
    ExecutionConfig defaultExecutionConfig;
    private final WorkflowExecutor workflowExecutor;
    private final Configuration config;
    ConcurrentHashMap<String, ExecutionConfig> queueExecutionConfigMap = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public SystemTaskExecutor(QueueDAO queueDAO, WorkflowExecutor workflowExecutor, Configuration configuration) {
        this.config = configuration;
        int systemTaskWorkerThreadCount = configuration.getSystemTaskWorkerThreadCount();
        this.callbackTime = configuration.getSystemTaskWorkerCallbackSeconds();
        this.defaultExecutionConfig = new ExecutionConfig(systemTaskWorkerThreadCount, "system-task-worker-%d");
        this.workflowExecutor = workflowExecutor;
        this.queueDAO = queueDAO;
        LOGGER.info("Initialized the SystemTaskExecutor with {} threads and callback time: {} seconds", Integer.valueOf(systemTaskWorkerThreadCount), Integer.valueOf(this.callbackTime));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void pollAndExecute(String str) {
        ExecutionConfig executionConfig = getExecutionConfig(str);
        SemaphoreUtil semaphoreUtil = executionConfig.getSemaphoreUtil();
        ExecutorService executorService = executionConfig.getExecutorService();
        String taskType = QueueUtils.getTaskType(str);
        if (!semaphoreUtil.canProcess()) {
            Monitors.recordSystemTaskWorkerPollingLimited(str);
            return;
        }
        try {
            List<String> pop = this.queueDAO.pop(str, 1, 200);
            Monitors.recordTaskPoll(str);
            LOGGER.debug("Polling queue:{}, got {} tasks", str, Integer.valueOf(pop.size()));
            if (pop.size() == 1 && StringUtils.isNotBlank(pop.get(0))) {
                String str2 = pop.get(0);
                LOGGER.debug("Task: {} from queue: {} being sent to the workflow executor", str2, str);
                Monitors.recordTaskPollCount(str, Configuration.SYSTEM_TASK_WORKER_EXECUTION_NAMESPACE_DEFAULT_VALUE, 1);
                WorkflowSystemTask workflowSystemTask = SystemTaskWorkerCoordinator.taskNameWorkflowTaskMapping.get(taskType);
                CompletableFuture.runAsync(() -> {
                    this.workflowExecutor.executeSystemTask(workflowSystemTask, str2, this.callbackTime);
                }, executorService).whenComplete((r3, th) -> {
                    semaphoreUtil.completeProcessing();
                });
            } else {
                semaphoreUtil.completeProcessing();
            }
        } catch (Exception e) {
            semaphoreUtil.completeProcessing();
            Monitors.recordTaskPollError(taskType, Configuration.SYSTEM_TASK_WORKER_EXECUTION_NAMESPACE_DEFAULT_VALUE, e.getClass().getSimpleName());
            LOGGER.error("Error polling system task in queue:{}", str, e);
        }
    }

    @VisibleForTesting
    ExecutionConfig getExecutionConfig(String str) {
        return !QueueUtils.isIsolatedQueue(str) ? this.defaultExecutionConfig : this.queueExecutionConfigMap.computeIfAbsent(str, str2 -> {
            return createExecutionConfig();
        });
    }

    private ExecutionConfig createExecutionConfig() {
        return new ExecutionConfig(this.config.getSystemTaskWorkerIsolatedThreadCount(), "isolated-system-task-worker-%d");
    }
}
