package org.graylog.scheduler;

import com.github.joschi.jadconfig.util.Duration;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.graylog.scheduler.JobExecutionEngine;
import org.graylog.scheduler.clock.JobSchedulerClock;
import org.graylog.scheduler.eventbus.JobCompletedEvent;
import org.graylog.scheduler.eventbus.JobSchedulerEventBus;
import org.graylog.scheduler.worker.JobWorkerPool;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.Tools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:org/graylog/scheduler/JobSchedulerService.class */
public class JobSchedulerService extends AbstractExecutionThreadService {
    private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerService.class);
    private final JobExecutionEngine jobExecutionEngine;
    private final JobSchedulerConfig schedulerConfig;
    private final JobSchedulerClock clock;
    private final JobSchedulerEventBus schedulerEventBus;
    private final ServerStatus serverStatus;
    private final JobWorkerPool workerPool;
    private final Duration loopSleepDuration;
    private final InterruptibleSleeper sleeper = new InterruptibleSleeper();
    private final ScheduledExecutorService jobHeartbeatExecutor = createJobHeartbeatExecutor();
    private Thread executionThread;

    @VisibleForTesting
    /* loaded from: input_file:org/graylog/scheduler/JobSchedulerService$InterruptibleSleeper.class */
    static class InterruptibleSleeper {
        private final Semaphore semaphore;

        InterruptibleSleeper() {
            this(new Semaphore(1));
        }

        @VisibleForTesting
        InterruptibleSleeper(Semaphore semaphore) {
            this.semaphore = semaphore;
        }

        public boolean sleep(long j, TimeUnit timeUnit) throws InterruptedException {
            this.semaphore.drainPermits();
            return !this.semaphore.tryAcquire(j, timeUnit);
        }

        public void interrupt() {
            this.semaphore.release();
        }
    }

    @Inject
    public JobSchedulerService(JobExecutionEngine.Factory factory, JobWorkerPool.Factory factory2, JobSchedulerConfig jobSchedulerConfig, JobSchedulerClock jobSchedulerClock, JobSchedulerEventBus jobSchedulerEventBus, ServerStatus serverStatus, @Named("job_scheduler_loop_sleep_duration") Duration duration) {
        this.workerPool = factory2.create("system", jobSchedulerConfig.numberOfWorkerThreads(), this::shutdownJobHeartbeatExecutor);
        this.jobExecutionEngine = factory.create(this.workerPool);
        this.schedulerConfig = jobSchedulerConfig;
        this.clock = jobSchedulerClock;
        this.schedulerEventBus = jobSchedulerEventBus;
        this.serverStatus = serverStatus;
        this.loopSleepDuration = duration;
    }

    private ScheduledExecutorService createJobHeartbeatExecutor() {
        return Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("job-heartbeat-%d").setDaemon(true).setUncaughtExceptionHandler(new Tools.LogUncaughtExceptionHandler(LOG)).build());
    }

    private void shutdownJobHeartbeatExecutor() {
        this.jobHeartbeatExecutor.shutdown();
        LOG.info("Shutdown of job heartbeat executor");
    }

    protected void startUp() throws Exception {
        this.jobHeartbeatExecutor.scheduleAtFixedRate(this::updateLockedJobs, 0L, 15L, TimeUnit.SECONDS);
        this.schedulerEventBus.register(this);
        this.executionThread = Thread.currentThread();
    }

    protected void run() throws Exception {
        LOG.debug("Waiting for server to enter RUNNING status before starting the scheduler loop");
        try {
            this.serverStatus.awaitRunning();
            LOG.debug("Server entered RUNNING state, starting scheduler loop");
            boolean z = true;
            while (isRunning()) {
                if (this.schedulerConfig.canExecute()) {
                    z = logExecutionConfigState(z, true);
                    LOG.debug("Starting scheduler loop iteration");
                    try {
                        if (!this.jobExecutionEngine.execute() && isRunning() && this.sleeper.sleep(this.loopSleepDuration.getQuantity(), this.loopSleepDuration.getUnit())) {
                            LOG.debug("Waited for {} {} because there are either no free worker threads or no runnable triggers", Long.valueOf(this.loopSleepDuration.getQuantity()), this.loopSleepDuration.getUnit());
                        }
                    } catch (InterruptedException e) {
                        LOG.debug("Received interrupted exception", e);
                    } catch (Exception e2) {
                        LOG.error("Error running job execution engine", e2);
                    }
                    LOG.debug("Ending scheduler loop iteration");
                } else {
                    z = logExecutionConfigState(z, false);
                    this.clock.sleepUninterruptibly(1L, TimeUnit.SECONDS);
                }
            }
        } catch (InterruptedException e3) {
            Thread.currentThread().interrupt();
            LOG.debug("Was interrupted while waiting for server to enter RUNNING state. Aborting.");
        }
    }

    @Subscribe
    public void handleJobCompleted(JobCompletedEvent jobCompletedEvent) {
        this.sleeper.interrupt();
    }

    protected void triggerShutdown() {
        this.schedulerEventBus.unregister(this);
        this.jobExecutionEngine.shutdown();
        this.executionThread.interrupt();
    }

    private void updateLockedJobs() {
        this.jobExecutionEngine.updateLockedJobs();
    }

    private boolean logExecutionConfigState(boolean z, boolean z2) {
        if (z && !z2) {
            LOG.info("Job scheduler execution is disabled. Waiting and trying again until enabled.");
        } else if (!z && z2) {
            LOG.info("Job scheduler execution is now enabled. Proceeding.");
        }
        return z2;
    }
}
