package com.netflix.conductor.core.reconciliation;

import com.netflix.conductor.core.LifecycleAwareComponent;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@ConditionalOnProperty(name = {"conductor.workflow-reconciler.enabled"}, havingValue = "true", matchIfMissing = true)
@Component
/* loaded from: input_file:com/netflix/conductor/core/reconciliation/WorkflowReconciler.class */
public class WorkflowReconciler extends LifecycleAwareComponent {
    private final WorkflowSweeper workflowSweeper;
    private final QueueDAO queueDAO;
    private final int sweeperThreadCount;
    private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowReconciler.class);

    public WorkflowReconciler(WorkflowSweeper workflowSweeper, QueueDAO queueDAO, ConductorProperties conductorProperties) {
        this.workflowSweeper = workflowSweeper;
        this.queueDAO = queueDAO;
        this.sweeperThreadCount = conductorProperties.getSweeperThreadCount();
        LOGGER.info("WorkflowReconciler initialized with {} sweeper threads", Integer.valueOf(conductorProperties.getSweeperThreadCount()));
    }

    @Scheduled(fixedDelayString = "${conductor.sweep-frequency.millis:500}", initialDelayString = "${conductor.sweep-frequency.millis:500}")
    public void pollAndSweep() {
        try {
            if (isRunning()) {
                List<String> pop = this.queueDAO.pop(WorkflowExecutor.DECIDER_QUEUE, this.sweeperThreadCount, 2000);
                if (pop != null) {
                    Stream<String> stream = pop.stream();
                    WorkflowSweeper workflowSweeper = this.workflowSweeper;
                    Objects.requireNonNull(workflowSweeper);
                    CompletableFuture.allOf((CompletableFuture[]) stream.map(workflowSweeper::sweepAsync).toArray(i -> {
                        return new CompletableFuture[i];
                    })).get();
                    LOGGER.debug("Sweeper processed {} from the decider queue", String.join(",", pop));
                }
                recordQueueDepth();
            } else {
                LOGGER.debug("Component stopped, skip workflow sweep");
            }
        } catch (Exception e) {
            Monitors.error(WorkflowReconciler.class.getSimpleName(), "poll");
            LOGGER.error("Error when polling for workflows", e);
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void recordQueueDepth() {
        Monitors.recordGauge(WorkflowExecutor.DECIDER_QUEUE, this.queueDAO.getSize(WorkflowExecutor.DECIDER_QUEUE));
    }
}
