package org.graylog.scheduler;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.inject.assistedinject.Assisted;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
import org.graylog.scheduler.Job;
import org.graylog.scheduler.JobTriggerUpdates;
import org.graylog.scheduler.eventbus.JobCompletedEvent;
import org.graylog.scheduler.eventbus.JobSchedulerEventBus;
import org.graylog.scheduler.worker.JobWorkerPool;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog/scheduler/JobExecutionEngine.class */
public class JobExecutionEngine {
    private static final Logger LOG = LoggerFactory.getLogger(JobExecutionEngine.class);
    private final DBJobTriggerService jobTriggerService;
    private final DBJobDefinitionService jobDefinitionService;
    private final JobSchedulerEventBus eventBus;
    private final JobScheduleStrategies scheduleStrategies;
    private final JobTriggerUpdates.Factory jobTriggerUpdatesFactory;
    private final Map<String, Job.Factory> jobFactory;
    private final JobWorkerPool workerPool;
    private Counter executionSuccessful;
    private Counter executionFailed;
    private Timer executionTime;
    private final AtomicBoolean isRunning = new AtomicBoolean(true);
    private final AtomicBoolean shouldCleanup = new AtomicBoolean(true);

    /* loaded from: input_file:org/graylog/scheduler/JobExecutionEngine$Factory.class */
    public interface Factory {
        JobExecutionEngine create(JobWorkerPool jobWorkerPool);
    }

    @Inject
    public JobExecutionEngine(DBJobTriggerService dBJobTriggerService, DBJobDefinitionService dBJobDefinitionService, JobSchedulerEventBus jobSchedulerEventBus, JobScheduleStrategies jobScheduleStrategies, JobTriggerUpdates.Factory factory, Map<String, Job.Factory> map, @Assisted JobWorkerPool jobWorkerPool, MetricRegistry metricRegistry) {
        this.jobTriggerService = dBJobTriggerService;
        this.jobDefinitionService = dBJobDefinitionService;
        this.eventBus = jobSchedulerEventBus;
        this.scheduleStrategies = jobScheduleStrategies;
        this.jobTriggerUpdatesFactory = factory;
        this.jobFactory = map;
        this.workerPool = jobWorkerPool;
        this.executionSuccessful = metricRegistry.counter(MetricRegistry.name(getClass(), new String[]{"executions", "successful"}));
        this.executionFailed = metricRegistry.counter(MetricRegistry.name(getClass(), new String[]{"executions", "failed"}));
        this.executionTime = metricRegistry.timer(MetricRegistry.name(getClass(), new String[]{"executions", "time"}));
    }

    public void shutdown() {
        this.isRunning.set(false);
    }

    private void cleanup() {
        int forceReleaseOwnedTriggers;
        if (!this.shouldCleanup.getAndSet(false) || (forceReleaseOwnedTriggers = this.jobTriggerService.forceReleaseOwnedTriggers()) <= 0) {
            return;
        }
        LOG.warn("Force-released {} stale job triggers after an unclean job scheduler shutdown", Integer.valueOf(forceReleaseOwnedTriggers));
    }

    public boolean execute() {
        if (this.shouldCleanup.get()) {
            cleanup();
        }
        if (!this.isRunning.get() || !this.workerPool.hasFreeSlots()) {
            return false;
        }
        Optional<JobTriggerDto> nextRunnableTrigger = this.jobTriggerService.nextRunnableTrigger();
        if (!nextRunnableTrigger.isPresent()) {
            return false;
        }
        JobTriggerDto jobTriggerDto = nextRunnableTrigger.get();
        if (this.workerPool.execute(() -> {
            handleTrigger(jobTriggerDto);
        })) {
            return true;
        }
        this.jobTriggerService.releaseTrigger(jobTriggerDto, JobTriggerUpdate.withNextTime(jobTriggerDto.nextTime()));
        return false;
    }

    public void updateLockedJobs() {
        if (this.workerPool.anySlotsUsed()) {
            this.jobTriggerService.updateLockedJobTriggers();
        }
    }

    private void handleTrigger(JobTriggerDto jobTriggerDto) {
        LOG.trace("Locked trigger {} (owner={})", jobTriggerDto.id(), jobTriggerDto.lock().owner());
        try {
            try {
                JobDefinitionDto orElseThrow = this.jobDefinitionService.get(jobTriggerDto.jobDefinitionId()).orElseThrow(() -> {
                    return new IllegalStateException("Couldn't find job definition " + jobTriggerDto.jobDefinitionId());
                });
                Job create = this.jobFactory.get(orElseThrow.config().type()).create(orElseThrow);
                if (create == null) {
                    throw new IllegalStateException("Couldn't find job factory for type " + orElseThrow.config().type());
                }
                this.executionTime.time(() -> {
                    executeJob(jobTriggerDto, orElseThrow, create);
                });
                this.eventBus.post(JobCompletedEvent.INSTANCE);
            } catch (IllegalStateException e) {
                LOG.error("Couldn't handle trigger due to a permanent error {} - trigger won't be retried", jobTriggerDto.id(), e);
                this.jobTriggerService.setTriggerError(jobTriggerDto);
                this.eventBus.post(JobCompletedEvent.INSTANCE);
            } catch (Exception e2) {
                DateTime plusSeconds = DateTime.now(DateTimeZone.UTC).plusSeconds(5);
                LOG.error("Couldn't handle trigger {} - retrying at {}", new Object[]{jobTriggerDto.id(), plusSeconds, e2});
                this.jobTriggerService.releaseTrigger(jobTriggerDto, JobTriggerUpdate.withNextTime(plusSeconds));
                this.eventBus.post(JobCompletedEvent.INSTANCE);
            }
        } catch (Throwable th) {
            this.eventBus.post(JobCompletedEvent.INSTANCE);
            throw th;
        }
    }

    private void executeJob(JobTriggerDto jobTriggerDto, JobDefinitionDto jobDefinitionDto, Job job) {
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Execute job: {}/{}/{} (job-class={} trigger={} config={})", new Object[]{jobDefinitionDto.title(), jobDefinitionDto.id(), jobDefinitionDto.config().type(), job.getClass().getSimpleName(), jobTriggerDto.id(), jobDefinitionDto.config()});
            }
            JobTriggerUpdate execute = job.execute(JobExecutionContext.create(jobTriggerDto, jobDefinitionDto, this.jobTriggerUpdatesFactory.create(jobTriggerDto), this.isRunning, this.jobTriggerService));
            if (execute == null) {
                this.executionFailed.inc();
                throw new IllegalStateException("Job#execute() must not return null - this is a bug in the job class");
            }
            this.executionSuccessful.inc();
            LOG.trace("Update trigger: trigger={} update={}", jobTriggerDto.id(), execute);
            this.jobTriggerService.releaseTrigger(jobTriggerDto, execute);
        } catch (JobExecutionException e) {
            LOG.error("Job execution error - trigger={} job={}", new Object[]{jobTriggerDto.id(), jobDefinitionDto.id(), e});
            this.executionFailed.inc();
            this.jobTriggerService.releaseTrigger(e.getTrigger(), e.getUpdate());
        } catch (Exception e2) {
            this.executionFailed.inc();
            LOG.error("Unhandled job execution error - trigger={} job={}", new Object[]{jobTriggerDto.id(), jobDefinitionDto.id(), e2});
            this.jobTriggerService.releaseTrigger(jobTriggerDto, JobTriggerUpdate.withNextTime(this.scheduleStrategies.nextFutureTime(jobTriggerDto).orElse(null)));
        }
    }
}
