package com.netflix.genie.web.agent.apis.rpc.v4.endpoints;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.netflix.genie.common.exceptions.GenieTimeoutException;
import com.netflix.genie.common.internal.dtos.DirectoryManifest;
import com.netflix.genie.common.internal.dtos.v4.converters.JobDirectoryManifestProtoConverter;
import com.netflix.genie.common.internal.exceptions.checked.GenieConversionException;
import com.netflix.genie.proto.AgentFileMessage;
import com.netflix.genie.proto.AgentManifestMessage;
import com.netflix.genie.proto.FileStreamServiceGrpc;
import com.netflix.genie.proto.ServerAckMessage;
import com.netflix.genie.proto.ServerControlMessage;
import com.netflix.genie.proto.ServerFileRequestMessage;
import com.netflix.genie.web.agent.resources.AgentFileResourceImpl;
import com.netflix.genie.web.agent.services.AgentFileStreamService;
import com.netflix.genie.web.util.StreamBuffer;
import io.grpc.stub.StreamObserver;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.TaskScheduler;

/* loaded from: input_file:com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcAgentFileStreamServiceImpl.class */
public class GRpcAgentFileStreamServiceImpl extends FileStreamServiceGrpc.FileStreamServiceImplBase implements AgentFileStreamService {
    private static final Logger log = LoggerFactory.getLogger(GRpcAgentFileStreamServiceImpl.class);
    private static final long FILE_TRANSFER_BEGIN_TIMEOUT_MILLIS = 3000;
    private final Map<String, ControlStreamObserver> jobIdControlStreamMap = Maps.newConcurrentMap();
    private final Map<String, StreamBuffer> pendingTransferBuffersMap = Maps.newConcurrentMap();
    private final Set<FileTransferStreamObserver> pendingTransferObserversSet = Sets.newConcurrentHashSet();
    private final Map<String, StreamBuffer> inProgressTransferBuffersMap = Maps.newConcurrentMap();
    private final JobDirectoryManifestProtoConverter converter;
    private final TaskScheduler taskScheduler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcAgentFileStreamServiceImpl$ControlStreamObserver.class */
    public static class ControlStreamObserver implements StreamObserver<AgentManifestMessage> {
        private final GRpcAgentFileStreamServiceImpl gRpcAgentFileStreamService;
        private final StreamObserver<ServerControlMessage> responseObserver;
        private final AtomicReference<DirectoryManifest> manifestRef = new AtomicReference<>();
        private final AtomicReference<String> jobIdRef = new AtomicReference<>();

        ControlStreamObserver(GRpcAgentFileStreamServiceImpl gRpcAgentFileStreamServiceImpl, StreamObserver<ServerControlMessage> streamObserver) {
            this.gRpcAgentFileStreamService = gRpcAgentFileStreamServiceImpl;
            this.responseObserver = streamObserver;
        }

        public void onNext(AgentManifestMessage agentManifestMessage) {
            GRpcAgentFileStreamServiceImpl.log.debug("Received a manifest");
            String jobId = agentManifestMessage.getJobId();
            if (this.jobIdRef.compareAndSet(null, jobId)) {
                this.gRpcAgentFileStreamService.registerControlStream(jobId, this);
            }
            try {
                this.manifestRef.set(this.gRpcAgentFileStreamService.converter.toManifest(agentManifestMessage));
            } catch (GenieConversionException e) {
                GRpcAgentFileStreamServiceImpl.log.warn("Failed to parse manifest for job id: {}", jobId, e);
            }
        }

        public void onError(Throwable th) {
            GRpcAgentFileStreamServiceImpl.log.warn("Manifest stream error", th);
            unregisterStream();
        }

        public void onCompleted() {
            GRpcAgentFileStreamServiceImpl.log.debug("Manifest stream completed");
            unregisterStream();
        }

        private void unregisterStream() {
            String str = this.jobIdRef.get();
            if (str != null) {
                this.gRpcAgentFileStreamService.unregisterControlStream(str, this);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcAgentFileStreamServiceImpl$FileTransferStreamObserver.class */
    public static class FileTransferStreamObserver implements StreamObserver<AgentFileMessage> {
        private final GRpcAgentFileStreamServiceImpl gRpcAgentFileStreamService;
        private final StreamObserver<ServerAckMessage> responseObserver;
        private final AtomicReference<String> streamId = new AtomicReference<>();

        FileTransferStreamObserver(GRpcAgentFileStreamServiceImpl gRpcAgentFileStreamServiceImpl, StreamObserver<ServerAckMessage> streamObserver) {
            this.gRpcAgentFileStreamService = gRpcAgentFileStreamServiceImpl;
            this.responseObserver = streamObserver;
        }

        public void onNext(AgentFileMessage agentFileMessage) {
            String streamId = agentFileMessage.getStreamId();
            if (StringUtils.isBlank(streamId)) {
                GRpcAgentFileStreamServiceImpl.log.warn("Received file chunk with empty stream identifier");
                return;
            }
            if (this.streamId.compareAndSet(null, streamId)) {
                GRpcAgentFileStreamServiceImpl.log.debug("Received first chunk for transfer: {}", streamId);
            }
            if (!streamId.equals(this.streamId.get())) {
                GRpcAgentFileStreamServiceImpl.log.warn("Received chunk with id: {}, but this stream was previously used by stream: {}", streamId, this.streamId.get());
            } else {
                this.gRpcAgentFileStreamService.handleFileTransferChunk(this, agentFileMessage.getStreamId(), agentFileMessage.getData());
                this.responseObserver.onNext(ServerAckMessage.newBuilder().build());
            }
        }

        public void onError(Throwable th) {
            this.gRpcAgentFileStreamService.handleFileTransferError(this, this.streamId.get(), th);
        }

        public void onCompleted() {
            this.gRpcAgentFileStreamService.handleFileTransferCompletion(this, this.streamId.get());
        }
    }

    public GRpcAgentFileStreamServiceImpl(JobDirectoryManifestProtoConverter jobDirectoryManifestProtoConverter, TaskScheduler taskScheduler) {
        this.converter = jobDirectoryManifestProtoConverter;
        this.taskScheduler = taskScheduler;
    }

    @Override // com.netflix.genie.web.agent.services.AgentFileStreamService
    public Optional<AgentFileStreamService.AgentFileResource> getResource(String str, Path path, URI uri) {
        ControlStreamObserver controlStreamObserver = this.jobIdControlStreamMap.get(str);
        if (controlStreamObserver == null) {
            log.warn("Stream Record not found for job id: {}", str);
            return Optional.empty();
        }
        DirectoryManifest directoryManifest = (DirectoryManifest) controlStreamObserver.manifestRef.get();
        if (directoryManifest == null) {
            log.warn("Stream record for job id: {} does not have a manifest", str);
            return Optional.empty();
        }
        DirectoryManifest.ManifestEntry manifestEntry = (DirectoryManifest.ManifestEntry) directoryManifest.getEntry(path.toString()).orElse(null);
        if (manifestEntry == null) {
            log.warn("Requesting a file ({}) that does not exist in the manifest for job id: {}: ", path, str);
            return Optional.of(AgentFileResourceImpl.forNonExistingResource());
        }
        String uuid = UUID.randomUUID().toString();
        int intExact = Math.toIntExact(manifestEntry.getSize());
        StreamBuffer streamBuffer = new StreamBuffer();
        if (intExact - 0 == 0) {
            streamBuffer.closeForCompleted();
        } else {
            this.pendingTransferBuffersMap.put(uuid, streamBuffer);
            controlStreamObserver.responseObserver.onNext(ServerControlMessage.newBuilder().setServerFileRequest(ServerFileRequestMessage.newBuilder().setStreamId(uuid).setRelativePath(path.toString()).setStartOffset(0).setEndOffset(intExact).build()).build());
            this.taskScheduler.schedule(() -> {
                StreamBuffer remove = this.pendingTransferBuffersMap.remove(uuid);
                if (remove != null) {
                    remove.closeForError(new TimeoutException("Timeout waiting for transfer to start"));
                }
            }, Instant.now().plusMillis(FILE_TRANSFER_BEGIN_TIMEOUT_MILLIS));
        }
        return Optional.of(AgentFileResourceImpl.forAgentFile(uri, manifestEntry.getSize(), manifestEntry.getLastModifiedTime(), Paths.get(manifestEntry.getPath(), new String[0]), str, streamBuffer.getInputStream()));
    }

    @Override // com.netflix.genie.web.agent.services.AgentFileStreamService
    public Optional<DirectoryManifest> getManifest(String str) {
        ControlStreamObserver controlStreamObserver = this.jobIdControlStreamMap.get(str);
        if (controlStreamObserver != null) {
            return Optional.ofNullable(controlStreamObserver.manifestRef.get());
        }
        log.warn("Stream Record not found for job id: {}", str);
        return Optional.empty();
    }

    public StreamObserver<AgentManifestMessage> sync(StreamObserver<ServerControlMessage> streamObserver) {
        log.info("New agent control stream established");
        return new ControlStreamObserver(this, streamObserver);
    }

    public StreamObserver<AgentFileMessage> transmit(StreamObserver<ServerAckMessage> streamObserver) {
        log.info("New file transfer stream established");
        FileTransferStreamObserver fileTransferStreamObserver = new FileTransferStreamObserver(this, streamObserver);
        this.pendingTransferObserversSet.add(fileTransferStreamObserver);
        this.taskScheduler.schedule(() -> {
            if (this.pendingTransferObserversSet.remove(fileTransferStreamObserver)) {
                fileTransferStreamObserver.responseObserver.onError(new GenieTimeoutException("Timeout waiting for transfer to begin"));
            }
        }, Instant.now().plusMillis(FILE_TRANSFER_BEGIN_TIMEOUT_MILLIS));
        return fileTransferStreamObserver;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerControlStream(String str, ControlStreamObserver controlStreamObserver) {
        if (this.jobIdControlStreamMap.put(str, controlStreamObserver) != null) {
            log.warn("Found an previous observer for job id: {}", str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void unregisterControlStream(String str, ControlStreamObserver controlStreamObserver) {
        if (this.jobIdControlStreamMap.remove(str, controlStreamObserver)) {
            return;
        }
        log.warn("Could not remove observer for job: {}, not found in map", str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleFileTransferChunk(FileTransferStreamObserver fileTransferStreamObserver, String str, ByteString byteString) {
        if (this.pendingTransferObserversSet.remove(fileTransferStreamObserver)) {
            log.debug("Removed observer for file stream: {} from 'pending' set", str);
        }
        StreamBuffer remove = this.pendingTransferBuffersMap.remove(str);
        if (remove != null) {
            log.debug("Moving buffer for file stream {} from 'pending' to 'in progress'", str);
            this.inProgressTransferBuffersMap.put(str, remove);
        }
        StreamBuffer streamBuffer = this.inProgressTransferBuffersMap.get(str);
        if (streamBuffer != null) {
            streamBuffer.write(byteString);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleFileTransferError(FileTransferStreamObserver fileTransferStreamObserver, @Nullable String str, Throwable th) {
        log.error("Error in file transfer stream: {}: {}", new Object[]{str, th.getMessage(), th});
        this.pendingTransferObserversSet.remove(fileTransferStreamObserver);
        if (str != null) {
            StreamBuffer remove = this.pendingTransferBuffersMap.remove(str);
            if (remove != null) {
                remove.closeForError(th);
            }
            StreamBuffer remove2 = this.inProgressTransferBuffersMap.remove(str);
            if (remove2 != null) {
                remove2.closeForError(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleFileTransferCompletion(FileTransferStreamObserver fileTransferStreamObserver, @Nullable String str) {
        this.pendingTransferObserversSet.remove(fileTransferStreamObserver);
        StreamBuffer remove = this.pendingTransferBuffersMap.remove(str);
        if (remove != null) {
            remove.closeForCompleted();
        }
        StreamBuffer remove2 = this.inProgressTransferBuffersMap.remove(str);
        if (remove2 != null) {
            remove2.closeForCompleted();
        }
    }
}
