/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.genie.web.services.impl;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.netflix.genie.common.dto.Job;
import com.netflix.genie.common.dto.JobExecution;
import com.netflix.genie.common.dto.JobMetadata;
import com.netflix.genie.common.dto.JobRequest;
import com.netflix.genie.common.dto.JobStatus;
import com.netflix.genie.common.exceptions.GenieConflictException;
import com.netflix.genie.common.exceptions.GenieException;
import com.netflix.genie.common.exceptions.GeniePreconditionException;
import com.netflix.genie.common.exceptions.GenieServerException;
import com.netflix.genie.common.exceptions.GenieServerUnavailableException;
import com.netflix.genie.common.exceptions.GenieUserLimitExceededException;
import com.netflix.genie.common.external.dtos.v4.Application;
import com.netflix.genie.common.external.dtos.v4.Cluster;
import com.netflix.genie.common.external.dtos.v4.Command;
import com.netflix.genie.common.external.dtos.v4.JobSpecification;
import com.netflix.genie.common.internal.dtos.v4.converters.DtoConverters;
import com.netflix.genie.common.internal.exceptions.checked.GenieJobResolutionException;
import com.netflix.genie.web.data.services.ApplicationPersistenceService;
import com.netflix.genie.web.data.services.ClusterPersistenceService;
import com.netflix.genie.web.data.services.CommandPersistenceService;
import com.netflix.genie.web.data.services.DataServices;
import com.netflix.genie.web.data.services.JobPersistenceService;
import com.netflix.genie.web.data.services.JobSearchService;
import com.netflix.genie.web.properties.JobsActiveLimitProperties;
import com.netflix.genie.web.properties.JobsProperties;
import com.netflix.genie.web.services.JobCoordinatorService;
import com.netflix.genie.web.services.JobKillService;
import com.netflix.genie.web.services.JobResolverService;
import com.netflix.genie.web.services.JobStateService;
import com.netflix.genie.web.util.MetricsUtils;
import io.micrometer.core.instrument.MeterRegistry;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.validation.Valid;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobCoordinatorServiceImpl
implements JobCoordinatorService {
    private static final Logger log = LoggerFactory.getLogger(JobCoordinatorServiceImpl.class);
    static final String OVERALL_COORDINATION_TIMER_NAME = "genie.jobs.coordination.timer";
    static final String SET_JOB_ENVIRONMENT_TIMER_NAME = "genie.jobs.submit.localRunner.setJobEnvironment.timer";
    static final String USER_JOB_LIMIT_EXCEEDED_COUNTER_NAME = "genie.jobs.submit.rejected.jobs-limit.counter";
    private static final String NO_ID_FOUND = "No id found";
    private final JobPersistenceService jobPersistenceService;
    private final JobKillService jobKillService;
    private final JobStateService jobStateService;
    private final ApplicationPersistenceService applicationPersistenceService;
    private final JobSearchService jobSearchService;
    private final ClusterPersistenceService clusterPersistenceService;
    private final CommandPersistenceService commandPersistenceService;
    private final JobResolverService jobResolverService;
    private final JobsProperties jobsProperties;
    private final String hostname;
    private final MeterRegistry registry;

    public JobCoordinatorServiceImpl(@NotNull DataServices dataServices, @NotNull JobKillService jobKillService, @NotNull JobStateService jobStateService, @NotNull JobsProperties jobsProperties, @NotNull JobResolverService jobResolverService, @NotNull MeterRegistry registry, @NotBlank String hostname) {
        this.jobPersistenceService = dataServices.getJobPersistenceService();
        this.jobKillService = jobKillService;
        this.jobStateService = jobStateService;
        this.applicationPersistenceService = dataServices.getApplicationPersistenceService();
        this.jobSearchService = dataServices.getJobSearchService();
        this.clusterPersistenceService = dataServices.getClusterPersistenceService();
        this.commandPersistenceService = dataServices.getCommandPersistenceService();
        this.jobResolverService = jobResolverService;
        this.jobsProperties = jobsProperties;
        this.hostname = hostname;
        this.registry = registry;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    @Override
    public String coordinateJob(@Valid @NotNull(message="No job request provided. Unable to execute.") @Valid @NotNull(message="No job request provided. Unable to execute.") JobRequest jobRequest, @Valid @NotNull(message="No job metadata provided. Unable to execute.") @Valid @NotNull(message="No job metadata provided. Unable to execute.") JobMetadata jobMetadata) throws GenieException {
        long coordinationStart = System.nanoTime();
        HashSet tags = Sets.newHashSet();
        String jobId = (String)jobRequest.getId().orElseThrow(() -> new GenieServerException("Id of the jobRequest cannot be null"));
        JobStatus jobStatus = JobStatus.FAILED;
        try {
            JobSpecification jobSpecification;
            log.info("Called to schedule job launch for job {}", (Object)jobId);
            Job.Builder jobBuilder = ((Job.Builder)((Job.Builder)new Job.Builder(jobRequest.getName(), jobRequest.getUser(), jobRequest.getVersion()).withId(jobId)).withTags(jobRequest.getTags())).withStatus(JobStatus.INIT).withStatusMsg("Job Accepted and in initialization phase.");
            jobRequest.getCommandArgs().ifPresent(arg_0 -> ((Job.Builder)jobBuilder).withCommandArgs(arg_0));
            jobRequest.getDescription().ifPresent(arg_0 -> ((Job.Builder)jobBuilder).withDescription(arg_0));
            String archiveRoot = this.jobsProperties.getLocations().getArchives().toString();
            if (!archiveRoot.endsWith("/")) {
                archiveRoot = archiveRoot + "/";
            }
            jobBuilder.withArchiveLocation(archiveRoot + jobId);
            JobExecution jobExecution = ((JobExecution.Builder)new JobExecution.Builder(this.hostname).withId(jobId)).build();
            this.jobPersistenceService.createJob(jobRequest, jobMetadata, jobBuilder.build(), jobExecution);
            this.jobStateService.init(jobId);
            log.info("Finding possible clusters and commands for job {}", (Object)jobRequest.getId().orElse(NO_ID_FOUND));
            try {
                jobSpecification = this.jobResolverService.resolveJob(jobId, DtoConverters.toV4JobRequest((JobRequest)jobRequest), true).getJobSpecification();
            }
            catch (GenieJobResolutionException e) {
                throw new GeniePreconditionException(e.getMessage(), (Throwable)e);
            }
            Cluster cluster = this.clusterPersistenceService.getCluster(jobSpecification.getCluster().getId());
            Command command = this.commandPersistenceService.getCommand(jobSpecification.getCommand().getId());
            int memory = jobRequest.getMemory().orElse(command.getMemory().orElse(this.jobsProperties.getMemory().getDefaultJobMemory()));
            ImmutableList.Builder applicationsBuilder = ImmutableList.builder();
            for (JobSpecification.ExecutionResource applicationResource : jobSpecification.getApplications()) {
                applicationsBuilder.add((Object)this.applicationPersistenceService.getApplication(applicationResource.getId()));
            }
            ImmutableList applications = applicationsBuilder.build();
            this.setRuntimeEnvironment(jobId, cluster, command, (List<Application>)applications, memory);
            int maxJobMemory = this.jobsProperties.getMemory().getMaxJobMemory();
            if (memory > maxJobMemory) {
                jobStatus = JobStatus.INVALID;
                throw new GeniePreconditionException("Requested " + memory + " MB to run job which is more than the " + maxJobMemory + " MB allowed");
            }
            log.info("Checking if can run job {} from user {}", (Object)jobRequest.getId(), (Object)jobRequest.getUser());
            JobsActiveLimitProperties activeLimit = this.jobsProperties.getActiveLimit();
            if (activeLimit.isEnabled()) {
                long activeJobsLimit = activeLimit.getUserLimit(jobRequest.getUser());
                long activeJobsCount = this.jobSearchService.getActiveJobCountForUser(jobRequest.getUser());
                if (activeJobsCount >= activeJobsLimit) {
                    this.registry.counter(USER_JOB_LIMIT_EXCEEDED_COUNTER_NAME, new String[]{"user", jobRequest.getUser(), "jobsUserLimit", String.valueOf(activeJobsLimit)}).increment();
                    throw GenieUserLimitExceededException.createForActiveJobsLimit((String)jobRequest.getUser(), (long)activeJobsCount, (long)activeJobsLimit);
                }
            }
            JobCoordinatorServiceImpl jobCoordinatorServiceImpl = this;
            // MONITORENTER : jobCoordinatorServiceImpl
            log.info("Checking if can run job {} on this node", (Object)jobRequest.getId());
            int maxSystemMemory = this.jobsProperties.getMemory().getMaxSystemMemory();
            int usedMemory = this.jobStateService.getUsedMemory();
            if (usedMemory + memory <= maxSystemMemory) {
                log.info("Job {} can run on this node as only {}/{} MB are used and requested {} MB", new Object[]{jobId, usedMemory, maxSystemMemory, memory});
                log.info("Publishing job scheduled event for job {}", (Object)jobId);
                this.jobStateService.schedule(jobId, jobRequest, cluster, command, (List<Application>)applications, memory);
                MetricsUtils.addSuccessTags(tags);
                String string = jobId;
                // MONITOREXIT : jobCoordinatorServiceImpl
                return string;
            }
            throw new GenieServerUnavailableException("Job " + jobId + " can't run on this node " + usedMemory + "/" + maxSystemMemory + " MB are used and requested " + memory + " MB");
        }
        catch (GenieConflictException e) {
            MetricsUtils.addFailureTagsWithException(tags, e);
            throw e;
        }
        catch (GenieException e) {
            MetricsUtils.addFailureTagsWithException(tags, e);
            if (!this.jobStateService.jobExists(jobId)) throw e;
            this.jobStateService.done(jobId);
            this.jobPersistenceService.updateJobStatus(jobId, jobStatus, e.getMessage());
            throw e;
        }
        catch (Exception e) {
            MetricsUtils.addFailureTagsWithException(tags, e);
            if (!this.jobStateService.jobExists(jobId)) throw new GenieServerException("Failed to coordinate job launch", (Throwable)e);
            this.jobStateService.done(jobId);
            this.jobPersistenceService.updateJobStatus(jobId, jobStatus, e.getMessage());
            throw new GenieServerException("Failed to coordinate job launch", (Throwable)e);
        }
        catch (Throwable t) {
            MetricsUtils.addFailureTagsWithException(tags, t);
            throw t;
        }
        finally {
            this.registry.timer(OVERALL_COORDINATION_TIMER_NAME, (Iterable)tags).record(System.nanoTime() - coordinationStart, TimeUnit.NANOSECONDS);
        }
    }

    @Override
    public void killJob(@NotBlank String jobId, @NotBlank String reason) throws GenieException {
        this.jobKillService.killJob(jobId, reason);
    }

    private void setRuntimeEnvironment(String jobId, Cluster cluster, Command command, List<Application> applications, int memory) throws GenieException {
        long jobEnvironmentStart = System.nanoTime();
        HashSet tags = Sets.newHashSet();
        try {
            String clusterId = cluster.getId();
            String commandId = command.getId();
            this.jobPersistenceService.updateJobWithRuntimeEnvironment(jobId, clusterId, commandId, applications.stream().map(rec$ -> ((Application)rec$).getId()).collect(Collectors.toList()), memory);
            MetricsUtils.addSuccessTags(tags);
        }
        catch (Throwable t) {
            MetricsUtils.addFailureTagsWithException(tags, t);
            throw t;
        }
        finally {
            this.registry.timer(SET_JOB_ENVIRONMENT_TIMER_NAME, (Iterable)tags).record(System.nanoTime() - jobEnvironmentStart, TimeUnit.NANOSECONDS);
        }
    }
}

