/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.genie.web.tasks.job;

import com.netflix.genie.common.dto.Job;
import com.netflix.genie.common.dto.JobExecution;
import com.netflix.genie.common.dto.JobStatus;
import com.netflix.genie.common.exceptions.GenieException;
import com.netflix.genie.common.exceptions.GenieServerException;
import com.netflix.genie.core.events.JobFinishedEvent;
import com.netflix.genie.core.events.JobFinishedReason;
import com.netflix.genie.core.events.JobScheduledEvent;
import com.netflix.genie.core.events.JobStartedEvent;
import com.netflix.genie.core.services.JobCountService;
import com.netflix.genie.core.services.JobSearchService;
import com.netflix.genie.web.properties.JobOutputMaxProperties;
import com.netflix.genie.web.tasks.job.JobMonitor;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import org.apache.commons.exec.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Primary;
import org.springframework.context.event.EventListener;
import org.springframework.core.io.Resource;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;

@Component
@Primary
public class JobMonitoringCoordinator
implements JobCountService {
    private static final Logger log = LoggerFactory.getLogger(JobMonitoringCoordinator.class);
    private final Map<String, ScheduledFuture<?>> jobMonitors = Collections.synchronizedMap(new HashMap());
    private final Map<String, Future<?>> scheduledJobs = Collections.synchronizedMap(new HashMap());
    private final String hostName;
    private final JobSearchService jobSearchService;
    private final TaskScheduler scheduler;
    private final ApplicationEventPublisher publisher;
    private final Executor executor;
    private final Registry registry;
    private final File jobsDir;
    private final JobOutputMaxProperties outputMaxProperties;
    private final Counter unableToCancel;
    private final Counter unableToReAttach;

    @Autowired
    public JobMonitoringCoordinator(String hostName, JobSearchService jobSearchService, ApplicationEventPublisher publisher, TaskScheduler scheduler, Executor executor, Registry registry, Resource jobsDir, JobOutputMaxProperties outputMaxProperties) throws IOException {
        this.hostName = hostName;
        this.jobSearchService = jobSearchService;
        this.publisher = publisher;
        this.scheduler = scheduler;
        this.executor = executor;
        this.registry = registry;
        this.jobsDir = jobsDir.getFile();
        this.outputMaxProperties = outputMaxProperties;
        this.registry.mapSize("genie.jobs.running.gauge", this.jobMonitors);
        this.registry.mapSize("genie.jobs.scheduled.gauge", this.scheduledJobs);
        this.registry.methodValue("genie.jobs.active.gauge", (Object)this, "getNumJobs");
        this.unableToCancel = registry.counter("genie.jobs.unableToCancel.rate");
        this.unableToReAttach = registry.counter("genie.jobs.unableToReAttach.rate");
    }

    @EventListener
    public void onStartup(ApplicationReadyEvent event) throws GenieException {
        log.info("Application is ready according to event {}. Attempting to re-attach to any active jobs", (Object)event);
        Set jobs = this.jobSearchService.getAllActiveJobsOnHost(this.hostName);
        if (jobs.isEmpty()) {
            log.info("No jobs currently active on this node.");
            return;
        }
        log.info("{} jobs currently active on this node at startup", (Object)jobs.size());
        for (Job job : jobs) {
            String id = (String)job.getId().orElseThrow(() -> new GenieServerException("Job has no id!"));
            if (this.jobMonitors.containsKey(id) || this.scheduledJobs.containsKey(id)) {
                log.info("Job {} is already being tracked. Ignoring.", (Object)id);
                continue;
            }
            if (job.getStatus() != JobStatus.RUNNING) {
                this.publisher.publishEvent((ApplicationEvent)new JobFinishedEvent(id, JobFinishedReason.SYSTEM_CRASH, "System crashed while job starting", (Object)this));
                continue;
            }
            try {
                this.scheduleMonitor(this.jobSearchService.getJobExecution(id));
                log.info("Re-attached a job monitor to job {}", (Object)id);
            }
            catch (GenieException ge) {
                log.error("Unable to re-attach to job {}.", (Object)id);
                this.publisher.publishEvent((ApplicationEvent)new JobFinishedEvent(id, JobFinishedReason.SYSTEM_CRASH, "Unable to re-attach on startup", (Object)this));
                this.unableToReAttach.increment();
            }
        }
    }

    @EventListener
    public synchronized void onJobScheduled(JobScheduledEvent event) {
        this.scheduledJobs.put(event.getId(), event.getTask());
    }

    @EventListener
    public synchronized void onJobStarted(JobStartedEvent event) {
        String jobId = (String)event.getJobExecution().getId().orElseThrow(IllegalArgumentException::new);
        this.scheduledJobs.remove(jobId);
        if (!this.jobMonitors.containsKey(jobId)) {
            this.scheduleMonitor(event.getJobExecution());
        }
    }

    @EventListener
    public synchronized void onJobFinished(JobFinishedEvent event) {
        String jobId = event.getId();
        if (this.jobMonitors.containsKey(jobId)) {
            if (this.jobMonitors.get(jobId).cancel(true)) {
                log.debug("Successfully cancelled task monitoring job {}", (Object)jobId);
            } else {
                log.error("Unable to cancel task monitoring job {}", (Object)jobId);
                this.unableToCancel.increment();
            }
            this.jobMonitors.remove(jobId);
        } else if (this.scheduledJobs.containsKey(jobId)) {
            Future<?> task = this.scheduledJobs.get(jobId);
            if (!task.isDone()) {
                if (task.cancel(true)) {
                    log.debug("Successfully cancelled job init task for job {}", (Object)jobId);
                } else {
                    log.error("Unable to cancel job init task for job {}", (Object)jobId);
                    this.unableToCancel.increment();
                }
            }
            this.scheduledJobs.remove(jobId);
        }
    }

    public int getNumJobs() {
        return this.jobMonitors.size() + this.scheduledJobs.size();
    }

    private void scheduleMonitor(JobExecution jobExecution) {
        ScheduledFuture future;
        String jobId = (String)jobExecution.getId().orElseThrow(IllegalArgumentException::new);
        File stdOut = new File(this.jobsDir, jobId + "/" + "stdout");
        File stdErr = new File(this.jobsDir, jobId + "/" + "stderr");
        JobMonitor monitor = new JobMonitor(jobExecution, stdOut, stdErr, this.executor, this.publisher, this.registry, this.outputMaxProperties);
        switch (monitor.getScheduleType()) {
            case TRIGGER: {
                future = this.scheduler.schedule((Runnable)monitor, monitor.getTrigger());
                break;
            }
            case FIXED_DELAY: {
                future = this.scheduler.scheduleWithFixedDelay((Runnable)monitor, monitor.getFixedDelay());
                break;
            }
            case FIXED_RATE: {
                future = this.scheduler.scheduleAtFixedRate((Runnable)monitor, monitor.getFixedRate());
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unknown schedule type: " + (Object)((Object)monitor.getScheduleType()));
            }
        }
        this.jobMonitors.put(jobId, future);
        log.info("Scheduled job monitoring for Job {}", (Object)jobExecution.getId());
    }
}

