package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.artifact;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import java.util.Iterator;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.FnService;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.hash.Hasher;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.hash.Hashing;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.vendor.grpc.v1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.ByteString;
import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.util.JsonFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.class */
public class BeamFileSystemArtifactStagingService extends ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase implements FnService {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFileSystemArtifactStagingService.class);
    private static final ObjectMapper MAPPER = new ObjectMapper();
    private static final Charset CHARSET = StandardCharsets.UTF_8;
    public static final String MANIFEST = "MANIFEST";
    public static final String ARTIFACTS = "artifacts";

    /* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService$PutArtifactStreamObserver.class */
    private class PutArtifactStreamObserver implements StreamObserver<ArtifactApi.PutArtifactRequest> {
        private final StreamObserver<ArtifactApi.PutArtifactResponse> outboundObserver;
        private ArtifactApi.PutArtifactMetadata metadata;
        private ResourceId artifactId;
        private WritableByteChannel artifactWritableByteChannel;
        private Hasher hasher;

        PutArtifactStreamObserver(StreamObserver<ArtifactApi.PutArtifactResponse> streamObserver) {
            this.outboundObserver = streamObserver;
        }

        public void onNext(ArtifactApi.PutArtifactRequest putArtifactRequest) {
            if (this.metadata != null) {
                try {
                    ByteString data = putArtifactRequest.getData().getData();
                    this.artifactWritableByteChannel.write(data.asReadOnlyByteBuffer());
                    this.hasher.putBytes(data.toByteArray());
                    return;
                } catch (IOException e) {
                    this.outboundObserver.onError(new StatusRuntimeException(Status.DATA_LOSS.withDescription(String.format("Failed to write chunk of artifact %s to %s", this.metadata.getMetadata().getName(), this.artifactId)).withCause(e)));
                    return;
                }
            }
            Preconditions.checkNotNull(putArtifactRequest);
            Preconditions.checkNotNull(putArtifactRequest.getMetadata());
            this.metadata = putArtifactRequest.getMetadata();
            BeamFileSystemArtifactStagingService.LOG.info("stored metadata: {}", this.metadata);
            try {
                this.artifactId = BeamFileSystemArtifactStagingService.this.getArtifactDirResourceId(StagingSessionToken.decode(putArtifactRequest.getMetadata().getStagingSessionToken())).resolve(BeamFileSystemArtifactStagingService.this.encodedFileName(this.metadata.getMetadata()), ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
                BeamFileSystemArtifactStagingService.LOG.info("Going to stage artifact {} to {}.", this.metadata.getMetadata().getName(), this.artifactId);
                this.artifactWritableByteChannel = FileSystems.create(this.artifactId, "application/octet-stream");
                this.hasher = Hashing.md5().newHasher();
            } catch (Exception e2) {
                this.outboundObserver.onError(new StatusRuntimeException(Status.DATA_LOSS.withDescription(String.format("Failed to begin staging artifact %s", this.metadata.getMetadata().getName())).withCause(e2)));
            }
        }

        public void onError(Throwable th) {
            BeamFileSystemArtifactStagingService.LOG.error("Staging artifact failed for " + this.artifactId, th);
            try {
                if (this.artifactWritableByteChannel != null) {
                    this.artifactWritableByteChannel.close();
                }
                if (this.artifactId != null) {
                    FileSystems.delete(Collections.singletonList(this.artifactId), new MoveOptions[]{MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES});
                }
                this.outboundObserver.onError(new StatusRuntimeException(Status.DATA_LOSS.withDescription(String.format("Failed to stage artifact %s", this.artifactId)).withCause(th)));
            } catch (IOException e) {
                this.outboundObserver.onError(new StatusRuntimeException(Status.DATA_LOSS.withDescription(String.format("Failed to clean up artifact file %s", this.artifactId))));
            }
        }

        public void onCompleted() {
            BeamFileSystemArtifactStagingService.LOG.info("Staging artifact completed for " + this.artifactId);
            if (this.artifactWritableByteChannel != null) {
                try {
                    this.artifactWritableByteChannel.close();
                } catch (IOException e) {
                    onError(e);
                    return;
                }
            }
            String md5 = this.metadata.getMetadata().getMd5();
            if (md5 != null && !md5.isEmpty()) {
                String encodeToString = Base64.getEncoder().encodeToString(this.hasher.hash().asBytes());
                if (!encodeToString.equals(md5)) {
                    this.outboundObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(String.format("Artifact %s is corrupt: expected md5 %s, but has md5 %s", this.metadata.getMetadata().getName(), md5, encodeToString))));
                    return;
                }
            }
            this.outboundObserver.onNext(ArtifactApi.PutArtifactResponse.newBuilder().build());
            this.outboundObserver.onCompleted();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService$StagingSessionToken.class */
    public static class StagingSessionToken implements Serializable {
        private String sessionId;
        private String basePath;

        private StagingSessionToken() {
        }

        public String getSessionId() {
            return this.sessionId;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setSessionId(String str) {
            this.sessionId = str;
        }

        public String getBasePath() {
            return this.basePath;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setBasePath(String str) {
            this.basePath = str;
        }

        public String encode() {
            try {
                return BeamFileSystemArtifactStagingService.MAPPER.writeValueAsString(this);
            } catch (JsonProcessingException e) {
                throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(String.format("Error %s occurred while serializing %s", e.getMessage(), this)));
            }
        }

        public static StagingSessionToken decode(String str) throws Exception {
            try {
                return (StagingSessionToken) BeamFileSystemArtifactStagingService.MAPPER.readValue(str, StagingSessionToken.class);
            } catch (JsonProcessingException e) {
                throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(String.format("Unable to deserialize staging token %s. Expected format: %s. Error: %s", str, "{\"sessionId\": \"sessionId\", \"basePath\": \"basePath\"}", e.getMessage())));
            }
        }

        public String toString() {
            return "StagingSessionToken{sessionId='" + this.sessionId + "', basePath='" + this.basePath + "'}";
        }
    }

    public StreamObserver<ArtifactApi.PutArtifactRequest> putArtifact(StreamObserver<ArtifactApi.PutArtifactResponse> streamObserver) {
        return new PutArtifactStreamObserver(streamObserver);
    }

    public void commitManifest(ArtifactApi.CommitManifestRequest commitManifestRequest, StreamObserver<ArtifactApi.CommitManifestResponse> streamObserver) {
        try {
            StagingSessionToken decode = StagingSessionToken.decode(commitManifestRequest.getStagingSessionToken());
            ResourceId manifestFileResourceId = getManifestFileResourceId(decode);
            ResourceId artifactDirResourceId = getArtifactDirResourceId(decode);
            ArtifactApi.ProxyManifest.Builder manifest = ArtifactApi.ProxyManifest.newBuilder().setManifest(commitManifestRequest.getManifest());
            for (ArtifactApi.ArtifactMetadata artifactMetadata : commitManifestRequest.getManifest().getArtifactList()) {
                manifest.addLocation(ArtifactApi.ProxyManifest.Location.newBuilder().setName(artifactMetadata.getName()).setUri(artifactDirResourceId.resolve(encodedFileName(artifactMetadata), ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString()).build());
            }
            WritableByteChannel create = FileSystems.create(manifestFileResourceId, "text/plain");
            Throwable th = null;
            try {
                try {
                    create.write(CHARSET.encode(JsonFormat.printer().print(manifest.build())));
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                    streamObserver.onNext(ArtifactApi.CommitManifestResponse.newBuilder().setRetrievalToken(manifestFileResourceId.toString()).build());
                    streamObserver.onCompleted();
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            LOG.error("Unable to commit manifest.", e);
            streamObserver.onError(e);
        }
    }

    @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.FnService, java.lang.AutoCloseable
    public void close() throws Exception {
    }

    public static String generateStagingSessionToken(String str, String str2) throws Exception {
        StagingSessionToken stagingSessionToken = new StagingSessionToken();
        stagingSessionToken.setSessionId(str);
        stagingSessionToken.setBasePath(str2);
        return stagingSessionToken.encode();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String encodedFileName(ArtifactApi.ArtifactMetadata artifactMetadata) {
        return "artifact_" + Hashing.sha256().hashString(artifactMetadata.getName(), CHARSET).toString();
    }

    public void removeArtifacts(String str) throws Exception {
        ResourceId jobDirResourceId = getJobDirResourceId(StagingSessionToken.decode(str));
        ResourceId resolve = jobDirResourceId.resolve(MANIFEST, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
        LOG.debug("Removing dir {}", jobDirResourceId);
        Iterator it = BeamFileSystemArtifactRetrievalService.loadManifest(resolve).getLocationList().iterator();
        while (it.hasNext()) {
            String uri = ((ArtifactApi.ProxyManifest.Location) it.next()).getUri();
            LOG.debug("Removing artifact: {}", uri);
            FileSystems.delete(Collections.singletonList(FileSystems.matchNewResource(uri, false)), new MoveOptions[0]);
        }
        ResourceId resolve2 = jobDirResourceId.resolve(ARTIFACTS, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
        LOG.debug("Removing artifacts: {}", resolve2);
        FileSystems.delete(Collections.singletonList(resolve2), new MoveOptions[0]);
        LOG.debug("Removing manifest: {}", resolve);
        FileSystems.delete(Collections.singletonList(resolve), new MoveOptions[0]);
        LOG.debug("Removing empty dir: {}", jobDirResourceId);
        FileSystems.delete(Collections.singletonList(jobDirResourceId), new MoveOptions[0]);
        LOG.info("Removed dir {}", jobDirResourceId);
    }

    private ResourceId getJobDirResourceId(StagingSessionToken stagingSessionToken) {
        return FileSystems.matchNewResource(stagingSessionToken.getBasePath(), true).resolve(stagingSessionToken.getSessionId(), ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
    }

    private ResourceId getManifestFileResourceId(StagingSessionToken stagingSessionToken) {
        return getJobDirResourceId(stagingSessionToken).resolve(MANIFEST, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ResourceId getArtifactDirResourceId(StagingSessionToken stagingSessionToken) {
        return getJobDirResourceId(stagingSessionToken).resolve(ARTIFACTS, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
    }
}
