package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.scheduler.SchedulerClient;
import com.github.kagkarlsson.scheduler.SchedulerState;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.CompletionHandler;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.ExecutionComplete;
import com.github.kagkarlsson.scheduler.task.ExecutionContext;
import com.github.kagkarlsson.scheduler.task.ExecutionOperations;
import com.github.kagkarlsson.scheduler.task.FailureHandler;
import com.github.kagkarlsson.scheduler.task.OnStartup;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import com.github.kagkarlsson.scheduler.task.TaskInstanceId;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/kagkarlsson/scheduler/Scheduler.class */
public class Scheduler implements SchedulerClient {
    public static final double TRIGGER_NEXT_BATCH_WHEN_AVAILABLE_THREADS_RATIO = 0.5d;
    public static final String THREAD_PREFIX = "db-scheduler";
    public static final Duration SHUTDOWN_WAIT = Duration.ofMinutes(30);
    private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
    private final SchedulerClient delegate;
    private final Clock clock;
    private final TaskRepository taskRepository;
    private final TaskResolver taskResolver;
    private int threadpoolSize;
    private final ExecutorService executorService;
    private final Waiter executeDueWaiter;
    protected final List<OnStartup> onStartup;
    private final Waiter detectDeadWaiter;
    private final Duration heartbeatInterval;
    private final StatsRegistry statsRegistry;
    private final int pollingLimit;
    private final Waiter heartbeatWaiter;
    private final Map<Execution, CurrentlyExecuting> currentlyProcessing = Collections.synchronizedMap(new HashMap());
    private final SchedulerState.SettableSchedulerState schedulerState = new SchedulerState.SettableSchedulerState();
    private int currentGenerationNumber = 1;
    private final ExecutorService dueExecutor = Executors.newSingleThreadExecutor(ExecutorUtils.defaultThreadFactoryWithPrefix("db-scheduler-execute-due-"));
    private final ExecutorService detectDeadExecutor = Executors.newSingleThreadExecutor(ExecutorUtils.defaultThreadFactoryWithPrefix("db-scheduler-detect-dead-"));
    private final ExecutorService updateHeartbeatExecutor = Executors.newSingleThreadExecutor(ExecutorUtils.defaultThreadFactoryWithPrefix("db-scheduler-update-heartbeat-"));

    /* loaded from: input_file:com/github/kagkarlsson/scheduler/Scheduler$PickAndExecute.class */
    private class PickAndExecute implements Runnable {
        private Execution candidate;
        private DueExecutionsBatch addedDueExecutionsBatch;

        public PickAndExecute(Execution execution, DueExecutionsBatch dueExecutionsBatch) {
            this.candidate = execution;
            this.addedDueExecutionsBatch = dueExecutionsBatch;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (Scheduler.this.schedulerState.isShuttingDown()) {
                Scheduler.LOG.info("Scheduler has been shutdown. Skipping fetched due execution: " + this.candidate.taskInstance.getTaskAndInstance());
                return;
            }
            if (this.addedDueExecutionsBatch.isOlderGenerationThan(Scheduler.this.currentGenerationNumber)) {
                this.addedDueExecutionsBatch.markBatchAsStale();
                Scheduler.this.statsRegistry.register(StatsRegistry.CandidateStatsEvent.STALE);
                Scheduler.LOG.trace("Skipping queued execution (current generationNumber: {}, execution generationNumber: {})", Integer.valueOf(Scheduler.this.currentGenerationNumber), Integer.valueOf(this.addedDueExecutionsBatch.getGenerationNumber()));
                return;
            }
            Optional<Execution> pick = Scheduler.this.taskRepository.pick(this.candidate, Scheduler.this.clock.now());
            if (!pick.isPresent()) {
                Scheduler.LOG.debug("Execution picked by another scheduler. Continuing to next due execution.");
                Scheduler.this.statsRegistry.register(StatsRegistry.CandidateStatsEvent.ALREADY_PICKED);
                return;
            }
            Scheduler.this.currentlyProcessing.put(pick.get(), new CurrentlyExecuting(pick.get(), Scheduler.this.clock));
            try {
                Scheduler.this.statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED);
                executePickedExecution(pick.get());
            } finally {
                if (Scheduler.this.currentlyProcessing.remove(pick.get()) == null) {
                    Scheduler.LOG.error("Released execution was not found in collection of executions currently being processed. Should never happen.");
                    Scheduler.this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
                }
                this.addedDueExecutionsBatch.oneExecutionDone(() -> {
                    return Boolean.valueOf(Scheduler.this.triggerCheckForDueExecutions());
                });
            }
        }

        private void executePickedExecution(Execution execution) {
            Optional<Task> resolve = Scheduler.this.taskResolver.resolve(execution.taskInstance.getTaskName());
            if (!resolve.isPresent()) {
                Scheduler.LOG.error("Failed to find implementation for task with name '{}'. If there are a high number of executions, this may block other executions and must be fixed.", execution.taskInstance.getTaskName());
                return;
            }
            Instant now = Scheduler.this.clock.now();
            try {
                Scheduler.LOG.debug("Executing " + execution);
                CompletionHandler execute = resolve.get().execute(execution.taskInstance, new ExecutionContext(Scheduler.this.schedulerState, execution, Scheduler.this));
                Scheduler.LOG.debug("Execution done");
                complete(execute, execution, now);
                Scheduler.this.statsRegistry.register(StatsRegistry.ExecutionStatsEvent.COMPLETED);
            } catch (RuntimeException e) {
                Scheduler.LOG.error("Unhandled exception during execution. Treating as failure.", e);
                failure(resolve.get().getFailureHandler(), execution, e, now);
                Scheduler.this.statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);
            } catch (Throwable th) {
                Scheduler.LOG.error("Error during execution. Treating as failure.", th);
                failure(resolve.get().getFailureHandler(), execution, th, now);
                Scheduler.this.statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);
            }
        }

        private void complete(CompletionHandler completionHandler, Execution execution, Instant instant) {
            ExecutionComplete success = ExecutionComplete.success(execution, instant, Scheduler.this.clock.now());
            try {
                completionHandler.complete(success, new ExecutionOperations(Scheduler.this.taskRepository, execution));
                Scheduler.this.statsRegistry.registerSingleCompletedExecution(success);
            } catch (Throwable th) {
                Scheduler.this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.COMPLETIONHANDLER_ERROR);
                Scheduler.this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
                Scheduler.LOG.error("Failed while completing execution {}. Execution will likely remain scheduled and locked/picked. The execution should be detected as dead in {}, and handled according to the tasks DeadExecutionHandler.", new Object[]{execution, Scheduler.this.getMaxAgeBeforeConsideredDead(), th});
            }
        }

        private void failure(FailureHandler failureHandler, Execution execution, Throwable th, Instant instant) {
            ExecutionComplete failure = ExecutionComplete.failure(execution, instant, Scheduler.this.clock.now(), th);
            try {
                failureHandler.onFailure(failure, new ExecutionOperations(Scheduler.this.taskRepository, execution));
                Scheduler.this.statsRegistry.registerSingleCompletedExecution(failure);
            } catch (Throwable th2) {
                Scheduler.this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.FAILUREHANDLER_ERROR);
                Scheduler.this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
                Scheduler.LOG.error("Failed while completing execution {}. Execution will likely remain scheduled and locked/picked. The execution should be detected as dead in {}, and handled according to the tasks DeadExecutionHandler.", new Object[]{execution, Scheduler.this.getMaxAgeBeforeConsideredDead(), th2});
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Scheduler(Clock clock, TaskRepository taskRepository, TaskResolver taskResolver, int i, ExecutorService executorService, SchedulerName schedulerName, Waiter waiter, Duration duration, boolean z, StatsRegistry statsRegistry, int i2, List<OnStartup> list) {
        this.clock = clock;
        this.taskRepository = taskRepository;
        this.taskResolver = taskResolver;
        this.threadpoolSize = i;
        this.executorService = executorService;
        this.executeDueWaiter = waiter;
        this.onStartup = list;
        this.detectDeadWaiter = new Waiter(duration.multipliedBy(2L), clock);
        this.heartbeatInterval = duration;
        this.heartbeatWaiter = new Waiter(duration, clock);
        this.statsRegistry = statsRegistry;
        this.pollingLimit = i2;
        this.delegate = new SchedulerClient.StandardSchedulerClient(taskRepository, z ? new TriggerCheckForDueExecutions(this.schedulerState, clock, waiter) : SchedulerClientEventListener.NOOP);
    }

    public void start() {
        LOG.info("Starting scheduler");
        executeOnStartup();
        this.dueExecutor.submit(new RunUntilShutdown(this::executeDue, this.executeDueWaiter, this.schedulerState, this.statsRegistry));
        this.detectDeadExecutor.submit(new RunUntilShutdown(this::detectDeadExecutions, this.detectDeadWaiter, this.schedulerState, this.statsRegistry));
        this.updateHeartbeatExecutor.submit(new RunUntilShutdown(this::updateHeartbeats, this.heartbeatWaiter, this.schedulerState, this.statsRegistry));
        this.schedulerState.setStarted();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void executeOnStartup() {
        this.onStartup.forEach(onStartup -> {
            try {
                onStartup.onStartup(this);
            } catch (Exception e) {
                LOG.error("Unexpected error while executing OnStartup tasks. Continuing.", e);
                this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
            }
        });
    }

    public void stop() {
        if (this.schedulerState.isShuttingDown()) {
            LOG.warn("Multiple calls to 'stop()'. Scheduler is already stopping.");
            return;
        }
        this.schedulerState.setIsShuttingDown();
        LOG.info("Shutting down Scheduler.");
        if (!ExecutorUtils.shutdownNowAndAwaitTermination(this.dueExecutor, Duration.ofSeconds(5L))) {
            LOG.warn("Failed to shutdown due-executor properly.");
        }
        if (!ExecutorUtils.shutdownNowAndAwaitTermination(this.detectDeadExecutor, Duration.ofSeconds(5L))) {
            LOG.warn("Failed to shutdown detect-dead-executor properly.");
        }
        if (!ExecutorUtils.shutdownNowAndAwaitTermination(this.updateHeartbeatExecutor, Duration.ofSeconds(5L))) {
            LOG.warn("Failed to shutdown update-heartbeat-executor properly.");
        }
        LOG.info("Letting running executions finish. Will wait up to {}.", SHUTDOWN_WAIT);
        if (ExecutorUtils.shutdownAndAwaitTermination(this.executorService, SHUTDOWN_WAIT)) {
            LOG.info("Scheduler stopped.");
        } else {
            LOG.warn("Scheduler stopped, but some tasks did not complete. Was currently running the following executions:\n{}", new ArrayList(this.currentlyProcessing.keySet()).stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining("\n")));
        }
    }

    @Override // com.github.kagkarlsson.scheduler.SchedulerClient
    public <T> void schedule(TaskInstance<T> taskInstance, Instant instant) {
        this.delegate.schedule(taskInstance, instant);
    }

    @Override // com.github.kagkarlsson.scheduler.SchedulerClient
    public void reschedule(TaskInstanceId taskInstanceId, Instant instant) {
        this.delegate.reschedule(taskInstanceId, instant);
    }

    @Override // com.github.kagkarlsson.scheduler.SchedulerClient
    public void cancel(TaskInstanceId taskInstanceId) {
        this.delegate.cancel(taskInstanceId);
    }

    @Override // com.github.kagkarlsson.scheduler.SchedulerClient
    public void getScheduledExecutions(Consumer<ScheduledExecution<Object>> consumer) {
        this.delegate.getScheduledExecutions(consumer);
    }

    @Override // com.github.kagkarlsson.scheduler.SchedulerClient
    public <T> void getScheduledExecutionsForTask(String str, Class<T> cls, Consumer<ScheduledExecution<T>> consumer) {
        this.delegate.getScheduledExecutionsForTask(str, cls, consumer);
    }

    @Override // com.github.kagkarlsson.scheduler.SchedulerClient
    public Optional<ScheduledExecution<Object>> getScheduledExecution(TaskInstanceId taskInstanceId) {
        return this.delegate.getScheduledExecution(taskInstanceId);
    }

    public List<Execution> getFailingExecutions(Duration duration) {
        return this.taskRepository.getExecutionsFailingLongerThan(duration);
    }

    public boolean triggerCheckForDueExecutions() {
        return this.executeDueWaiter.wake();
    }

    public List<CurrentlyExecuting> getCurrentlyExecuting() {
        return new ArrayList(this.currentlyProcessing.values());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void executeDue() {
        List<Execution> due = this.taskRepository.getDue(this.clock.now(), this.pollingLimit);
        LOG.trace("Found {} taskinstances due for execution", Integer.valueOf(due.size()));
        int i = this.currentGenerationNumber + 1;
        DueExecutionsBatch dueExecutionsBatch = new DueExecutionsBatch(this.threadpoolSize, i, due.size(), this.pollingLimit == due.size());
        Iterator<Execution> it = due.iterator();
        while (it.hasNext()) {
            this.executorService.execute(new PickAndExecute(it.next(), dueExecutionsBatch));
        }
        this.currentGenerationNumber = i;
        this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_EXECUTE_DUE);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void detectDeadExecutions() {
        LOG.debug("Checking for dead executions.");
        List<Execution> oldExecutions = this.taskRepository.getOldExecutions(this.clock.now().minus((TemporalAmount) getMaxAgeBeforeConsideredDead()));
        if (oldExecutions.isEmpty()) {
            LOG.trace("No dead executions found.");
        } else {
            oldExecutions.forEach(execution -> {
                LOG.info("Found dead execution. Delegating handling to task. Execution: " + execution);
                try {
                    Optional<Task> resolve = this.taskResolver.resolve(execution.taskInstance.getTaskName());
                    if (resolve.isPresent()) {
                        this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.DEAD_EXECUTION);
                        resolve.get().getDeadExecutionHandler().deadExecution(execution, new ExecutionOperations(this.taskRepository, execution));
                    } else {
                        LOG.error("Failed to find implementation for task with name '{}' for detected dead execution. Either delete the execution from the databaser, or add an implementation for it.", execution.taskInstance.getTaskName());
                    }
                } catch (Throwable th) {
                    LOG.error("Failed while handling dead execution {}. Will be tried again later.", execution, th);
                    this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
                }
            });
        }
        this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_DETECT_DEAD);
    }

    void updateHeartbeats() {
        if (this.currentlyProcessing.isEmpty()) {
            LOG.trace("No executions to update heartbeats for. Skipping.");
            return;
        }
        LOG.debug("Updating heartbeats for {} executions being processed.", Integer.valueOf(this.currentlyProcessing.size()));
        Instant now = this.clock.now();
        new ArrayList(this.currentlyProcessing.keySet()).forEach(execution -> {
            LOG.trace("Updating heartbeat for execution: " + execution);
            try {
                this.taskRepository.updateHeartbeat(execution, now);
            } catch (Throwable th) {
                LOG.error("Failed while updating heartbeat for execution {}. Will try again later.", execution, th);
                this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
            }
        });
        this.statsRegistry.register(StatsRegistry.SchedulerStatsEvent.RAN_UPDATE_HEARTBEATS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Duration getMaxAgeBeforeConsideredDead() {
        return this.heartbeatInterval.multipliedBy(4L);
    }

    public static SchedulerBuilder create(DataSource dataSource, Task<?>... taskArr) {
        return create(dataSource, (List<Task<?>>) Arrays.asList(taskArr));
    }

    public static SchedulerBuilder create(DataSource dataSource, List<Task<?>> list) {
        return new SchedulerBuilder(dataSource, list);
    }
}
