/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.artifact;

import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.base.Strings;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.CacheBuilder;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.CacheLoader;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LoadingCache;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.hash.Hasher;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.hash.Hashing;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.io.ByteStreams;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.Message;
import org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.util.JsonFormat;
import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFileSystemArtifactRetrievalService
extends ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceImplBase
implements ArtifactRetrievalService {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFileSystemArtifactRetrievalService.class);
    private static final int ARTIFACT_CHUNK_SIZE_BYTES = 0x200000;
    private static final LoadingCache<String, ArtifactApi.ProxyManifest> MANIFEST_CACHE = CacheBuilder.newBuilder().expireAfterAccess(1L, TimeUnit.HOURS).maximumSize(100L).build(new CacheLoader<String, ArtifactApi.ProxyManifest>(){

        @Override
        public ArtifactApi.ProxyManifest load(String retrievalToken) throws Exception {
            return BeamFileSystemArtifactRetrievalService.loadManifest(retrievalToken);
        }
    });

    public static BeamFileSystemArtifactRetrievalService create() {
        return new BeamFileSystemArtifactRetrievalService();
    }

    public void getManifest(ArtifactApi.GetManifestRequest request, StreamObserver<ArtifactApi.GetManifestResponse> responseObserver) {
        String token = request.getRetrievalToken();
        if (Strings.isNullOrEmpty(token)) {
            throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Empty artifact token"));
        }
        LOG.info("GetManifest for {}", (Object)token);
        try {
            ArtifactApi.ProxyManifest proxyManifest = MANIFEST_CACHE.get(token);
            ArtifactApi.GetManifestResponse response = ArtifactApi.GetManifestResponse.newBuilder().setManifest(proxyManifest.getManifest()).build();
            LOG.info("GetManifest for {} -> {} artifacts", (Object)token, (Object)proxyManifest.getManifest().getArtifactCount());
            responseObserver.onNext((Object)response);
            responseObserver.onCompleted();
        }
        catch (Exception e) {
            LOG.info("GetManifest for {} failed", (Object)token, (Object)e);
            responseObserver.onError((Throwable)e);
        }
    }

    public void getArtifact(ArtifactApi.GetArtifactRequest request, StreamObserver<ArtifactApi.ArtifactChunk> responseObserver) {
        LOG.debug("GetArtifact {}", (Object)request);
        String name = request.getName();
        try {
            ArtifactApi.ProxyManifest proxyManifest = MANIFEST_CACHE.get(request.getRetrievalToken());
            ArtifactApi.ProxyManifest.Location location = proxyManifest.getLocationList().stream().filter(loc -> loc.getName().equals(name)).findFirst().orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription(String.format("Artifact location not found in manifest: %s", name))));
            List existingArtifacts = proxyManifest.getManifest().getArtifactList();
            ArtifactApi.ArtifactMetadata metadata = existingArtifacts.stream().filter(meta -> meta.getName().equals(name)).findFirst().orElseThrow(() -> new StatusRuntimeException(Status.NOT_FOUND.withDescription(String.format("Artifact metadata not found in manifest: %s", name))));
            ResourceId artifactResourceId = FileSystems.matchNewResource((String)location.getUri(), (boolean)false);
            LOG.debug("Artifact {} located in {}", (Object)name, (Object)artifactResourceId);
            Hasher hasher = Hashing.sha256().newHasher();
            byte[] data = new byte[0x200000];
            try (InputStream stream = Channels.newInputStream(FileSystems.open((ResourceId)artifactResourceId));){
                int len;
                while ((len = stream.read(data)) != -1) {
                    hasher.putBytes(data, 0, len);
                    responseObserver.onNext((Object)ArtifactApi.ArtifactChunk.newBuilder().setData(ByteString.copyFrom((byte[])data, (int)0, (int)len)).build());
                }
            }
            if (metadata.getSha256() != null && !metadata.getSha256().isEmpty()) {
                String expected = metadata.getSha256();
                String actual = hasher.hash().toString();
                if (!actual.equals(expected)) {
                    throw new StatusRuntimeException(Status.DATA_LOSS.withDescription(String.format("Artifact %s is corrupt: expected sha256 %s, actual %s", name, expected, actual)));
                }
            }
            responseObserver.onCompleted();
        }
        catch (IOException | ExecutionException e) {
            LOG.info("GetArtifact {} failed", (Object)request, (Object)e);
            responseObserver.onError((Throwable)e);
        }
    }

    @Override
    public void close() throws Exception {
    }

    @VisibleForTesting
    static ArtifactApi.ProxyManifest loadManifest(String retrievalToken) throws IOException {
        LOG.info("Loading manifest for retrieval token {}", (Object)retrievalToken);
        ResourceId manifestResourceId = BeamFileSystemArtifactRetrievalService.getManifestLocationFromToken(retrievalToken);
        return BeamFileSystemArtifactRetrievalService.loadManifest(manifestResourceId);
    }

    static ArtifactApi.ProxyManifest loadManifest(ResourceId manifestResourceId) throws IOException {
        ArtifactApi.ProxyManifest.Builder manifestBuilder = ArtifactApi.ProxyManifest.newBuilder();
        try (InputStream stream = Channels.newInputStream(FileSystems.open((ResourceId)manifestResourceId));){
            String contents = new String(ByteStreams.toByteArray(stream), StandardCharsets.UTF_8);
            JsonFormat.parser().merge(contents, (Message.Builder)manifestBuilder);
        }
        ArtifactApi.ProxyManifest proxyManifest = manifestBuilder.build();
        Preconditions.checkArgument(proxyManifest.hasManifest(), String.format("Invalid ProxyManifest at %s: doesn't have a Manifest", manifestResourceId));
        Preconditions.checkArgument(proxyManifest.getLocationCount() == proxyManifest.getManifest().getArtifactCount(), String.format("Invalid ProxyManifestat %s: %d locations but %d artifacts", manifestResourceId, proxyManifest.getLocationCount(), proxyManifest.getManifest().getArtifactCount()));
        LOG.info("Manifest at {} has {} artifact locations", (Object)manifestResourceId, (Object)proxyManifest.getManifest().getArtifactCount());
        return proxyManifest;
    }

    private static ResourceId getManifestLocationFromToken(String retrievalToken) {
        return FileSystems.matchNewResource((String)retrievalToken, (boolean)false);
    }
}

