/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.genie.web.agent.apis.rpc.v4.endpoints;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.netflix.genie.common.exceptions.GenieTimeoutException;
import com.netflix.genie.common.internal.dto.DirectoryManifest;
import com.netflix.genie.common.internal.dto.v4.converters.JobDirectoryManifestProtoConverter;
import com.netflix.genie.common.internal.exceptions.GenieConversionException;
import com.netflix.genie.proto.AgentFileMessage;
import com.netflix.genie.proto.AgentManifestMessage;
import com.netflix.genie.proto.FileStreamServiceGrpc;
import com.netflix.genie.proto.ServerAckMessage;
import com.netflix.genie.proto.ServerControlMessage;
import com.netflix.genie.proto.ServerFileRequestMessage;
import com.netflix.genie.web.agent.resources.AgentFileResourceImpl;
import com.netflix.genie.web.agent.services.AgentFileStreamService;
import com.netflix.genie.web.util.StreamBuffer;
import io.grpc.stub.StreamObserver;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.TaskScheduler;

public class GRpcAgentFileStreamServiceImpl
extends FileStreamServiceGrpc.FileStreamServiceImplBase
implements AgentFileStreamService {
    private static final Logger log = LoggerFactory.getLogger(GRpcAgentFileStreamServiceImpl.class);
    private static final long FILE_TRANSFER_BEGIN_TIMEOUT_MILLIS = 3000L;
    private final Map<String, ControlStreamObserver> jobIdControlStreamMap = Maps.newConcurrentMap();
    private final Map<String, StreamBuffer> pendingTransferBuffersMap = Maps.newConcurrentMap();
    private final Set<FileTransferStreamObserver> pendingTransferObserversSet = Sets.newConcurrentHashSet();
    private final Map<String, StreamBuffer> inProgressTransferBuffersMap = Maps.newConcurrentMap();
    private final JobDirectoryManifestProtoConverter converter;
    private final TaskScheduler taskScheduler;

    public GRpcAgentFileStreamServiceImpl(JobDirectoryManifestProtoConverter converter, TaskScheduler taskScheduler) {
        this.converter = converter;
        this.taskScheduler = taskScheduler;
    }

    @Override
    public Optional<AgentFileStreamService.AgentFileResource> getResource(String jobId, Path relativePath, URI uri) {
        ControlStreamObserver streamObserver = this.jobIdControlStreamMap.get(jobId);
        if (streamObserver == null) {
            log.warn("Stream Record not found for job id: {}", (Object)jobId);
            return Optional.empty();
        }
        DirectoryManifest manifest = (DirectoryManifest)streamObserver.manifestRef.get();
        if (manifest == null) {
            log.warn("Stream record for job id: {} does not have a manifest", (Object)jobId);
            return Optional.empty();
        }
        DirectoryManifest.ManifestEntry manifestEntry = manifest.getEntry(relativePath.toString()).orElse(null);
        if (manifestEntry == null) {
            log.warn("Requesting a file ({}) that does not exist in the manifest for job id: {}: ", (Object)relativePath, (Object)jobId);
            return Optional.of(AgentFileResourceImpl.forNonExistingResource());
        }
        String fileTransferId = UUID.randomUUID().toString();
        boolean startOffset = false;
        int endOffset = Math.toIntExact(manifestEntry.getSize());
        StreamBuffer buffer = new StreamBuffer();
        if (endOffset - 0 == 0) {
            buffer.closeForCompleted();
        } else {
            this.pendingTransferBuffersMap.put(fileTransferId, buffer);
            streamObserver.responseObserver.onNext((Object)ServerControlMessage.newBuilder().setServerFileRequest(ServerFileRequestMessage.newBuilder().setStreamId(fileTransferId).setRelativePath(relativePath.toString()).setStartOffset(0).setEndOffset(endOffset).build()).build());
            this.taskScheduler.schedule(() -> {
                StreamBuffer b = this.pendingTransferBuffersMap.remove(fileTransferId);
                if (b != null) {
                    b.closeForError(new TimeoutException("Timeout waiting for transfer to start"));
                }
            }, Instant.now().plusMillis(3000L));
        }
        AgentFileStreamService.AgentFileResource resource = AgentFileResourceImpl.forAgentFile(uri, manifestEntry.getSize(), manifestEntry.getLastModifiedTime(), Paths.get(manifestEntry.getPath(), new String[0]), jobId, buffer.getInputStream());
        return Optional.of(resource);
    }

    @Override
    public Optional<DirectoryManifest> getManifest(String jobId) {
        ControlStreamObserver streamObserver = this.jobIdControlStreamMap.get(jobId);
        if (streamObserver != null) {
            return Optional.ofNullable(streamObserver.manifestRef.get());
        }
        log.warn("Stream Record not found for job id: {}", (Object)jobId);
        return Optional.empty();
    }

    public StreamObserver<AgentManifestMessage> sync(StreamObserver<ServerControlMessage> responseObserver) {
        log.info("New agent control stream established");
        return new ControlStreamObserver(this, responseObserver);
    }

    public StreamObserver<AgentFileMessage> transmit(StreamObserver<ServerAckMessage> responseObserver) {
        log.info("New file transfer stream established");
        FileTransferStreamObserver fileTransferStreamObserver = new FileTransferStreamObserver(this, responseObserver);
        this.pendingTransferObserversSet.add(fileTransferStreamObserver);
        this.taskScheduler.schedule(() -> {
            boolean removed = this.pendingTransferObserversSet.remove(fileTransferStreamObserver);
            if (removed) {
                fileTransferStreamObserver.responseObserver.onError((Throwable)new GenieTimeoutException("Timeout waiting for transfer to begin"));
            }
        }, Instant.now().plusMillis(3000L));
        return fileTransferStreamObserver;
    }

    private void registerControlStream(String jobId, ControlStreamObserver controlStreamObserver) {
        ControlStreamObserver previousObserver = this.jobIdControlStreamMap.put(jobId, controlStreamObserver);
        if (previousObserver != null) {
            log.warn("Found an previous observer for job id: {}", (Object)jobId);
        }
    }

    private void unregisterControlStream(String jobId, ControlStreamObserver controlStreamObserver) {
        boolean removed = this.jobIdControlStreamMap.remove(jobId, controlStreamObserver);
        if (!removed) {
            log.warn("Could not remove observer for job: {}, not found in map", (Object)jobId);
        }
    }

    private void handleFileTransferChunk(FileTransferStreamObserver fileTransferStreamObserver, String streamId, ByteString data) {
        StreamBuffer streamBuffer;
        StreamBuffer streamBufferFromPending;
        boolean removed = this.pendingTransferObserversSet.remove(fileTransferStreamObserver);
        if (removed) {
            log.debug("Removed observer for file stream: {} from 'pending' set", (Object)streamId);
        }
        if ((streamBufferFromPending = this.pendingTransferBuffersMap.remove(streamId)) != null) {
            log.debug("Moving buffer for file stream {} from 'pending' to 'in progress'", (Object)streamId);
            this.inProgressTransferBuffersMap.put(streamId, streamBufferFromPending);
        }
        if ((streamBuffer = this.inProgressTransferBuffersMap.get(streamId)) != null) {
            streamBuffer.write(data);
        }
    }

    private void handleFileTransferError(FileTransferStreamObserver fileTransferStreamObserver, @Nullable String streamId, Throwable t) {
        log.error("Error in file transfer stream: {}: {}", new Object[]{streamId, t.getMessage(), t});
        this.pendingTransferObserversSet.remove(fileTransferStreamObserver);
        if (streamId != null) {
            StreamBuffer inProgressTransferBuffer;
            StreamBuffer pendingTransferBuffer = this.pendingTransferBuffersMap.remove(streamId);
            if (pendingTransferBuffer != null) {
                pendingTransferBuffer.closeForError(t);
            }
            if ((inProgressTransferBuffer = this.inProgressTransferBuffersMap.remove(streamId)) != null) {
                inProgressTransferBuffer.closeForError(t);
            }
        }
    }

    private void handleFileTransferCompletion(FileTransferStreamObserver fileTransferStreamObserver, @Nullable String streamId) {
        StreamBuffer inProgressTransferBuffer;
        this.pendingTransferObserversSet.remove(fileTransferStreamObserver);
        StreamBuffer pendingTransferBuffer = this.pendingTransferBuffersMap.remove(streamId);
        if (pendingTransferBuffer != null) {
            pendingTransferBuffer.closeForCompleted();
        }
        if ((inProgressTransferBuffer = this.inProgressTransferBuffersMap.remove(streamId)) != null) {
            inProgressTransferBuffer.closeForCompleted();
        }
    }

    private static class FileTransferStreamObserver
    implements StreamObserver<AgentFileMessage> {
        private final GRpcAgentFileStreamServiceImpl gRpcAgentFileStreamService;
        private final StreamObserver<ServerAckMessage> responseObserver;
        private final AtomicReference<String> streamId = new AtomicReference();

        FileTransferStreamObserver(GRpcAgentFileStreamServiceImpl gRpcAgentFileStreamService, StreamObserver<ServerAckMessage> responseObserver) {
            this.gRpcAgentFileStreamService = gRpcAgentFileStreamService;
            this.responseObserver = responseObserver;
        }

        public void onNext(AgentFileMessage value) {
            String messageStreamId = value.getStreamId();
            if (StringUtils.isBlank((CharSequence)messageStreamId)) {
                log.warn("Received file chunk with empty stream identifier");
                return;
            }
            if (this.streamId.compareAndSet(null, messageStreamId)) {
                log.debug("Received first chunk for transfer: {}", (Object)messageStreamId);
            }
            if (!messageStreamId.equals(this.streamId.get())) {
                log.warn("Received chunk with id: {}, but this stream was previously used by stream: {}", (Object)messageStreamId, (Object)this.streamId.get());
                return;
            }
            this.gRpcAgentFileStreamService.handleFileTransferChunk(this, value.getStreamId(), value.getData());
            this.responseObserver.onNext((Object)ServerAckMessage.newBuilder().build());
        }

        public void onError(Throwable t) {
            this.gRpcAgentFileStreamService.handleFileTransferError(this, this.streamId.get(), t);
        }

        public void onCompleted() {
            this.gRpcAgentFileStreamService.handleFileTransferCompletion(this, this.streamId.get());
        }
    }

    private static class ControlStreamObserver
    implements StreamObserver<AgentManifestMessage> {
        private final GRpcAgentFileStreamServiceImpl gRpcAgentFileStreamService;
        private final StreamObserver<ServerControlMessage> responseObserver;
        private final AtomicReference<DirectoryManifest> manifestRef = new AtomicReference();
        private final AtomicReference<String> jobIdRef = new AtomicReference();

        ControlStreamObserver(GRpcAgentFileStreamServiceImpl gRpcAgentFileStreamService, StreamObserver<ServerControlMessage> responseObserver) {
            this.gRpcAgentFileStreamService = gRpcAgentFileStreamService;
            this.responseObserver = responseObserver;
        }

        public void onNext(AgentManifestMessage value) {
            log.debug("Received a manifest");
            String jobId = value.getJobId();
            boolean isFirstMessage = this.jobIdRef.compareAndSet(null, jobId);
            if (isFirstMessage) {
                this.gRpcAgentFileStreamService.registerControlStream(jobId, this);
            }
            try {
                this.manifestRef.set(this.gRpcAgentFileStreamService.converter.toManifest(value));
            }
            catch (GenieConversionException e) {
                log.warn("Failed to parse manifest for job id: {}", (Object)jobId, (Object)e);
            }
        }

        public void onError(Throwable t) {
            log.warn("Manifest stream error", t);
            this.unregisterStream();
        }

        public void onCompleted() {
            log.debug("Manifest stream completed");
            this.unregisterStream();
        }

        private void unregisterStream() {
            String jobId = this.jobIdRef.get();
            if (jobId != null) {
                this.gRpcAgentFileStreamService.unregisterControlStream(jobId, this);
            }
        }
    }
}

