/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.genie.web.tasks.leader;

import com.google.common.base.Splitter;
import com.netflix.genie.common.dto.Job;
import com.netflix.genie.common.dto.JobStatus;
import com.netflix.genie.common.exceptions.GenieException;
import com.netflix.genie.common.external.util.GenieObjectMapper;
import com.netflix.genie.common.internal.util.GenieHostInfo;
import com.netflix.genie.web.data.services.AgentConnectionPersistenceService;
import com.netflix.genie.web.data.services.JobPersistenceService;
import com.netflix.genie.web.data.services.JobSearchService;
import com.netflix.genie.web.properties.ClusterCheckerProperties;
import com.netflix.genie.web.tasks.GenieTaskScheduleType;
import com.netflix.genie.web.tasks.leader.LeadershipTask;
import com.netflix.genie.web.util.MetricsUtils;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.autoconfigure.endpoint.web.WebEndpointProperties;
import org.springframework.boot.actuate.health.Status;
import org.springframework.web.client.HttpStatusCodeException;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;

public class ClusterCheckerTask
extends LeadershipTask {
    private static final Logger log = LoggerFactory.getLogger(ClusterCheckerTask.class);
    private static final String UNHEALTHY_HOSTS_GAUGE_METRIC_NAME = "genie.tasks.clusterChecker.unhealthyHosts.gauge";
    private static final String BAD_HEALTH_COUNT_METRIC_NAME = "genie.tasks.clusterChecker.failedHealthcheck.counter";
    private static final String BAD_RESPONSE_COUNT_METRIC_NAME = "genie.tasks.clusterChecker.invalidResponse.counter";
    private static final String BAD_HOST_COUNT_METRIC_NAME = "genie.tasks.clusterChecker.unreachableHost.counter";
    private static final String FAILED_JOBS_COUNT_METRIC_NAME = "genie.tasks.clusterChecker.jobsMarkedFailed.counter";
    private static final String REAPED_CONNECTIONS_METRIC_NAME = "genie.tasks.clusterChecker.connectionsReaped.counter";
    private final String hostname;
    private final ClusterCheckerProperties properties;
    private final JobSearchService jobSearchService;
    private final JobPersistenceService jobPersistenceService;
    private final AgentConnectionPersistenceService agentConnectionPersistenceService;
    private final RestTemplate restTemplate;
    private final MeterRegistry registry;
    private final String scheme;
    private final String healthEndpoint;
    private final List<String> healthIndicatorsToIgnore;
    private final Map<String, Integer> errorCounts = new HashMap<String, Integer>();

    public ClusterCheckerTask(@NotNull GenieHostInfo genieHostInfo, @NotNull ClusterCheckerProperties properties, @NotNull JobSearchService jobSearchService, @NotNull JobPersistenceService jobPersistenceService, @NotNull AgentConnectionPersistenceService agentConnectionPersistenceService, @NotNull RestTemplate restTemplate, @NotNull WebEndpointProperties webEndpointProperties, @NotNull MeterRegistry registry) {
        this.hostname = genieHostInfo.getHostname();
        this.properties = properties;
        this.jobSearchService = jobSearchService;
        this.jobPersistenceService = jobPersistenceService;
        this.agentConnectionPersistenceService = agentConnectionPersistenceService;
        this.restTemplate = restTemplate;
        this.registry = registry;
        this.scheme = this.properties.getScheme() + "://";
        this.healthEndpoint = ":" + this.properties.getPort() + webEndpointProperties.getBasePath() + "/health";
        this.healthIndicatorsToIgnore = Splitter.on((String)",").omitEmptyStrings().trimResults().splitToList((CharSequence)properties.getHealthIndicatorsToIgnore());
        Gauge.builder((String)UNHEALTHY_HOSTS_GAUGE_METRIC_NAME, this.errorCounts, Map::size).register(registry);
    }

    @Override
    public void run() {
        log.info("Checking for cluster node health...");
        this.jobSearchService.getAllHostsWithActiveJobs().stream().filter(host -> !this.hostname.equals(host)).forEach(this::validateHostAndUpdateErrorCount);
        this.errorCounts.entrySet().removeIf(entry -> {
            String host = (String)entry.getKey();
            boolean result = true;
            if ((Integer)entry.getValue() >= this.properties.getLostThreshold()) {
                try {
                    this.updateJobsToFailedOnHost(host);
                }
                catch (Exception e) {
                    log.error("Unable to update jobs on host {} due to exception", (Object)host, (Object)e);
                    result = false;
                }
                try {
                    this.cleanupAgentConnectionsToHost(host);
                }
                catch (RuntimeException e) {
                    log.error("Unable to drop agent connections to host {} due to exception", (Object)host, (Object)e);
                    result = false;
                }
            } else {
                result = false;
            }
            return result;
        });
        log.info("Finished checking for cluster node health.");
    }

    private void updateJobsToFailedOnHost(String host) {
        Set<Job> jobs = this.jobSearchService.getAllActiveJobsOnHost(host);
        jobs.forEach(job -> {
            Set<Tag> tags = MetricsUtils.newSuccessTagsSet();
            tags.add(Tag.of((String)"host", (String)host));
            try {
                this.jobPersistenceService.setJobCompletionInformation((String)job.getId().orElseThrow(IllegalArgumentException::new), 666, JobStatus.FAILED, "Genie leader can't reach node running job. Assuming node and job are lost.", null, null);
            }
            catch (GenieException ge) {
                MetricsUtils.addFailureTagsWithException(tags, ge);
                log.error("Unable to update job {} to failed due to exception", (Object)job.getId(), (Object)ge);
                throw new RuntimeException("Failed to update job", ge);
            }
            finally {
                this.registry.counter(FAILED_JOBS_COUNT_METRIC_NAME, tags).increment();
            }
        });
    }

    private void cleanupAgentConnectionsToHost(String host) {
        Set<Tag> tags = MetricsUtils.newSuccessTagsSet();
        tags.add(Tag.of((String)"host", (String)host));
        int reapedConnectionsCount = 1;
        try {
            reapedConnectionsCount = this.agentConnectionPersistenceService.removeAllAgentConnectionToServer(host);
            log.info("Dropped {} agent connections to host {}", (Object)reapedConnectionsCount, (Object)host);
        }
        catch (RuntimeException e) {
            MetricsUtils.addFailureTagsWithException(tags, e);
            log.error("Unable to drop agent connections to host {}", (Object)host, (Object)e);
            throw e;
        }
        finally {
            this.registry.counter(REAPED_CONNECTIONS_METRIC_NAME, tags).increment((double)reapedConnectionsCount);
        }
    }

    private void validateHostAndUpdateErrorCount(String host) {
        if (this.isNodeHealthy(host)) {
            if (this.errorCounts.remove(host) != null) {
                log.info("Host {} is no longer unhealthy", (Object)host);
            }
        } else if (this.errorCounts.containsKey(host)) {
            int currentCount = this.errorCounts.get(host) + 1;
            log.info("Host still unhealthy (check #{}): {}", (Object)currentCount, (Object)host);
            this.errorCounts.put(host, currentCount);
        } else {
            log.info("Marking host unhealthy: {}", (Object)host);
            this.errorCounts.put(host, 1);
        }
    }

    private boolean isNodeHealthy(String host) {
        HealthEndpointResponse healthEndpointResponse;
        String responseContent;
        try {
            responseContent = (String)this.restTemplate.getForObject(this.scheme + host + this.healthEndpoint, String.class, new Object[0]);
            log.debug("Healtcheck retrieved successfully from: {}", (Object)host);
        }
        catch (HttpStatusCodeException e) {
            log.warn("Host {} healthcheck returned code: {}", new Object[]{host, e.getStatusCode(), e});
            responseContent = e.getResponseBodyAsString();
        }
        catch (RestClientException e) {
            log.warn("Unable to request healtcheck response from host: {}", (Object)host, (Object)e);
            this.registry.counter(BAD_HOST_COUNT_METRIC_NAME, new String[]{"host", host}).increment();
            return false;
        }
        try {
            healthEndpointResponse = (HealthEndpointResponse)GenieObjectMapper.getMapper().readValue(responseContent, HealthEndpointResponse.class);
        }
        catch (IOException ex) {
            log.warn("Failed to parse healthcheck response from host: {}: {}", (Object)host, (Object)ex.getMessage());
            this.registry.counter(BAD_RESPONSE_COUNT_METRIC_NAME, new String[]{"host", host}).increment();
            return false;
        }
        boolean hostHealthy = true;
        for (Map.Entry<String, HealthIndicatorDetails> entry : healthEndpointResponse.getComponents().entrySet()) {
            String healthIndicatorName = entry.getKey();
            HealthIndicatorDetails healthIndicator = entry.getValue();
            if (this.healthIndicatorsToIgnore.contains(healthIndicatorName)) {
                log.debug("Ignoring indicator: {}", (Object)healthIndicatorName);
                continue;
            }
            if (Status.UP.getCode().equals(healthIndicator.getStatus().getCode())) {
                log.debug("Indicator {} is UP", (Object)healthIndicatorName);
                continue;
            }
            hostHealthy = false;
            this.registry.counter(BAD_HEALTH_COUNT_METRIC_NAME, new String[]{"host", host, "healthIndicator", healthIndicatorName, "healthStatus", healthIndicator.getStatus().getCode()}).increment();
        }
        return hostHealthy;
    }

    @Override
    public GenieTaskScheduleType getScheduleType() {
        return GenieTaskScheduleType.FIXED_RATE;
    }

    @Override
    public long getFixedRate() {
        return this.properties.getRate();
    }

    @Override
    public void cleanup() {
        this.errorCounts.clear();
    }

    int getErrorCountsSize() {
        return this.errorCounts.size();
    }

    private static class HealthEndpointResponse {
        private Status status;
        private Map<String, HealthIndicatorDetails> components;

        public Status getStatus() {
            return this.status;
        }

        public Map<String, HealthIndicatorDetails> getComponents() {
            return this.components;
        }

        public void setStatus(Status status) {
            this.status = status;
        }

        public void setComponents(Map<String, HealthIndicatorDetails> components) {
            this.components = components;
        }
    }

    private static class HealthIndicatorDetails {
        private Status status;
        private Map<String, Object> details;

        public Status getStatus() {
            return this.status;
        }

        public Map<String, Object> getDetails() {
            return this.details;
        }

        public void setStatus(Status status) {
            this.status = status;
        }

        public void setDetails(Map<String, Object> details) {
            this.details = details;
        }
    }
}

