/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.genie.web.tasks.leader;

import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.common.base.Splitter;
import com.netflix.genie.common.dto.JobStatus;
import com.netflix.genie.common.exceptions.GenieException;
import com.netflix.genie.core.services.JobPersistenceService;
import com.netflix.genie.core.services.JobSearchService;
import com.netflix.genie.web.properties.ClusterCheckerProperties;
import com.netflix.genie.web.tasks.GenieTaskScheduleType;
import com.netflix.genie.web.tasks.leader.LeadershipTask;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.autoconfigure.ManagementServerProperties;
import org.springframework.boot.actuate.health.Status;
import org.springframework.stereotype.Component;
import org.springframework.web.client.HttpStatusCodeException;
import org.springframework.web.client.RestTemplate;

@Component
public class ClusterCheckerTask
extends LeadershipTask {
    private static final Logger log = LoggerFactory.getLogger(ClusterCheckerTask.class);
    private static final String PROPERTY_STATUS = "status";
    private final String hostName;
    private final ClusterCheckerProperties properties;
    private final JobSearchService jobSearchService;
    private final JobPersistenceService jobPersistenceService;
    private final RestTemplate restTemplate;
    private final String scheme;
    private final String healthEndpoint;
    private final ObjectMapper mapper = new ObjectMapper();
    private final List<String> healthIndicatorsToIgnore;
    private final Map<String, Integer> errorCounts = new HashMap<String, Integer>();
    private final Counter lostJobsCounter;
    private final Counter unableToUpdateJobCounter;

    @Autowired
    public ClusterCheckerTask(@NotNull String hostName, @NotNull ClusterCheckerProperties properties, @NotNull JobSearchService jobSearchService, @NotNull JobPersistenceService jobPersistenceService, @Qualifier(value="genieRestTemplate") @NotNull RestTemplate restTemplate, @NotNull ManagementServerProperties managementServerProperties, @Value(value="${genie.tasks.clusterChecker.healthIndicatorsToIgnore:memory,genie,discoveryComposite}") String healthIndicatorsToIgnore, @NotNull Registry registry) {
        this.hostName = hostName;
        this.properties = properties;
        this.jobSearchService = jobSearchService;
        this.jobPersistenceService = jobPersistenceService;
        this.restTemplate = restTemplate;
        this.scheme = this.properties.getScheme() + "://";
        this.healthEndpoint = ":" + this.properties.getPort() + managementServerProperties.getContextPath() + "/health";
        this.healthIndicatorsToIgnore = Splitter.on((String)",").omitEmptyStrings().trimResults().splitToList((CharSequence)healthIndicatorsToIgnore);
        registry.mapSize("genie.tasks.clusterChecker.errorCounts.gauge", this.errorCounts);
        this.lostJobsCounter = registry.counter("genie.tasks.clusterChecker.lostJobs.rate");
        this.unableToUpdateJobCounter = registry.counter("genie.tasks.clusterChecker.unableToUpdateJob.rate");
    }

    @Override
    public void run() {
        log.info("Checking for cluster node health...");
        this.jobSearchService.getAllHostsWithActiveJobs().stream().filter(host -> !this.hostName.equals(host)).forEach(this::validateHostAndUpdateErrorCount);
        this.errorCounts.entrySet().removeIf(entry -> {
            String host = (String)entry.getKey();
            boolean result = true;
            if ((Integer)entry.getValue() >= this.properties.getLostThreshold()) {
                try {
                    this.updateJobsToFailedOnHost(host);
                }
                catch (Exception e) {
                    log.error("Unable to update jobs on host {} due to exception", (Object)host, (Object)e);
                    this.unableToUpdateJobCounter.increment();
                    result = false;
                }
            } else {
                result = false;
            }
            return result;
        });
        log.info("Finished checking for cluster node health.");
    }

    private void updateJobsToFailedOnHost(String host) {
        Set jobs = this.jobSearchService.getAllActiveJobsOnHost(host);
        jobs.forEach(job -> {
            try {
                this.jobPersistenceService.setJobCompletionInformation((String)job.getId().orElseThrow(IllegalArgumentException::new), 666, JobStatus.FAILED, "Genie leader can't reach node running job. Assuming node and job are lost.", null, null);
                this.lostJobsCounter.increment();
            }
            catch (GenieException ge) {
                log.error("Unable to update job {} to failed due to exception", (Object)job.getId(), (Object)ge);
                this.unableToUpdateJobCounter.increment();
            }
        });
    }

    private void validateHostAndUpdateErrorCount(String host) {
        if (this.isNodeHealthy(host)) {
            if (this.errorCounts.containsKey(host)) {
                this.errorCounts.remove(host);
            }
        } else if (this.errorCounts.containsKey(host)) {
            this.errorCounts.put(host, this.errorCounts.get(host) + 1);
        } else {
            this.errorCounts.put(host, 1);
        }
    }

    private boolean isNodeHealthy(String host) {
        boolean result = true;
        try {
            this.restTemplate.getForObject(this.scheme + host + this.healthEndpoint, String.class, new Object[0]);
        }
        catch (HttpStatusCodeException e) {
            log.error("Failed validating host {}", (Object)host, (Object)e);
            try {
                Map responseMap = (Map)this.mapper.readValue(e.getResponseBodyAsByteArray(), (JavaType)TypeFactory.defaultInstance().constructMapType(Map.class, String.class, Object.class));
                for (Map.Entry responseEntry : responseMap.entrySet()) {
                    if (!(responseEntry.getValue() instanceof Map) || this.healthIndicatorsToIgnore.contains(responseEntry.getKey()) || Status.UP.getCode().equals(((Map)responseEntry.getValue()).get(PROPERTY_STATUS))) continue;
                    result = false;
                }
            }
            catch (Exception ex) {
                log.error("Failed reading the error response when validating host {}", (Object)host, (Object)ex);
                result = false;
            }
        }
        catch (Exception e) {
            log.error("Unable to reach {}", (Object)host, (Object)e);
            result = false;
        }
        return result;
    }

    @Override
    public GenieTaskScheduleType getScheduleType() {
        return GenieTaskScheduleType.FIXED_RATE;
    }

    @Override
    public long getFixedRate() {
        return this.properties.getRate();
    }

    protected int getErrorCountsSize() {
        return this.errorCounts.size();
    }
}

