package com.netflix.genie.web.agent.apis.rpc.v4.endpoints;

import com.google.common.collect.Maps;
import com.netflix.genie.proto.AgentHeartBeat;
import com.netflix.genie.proto.HeartBeatServiceGrpc;
import com.netflix.genie.proto.ServerHeartBeat;
import com.netflix.genie.web.agent.services.AgentRoutingService;
import io.grpc.stub.StreamObserver;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.TaskScheduler;

/* loaded from: input_file:com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcHeartBeatServiceImpl.class */
public class GRpcHeartBeatServiceImpl extends HeartBeatServiceGrpc.HeartBeatServiceImplBase {
    private static final Logger log = LoggerFactory.getLogger(GRpcHeartBeatServiceImpl.class);
    private static final long HEART_BEAT_PERIOD_MILLIS = 5000;
    private final Map<String, AgentStreamRecord> activeStreamsMap = Maps.newHashMap();
    private final ScheduledFuture<?> sendHeartbeatsFuture;
    private final AgentRoutingService agentRoutingService;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcHeartBeatServiceImpl$AgentStreamRecord.class */
    public static class AgentStreamRecord {
        private final StreamObserver<ServerHeartBeat> responseObserver;
        private String claimedJobId;

        AgentStreamRecord(StreamObserver<ServerHeartBeat> streamObserver) {
            this.responseObserver = streamObserver;
        }

        synchronized boolean updateRecord(String str) {
            if (hasJobId() || StringUtils.isBlank(str)) {
                return false;
            }
            this.claimedJobId = str;
            return true;
        }

        String getJobId() {
            return this.claimedJobId;
        }

        boolean hasJobId() {
            return !StringUtils.isBlank(this.claimedJobId);
        }
    }

    /* loaded from: input_file:com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcHeartBeatServiceImpl$RequestObserver.class */
    private static class RequestObserver implements StreamObserver<AgentHeartBeat> {
        private final GRpcHeartBeatServiceImpl grpcHeartBeatService;
        private final String streamId;

        RequestObserver(GRpcHeartBeatServiceImpl gRpcHeartBeatServiceImpl, String str) {
            this.grpcHeartBeatService = gRpcHeartBeatServiceImpl;
            this.streamId = str;
        }

        public void onNext(AgentHeartBeat agentHeartBeat) {
            this.grpcHeartBeatService.handleAgentHeartBeat(this.streamId, agentHeartBeat);
        }

        public void onError(Throwable th) {
            this.grpcHeartBeatService.handleStreamError(this.streamId, th);
        }

        public void onCompleted() {
            this.grpcHeartBeatService.handleStreamCompletion(this.streamId);
        }
    }

    public GRpcHeartBeatServiceImpl(AgentRoutingService agentRoutingService, TaskScheduler taskScheduler) {
        this.agentRoutingService = agentRoutingService;
        this.sendHeartbeatsFuture = taskScheduler.scheduleWithFixedDelay(this::sendHeartbeats, HEART_BEAT_PERIOD_MILLIS);
    }

    @PreDestroy
    public synchronized void shutdown() {
        if (this.sendHeartbeatsFuture != null) {
            this.sendHeartbeatsFuture.cancel(false);
        }
        synchronized (this.activeStreamsMap) {
            for (AgentStreamRecord agentStreamRecord : this.activeStreamsMap.values()) {
                agentStreamRecord.responseObserver.onCompleted();
                if (agentStreamRecord.hasJobId()) {
                    notifyAgentDisconnected(agentStreamRecord.getJobId());
                }
            }
            this.activeStreamsMap.clear();
        }
    }

    private void sendHeartbeats() {
        synchronized (this.activeStreamsMap) {
            Iterator<AgentStreamRecord> it = this.activeStreamsMap.values().iterator();
            while (it.hasNext()) {
                it.next().responseObserver.onNext(ServerHeartBeat.getDefaultInstance());
            }
        }
    }

    public StreamObserver<AgentHeartBeat> heartbeat(StreamObserver<ServerHeartBeat> streamObserver) {
        String uuid = UUID.randomUUID().toString();
        RequestObserver requestObserver = new RequestObserver(this, uuid);
        synchronized (this.activeStreamsMap) {
            this.activeStreamsMap.put(uuid, new AgentStreamRecord(streamObserver));
        }
        return requestObserver;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleAgentHeartBeat(String str, AgentHeartBeat agentHeartBeat) {
        AgentStreamRecord agentStreamRecord;
        synchronized (this.activeStreamsMap) {
            agentStreamRecord = this.activeStreamsMap.get(str);
        }
        String claimedJobId = agentHeartBeat.getClaimedJobId();
        if (agentStreamRecord == null) {
            log.warn("Received heartbeat from an unknown stream");
            return;
        }
        if (StringUtils.isBlank(claimedJobId)) {
            log.warn("Ignoring heartbeat lacking job id");
            return;
        }
        log.info("Received heartbeat from agent that claimed job: {}", claimedJobId);
        if (agentStreamRecord.updateRecord(claimedJobId)) {
            notifyAgentConnected(agentStreamRecord.getJobId());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleStreamCompletion(String str) {
        AgentStreamRecord remove;
        synchronized (this.activeStreamsMap) {
            remove = this.activeStreamsMap.remove(str);
        }
        if (remove == null) {
            log.warn("Received completion from an unknown stream");
            return;
        }
        remove.responseObserver.onCompleted();
        if (remove.hasJobId()) {
            notifyAgentDisconnected(remove.getJobId());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleStreamError(String str, Throwable th) {
        AgentStreamRecord remove;
        synchronized (this.activeStreamsMap) {
            remove = this.activeStreamsMap.remove(str);
        }
        if (remove == null) {
            log.warn("Received error from an unknown stream");
            return;
        }
        remove.responseObserver.onError(th);
        if (remove.hasJobId()) {
            notifyAgentDisconnected(remove.getJobId());
        }
    }

    private void notifyAgentConnected(String str) {
        this.agentRoutingService.handleClientConnected(str);
    }

    private void notifyAgentDisconnected(String str) {
        this.agentRoutingService.handleClientDisconnected(str);
    }
}
