package com.netflix.genie.web.agent.apis.rpc.v4.endpoints;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.genie.common.exceptions.GenieServerException;
import com.netflix.genie.common.internal.dtos.JobStatus;
import com.netflix.genie.common.internal.exceptions.unchecked.GenieInvalidStatusException;
import com.netflix.genie.common.internal.exceptions.unchecked.GenieJobNotFoundException;
import com.netflix.genie.proto.JobKillRegistrationRequest;
import com.netflix.genie.proto.JobKillRegistrationResponse;
import com.netflix.genie.proto.JobKillServiceGrpc;
import com.netflix.genie.web.agent.services.AgentRoutingService;
import com.netflix.genie.web.data.services.DataServices;
import com.netflix.genie.web.data.services.PersistenceService;
import com.netflix.genie.web.exceptions.checked.NotFoundException;
import com.netflix.genie.web.services.JobKillService;
import com.netflix.genie.web.services.RequestForwardingService;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.scheduling.annotation.Scheduled;

/* loaded from: input_file:com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcJobKillServiceImpl.class */
public class GRpcJobKillServiceImpl extends JobKillServiceGrpc.JobKillServiceImplBase implements JobKillService {
    private static final Logger log = LoggerFactory.getLogger(GRpcJobKillServiceImpl.class);

    @VisibleForTesting
    private final Map<String, StreamObserver<JobKillRegistrationResponse>> parkedJobKillResponseObservers = new ConcurrentHashMap();
    private final PersistenceService persistenceService;
    private final AgentRoutingService agentRoutingService;
    private final RequestForwardingService requestForwardingService;

    public GRpcJobKillServiceImpl(DataServices dataServices, AgentRoutingService agentRoutingService, RequestForwardingService requestForwardingService) {
        this.persistenceService = dataServices.getPersistenceService();
        this.agentRoutingService = agentRoutingService;
        this.requestForwardingService = requestForwardingService;
    }

    public void registerForKillNotification(JobKillRegistrationRequest jobKillRegistrationRequest, StreamObserver<JobKillRegistrationResponse> streamObserver) {
        StreamObserver<JobKillRegistrationResponse> put = this.parkedJobKillResponseObservers.put(jobKillRegistrationRequest.getJobId(), streamObserver);
        if (put != null) {
            put.onCompleted();
        }
    }

    @Override // com.netflix.genie.web.services.JobKillService
    @Retryable(value = {GenieInvalidStatusException.class, GenieServerException.class}, backoff = @Backoff(delay = 1000))
    public void killJob(String str, String str2, @Nullable HttpServletRequest httpServletRequest) throws GenieJobNotFoundException, GenieServerException {
        try {
            JobStatus jobStatus = this.persistenceService.getJobStatus(str);
            if (jobStatus.isFinished()) {
                log.info("Job {} was already finished when the kill request arrived. Nothing to do.", str);
                return;
            }
            if (JobStatus.getStatusesBeforeClaimed().contains(jobStatus)) {
                try {
                    this.persistenceService.updateJobStatus(str, jobStatus, JobStatus.KILLED, str2);
                    return;
                } catch (NotFoundException e) {
                    throw new GenieJobNotFoundException(e);
                } catch (GenieInvalidStatusException e2) {
                    log.error("Unable to set job status for {} to {} due to current status not being expected {}", new Object[]{str, JobStatus.KILLED, jobStatus});
                    throw e2;
                }
            }
            if (!jobStatus.isActive()) {
                log.error("{} is an unhandled state for job {}", jobStatus, str);
                throw new GenieServerException("Job " + str + " is currently in " + jobStatus + " status, which isn't currently handled");
            }
            if (!this.agentRoutingService.isAgentConnectionLocal(str)) {
                String orElseThrow = this.agentRoutingService.getHostnameForAgentConnection(str).orElseThrow(() -> {
                    return new GenieServerException("Unable to locate host where agent is connected for job " + str);
                });
                log.info("Agent for job {} currently connected to {}. Attempting to forward kill request", str, orElseThrow);
                this.requestForwardingService.kill(orElseThrow, str, httpServletRequest);
                return;
            }
            StreamObserver<JobKillRegistrationResponse> remove = this.parkedJobKillResponseObservers.remove(str);
            if (remove == null) {
                log.error("Job {} not killed. Expected local agent connection not found", str);
                throw new GenieServerException("Job " + str + " not killed. Expected local agent connection not found.");
            }
            remove.onNext(JobKillRegistrationResponse.newBuilder().build());
            remove.onCompleted();
            log.info("Agent notified for killing job {}", str);
        } catch (NotFoundException e3) {
            throw new GenieJobNotFoundException(e3);
        }
    }

    @Scheduled(fixedDelay = 30000, initialDelay = 30000)
    public void cleanupOrphanedObservers() {
        for (String str : this.parkedJobKillResponseObservers.keySet()) {
            try {
                if (!this.agentRoutingService.isAgentConnectionLocal(str)) {
                    cancelObserverIfNecessary(this.parkedJobKillResponseObservers.remove(str));
                }
            } catch (Exception e) {
                log.error("Got unexpected exception while trying to cleanup jobID {}. Moving on. Exception: {}", str, e);
            }
        }
    }

    @VisibleForTesting
    protected boolean isStreamObserverCancelled(StreamObserver<JobKillRegistrationResponse> streamObserver) {
        return ((ServerCallStreamObserver) streamObserver).isCancelled();
    }

    private void cancelObserverIfNecessary(StreamObserver<JobKillRegistrationResponse> streamObserver) {
        if (streamObserver == null || isStreamObserverCancelled(streamObserver)) {
            return;
        }
        try {
            streamObserver.onCompleted();
        } catch (Exception e) {
            log.error("Got exception while trying to complete streamObserver during cleanupfor jobID {}. Exception: {}", "jobId", e);
        }
    }

    Map<String, StreamObserver<JobKillRegistrationResponse>> getParkedJobKillResponseObservers() {
        return this.parkedJobKillResponseObservers;
    }
}
