package com.netflix.conductor.core.execution.tasks;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.conductor.core.LifecycleAwareComponent;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.execution.AsyncSystemTaskExecutor;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.service.ExecutionService;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@ConditionalOnProperty(name = {"conductor.system-task-workers.enabled"}, havingValue = "true", matchIfMissing = true)
@Component
/* loaded from: input_file:com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.class */
public class SystemTaskWorkerCoordinator extends LifecycleAwareComponent {
    private SystemTaskWorker systemTaskWorker;
    private final ConductorProperties properties;
    private final long pollInterval;
    private final String executionNameSpace;
    private final Set<String> listeningTaskQueues = new HashSet();
    private final Set<WorkflowSystemTask> workflowSystemTasks;
    private final QueueDAO queueDAO;
    private final AsyncSystemTaskExecutor asyncSystemTaskExecutor;
    private final ExecutionService executionService;
    private static final Logger LOGGER = LoggerFactory.getLogger(SystemTaskWorkerCoordinator.class);
    static final BlockingQueue<String> queue = new LinkedBlockingQueue();
    public static Map<String, WorkflowSystemTask> taskNameWorkflowTaskMapping = new ConcurrentHashMap();
    private static final String CLASS_NAME = SystemTaskWorkerCoordinator.class.getName();

    public SystemTaskWorkerCoordinator(QueueDAO queueDAO, AsyncSystemTaskExecutor asyncSystemTaskExecutor, ConductorProperties conductorProperties, ExecutionService executionService, Set<WorkflowSystemTask> set) {
        this.properties = conductorProperties;
        this.workflowSystemTasks = set;
        this.executionNameSpace = conductorProperties.getSystemTaskWorkerExecutionNamespace();
        this.pollInterval = conductorProperties.getSystemTaskWorkerPollInterval().toMillis();
        this.queueDAO = queueDAO;
        this.asyncSystemTaskExecutor = asyncSystemTaskExecutor;
        this.executionService = executionService;
    }

    @EventListener({ApplicationReadyEvent.class})
    public void initSystemTaskExecutor() {
        if (this.properties.getSystemTaskWorkerThreadCount() <= 0) {
            throw new IllegalStateException("Cannot set system task worker thread count to <=0. To disable system task workers, set conductor.system-task-workers.enabled=false.");
        }
        this.workflowSystemTasks.forEach(this::add);
        this.systemTaskWorker = new SystemTaskWorker(this.queueDAO, this.asyncSystemTaskExecutor, this.properties, this.executionService);
        new Thread(this::listen).start();
        LOGGER.info("System Task Worker Coordinator initialized with poll interval: {}", Long.valueOf(this.pollInterval));
    }

    private void add(WorkflowSystemTask workflowSystemTask) {
        LOGGER.info("Adding the queue for system task: {}", workflowSystemTask.getTaskType());
        taskNameWorkflowTaskMapping.put(workflowSystemTask.getTaskType(), workflowSystemTask);
        queue.add(workflowSystemTask.getTaskType());
    }

    private void listen() {
        while (true) {
            try {
                String poll = queue.poll(60L, TimeUnit.SECONDS);
                if (poll != null && !this.listeningTaskQueues.contains(poll) && shouldListen(poll)) {
                    listen(poll);
                    this.listeningTaskQueues.add(poll);
                }
            } catch (InterruptedException e) {
                Monitors.error(CLASS_NAME, "listen");
                LOGGER.warn("Error listening for workflow system tasks", e);
                return;
            }
        }
    }

    private void listen(String str) {
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(() -> {
            pollAndExecute(str);
        }, 1000L, this.pollInterval, TimeUnit.MILLISECONDS);
        LOGGER.info("Started listening for queue: {}", str);
    }

    private void pollAndExecute(String str) {
        if (isRunning()) {
            this.systemTaskWorker.pollAndExecute(str);
        } else {
            LOGGER.debug("Component stopped. Not polling for system task in queue : {}", str);
        }
    }

    @VisibleForTesting
    boolean isFromCoordinatorExecutionNameSpace(String str) {
        return StringUtils.equals(QueueUtils.getExecutionNameSpace(str), this.executionNameSpace);
    }

    private boolean shouldListen(String str) {
        return isFromCoordinatorExecutionNameSpace(str) && isAsyncSystemTask(str);
    }

    @VisibleForTesting
    boolean isAsyncSystemTask(String str) {
        String taskType = QueueUtils.getTaskType(str);
        if (!StringUtils.isNotBlank(taskType)) {
            return false;
        }
        WorkflowSystemTask workflowSystemTask = taskNameWorkflowTaskMapping.get(taskType);
        return Objects.nonNull(workflowSystemTask) && workflowSystemTask.isAsync();
    }
}
