package com.netflix.genie.web.agent.apis.rpc.v4.endpoints;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.netflix.genie.common.internal.dtos.DirectoryManifest;
import com.netflix.genie.common.internal.dtos.v4.converters.JobDirectoryManifestProtoConverter;
import com.netflix.genie.common.internal.exceptions.checked.GenieConversionException;
import com.netflix.genie.proto.AgentFileMessage;
import com.netflix.genie.proto.AgentManifestMessage;
import com.netflix.genie.proto.FileStreamServiceGrpc;
import com.netflix.genie.proto.ServerAckMessage;
import com.netflix.genie.proto.ServerControlMessage;
import com.netflix.genie.proto.ServerFileRequestMessage;
import com.netflix.genie.web.agent.resources.AgentFileResourceImpl;
import com.netflix.genie.web.agent.services.AgentFileStreamService;
import com.netflix.genie.web.exceptions.checked.NotFoundException;
import com.netflix.genie.web.properties.AgentFileStreamProperties;
import com.netflix.genie.web.util.StreamBuffer;
import io.grpc.stub.StreamObserver;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Path;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.naming.LimitExceededException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpRange;
import org.springframework.scheduling.TaskScheduler;

/* loaded from: input_file:com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcAgentFileStreamServiceImpl.class */
public class GRpcAgentFileStreamServiceImpl extends FileStreamServiceGrpc.FileStreamServiceImplBase implements AgentFileStreamService {
    private static final Logger log = LoggerFactory.getLogger(GRpcAgentFileStreamServiceImpl.class);
    private static final String METRICS_PREFIX = "genie.agents.fileTransfers";
    private static final String TRANSFER_COUNTER = "genie.agents.fileTransfers.requested.counter";
    private static final String TRANSFER_LIMIT_EXCEEDED_COUNTER = "genie.agents.fileTransfers.rejected.counter";
    private static final String MANIFEST_CACHE_SIZE_GAUGE = "genie.agents.fileTransfers.manifestCache.size";
    private static final String CONTROL_STREAMS_GAUGE = "genie.agents.fileTransfers.controlStreams.size";
    private static final String TRANSFER_TIMEOUT_COUNTER = "genie.agents.fileTransfers.timeout.counter";
    private static final String TRANSFER_SIZE_DISTRIBUTION = "genie.agents.fileTransfers.transferSize.summary";
    private static final String ACTIVE_TRANSFER_GAUGE = "genie.agents.fileTransfers.activeTransfers.size";
    private final ControlStreamManager controlStreamsManager;
    private final TransferManager transferManager;
    private final Counter fileTransferLimitExceededCounter;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcAgentFileStreamServiceImpl$AgentFileChunkObserver.class */
    public static final class AgentFileChunkObserver implements StreamObserver<AgentFileMessage> {
        private final TransferManager transferManager;
        private final StreamObserver<ServerAckMessage> responseObserver;

        AgentFileChunkObserver(TransferManager transferManager, StreamObserver<ServerAckMessage> streamObserver) {
            this.transferManager = transferManager;
            this.responseObserver = streamObserver;
        }

        public void onNext(AgentFileMessage agentFileMessage) {
            String streamId = agentFileMessage.getStreamId();
            GRpcAgentFileStreamServiceImpl.log.debug("Received file chunk of transfer: {}", streamId);
            this.transferManager.handleFileChunk(streamId, this, agentFileMessage.getData());
        }

        public void onError(Throwable th) {
            this.transferManager.removeTransferStream(this, th);
        }

        public void onCompleted() {
            this.transferManager.removeTransferStream(this, null);
            this.responseObserver.onCompleted();
        }

        public StreamObserver<ServerAckMessage> getResponseObserver() {
            return this.responseObserver;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcAgentFileStreamServiceImpl$ControlStreamManager.class */
    public static final class ControlStreamManager {
        private final Map<String, ControlStreamObserver> controlStreamMap = Maps.newHashMap();
        private final Cache<String, DirectoryManifest> manifestCache;
        private final JobDirectoryManifestProtoConverter converter;
        private final Counter fileTansferCounter;
        private final MeterRegistry registry;

        private ControlStreamManager(JobDirectoryManifestProtoConverter jobDirectoryManifestProtoConverter, AgentFileStreamProperties agentFileStreamProperties, MeterRegistry meterRegistry) {
            this.converter = jobDirectoryManifestProtoConverter;
            this.manifestCache = Caffeine.newBuilder().expireAfterWrite(agentFileStreamProperties.getManifestCacheExpiration()).build();
            this.fileTansferCounter = meterRegistry.counter(GRpcAgentFileStreamServiceImpl.TRANSFER_COUNTER, new String[0]);
            this.registry = meterRegistry;
        }

        private synchronized void requestFile(String str, String str2, String str3, long j, long j2) throws NotFoundException, IndexOutOfBoundsException {
            ControlStreamObserver controlStreamObserver = this.controlStreamMap.get(str);
            if (controlStreamObserver == null) {
                throw new NotFoundException("No active stream control stream for job: " + str);
            }
            this.fileTansferCounter.increment();
            if (!controlStreamObserver.allowLargeFiles.get() && (j > 2147483647L || j2 > 2147483647L)) {
                throw new IndexOutOfBoundsException("Outdated agent does not support ranges beyond the 2GB mark");
            }
            controlStreamObserver.responseObserver.onNext(ServerControlMessage.newBuilder().setServerFileRequest(ServerFileRequestMessage.newBuilder().setStreamId(str2).setRelativePath(str3).setDeprecatedStartOffset((int) j).setDeprecatedEndOffset((int) j2).setStartOffset(j).setEndOffset(j2).build()).build());
        }

        private StreamObserver<AgentManifestMessage> handleNewControlStream(StreamObserver<ServerControlMessage> streamObserver) {
            GRpcAgentFileStreamServiceImpl.log.debug("New agent control stream established");
            return new ControlStreamObserver(this, streamObserver);
        }

        private DirectoryManifest getManifest(String str) {
            return (DirectoryManifest) this.manifestCache.getIfPresent(str);
        }

        private synchronized void updateManifestAndStream(ControlStreamObserver controlStreamObserver, String str, DirectoryManifest directoryManifest) {
            this.manifestCache.put(str, directoryManifest);
            ControlStreamObserver put = this.controlStreamMap.put(str, controlStreamObserver);
            if (put != null && put != controlStreamObserver) {
                put.closeStreamWithError(new IllegalStateException("A new stream was registered for the same job id: " + str));
            }
            this.registry.gauge(GRpcAgentFileStreamServiceImpl.MANIFEST_CACHE_SIZE_GAUGE, Long.valueOf(this.manifestCache.estimatedSize()));
            this.registry.gauge(GRpcAgentFileStreamServiceImpl.CONTROL_STREAMS_GAUGE, Integer.valueOf(this.controlStreamMap.size()));
        }

        private synchronized void removeControlStream(ControlStreamObserver controlStreamObserver, @Nullable Throwable th) {
            GRpcAgentFileStreamServiceImpl.log.debug("Control stream {}", th == null ? "completed" : "error: " + th.getMessage());
            if (this.controlStreamMap.entrySet().removeIf(entry -> {
                return entry.getValue() == controlStreamObserver;
            })) {
                GRpcAgentFileStreamServiceImpl.log.debug("Removed a control stream due to {}", th == null ? "completion" : "error: " + th.getMessage());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcAgentFileStreamServiceImpl$ControlStreamObserver.class */
    public static final class ControlStreamObserver implements StreamObserver<AgentManifestMessage> {
        private final ControlStreamManager controlStreamManager;
        private final StreamObserver<ServerControlMessage> responseObserver;
        private final AtomicBoolean allowLargeFiles = new AtomicBoolean(false);

        private ControlStreamObserver(ControlStreamManager controlStreamManager, StreamObserver<ServerControlMessage> streamObserver) {
            this.controlStreamManager = controlStreamManager;
            this.responseObserver = streamObserver;
        }

        public void onNext(AgentManifestMessage agentManifestMessage) {
            String jobId = agentManifestMessage.getJobId();
            this.allowLargeFiles.set(agentManifestMessage.getLargeFilesSupported());
            DirectoryManifest directoryManifest = null;
            try {
                directoryManifest = this.controlStreamManager.converter.toManifest(agentManifestMessage);
            } catch (GenieConversionException e) {
                GRpcAgentFileStreamServiceImpl.log.warn("Failed to parse manifest for job id: {}", jobId, e);
            }
            if (directoryManifest != null) {
                this.controlStreamManager.updateManifestAndStream(this, jobId, directoryManifest);
            }
        }

        public void onError(Throwable th) {
            this.controlStreamManager.removeControlStream(this, th);
        }

        public void onCompleted() {
            this.controlStreamManager.removeControlStream(this, null);
            this.responseObserver.onCompleted();
        }

        public void closeStreamWithError(Throwable th) {
            this.responseObserver.onError(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcAgentFileStreamServiceImpl$FileTransfer.class */
    public static final class FileTransfer {
        private final String jobId;
        private final String transferId;
        private final StreamBuffer buffer;
        private final String description;
        private AgentFileChunkObserver agentFileChunkObserver;
        private State state = State.NEW;
        private Instant lastAckTimestamp = Instant.now();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcAgentFileStreamServiceImpl$FileTransfer$State.class */
        public enum State {
            NEW,
            IN_PROGRESS,
            COMPLETED,
            FAILED
        }

        private FileTransfer(String str, String str2, Path path, long j, long j2, long j3, StreamBuffer streamBuffer) {
            this.jobId = str2;
            this.transferId = str;
            this.buffer = streamBuffer;
            this.description = "FileTransfer " + str + ", agent://" + str2 + "/" + path + " (range: (" + j + "-" + this + "] file.size: " + j2 + ")";
        }

        public String toString() {
            return this.state + " " + this.description;
        }

        private void claimStreamObserver(AgentFileChunkObserver agentFileChunkObserver) {
            this.state = State.IN_PROGRESS;
            this.agentFileChunkObserver = agentFileChunkObserver;
        }

        private InputStream getInputStream() {
            return this.buffer.getInputStream();
        }

        private boolean append(ByteString byteString) {
            return this.buffer.tryWrite(byteString);
        }

        private void closeWithError(Throwable th) {
            this.state = State.FAILED;
            this.buffer.closeForError(th);
        }

        private void close() {
            this.state = State.COMPLETED;
            this.buffer.closeForCompleted();
        }

        private void sendAck() {
            getAgentFileChunkObserver().getResponseObserver().onNext(ServerAckMessage.newBuilder().build());
            this.lastAckTimestamp = Instant.now();
        }

        public String getTransferId() {
            return this.transferId;
        }

        public AgentFileChunkObserver getAgentFileChunkObserver() {
            return this.agentFileChunkObserver;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/netflix/genie/web/agent/apis/rpc/v4/endpoints/GRpcAgentFileStreamServiceImpl$TransferManager.class */
    public static final class TransferManager {
        private final Map<String, FileTransfer> activeTransfers = Maps.newHashMap();
        private final Set<AgentFileChunkObserver> unclaimedTransferStreams = Sets.newHashSet();
        private final Class<? extends HttpRange> suffixRangeClass = HttpRange.createSuffixRange(1).getClass();
        private final ControlStreamManager controlStreamsManager;
        private final TaskScheduler taskScheduler;
        private final AgentFileStreamProperties properties;
        private final Counter transferTimeOutCounter;
        private final DistributionSummary transferSizeDistribution;
        private final MeterRegistry registry;

        private TransferManager(ControlStreamManager controlStreamManager, TaskScheduler taskScheduler, AgentFileStreamProperties agentFileStreamProperties, MeterRegistry meterRegistry) {
            this.controlStreamsManager = controlStreamManager;
            this.taskScheduler = taskScheduler;
            this.properties = agentFileStreamProperties;
            this.registry = meterRegistry;
            this.transferTimeOutCounter = meterRegistry.counter(GRpcAgentFileStreamServiceImpl.TRANSFER_TIMEOUT_COUNTER, new String[0]);
            this.transferSizeDistribution = meterRegistry.summary(GRpcAgentFileStreamServiceImpl.TRANSFER_SIZE_DISTRIBUTION, new String[0]);
            this.taskScheduler.scheduleAtFixedRate(this::reapStalledTransfers, this.properties.getStalledTransferCheckInterval());
        }

        private synchronized void reapStalledTransfers() {
            AtomicInteger atomicInteger = new AtomicInteger();
            Instant now = Instant.now();
            this.activeTransfers.entrySet().removeIf(entry -> {
                String str = (String) entry.getKey();
                FileTransfer fileTransfer = (FileTransfer) entry.getValue();
                if (!now.isAfter(fileTransfer.lastAckTimestamp.plus((TemporalAmount) this.properties.getStalledTransferTimeout()))) {
                    return false;
                }
                atomicInteger.incrementAndGet();
                GRpcAgentFileStreamServiceImpl.log.warn("Transfer {} is stalled of job {}, shutting it down", str, ((FileTransfer) entry.getValue()).jobId);
                TimeoutException timeoutException = new TimeoutException("Transfer not making progress");
                AgentFileChunkObserver agentFileChunkObserver = fileTransfer.getAgentFileChunkObserver();
                if (agentFileChunkObserver != null) {
                    agentFileChunkObserver.getResponseObserver().onError(timeoutException);
                }
                fileTransfer.closeWithError(timeoutException);
                return true;
            });
            this.transferTimeOutCounter.increment(atomicInteger.get());
            this.registry.gauge(GRpcAgentFileStreamServiceImpl.ACTIVE_TRANSFER_GAUGE, Integer.valueOf(this.activeTransfers.size()));
        }

        private synchronized FileTransfer startFileTransfer(String str, DirectoryManifest.ManifestEntry manifestEntry, Path path, @Nullable HttpRange httpRange) throws NotFoundException, LimitExceededException {
            long min;
            long rangeEnd;
            String uuid = UUID.randomUUID().toString();
            GRpcAgentFileStreamServiceImpl.log.debug("Initiating transfer {} for file: {} of job: {}", new Object[]{uuid, path, str});
            if (this.activeTransfers.size() >= this.properties.getMaxConcurrentTransfers()) {
                GRpcAgentFileStreamServiceImpl.log.warn("Rejecting request for {}:{}, too many active transfers", str, path);
                throw new LimitExceededException("Too many concurrent downloads");
            }
            long size = manifestEntry.getSize();
            if (httpRange == null) {
                min = 0;
                rangeEnd = size;
            } else if (httpRange.getClass() == this.suffixRangeClass) {
                min = httpRange.getRangeStart(size);
                rangeEnd = size;
            } else {
                min = Math.min(size, httpRange.getRangeStart(size));
                rangeEnd = 1 + httpRange.getRangeEnd(size);
            }
            GRpcAgentFileStreamServiceImpl.log.debug("Transfer {} effective range {}-{}: of job: {} ", new Object[]{uuid, Long.valueOf(min), Long.valueOf(rangeEnd), str});
            StreamBuffer streamBuffer = new StreamBuffer(min);
            FileTransfer fileTransfer = new FileTransfer(uuid, str, path, min, rangeEnd, size, streamBuffer);
            this.transferSizeDistribution.record(rangeEnd - min);
            if (rangeEnd - min == 0) {
                GRpcAgentFileStreamServiceImpl.log.debug("Transfer {} is empty, completing of job: {}", uuid, str);
                streamBuffer.closeForCompleted();
            } else {
                GRpcAgentFileStreamServiceImpl.log.debug("Tracking new transfer {} of job: {}", uuid, str);
                this.activeTransfers.put(uuid, fileTransfer);
                GRpcAgentFileStreamServiceImpl.log.debug("Requesting start of transfer {} of job: {}", uuid, str);
                try {
                    this.controlStreamsManager.requestFile(str, uuid, path.toString(), min, rangeEnd);
                } catch (NotFoundException | IndexOutOfBoundsException e) {
                    GRpcAgentFileStreamServiceImpl.log.error("Failed to request file {}:{}, terminating transfer {}: {}", new Object[]{str, path, uuid, e.getMessage()});
                    this.activeTransfers.remove(uuid, fileTransfer);
                    streamBuffer.closeForError(e);
                    throw e;
                }
            }
            return fileTransfer;
        }

        private synchronized StreamObserver<AgentFileMessage> handleNewTransferStream(StreamObserver<ServerAckMessage> streamObserver) {
            GRpcAgentFileStreamServiceImpl.log.info("New file transfer stream established");
            AgentFileChunkObserver agentFileChunkObserver = new AgentFileChunkObserver(this, streamObserver);
            this.unclaimedTransferStreams.add(agentFileChunkObserver);
            this.taskScheduler.schedule(() -> {
                handleUnclaimedStreamTimeout(agentFileChunkObserver);
            }, Instant.now().plus((TemporalAmount) this.properties.getUnclaimedStreamStartTimeout()));
            return agentFileChunkObserver;
        }

        private synchronized void handleUnclaimedStreamTimeout(AgentFileChunkObserver agentFileChunkObserver) {
            if (this.unclaimedTransferStreams.remove(agentFileChunkObserver)) {
                GRpcAgentFileStreamServiceImpl.log.warn("Shutting down unclaimed transfer stream");
                agentFileChunkObserver.getResponseObserver().onError(new TimeoutException("No messages received in stream"));
                this.transferTimeOutCounter.increment();
            }
        }

        private synchronized void handleFileChunk(String str, AgentFileChunkObserver agentFileChunkObserver, ByteString byteString) {
            FileTransfer fileTransfer = this.activeTransfers.get(str);
            boolean remove = this.unclaimedTransferStreams.remove(agentFileChunkObserver);
            if (fileTransfer == null) {
                GRpcAgentFileStreamServiceImpl.log.warn("Received a chunk for a transfer no longer in progress: {}", str);
                return;
            }
            if (remove) {
                fileTransfer.claimStreamObserver(agentFileChunkObserver);
            }
            this.taskScheduler.schedule(() -> {
                writeDataAndAck(fileTransfer, byteString);
            }, new Date());
        }

        private synchronized void removeTransferStream(AgentFileChunkObserver agentFileChunkObserver, @Nullable Throwable th) {
            GRpcAgentFileStreamServiceImpl.log.info("Removing file transfer: {}", th == null ? "completed" : th.getMessage());
            FileTransfer findFileTransfer = findFileTransfer(agentFileChunkObserver);
            if (findFileTransfer == null) {
                this.unclaimedTransferStreams.remove(agentFileChunkObserver);
                return;
            }
            boolean remove = this.activeTransfers.remove(findFileTransfer.getTransferId(), findFileTransfer);
            if (remove && th == null) {
                findFileTransfer.close();
            } else if (remove) {
                findFileTransfer.closeWithError(th);
            }
        }

        private synchronized FileTransfer findFileTransfer(AgentFileChunkObserver agentFileChunkObserver) {
            for (FileTransfer fileTransfer : this.activeTransfers.values()) {
                if (fileTransfer.getAgentFileChunkObserver() == agentFileChunkObserver) {
                    return fileTransfer;
                }
            }
            return null;
        }

        private void writeDataAndAck(FileTransfer fileTransfer, ByteString byteString) {
            String transferId = fileTransfer.getTransferId();
            try {
                if (fileTransfer.append(byteString)) {
                    GRpcAgentFileStreamServiceImpl.log.debug("Wrote chunk of transfer {} to buffer. Sending ack", transferId);
                    fileTransfer.sendAck();
                } else {
                    this.taskScheduler.schedule(() -> {
                        writeDataAndAck(fileTransfer, byteString);
                    }, Instant.now().plus((TemporalAmount) this.properties.getWriteRetryDelay()));
                }
            } catch (IllegalStateException e) {
                GRpcAgentFileStreamServiceImpl.log.warn("Buffer of transfer {} of job {} is closed", transferId, fileTransfer.jobId);
            }
        }
    }

    public GRpcAgentFileStreamServiceImpl(JobDirectoryManifestProtoConverter jobDirectoryManifestProtoConverter, TaskScheduler taskScheduler, AgentFileStreamProperties agentFileStreamProperties, MeterRegistry meterRegistry) {
        this.fileTransferLimitExceededCounter = meterRegistry.counter(TRANSFER_LIMIT_EXCEEDED_COUNTER, new String[0]);
        this.controlStreamsManager = new ControlStreamManager(jobDirectoryManifestProtoConverter, agentFileStreamProperties, meterRegistry);
        this.transferManager = new TransferManager(this.controlStreamsManager, taskScheduler, agentFileStreamProperties, meterRegistry);
    }

    @Override // com.netflix.genie.web.agent.services.AgentFileStreamService
    public Optional<AgentFileStreamService.AgentFileResource> getResource(String str, Path path, URI uri, @Nullable HttpRange httpRange) {
        if (httpRange == null) {
            log.warn("Attempting to stream file with no range: {} of job: {}", path, str);
        }
        log.debug("Attempting to stream file: {} of job: {}", path, str);
        Optional<DirectoryManifest> manifest = getManifest(str);
        if (!manifest.isPresent()) {
            log.warn("No manifest found for job: {}" + str);
            return Optional.empty();
        }
        DirectoryManifest.ManifestEntry manifestEntry = (DirectoryManifest.ManifestEntry) manifest.get().getEntry(path.toString()).orElse(null);
        if (manifestEntry == null) {
            log.warn("Requesting a file ({}) that does not exist in the manifest for job id: {}: ", path, str);
            return Optional.of(AgentFileResourceImpl.forNonExistingResource());
        }
        try {
            return Optional.of(AgentFileResourceImpl.forAgentFile(uri, manifestEntry.getSize(), manifestEntry.getLastModifiedTime(), path, str, this.transferManager.startFileTransfer(str, manifestEntry, path, httpRange).getInputStream()));
        } catch (NotFoundException e) {
            log.warn("No available stream to request file {} from agent running job: {}", path, str);
            return Optional.empty();
        } catch (LimitExceededException e2) {
            log.warn("No available slots to request file {} from agent running job: {}", path, str);
            this.fileTransferLimitExceededCounter.increment();
            return Optional.empty();
        } catch (IndexOutOfBoundsException e3) {
            log.warn("Cannot serve large file {} from agent running job: {}", path, str);
            return Optional.empty();
        }
    }

    @Override // com.netflix.genie.web.agent.services.AgentFileStreamService
    public Optional<DirectoryManifest> getManifest(String str) {
        return Optional.ofNullable(this.controlStreamsManager.getManifest(str));
    }

    public StreamObserver<AgentManifestMessage> sync(StreamObserver<ServerControlMessage> streamObserver) {
        return this.controlStreamsManager.handleNewControlStream(streamObserver);
    }

    public StreamObserver<AgentFileMessage> transmit(StreamObserver<ServerAckMessage> streamObserver) {
        return this.transferManager.handleNewTransferStream(streamObserver);
    }
}
