/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.genie.web.agent.apis.rpc.v4.endpoints;

import com.google.common.collect.Maps;
import com.netflix.genie.proto.AgentHeartBeat;
import com.netflix.genie.proto.HeartBeatServiceGrpc;
import com.netflix.genie.proto.ServerHeartBeat;
import com.netflix.genie.web.agent.services.AgentRoutingService;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.TaskScheduler;

public class GRpcHeartBeatServiceImpl
extends HeartBeatServiceGrpc.HeartBeatServiceImplBase {
    private static final Logger log = LoggerFactory.getLogger(GRpcHeartBeatServiceImpl.class);
    private static final long HEART_BEAT_PERIOD_MILLIS = 5000L;
    private final Map<String, AgentStreamRecord> activeStreamsMap = Maps.newHashMap();
    private final ScheduledFuture<?> sendHeartbeatsFuture;
    private final AgentRoutingService agentRoutingService;

    public GRpcHeartBeatServiceImpl(AgentRoutingService agentRoutingService, TaskScheduler taskScheduler) {
        this.agentRoutingService = agentRoutingService;
        this.sendHeartbeatsFuture = taskScheduler.scheduleWithFixedDelay(this::sendHeartbeats, 5000L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @PreDestroy
    public synchronized void shutdown() {
        if (this.sendHeartbeatsFuture != null) {
            this.sendHeartbeatsFuture.cancel(false);
        }
        Map<String, AgentStreamRecord> map = this.activeStreamsMap;
        synchronized (map) {
            for (AgentStreamRecord agentStreamRecord : this.activeStreamsMap.values()) {
                agentStreamRecord.responseObserver.onCompleted();
                if (!agentStreamRecord.hasJobId()) continue;
                this.notifyAgentDisconnected(agentStreamRecord.getJobId());
            }
            this.activeStreamsMap.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendHeartbeats() {
        Map<String, AgentStreamRecord> map = this.activeStreamsMap;
        synchronized (map) {
            for (AgentStreamRecord agentStreamRecord : this.activeStreamsMap.values()) {
                agentStreamRecord.responseObserver.onNext((Object)ServerHeartBeat.getDefaultInstance());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public StreamObserver<AgentHeartBeat> heartbeat(StreamObserver<ServerHeartBeat> responseObserver) {
        String streamId = UUID.randomUUID().toString();
        RequestObserver requestObserver = new RequestObserver(this, streamId);
        Map<String, AgentStreamRecord> map = this.activeStreamsMap;
        synchronized (map) {
            this.activeStreamsMap.put(streamId, new AgentStreamRecord(responseObserver));
        }
        return requestObserver;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleAgentHeartBeat(String streamId, AgentHeartBeat agentHeartBeat) {
        AgentStreamRecord agentStreamRecord;
        Map<String, AgentStreamRecord> map = this.activeStreamsMap;
        synchronized (map) {
            agentStreamRecord = this.activeStreamsMap.get(streamId);
        }
        String claimedJobId = agentHeartBeat.getClaimedJobId();
        if (agentStreamRecord == null) {
            log.warn("Received heartbeat from an unknown stream");
        } else if (StringUtils.isBlank((CharSequence)claimedJobId)) {
            log.warn("Ignoring heartbeat lacking job id");
        } else {
            log.debug("Received heartbeat from agent that claimed job: {}", (Object)claimedJobId);
            boolean isFirstHeartBeat = agentStreamRecord.updateRecord(claimedJobId);
            if (isFirstHeartBeat) {
                this.notifyAgentConnected(agentStreamRecord.getJobId());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleStreamCompletion(String streamId) {
        AgentStreamRecord agentStreamRecord;
        Map<String, AgentStreamRecord> map = this.activeStreamsMap;
        synchronized (map) {
            agentStreamRecord = this.activeStreamsMap.remove(streamId);
        }
        if (agentStreamRecord == null) {
            log.warn("Received completion from an unknown stream");
        } else {
            agentStreamRecord.responseObserver.onCompleted();
            if (agentStreamRecord.hasJobId()) {
                this.notifyAgentDisconnected(agentStreamRecord.getJobId());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleStreamError(String streamId, Throwable t) {
        AgentStreamRecord agentStreamRecord;
        Map<String, AgentStreamRecord> map = this.activeStreamsMap;
        synchronized (map) {
            agentStreamRecord = this.activeStreamsMap.remove(streamId);
        }
        if (agentStreamRecord == null) {
            log.warn("Received error from an unknown stream");
        } else {
            agentStreamRecord.responseObserver.onError(t);
            if (agentStreamRecord.hasJobId()) {
                this.notifyAgentDisconnected(agentStreamRecord.getJobId());
            }
        }
    }

    private void notifyAgentConnected(String jobId) {
        this.agentRoutingService.handleClientConnected(jobId);
    }

    private void notifyAgentDisconnected(String jobId) {
        this.agentRoutingService.handleClientDisconnected(jobId);
    }

    private static class RequestObserver
    implements StreamObserver<AgentHeartBeat> {
        private final GRpcHeartBeatServiceImpl grpcHeartBeatService;
        private final String streamId;

        RequestObserver(GRpcHeartBeatServiceImpl grpcHeartBeatService, String streamId) {
            this.grpcHeartBeatService = grpcHeartBeatService;
            this.streamId = streamId;
        }

        public void onNext(AgentHeartBeat agentHeartBeat) {
            this.grpcHeartBeatService.handleAgentHeartBeat(this.streamId, agentHeartBeat);
        }

        public void onError(Throwable t) {
            this.grpcHeartBeatService.handleStreamError(this.streamId, t);
        }

        public void onCompleted() {
            this.grpcHeartBeatService.handleStreamCompletion(this.streamId);
        }
    }

    private static class AgentStreamRecord {
        private final StreamObserver<ServerHeartBeat> responseObserver;
        private String claimedJobId;

        AgentStreamRecord(StreamObserver<ServerHeartBeat> responseObserver) {
            this.responseObserver = responseObserver;
        }

        synchronized boolean updateRecord(String jobId) {
            if (this.hasJobId() || StringUtils.isBlank((CharSequence)jobId)) {
                return false;
            }
            this.claimedJobId = jobId;
            return true;
        }

        String getJobId() {
            return this.claimedJobId;
        }

        boolean hasJobId() {
            return !StringUtils.isBlank((CharSequence)this.claimedJobId);
        }
    }
}

