/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.genie.web.agent.launchers.impl;

import com.fasterxml.jackson.databind.JsonNode;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.netflix.genie.common.external.dtos.v4.JobMetadata;
import com.netflix.genie.web.agent.launchers.AgentLauncher;
import com.netflix.genie.web.data.services.DataServices;
import com.netflix.genie.web.data.services.PersistenceService;
import com.netflix.genie.web.data.services.impl.jpa.queries.aggregates.JobInfoAggregate;
import com.netflix.genie.web.dtos.ResolvedJob;
import com.netflix.genie.web.exceptions.checked.AgentLaunchException;
import com.netflix.genie.web.introspection.GenieWebHostInfo;
import com.netflix.genie.web.introspection.GenieWebRpcInfo;
import com.netflix.genie.web.properties.LocalAgentLauncherProperties;
import com.netflix.genie.web.util.ExecutorFactory;
import com.netflix.genie.web.util.UNIXUtils;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.validation.Valid;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecuteResultHandler;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.ExecuteStreamHandler;
import org.apache.commons.exec.Executor;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.lang3.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.health.Health;

public class LocalAgentLauncherImpl
implements AgentLauncher {
    private static final Logger log = LoggerFactory.getLogger(LocalAgentLauncherImpl.class);
    private static final String NUMBER_ACTIVE_JOBS_KEY = "numActiveJobs";
    private static final String ALLOCATED_MEMORY_KEY = "allocatedMemory";
    private static final String USED_MEMORY_KEY = "usedMemory";
    private static final String AVAILABLE_MEMORY_KEY = "availableMemory";
    private static final String AVAILABLE_MAX_JOB_CAPACITY_KEY = "availableMaxJobCapacity";
    private static final Map<String, String> INFO_UNAVAILABLE_DETAILS = ImmutableMap.of((Object)"jobInfoUnavailable", (Object)"Unable to retrieve host job information. State unknown.");
    private static final String RUN_USER_PLACEHOLDER = "<GENIE_USER>";
    private static final String SETS_ID = "setsid";
    private static final Object MEMORY_CHECK_LOCK = new Object();
    private final String hostname;
    private final PersistenceService persistenceService;
    private final LocalAgentLauncherProperties launcherProperties;
    private final ExecutorFactory executorFactory;
    private final MeterRegistry registry;
    private final Executor sharedExecutor;
    private final int rpcPort;
    private final LoadingCache<String, JobInfoAggregate> jobInfoCache;
    private final AtomicLong numActiveJobs;
    private final AtomicLong usedMemory;

    public LocalAgentLauncherImpl(GenieWebHostInfo hostInfo, GenieWebRpcInfo rpcInfo, DataServices dataServices, LocalAgentLauncherProperties launcherProperties, ExecutorFactory executorFactory, MeterRegistry registry) {
        this.hostname = hostInfo.getHostname();
        this.rpcPort = rpcInfo.getRpcPort();
        this.persistenceService = dataServices.getPersistenceService();
        this.launcherProperties = launcherProperties;
        this.executorFactory = executorFactory;
        this.registry = registry;
        this.sharedExecutor = this.executorFactory.newInstance(false);
        this.numActiveJobs = new AtomicLong(0L);
        this.usedMemory = new AtomicLong(0L);
        HashSet tags = Sets.newHashSet((Object[])new Tag[]{Tag.of((String)"launcherClass", (String)this.getClass().getSimpleName())});
        this.registry.gauge("genie.jobs.active.gauge", (Iterable)tags, (Number)this.numActiveJobs);
        this.registry.gauge("genie.jobs.memory.used.gauge", (Iterable)tags, (Number)this.usedMemory);
        this.jobInfoCache = Caffeine.newBuilder().expireAfterWrite(this.launcherProperties.getHostInfoExpireAfter()).refreshAfterWrite(this.launcherProperties.getHostInfoRefreshAfter()).initialCapacity(1).build(host -> {
            JobInfoAggregate info = this.persistenceService.getHostJobInformation((String)host);
            if (info != null) {
                this.numActiveJobs.set(info.getNumberOfActiveJobs());
                this.usedMemory.set(info.getTotalMemoryAllocated());
            }
            return info;
        });
        try {
            this.jobInfoCache.get((Object)this.hostname);
        }
        catch (Exception e) {
            log.error("Unable to fetch initial job information", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Optional<JsonNode> launchAgent(@Valid ResolvedJob resolvedJob, @Nullable JsonNode requestedLauncherExt) throws AgentLaunchException {
        log.debug("Received request to launch local agent to run job: {}", (Object)resolvedJob);
        JobMetadata jobMetadata = resolvedJob.getJobMetadata();
        String user = jobMetadata.getUser();
        if (this.launcherProperties.isRunAsUserEnabled()) {
            String group = jobMetadata.getGroup().orElse(null);
            try {
                UNIXUtils.createUser(user, group, this.sharedExecutor);
            }
            catch (IOException e) {
                log.error("Failed to create user {}: {}", new Object[]{jobMetadata.getUser(), e.getMessage(), e});
                throw new AgentLaunchException(e);
            }
        }
        int jobMemory = resolvedJob.getJobEnvironment().getMemory();
        String jobId = resolvedJob.getJobSpecification().getJob().getId();
        if (jobMemory > this.launcherProperties.getMaxJobMemory()) {
            throw new AgentLaunchException("Unable to launch job as the requested job memory (" + jobMemory + "MB) exceeds the maximum allowed by the configuration of the system (" + this.launcherProperties.getMaxJobMemory() + "MB)");
        }
        CommandLine commandLine = this.createCommandLine((Map<String, String>)ImmutableMap.of((Object)"<SERVER_PORT_PLACEHOLDER>", (Object)Integer.toString(this.rpcPort), (Object)"<JOB_ID_PLACEHOLDER>", (Object)jobId, (Object)RUN_USER_PLACEHOLDER, (Object)user, (Object)"<AGENT_JAR_PLACEHOLDER>", (Object)this.launcherProperties.getAgentJarPath()));
        Object object = MEMORY_CHECK_LOCK;
        synchronized (object) {
            long usedMemoryOnHost = this.persistenceService.getUsedMemoryOnHost(this.hostname);
            long expectedUsedMemoryOnHost = usedMemoryOnHost + (long)jobMemory;
            if (expectedUsedMemoryOnHost > this.launcherProperties.getMaxTotalJobMemory()) {
                throw new AgentLaunchException("Running job " + jobId + " with " + jobMemory + "MB of memory would cause there to be more memory used than the configured amount of " + this.launcherProperties.getMaxTotalJobMemory() + "MB. " + usedMemoryOnHost + "MB worth of jobs are currently running on this node.");
            }
        }
        HashMap environment = Maps.newHashMap(System.getenv());
        environment.putAll(this.launcherProperties.getAdditionalEnvironment());
        log.debug("Launching agent: {}, env: {}", (Object)commandLine, (Object)environment);
        Executor executor = this.executorFactory.newInstance(true);
        if (this.launcherProperties.isProcessOutputCaptureEnabled()) {
            String debugOutputPath = System.getProperty(SystemUtils.JAVA_IO_TMPDIR, "/tmp") + "/agent-job-" + jobId + ".txt";
            try {
                FileOutputStream fileOutput = new FileOutputStream(debugOutputPath, false);
                executor.setStreamHandler((ExecuteStreamHandler)new PumpStreamHandler((OutputStream)fileOutput));
            }
            catch (FileNotFoundException e) {
                log.error("Failed to create agent process output file", (Throwable)e);
            }
        }
        log.info("Launching agent for job {}", (Object)jobId);
        AgentResultHandler resultHandler = new AgentResultHandler(jobId);
        try {
            executor.execute(commandLine, (Map)environment, (ExecuteResultHandler)resultHandler);
        }
        catch (IOException ioe) {
            throw new AgentLaunchException("Unable to launch agent using command: " + commandLine.toString(), ioe);
        }
        return Optional.empty();
    }

    public Health health() {
        int maxJobMemory;
        JobInfoAggregate jobInfo;
        try {
            jobInfo = (JobInfoAggregate)this.jobInfoCache.get((Object)this.hostname);
        }
        catch (Exception e) {
            log.error("Computing host info threw exception", (Throwable)e);
            return Health.down((Exception)e).build();
        }
        if (jobInfo == null) {
            log.error("Unable to retrieve host info from cache");
            return Health.unknown().withDetails(INFO_UNAVAILABLE_DETAILS).build();
        }
        long memoryAllocated = jobInfo.getTotalMemoryAllocated();
        long availableMemory = this.launcherProperties.getMaxTotalJobMemory() - memoryAllocated;
        Health.Builder builder = availableMemory >= (long)(maxJobMemory = this.launcherProperties.getMaxJobMemory()) ? Health.up() : Health.down();
        return builder.withDetail(NUMBER_ACTIVE_JOBS_KEY, (Object)jobInfo.getNumberOfActiveJobs()).withDetail(ALLOCATED_MEMORY_KEY, (Object)memoryAllocated).withDetail(AVAILABLE_MEMORY_KEY, (Object)availableMemory).withDetail(USED_MEMORY_KEY, (Object)jobInfo.getTotalMemoryUsed()).withDetail(AVAILABLE_MAX_JOB_CAPACITY_KEY, (Object)(availableMemory >= 0L && maxJobMemory > 0 ? availableMemory / (long)maxJobMemory : 0L)).build();
    }

    private CommandLine createCommandLine(Map<String, String> argumentValueReplacements) {
        ArrayList commandLineTemplate = Lists.newArrayList();
        if (SystemUtils.IS_OS_LINUX) {
            commandLineTemplate.add(SETS_ID);
        }
        if (this.launcherProperties.isRunAsUserEnabled()) {
            commandLineTemplate.addAll(Lists.newArrayList((Object[])new String[]{"sudo", "-E", "-u", RUN_USER_PLACEHOLDER}));
        }
        commandLineTemplate.addAll(this.launcherProperties.getLaunchCommandTemplate());
        CommandLine commandLine = new CommandLine((String)commandLineTemplate.get(0));
        for (int i = 1; i < commandLineTemplate.size(); ++i) {
            String argument = (String)commandLineTemplate.get(i);
            commandLine.addArgument(argumentValueReplacements.getOrDefault(argument, argument));
        }
        return commandLine;
    }

    @VisibleForTesting
    static class AgentResultHandler
    extends DefaultExecuteResultHandler {
        private static final Logger log = LoggerFactory.getLogger(AgentResultHandler.class);
        private final String jobId;

        AgentResultHandler(String jobId) {
            this.jobId = jobId;
        }

        public void onProcessComplete(int exitValue) {
            super.onProcessComplete(exitValue);
            log.info("Agent process for job {} completed with exit value {}", (Object)this.jobId, (Object)exitValue);
        }

        public void onProcessFailed(ExecuteException e) {
            super.onProcessFailed(e);
            log.error("Agent process failed for job {} due to {}", new Object[]{this.jobId, e.getMessage(), e});
        }
    }
}

