/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.genie.web.agent.apis.rpc.v4.endpoints;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.netflix.genie.common.internal.dtos.DirectoryManifest;
import com.netflix.genie.common.internal.dtos.v4.converters.JobDirectoryManifestProtoConverter;
import com.netflix.genie.common.internal.exceptions.checked.GenieConversionException;
import com.netflix.genie.proto.AgentFileMessage;
import com.netflix.genie.proto.AgentManifestMessage;
import com.netflix.genie.proto.FileStreamServiceGrpc;
import com.netflix.genie.proto.ServerAckMessage;
import com.netflix.genie.proto.ServerControlMessage;
import com.netflix.genie.proto.ServerFileRequestMessage;
import com.netflix.genie.web.agent.resources.AgentFileResourceImpl;
import com.netflix.genie.web.agent.services.AgentFileStreamService;
import com.netflix.genie.web.exceptions.checked.NotFoundException;
import com.netflix.genie.web.properties.AgentFileStreamProperties;
import com.netflix.genie.web.util.StreamBuffer;
import io.grpc.stub.StreamObserver;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import javax.naming.LimitExceededException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpRange;
import org.springframework.scheduling.TaskScheduler;

public class GRpcAgentFileStreamServiceImpl
extends FileStreamServiceGrpc.FileStreamServiceImplBase
implements AgentFileStreamService {
    private static final Logger log = LoggerFactory.getLogger(GRpcAgentFileStreamServiceImpl.class);
    private static final String METRICS_PREFIX = "genie.agents.fileTransfers";
    private static final String TRANSFER_COUNTER = "genie.agents.fileTransfers.requested.counter";
    private static final String TRANSFER_LIMIT_EXCEEDED_COUNTER = "genie.agents.fileTransfers.rejected.counter";
    private static final String MANIFEST_CACHE_SIZE_GAUGE = "genie.agents.fileTransfers.manifestCache.size";
    private static final String CONTROL_STREAMS_GAUGE = "genie.agents.fileTransfers.controlStreams.size";
    private static final String TRANSFER_TIMEOUT_COUNTER = "genie.agents.fileTransfers.timeout.counter";
    private static final String TRANSFER_SIZE_DISTRIBUTION = "genie.agents.fileTransfers.transferSize.summary";
    private static final String ACTIVE_TRANSFER_GAUGE = "genie.agents.fileTransfers.activeTransfers.size";
    private final ControlStreamManager controlStreamsManager;
    private final TransferManager transferManager;
    private final Counter fileTransferLimitExceededCounter;

    public GRpcAgentFileStreamServiceImpl(JobDirectoryManifestProtoConverter converter, TaskScheduler taskScheduler, AgentFileStreamProperties properties, MeterRegistry registry) {
        this.fileTransferLimitExceededCounter = registry.counter(TRANSFER_LIMIT_EXCEEDED_COUNTER, new String[0]);
        this.controlStreamsManager = new ControlStreamManager(converter, properties, registry);
        this.transferManager = new TransferManager(this.controlStreamsManager, taskScheduler, properties, registry);
    }

    @Override
    public Optional<AgentFileStreamService.AgentFileResource> getResource(String jobId, Path relativePath, URI uri, @Nullable HttpRange range) {
        FileTransfer fileTransfer;
        log.debug("Attempting to stream file: {} of job: {}", (Object)relativePath, (Object)jobId);
        Optional<DirectoryManifest> optionalManifest = this.getManifest(jobId);
        if (!optionalManifest.isPresent()) {
            log.warn("No manifest found for job: {}" + jobId);
            return Optional.empty();
        }
        DirectoryManifest manifest = optionalManifest.get();
        DirectoryManifest.ManifestEntry manifestEntry = manifest.getEntry(relativePath.toString()).orElse(null);
        if (manifestEntry == null) {
            log.warn("Requesting a file ({}) that does not exist in the manifest for job id: {}: ", (Object)relativePath, (Object)jobId);
            return Optional.of(AgentFileResourceImpl.forNonExistingResource());
        }
        try {
            fileTransfer = this.transferManager.startFileTransfer(jobId, manifestEntry, relativePath, range);
        }
        catch (NotFoundException e) {
            log.warn("No available stream to request file {} from agent running job: {}", (Object)relativePath, (Object)jobId);
            return Optional.empty();
        }
        catch (LimitExceededException e) {
            log.warn("No available slots to request file {} from agent running job: {}", (Object)relativePath, (Object)jobId);
            this.fileTransferLimitExceededCounter.increment();
            return Optional.empty();
        }
        return Optional.of(AgentFileResourceImpl.forAgentFile(uri, manifestEntry.getSize(), manifestEntry.getLastModifiedTime(), relativePath, jobId, fileTransfer.getInputStream()));
    }

    @Override
    public Optional<DirectoryManifest> getManifest(String jobId) {
        return Optional.ofNullable(this.controlStreamsManager.getManifest(jobId));
    }

    public StreamObserver<AgentManifestMessage> sync(StreamObserver<ServerControlMessage> responseObserver) {
        return this.controlStreamsManager.handleNewControlStream((StreamObserver<ServerControlMessage>)responseObserver);
    }

    public StreamObserver<AgentFileMessage> transmit(StreamObserver<ServerAckMessage> responseObserver) {
        return this.transferManager.handleNewTransferStream((StreamObserver<ServerAckMessage>)responseObserver);
    }

    private static final class AgentFileChunkObserver
    implements StreamObserver<AgentFileMessage> {
        private final TransferManager transferManager;
        private final StreamObserver<ServerAckMessage> responseObserver;

        AgentFileChunkObserver(TransferManager transferManager, StreamObserver<ServerAckMessage> responseObserver) {
            this.transferManager = transferManager;
            this.responseObserver = responseObserver;
        }

        public void onNext(AgentFileMessage value) {
            String transferStreamId = value.getStreamId();
            log.debug("Received file chunk of transfer: {}", (Object)transferStreamId);
            this.transferManager.handleFileChunk(transferStreamId, this, value.getData());
        }

        public void onError(Throwable t) {
            this.transferManager.removeTransferStream(this, t);
            this.responseObserver.onCompleted();
        }

        public void onCompleted() {
            this.transferManager.removeTransferStream(this, null);
            this.responseObserver.onCompleted();
        }

        public StreamObserver<ServerAckMessage> getResponseObserver() {
            return this.responseObserver;
        }
    }

    private static final class FileTransfer {
        private final String transferId;
        private AgentFileChunkObserver agentFileChunkObserver;
        private final StreamBuffer buffer;
        private final String description;
        private State state = State.NEW;
        private Instant lastAckTimestamp;

        private FileTransfer(String transferId, String jobId, Path relativePath, int startOffset, int endOffset, int fileSize, StreamBuffer buffer) {
            this.transferId = transferId;
            this.buffer = buffer;
            this.lastAckTimestamp = Instant.now();
            this.description = "FileTransfer " + transferId + ", agent://" + jobId + "/" + relativePath + " (range: (" + startOffset + "-" + endOffset + "] file.size: " + fileSize + ")";
        }

        public String toString() {
            return "" + (Object)((Object)this.state) + " " + this.description;
        }

        private void claimStreamObserver(AgentFileChunkObserver observer) {
            this.state = State.IN_PROGRESS;
            this.agentFileChunkObserver = observer;
        }

        private InputStream getInputStream() {
            return this.buffer.getInputStream();
        }

        private boolean append(ByteString data) {
            return this.buffer.tryWrite(data);
        }

        private void closeWithError(Throwable t) {
            this.state = State.FAILED;
            this.buffer.closeForError(t);
        }

        private void close() {
            this.state = State.COMPLETED;
            this.buffer.closeForCompleted();
        }

        private void sendAck() {
            this.getAgentFileChunkObserver().getResponseObserver().onNext((Object)ServerAckMessage.newBuilder().build());
            this.lastAckTimestamp = Instant.now();
        }

        public String getTransferId() {
            return this.transferId;
        }

        public AgentFileChunkObserver getAgentFileChunkObserver() {
            return this.agentFileChunkObserver;
        }

        private static enum State {
            NEW,
            IN_PROGRESS,
            COMPLETED,
            FAILED;

        }
    }

    private static final class TransferManager {
        private final Map<String, FileTransfer> activeTransfers = Maps.newHashMap();
        private final Set<AgentFileChunkObserver> unclaimedTransferStreams = Sets.newHashSet();
        private final Class<? extends HttpRange> suffixRangeClass = HttpRange.createSuffixRange((long)1L).getClass();
        private final ControlStreamManager controlStreamsManager;
        private final TaskScheduler taskScheduler;
        private final AgentFileStreamProperties properties;
        private final Counter transferTimeOutCounter;
        private final DistributionSummary transferSizeDistribution;
        private final MeterRegistry registry;

        private TransferManager(ControlStreamManager controlStreamsManager, TaskScheduler taskScheduler, AgentFileStreamProperties properties, MeterRegistry registry) {
            this.controlStreamsManager = controlStreamsManager;
            this.taskScheduler = taskScheduler;
            this.properties = properties;
            this.registry = registry;
            this.transferTimeOutCounter = registry.counter(GRpcAgentFileStreamServiceImpl.TRANSFER_TIMEOUT_COUNTER, new String[0]);
            this.transferSizeDistribution = registry.summary(GRpcAgentFileStreamServiceImpl.TRANSFER_SIZE_DISTRIBUTION, new String[0]);
            this.taskScheduler.scheduleAtFixedRate(this::reapStalledTransfers, this.properties.getStalledTransferCheckInterval());
        }

        private synchronized void reapStalledTransfers() {
            AtomicInteger stalledTrasfersCounter = new AtomicInteger();
            Instant now = Instant.now();
            this.activeTransfers.entrySet().removeIf(entry -> {
                String transferId = (String)entry.getKey();
                FileTransfer transfer = (FileTransfer)entry.getValue();
                Instant deadline = transfer.lastAckTimestamp.plus(this.properties.getStalledTransferTimeout());
                if (now.isAfter(deadline)) {
                    stalledTrasfersCounter.incrementAndGet();
                    log.warn("Transfer {} is stalled, shutting it down", (Object)transferId);
                    TimeoutException exception = new TimeoutException("Transfer not making progress");
                    AgentFileChunkObserver observer = transfer.getAgentFileChunkObserver();
                    if (observer != null) {
                        observer.getResponseObserver().onError((Throwable)exception);
                    }
                    transfer.closeWithError(exception);
                    return true;
                }
                return false;
            });
            this.transferTimeOutCounter.increment((double)stalledTrasfersCounter.get());
            this.registry.gauge(GRpcAgentFileStreamServiceImpl.ACTIVE_TRANSFER_GAUGE, (Number)this.activeTransfers.size());
        }

        private synchronized FileTransfer startFileTransfer(String jobId, DirectoryManifest.ManifestEntry manifestEntry, Path relativePath, @Nullable HttpRange range) throws NotFoundException, LimitExceededException {
            int endOffset;
            int startOffset;
            String fileTransferId = UUID.randomUUID().toString();
            log.debug("Initiating transfer {} for file: {} of job: {}", new Object[]{fileTransferId, relativePath, jobId});
            if (this.activeTransfers.size() >= this.properties.getMaxConcurrentTransfers()) {
                log.warn("Rejecting request for {}:{}, too many active transfers", (Object)jobId, (Object)relativePath);
                throw new LimitExceededException("Too many concurrent downloads");
            }
            int fileSize = Math.toIntExact(manifestEntry.getSize());
            if (range == null) {
                startOffset = 0;
                endOffset = fileSize;
            } else if (range.getClass() == this.suffixRangeClass) {
                startOffset = Math.toIntExact(range.getRangeStart((long)fileSize));
                endOffset = fileSize;
            } else {
                startOffset = Math.min(fileSize, Math.toIntExact(range.getRangeStart((long)fileSize)));
                endOffset = 1 + Math.toIntExact(range.getRangeEnd((long)fileSize));
            }
            log.debug("Transfer {} effective range {}-{}: ", new Object[]{fileTransferId, startOffset, endOffset});
            StreamBuffer buffer = new StreamBuffer(startOffset);
            FileTransfer fileTransfer = new FileTransfer(fileTransferId, jobId, relativePath, startOffset, endOffset, fileSize, buffer);
            this.transferSizeDistribution.record((double)(endOffset - startOffset));
            if (endOffset - startOffset == 0) {
                log.debug("Transfer {} is empty, completing", (Object)fileTransferId);
                buffer.closeForCompleted();
            } else {
                log.debug("Tracking new transfer {}", (Object)fileTransferId);
                this.activeTransfers.put(fileTransferId, fileTransfer);
                log.debug("Requesting start of transfer {}", (Object)fileTransferId);
                try {
                    this.controlStreamsManager.requestFile(jobId, fileTransferId, relativePath.toString(), startOffset, endOffset);
                }
                catch (NotFoundException e) {
                    log.error("Failed to request file {}:{}, terminating transfer {}: {}", new Object[]{jobId, relativePath, fileTransferId, e.getMessage()});
                    this.activeTransfers.remove(fileTransferId, fileTransfer);
                    buffer.closeForError((Throwable)((Object)e));
                    throw e;
                }
            }
            return fileTransfer;
        }

        private synchronized StreamObserver<AgentFileMessage> handleNewTransferStream(StreamObserver<ServerAckMessage> responseObserver) {
            log.info("New file transfer stream established");
            AgentFileChunkObserver agentFileChunkObserver = new AgentFileChunkObserver(this, responseObserver);
            this.unclaimedTransferStreams.add(agentFileChunkObserver);
            this.taskScheduler.schedule(() -> this.handleUnclaimedStreamTimeout(agentFileChunkObserver), Instant.now().plus(this.properties.getUnclaimedStreamStartTimeout()));
            return agentFileChunkObserver;
        }

        private synchronized void handleUnclaimedStreamTimeout(AgentFileChunkObserver agentFileChunkObserver) {
            boolean streamUnclaimed = this.unclaimedTransferStreams.remove(agentFileChunkObserver);
            if (streamUnclaimed) {
                log.warn("Shutting down unclaimed transfer stream");
                agentFileChunkObserver.getResponseObserver().onError((Throwable)new TimeoutException("No messages received in stream"));
                this.transferTimeOutCounter.increment();
            }
        }

        private synchronized void handleFileChunk(String transferStreamId, AgentFileChunkObserver agentFileChunkObserver, ByteString data) {
            FileTransfer fileTransfer = this.activeTransfers.get(transferStreamId);
            boolean unclaimedStream = this.unclaimedTransferStreams.remove(agentFileChunkObserver);
            if (fileTransfer != null) {
                if (unclaimedStream) {
                    fileTransfer.claimStreamObserver(agentFileChunkObserver);
                }
                this.taskScheduler.schedule(() -> this.writeDataAndAck(fileTransfer, data), new Date());
            } else {
                log.warn("Received a chunk for a transfer no longer in progress: {}", (Object)transferStreamId);
            }
        }

        private synchronized void removeTransferStream(AgentFileChunkObserver agentFileChunkObserver, @Nullable Throwable t) {
            log.info("Removing file transfer: {}", (Object)(t == null ? "completed" : t.getMessage()));
            FileTransfer fileTransfer = this.findFileTransfer(agentFileChunkObserver);
            if (fileTransfer != null) {
                boolean removed = this.activeTransfers.remove(fileTransfer.getTransferId(), fileTransfer);
                if (removed && t == null) {
                    fileTransfer.close();
                } else if (removed) {
                    fileTransfer.closeWithError(t);
                }
            } else {
                this.unclaimedTransferStreams.remove(agentFileChunkObserver);
            }
        }

        private synchronized FileTransfer findFileTransfer(AgentFileChunkObserver agentFileChunkObserver) {
            for (FileTransfer fileTransfer : this.activeTransfers.values()) {
                if (fileTransfer.getAgentFileChunkObserver() != agentFileChunkObserver) continue;
                return fileTransfer;
            }
            return null;
        }

        private void writeDataAndAck(FileTransfer fileTransfer, ByteString data) {
            String fileTransferId = fileTransfer.getTransferId();
            try {
                if (fileTransfer.append(data)) {
                    log.debug("Wrote chunk of transfer {} to buffer. Sending ack", (Object)fileTransferId);
                    fileTransfer.sendAck();
                } else {
                    this.taskScheduler.schedule(() -> this.writeDataAndAck(fileTransfer, data), Instant.now().plus(this.properties.getWriteRetryDelay()));
                }
            }
            catch (IllegalStateException e) {
                log.warn("Buffer of transfer {} is closed", (Object)fileTransferId);
            }
        }
    }

    private static final class ControlStreamObserver
    implements StreamObserver<AgentManifestMessage> {
        private final ControlStreamManager controlStreamManager;
        private final StreamObserver<ServerControlMessage> responseObserver;

        private ControlStreamObserver(ControlStreamManager controlStreamManager, StreamObserver<ServerControlMessage> responseObserver) {
            this.controlStreamManager = controlStreamManager;
            this.responseObserver = responseObserver;
        }

        public void onNext(AgentManifestMessage value) {
            String jobId = value.getJobId();
            DirectoryManifest manifest = null;
            try {
                manifest = this.controlStreamManager.converter.toManifest(value);
            }
            catch (GenieConversionException e) {
                log.warn("Failed to parse manifest for job id: {}", (Object)jobId, (Object)e);
            }
            if (manifest != null) {
                this.controlStreamManager.updateManifestAndStream(this, jobId, manifest);
            }
        }

        public void onError(Throwable t) {
            this.controlStreamManager.removeControlStream(this, t);
            this.responseObserver.onCompleted();
        }

        public void onCompleted() {
            this.controlStreamManager.removeControlStream(this, null);
            this.responseObserver.onCompleted();
        }

        public void closeStreamWithError(Throwable e) {
            this.responseObserver.onError(e);
        }
    }

    private static final class ControlStreamManager {
        private final Map<String, ControlStreamObserver> controlStreamMap = Maps.newHashMap();
        private final Cache<String, DirectoryManifest> manifestCache;
        private final JobDirectoryManifestProtoConverter converter;
        private final Counter fileTansferCounter;
        private final MeterRegistry registry;

        private ControlStreamManager(JobDirectoryManifestProtoConverter converter, AgentFileStreamProperties properties, MeterRegistry registry) {
            this.converter = converter;
            this.manifestCache = Caffeine.newBuilder().expireAfterWrite(properties.getManifestCacheExpiration()).build();
            this.fileTansferCounter = registry.counter(GRpcAgentFileStreamServiceImpl.TRANSFER_COUNTER, new String[0]);
            this.registry = registry;
        }

        private synchronized void requestFile(String jobId, String fileTransferId, String relativePath, int startOffset, int endOffset) throws NotFoundException {
            ControlStreamObserver controlStreamObserver = this.controlStreamMap.get(jobId);
            if (controlStreamObserver == null) {
                throw new NotFoundException("No active stream control stream for job: " + jobId);
            }
            this.fileTansferCounter.increment();
            controlStreamObserver.responseObserver.onNext((Object)ServerControlMessage.newBuilder().setServerFileRequest(ServerFileRequestMessage.newBuilder().setStreamId(fileTransferId).setRelativePath(relativePath).setStartOffset(startOffset).setEndOffset(endOffset).build()).build());
        }

        private StreamObserver<AgentManifestMessage> handleNewControlStream(StreamObserver<ServerControlMessage> responseObserver) {
            log.info("New agent control stream established");
            return new ControlStreamObserver(this, responseObserver);
        }

        private DirectoryManifest getManifest(String jobId) {
            return (DirectoryManifest)this.manifestCache.getIfPresent((Object)jobId);
        }

        private synchronized void updateManifestAndStream(ControlStreamObserver controlStreamObserver, String jobId, DirectoryManifest manifest) {
            this.manifestCache.put((Object)jobId, (Object)manifest);
            ControlStreamObserver previousObserver = this.controlStreamMap.put(jobId, controlStreamObserver);
            if (previousObserver != null && previousObserver != controlStreamObserver) {
                previousObserver.closeStreamWithError(new IllegalStateException("A new stream was registered for the same job id: " + jobId));
            }
            this.registry.gauge(GRpcAgentFileStreamServiceImpl.MANIFEST_CACHE_SIZE_GAUGE, (Number)this.manifestCache.estimatedSize());
            this.registry.gauge(GRpcAgentFileStreamServiceImpl.CONTROL_STREAMS_GAUGE, (Number)this.controlStreamMap.size());
        }

        private synchronized void removeControlStream(ControlStreamObserver controlStreamObserver, @Nullable Throwable t) {
            log.debug("Control stream {}", (Object)(t == null ? "completed" : "error: " + t.getMessage()));
            boolean foundAndRemoved = this.controlStreamMap.entrySet().removeIf(entry -> entry.getValue() == controlStreamObserver);
            if (foundAndRemoved) {
                log.debug("Removed a control stream due to {}", (Object)(t == null ? "completion" : "error: " + t.getMessage()));
            }
        }
    }
}

