/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.scheduler;

import com.github.joschi.jadconfig.util.Duration;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.graylog.scheduler.JobExecutionEngine;
import org.graylog.scheduler.JobSchedulerConfig;
import org.graylog.scheduler.clock.JobSchedulerClock;
import org.graylog.scheduler.eventbus.JobCompletedEvent;
import org.graylog.scheduler.eventbus.JobSchedulerEventBus;
import org.graylog.scheduler.worker.JobWorkerPool;
import org.graylog2.plugin.ServerStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class JobSchedulerService
extends AbstractExecutionThreadService {
    private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerService.class);
    private final JobExecutionEngine jobExecutionEngine;
    private final JobSchedulerConfig schedulerConfig;
    private final JobSchedulerClock clock;
    private final JobSchedulerEventBus schedulerEventBus;
    private final ServerStatus serverStatus;
    private final JobWorkerPool workerPool;
    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
    private final Duration loopSleepDuration;
    private final InterruptibleSleeper sleeper = new InterruptibleSleeper();

    @Inject
    public JobSchedulerService(JobExecutionEngine.Factory engineFactory, JobWorkerPool.Factory workerPoolFactory, JobSchedulerConfig schedulerConfig, JobSchedulerClock clock, JobSchedulerEventBus schedulerEventBus, ServerStatus serverStatus, @Named(value="job_scheduler_loop_sleep_duration") Duration loopSleepDuration) {
        this.workerPool = workerPoolFactory.create("system", schedulerConfig.numberOfWorkerThreads());
        this.jobExecutionEngine = engineFactory.create(this.workerPool);
        this.schedulerConfig = schedulerConfig;
        this.clock = clock;
        this.schedulerEventBus = schedulerEventBus;
        this.serverStatus = serverStatus;
        this.loopSleepDuration = loopSleepDuration;
    }

    protected void startUp() throws Exception {
        this.schedulerEventBus.register((Object)this);
    }

    protected void run() throws Exception {
        LOG.debug("Waiting for server to enter RUNNING status before starting the scheduler loop");
        this.serverStatus.awaitRunning(() -> LOG.debug("Server entered RUNNING state, starting scheduler loop"));
        if (this.schedulerConfig.canStart()) {
            while (this.isRunning()) {
                if (!this.schedulerConfig.canExecute()) {
                    LOG.info("Couldn't execute next scheduler loop iteration. Waiting and trying again.");
                    this.clock.sleepUninterruptibly(1L, TimeUnit.SECONDS);
                    continue;
                }
                LOG.debug("Starting scheduler loop iteration");
                try {
                    if (!this.jobExecutionEngine.execute() && this.isRunning() && this.sleeper.sleep(this.loopSleepDuration.getQuantity(), this.loopSleepDuration.getUnit())) {
                        LOG.debug("Waited for {} {} because there are either no free worker threads or no runnable triggers", (Object)this.loopSleepDuration.getQuantity(), (Object)this.loopSleepDuration.getUnit());
                    }
                }
                catch (InterruptedException e) {
                    LOG.debug("Received interrupted exception", (Throwable)e);
                }
                catch (Exception e) {
                    LOG.error("Error running job execution engine", (Throwable)e);
                }
                LOG.debug("Ending scheduler loop iteration");
            }
        } else {
            LOG.debug("Scheduler cannot run on this node, waiting for shutdown");
            this.shutdownLatch.await();
        }
    }

    @Subscribe
    public void handleJobCompleted(JobCompletedEvent triggerCompletedEvent) {
        this.sleeper.interrupt();
    }

    protected void triggerShutdown() {
        this.schedulerEventBus.unregister((Object)this);
        this.shutdownLatch.countDown();
        this.jobExecutionEngine.shutdown();
    }

    @VisibleForTesting
    static class InterruptibleSleeper {
        private final Semaphore semaphore;

        InterruptibleSleeper() {
            this(new Semaphore(1));
        }

        @VisibleForTesting
        InterruptibleSleeper(Semaphore semaphore) {
            this.semaphore = semaphore;
        }

        public boolean sleep(long duration, TimeUnit unit) throws InterruptedException {
            this.semaphore.drainPermits();
            return !this.semaphore.tryAcquire(duration, unit);
        }

        public void interrupt() {
            this.semaphore.release();
        }
    }
}

