package com.netflix.genie.web.agent.services.impl;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.netflix.genie.common.internal.util.GenieHostInfo;
import com.netflix.genie.web.agent.services.AgentRoutingService;
import com.netflix.genie.web.properties.AgentRoutingServiceProperties;
import com.netflix.genie.web.util.MetricsUtils;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.validation.constraints.NotBlank;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceType;
import org.apache.curator.x.discovery.UriSpec;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.TaskScheduler;

/* loaded from: input_file:com/netflix/genie/web/agent/services/impl/AgentRoutingServiceCuratorDiscoveryImpl.class */
public class AgentRoutingServiceCuratorDiscoveryImpl implements AgentRoutingService {
    private static final String SERVICE_NAME = "agent_connections";
    private static final String METRICS_PREFIX = "genie.agents.connections.";
    private static final String CONNECTED_AGENTS_GAUGE_NAME = "genie.agents.connections.connected.gauge";
    private static final String REGISTERED_AGENTS_GAUGE_NAME = "genie.agents.connections.registered.gauge";
    private static final String ZOOKEEPER_SESSION_STATE_COUNTER_NAME = "genie.agents.connections.zookeeperSessionState.counter";
    private static final String AGENT_REGISTERED_TIMER_NAME = "genie.agents.connections.registered.timer";
    private static final String AGENT_UNREGISTERED_TIMER_NAME = "genie.agents.connections.unregistered.timer";
    private static final String AGENT_REFRESH_TIMER_NAME = "genie.agents.connections.refreshed.timer";
    private static final String AGENT_CONNECTED_COUNTER_NAME = "genie.agents.connections.connected.counter";
    private static final String AGENT_DISCONNECTED_COUNTER_NAME = "genie.agents.connections.disconnected.counter";
    private static final String AGENT_LOOKUP_TIMER_NAME = "genie.agents.connections.lookup.timer";
    private static final String ZK_CONNECTION_STATE_TAG_NAME = "connectionState";
    private static final String ROUTE_FOUND_TAG_NAME = "found";
    private final String localHostname;
    private final ServiceDiscovery<Agent> serviceDiscovery;
    private final TaskScheduler taskScheduler;
    private final MeterRegistry registry;
    private final AgentRoutingServiceProperties properties;
    private final Set<String> connectedAgentsSet;
    private final Map<String, ServiceInstance<Agent>> registeredAgentsMap;
    private final PriorityBlockingQueue<RegisterMutation> registrationQueue;
    private final AtomicReference<Thread> registrationTaskThread;
    private final ThreadFactory threadFactory;
    private static final Logger log = LoggerFactory.getLogger(AgentRoutingServiceCuratorDiscoveryImpl.class);
    private static final Set<Tag> EMPTY_TAG_SET = ImmutableSet.of();

    /* renamed from: com.netflix.genie.web.agent.services.impl.AgentRoutingServiceCuratorDiscoveryImpl$2, reason: invalid class name */
    /* loaded from: input_file:com/netflix/genie/web/agent/services/impl/AgentRoutingServiceCuratorDiscoveryImpl$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$state$ConnectionState = new int[ConnectionState.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$state$ConnectionState[ConnectionState.CONNECTED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$state$ConnectionState[ConnectionState.RECONNECTED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$state$ConnectionState[ConnectionState.LOST.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$state$ConnectionState[ConnectionState.SUSPENDED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:com/netflix/genie/web/agent/services/impl/AgentRoutingServiceCuratorDiscoveryImpl$Agent.class */
    public static final class Agent {
        private final String jobId;

        private Agent() {
            this.jobId = null;
        }

        public Agent(@JsonProperty(value = "jobId", required = true) String str) {
            this.jobId = str;
        }

        public String getJobId() {
            return this.jobId;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Agent)) {
                return false;
            }
            String jobId = getJobId();
            String jobId2 = ((Agent) obj).getJobId();
            return jobId == null ? jobId2 == null : jobId.equals(jobId2);
        }

        public int hashCode() {
            String jobId = getJobId();
            return (1 * 59) + (jobId == null ? 43 : jobId.hashCode());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/netflix/genie/web/agent/services/impl/AgentRoutingServiceCuratorDiscoveryImpl$RegisterMutation.class */
    public static final class RegisterMutation implements Comparable<RegisterMutation> {
        private final String jobId;
        private final boolean refresh;
        private final long timestamp = System.nanoTime();

        private RegisterMutation(String str, boolean z) {
            this.jobId = str;
            this.refresh = z;
        }

        static RegisterMutation refresh(String str) {
            return new RegisterMutation(str, true);
        }

        static RegisterMutation update(String str) {
            return new RegisterMutation(str, false);
        }

        @Override // java.lang.Comparable
        public int compareTo(RegisterMutation registerMutation) {
            return isRefresh() == registerMutation.isRefresh() ? getTimestamp() - registerMutation.getTimestamp() == 0 ? getJobId().compareTo(registerMutation.getJobId()) : this.timestamp > 0 ? 1 : -1 : isRefresh() ? 1 : -1;
        }

        public String getJobId() {
            return this.jobId;
        }

        public boolean isRefresh() {
            return this.refresh;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof RegisterMutation)) {
                return false;
            }
            RegisterMutation registerMutation = (RegisterMutation) obj;
            if (isRefresh() != registerMutation.isRefresh() || getTimestamp() != registerMutation.getTimestamp()) {
                return false;
            }
            String jobId = getJobId();
            String jobId2 = registerMutation.getJobId();
            return jobId == null ? jobId2 == null : jobId.equals(jobId2);
        }

        public int hashCode() {
            int i = (1 * 59) + (isRefresh() ? 79 : 97);
            long timestamp = getTimestamp();
            int i2 = (i * 59) + ((int) ((timestamp >>> 32) ^ timestamp));
            String jobId = getJobId();
            return (i2 * 59) + (jobId == null ? 43 : jobId.hashCode());
        }
    }

    public AgentRoutingServiceCuratorDiscoveryImpl(GenieHostInfo genieHostInfo, ServiceDiscovery<Agent> serviceDiscovery, TaskScheduler taskScheduler, Listenable<ConnectionStateListener> listenable, MeterRegistry meterRegistry, AgentRoutingServiceProperties agentRoutingServiceProperties) {
        this(genieHostInfo, serviceDiscovery, taskScheduler, listenable, meterRegistry, agentRoutingServiceProperties, new ThreadFactory() { // from class: com.netflix.genie.web.agent.services.impl.AgentRoutingServiceCuratorDiscoveryImpl.1
            private final AtomicLong threadCounter = new AtomicLong();

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, getClass().getSimpleName() + "-registration-" + this.threadCounter.incrementAndGet());
            }
        });
    }

    @VisibleForTesting
    AgentRoutingServiceCuratorDiscoveryImpl(GenieHostInfo genieHostInfo, ServiceDiscovery<Agent> serviceDiscovery, TaskScheduler taskScheduler, Listenable<ConnectionStateListener> listenable, MeterRegistry meterRegistry, AgentRoutingServiceProperties agentRoutingServiceProperties, ThreadFactory threadFactory) {
        this.localHostname = genieHostInfo.getHostname();
        this.serviceDiscovery = serviceDiscovery;
        this.taskScheduler = taskScheduler;
        this.registry = meterRegistry;
        this.threadFactory = threadFactory;
        this.properties = agentRoutingServiceProperties;
        this.registeredAgentsMap = Maps.newConcurrentMap();
        this.connectedAgentsSet = Sets.newConcurrentHashSet();
        this.registrationQueue = new PriorityBlockingQueue<>();
        this.registrationTaskThread = new AtomicReference<>();
        meterRegistry.gauge(CONNECTED_AGENTS_GAUGE_NAME, EMPTY_TAG_SET, this.connectedAgentsSet, (v0) -> {
            return v0.size();
        });
        meterRegistry.gaugeMapSize(REGISTERED_AGENTS_GAUGE_NAME, EMPTY_TAG_SET, this.registeredAgentsMap);
        listenable.addListener(this::handleConnectionStateChange);
        startRegistrationThread();
    }

    private void startRegistrationThread() {
        Thread newThread = this.threadFactory.newThread(this::registrationTask);
        Thread andSet = this.registrationTaskThread.getAndSet(newThread);
        if (andSet != null) {
            andSet.interrupt();
        }
        newThread.start();
    }

    private void stopRegistrationThread() {
        Thread andSet = this.registrationTaskThread.getAndSet(null);
        if (andSet != null) {
            andSet.interrupt();
        }
    }

    private void registrationTask() {
        while (true) {
            try {
                processNextRegistrationMutation();
            } catch (InterruptedException e) {
                log.debug("Registration thread terminating");
                return;
            }
        }
    }

    private void processNextRegistrationMutation() throws InterruptedException {
        try {
            RegisterMutation take = this.registrationQueue.take();
            String jobId = take.getJobId();
            if (this.connectedAgentsSet.contains(take.getJobId())) {
                if (take.isRefresh()) {
                    refreshAgentConnection(jobId);
                } else {
                    registerAgentConnection(jobId);
                }
                this.taskScheduler.schedule(() -> {
                    this.registrationQueue.add(RegisterMutation.refresh(jobId));
                }, Instant.now().plus((TemporalAmount) this.properties.getRefreshInterval()));
            } else {
                unregisterAgentConnection(jobId);
            }
        } catch (InterruptedException e) {
            log.warn("Registration task interrupted", e);
            if (0 != 0) {
                this.registrationQueue.add(null);
            }
            throw e;
        }
    }

    private void registerAgentConnection(String str) throws InterruptedException {
        log.debug("Registering route for job: {}", str);
        ServiceInstance<Agent> serviceInstance = new ServiceInstance<>(SERVICE_NAME, str, this.localHostname, (Integer) null, (Integer) null, new Agent(str), Instant.now().getEpochSecond(), ServiceType.DYNAMIC, (UriSpec) null);
        Set<Tag> newSuccessTagsSet = MetricsUtils.newSuccessTagsSet();
        long nanoTime = System.nanoTime();
        try {
            try {
                this.serviceDiscovery.registerService(serviceInstance);
                this.registeredAgentsMap.put(str, serviceInstance);
                this.registry.timer(AGENT_REGISTERED_TIMER_NAME, newSuccessTagsSet).record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            } catch (InterruptedException e) {
                log.debug("Interrupted while registering {}", str);
                MetricsUtils.newFailureTagsSetForException(e);
                throw e;
            } catch (Exception e2) {
                log.error("Failed to register agent executing job: {}", str, e2);
                this.registry.timer(AGENT_REGISTERED_TIMER_NAME, MetricsUtils.newFailureTagsSetForException(e2)).record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            }
        } catch (Throwable th) {
            this.registry.timer(AGENT_REGISTERED_TIMER_NAME, newSuccessTagsSet).record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            throw th;
        }
    }

    private void refreshAgentConnection(String str) throws InterruptedException {
        log.debug("Refreshing route for job: {}", str);
        ServiceInstance<Agent> serviceInstance = this.registeredAgentsMap.get(str);
        if (serviceInstance == null) {
            log.warn("Instance record not found for job {}", str);
            registerAgentConnection(str);
            return;
        }
        Set<Tag> newSuccessTagsSet = MetricsUtils.newSuccessTagsSet();
        long nanoTime = System.nanoTime();
        try {
            try {
                try {
                    try {
                        this.serviceDiscovery.updateService(serviceInstance);
                        this.registry.timer(AGENT_REFRESH_TIMER_NAME, newSuccessTagsSet).record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                    } catch (InterruptedException e) {
                        log.debug("Interrupted while refreshing {}", str);
                        MetricsUtils.newFailureTagsSetForException(e);
                        throw e;
                    }
                } catch (Exception e2) {
                    log.error("Failed to refresh agent executing job id: {}", str);
                    this.registry.timer(AGENT_REFRESH_TIMER_NAME, MetricsUtils.newFailureTagsSetForException(e2)).record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                }
            } catch (KeeperException.NoNodeException e3) {
                log.warn("Failed to update registration of agent executing job id: {}", str);
                registerAgentConnection(str);
                this.registry.timer(AGENT_REFRESH_TIMER_NAME, newSuccessTagsSet).record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            }
        } catch (Throwable th) {
            this.registry.timer(AGENT_REFRESH_TIMER_NAME, newSuccessTagsSet).record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            throw th;
        }
    }

    private void unregisterAgentConnection(String str) throws InterruptedException {
        ServiceInstance<Agent> serviceInstance = this.registeredAgentsMap.get(str);
        if (serviceInstance == null) {
            log.debug("Skipping unregistration, already removed");
            return;
        }
        log.debug("Unregistering route for job: {}", str);
        Set<Tag> newSuccessTagsSet = MetricsUtils.newSuccessTagsSet();
        long nanoTime = System.nanoTime();
        try {
            try {
                this.serviceDiscovery.unregisterService(serviceInstance);
                this.registeredAgentsMap.remove(str);
                this.registry.timer(AGENT_UNREGISTERED_TIMER_NAME, newSuccessTagsSet).record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            } catch (InterruptedException e) {
                log.debug("Interrupted while unregistering {}", str);
                MetricsUtils.newFailureTagsSetForException(e);
                throw e;
            } catch (Exception e2) {
                log.error("Failed to unregister agent executing job id: {}", str);
                this.registry.timer(AGENT_UNREGISTERED_TIMER_NAME, MetricsUtils.newFailureTagsSetForException(e2)).record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            }
        } catch (Throwable th) {
            this.registry.timer(AGENT_UNREGISTERED_TIMER_NAME, newSuccessTagsSet).record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            throw th;
        }
    }

    private void handleConnectionStateChange(CuratorFramework curatorFramework, ConnectionState connectionState) {
        this.registry.counter(ZOOKEEPER_SESSION_STATE_COUNTER_NAME, Sets.newHashSet(new Tag[]{Tag.of(ZK_CONNECTION_STATE_TAG_NAME, connectionState.name())})).increment();
        log.info("Zookeeper/Curator client: {}", connectionState);
        switch (AnonymousClass2.$SwitchMap$org$apache$curator$framework$state$ConnectionState[connectionState.ordinal()]) {
            case 1:
            case 2:
                startRegistrationThread();
                return;
            case 3:
            case 4:
                stopRegistrationThread();
                return;
            default:
                log.warn("Zookeeper/Curator unhandled connection state: {}", connectionState);
                return;
        }
    }

    @Override // com.netflix.genie.web.agent.services.AgentRoutingService
    public Optional<String> getHostnameForAgentConnection(@NotBlank String str) {
        log.debug("Looking up agent executing job: {}", str);
        if (isAgentConnectionLocal(str)) {
            return Optional.of(this.localHostname);
        }
        long nanoTime = System.nanoTime();
        HashSet newHashSet = Sets.newHashSet();
        String str2 = null;
        try {
            try {
                ServiceInstance queryForInstance = this.serviceDiscovery.queryForInstance(SERVICE_NAME, str);
                if (queryForInstance == null) {
                    log.debug("Could not find agent connection for job {}", str);
                } else {
                    str2 = queryForInstance.getAddress();
                }
                MetricsUtils.addSuccessTags(newHashSet);
                newHashSet.add(Tag.of(ROUTE_FOUND_TAG_NAME, String.valueOf(str2 != null)));
                this.registry.timer(AGENT_LOOKUP_TIMER_NAME, newHashSet).record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            } catch (Exception e) {
                log.error("Error looking up agent connection for job {}", str, e);
                str2 = null;
                MetricsUtils.addFailureTagsWithException(newHashSet, e);
                newHashSet.add(Tag.of(ROUTE_FOUND_TAG_NAME, String.valueOf(0 != 0)));
                this.registry.timer(AGENT_LOOKUP_TIMER_NAME, newHashSet).record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            }
            return Optional.ofNullable(str2);
        } catch (Throwable th) {
            newHashSet.add(Tag.of(ROUTE_FOUND_TAG_NAME, String.valueOf(str2 != null)));
            this.registry.timer(AGENT_LOOKUP_TIMER_NAME, newHashSet).record(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            throw th;
        }
    }

    @Override // com.netflix.genie.web.agent.services.AgentRoutingService
    public boolean isAgentConnectionLocal(@NotBlank String str) {
        return this.connectedAgentsSet.contains(str);
    }

    @Override // com.netflix.genie.web.agent.services.AgentRoutingService
    public void handleClientConnected(@NotBlank String str) {
        log.debug("Adding to routing table (pending registration): {}", str);
        boolean add = this.connectedAgentsSet.add(str);
        this.registrationQueue.add(RegisterMutation.update(str));
        if (add) {
            this.registry.counter(AGENT_CONNECTED_COUNTER_NAME, new String[0]).increment();
        }
    }

    @Override // com.netflix.genie.web.agent.services.AgentRoutingService
    public void handleClientDisconnected(@NotBlank String str) {
        log.debug("Removing from routing table (pending un-registration): {}", str);
        boolean remove = this.connectedAgentsSet.remove(str);
        this.registrationQueue.add(RegisterMutation.update(str));
        if (remove) {
            this.registry.counter(AGENT_DISCONNECTED_COUNTER_NAME, new String[0]).increment();
        }
    }

    @Override // com.netflix.genie.web.agent.services.AgentRoutingService
    public boolean isAgentConnected(String str) {
        return getHostnameForAgentConnection(str).isPresent();
    }
}
