package com.netflix.genie.web.rpc.grpc.services.impl.v4;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.netflix.genie.common.internal.dto.v4.files.JobFileState;
import com.netflix.genie.proto.BeginAcknowledgement;
import com.netflix.genie.proto.BeginSync;
import com.netflix.genie.proto.DataUpload;
import com.netflix.genie.proto.DeleteFile;
import com.netflix.genie.proto.JobDirectoryState;
import com.netflix.genie.proto.JobFileSyncServiceGrpc;
import com.netflix.genie.proto.ResetSync;
import com.netflix.genie.proto.SyncAcknowledgement;
import com.netflix.genie.proto.SyncComplete;
import com.netflix.genie.proto.SyncRequest;
import com.netflix.genie.proto.SyncRequestResult;
import com.netflix.genie.proto.SyncResponse;
import com.netflix.genie.web.properties.GRpcServerProperties;
import com.netflix.genie.web.properties.JobFileSyncRpcProperties;
import com.netflix.genie.web.services.JobFileService;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import net.devh.springboot.autoconfigure.grpc.server.GrpcService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.TaskScheduler;

@GrpcService(JobFileSyncServiceGrpc.class)
@ConditionalOnProperty(value = {GRpcServerProperties.ENABLED_PROPERTY}, havingValue = "true")
/* loaded from: input_file:com/netflix/genie/web/rpc/grpc/services/impl/v4/GRpcJobFileSyncServiceImpl.class */
public class GRpcJobFileSyncServiceImpl extends JobFileSyncServiceGrpc.JobFileSyncServiceImplBase {
    private static final Logger log = LoggerFactory.getLogger(GRpcJobFileSyncServiceImpl.class);
    private final JobFileSyncRpcProperties jobFileSyncRpcProperties;
    private final JobFileService jobFileService;
    private final ScheduledFuture<?> ackFuture;
    private final ConcurrentMap<String, JobFileSyncObserver> jobSyncRequestObservers = Maps.newConcurrentMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/netflix/genie/web/rpc/grpc/services/impl/v4/GRpcJobFileSyncServiceImpl$JobFileSyncObserver.class */
    public interface JobFileSyncObserver {
        String getId();

        Optional<String> getJobId();

        void cleanup();

        void sendSyncAckMessageIfNecessary();
    }

    /* loaded from: input_file:com/netflix/genie/web/rpc/grpc/services/impl/v4/GRpcJobFileSyncServiceImpl$JobFileSyncObserverImpl.class */
    private static class JobFileSyncObserverImpl implements StreamObserver<SyncRequest>, JobFileSyncObserver {
        private static final Logger log = LoggerFactory.getLogger(JobFileSyncObserverImpl.class);
        private final String id;
        private final Object messagesLock;
        private final List<SyncRequestResult> requestResults;
        private final StreamObserver<SyncResponse> responseObserver;
        private final JobFileService jobFileService;
        private final Consumer<JobFileSyncObserver> jobIdPopulatedCallback;
        private final Consumer<JobFileSyncObserver> completionCallback;
        private final AtomicBoolean cleanedUp;
        private final int maxSyncMessages;
        private boolean waitingForBeginMessage;
        private boolean sentResetMessage;
        private String jobId;

        private JobFileSyncObserverImpl(JobFileSyncRpcProperties jobFileSyncRpcProperties, StreamObserver<SyncResponse> streamObserver, JobFileService jobFileService, Consumer<JobFileSyncObserver> consumer, Consumer<JobFileSyncObserver> consumer2) {
            this.id = UUID.randomUUID().toString();
            this.messagesLock = new Object();
            this.requestResults = Lists.newArrayList();
            this.cleanedUp = new AtomicBoolean(false);
            this.waitingForBeginMessage = true;
            this.responseObserver = streamObserver;
            this.jobFileService = jobFileService;
            this.jobIdPopulatedCallback = consumer;
            this.completionCallback = consumer2;
            this.maxSyncMessages = jobFileSyncRpcProperties.getMaxSyncMessages();
        }

        public void onNext(SyncRequest syncRequest) {
            try {
                if (syncRequest.hasBeginSync()) {
                    handleBeginSync(syncRequest.getBeginSync());
                } else if (syncRequest.hasDataUpload()) {
                    handleDataUpload(syncRequest.getDataUpload());
                } else if (syncRequest.hasDeleteFile()) {
                    handleDeleteFile(syncRequest.getDeleteFile());
                } else if (syncRequest.hasSyncComplete()) {
                    handleSyncComplete(syncRequest.getSyncComplete());
                } else {
                    log.error("Received unknown message type {}", syncRequest);
                }
            } catch (IOException e) {
                log.error("Error for upload request {}", syncRequest, e);
            }
        }

        public void onError(Throwable th) {
            cleanup();
        }

        public void onCompleted() {
            cleanup();
        }

        @Override // com.netflix.genie.web.rpc.grpc.services.impl.v4.GRpcJobFileSyncServiceImpl.JobFileSyncObserver
        public Optional<String> getJobId() {
            return Optional.ofNullable(this.jobId);
        }

        @Override // com.netflix.genie.web.rpc.grpc.services.impl.v4.GRpcJobFileSyncServiceImpl.JobFileSyncObserver
        public void cleanup() {
            if (this.cleanedUp.getAndSet(true)) {
                return;
            }
            log.debug("Cleaning up");
            this.completionCallback.accept(this);
            log.debug("Cleaned up");
        }

        @Override // com.netflix.genie.web.rpc.grpc.services.impl.v4.GRpcJobFileSyncServiceImpl.JobFileSyncObserver
        public void sendSyncAckMessageIfNecessary() {
            synchronized (this.messagesLock) {
                if (!this.requestResults.isEmpty()) {
                    log.debug("Sending sync acknowledgment for messages {}", this.requestResults);
                    this.responseObserver.onNext(SyncResponse.newBuilder().setSyncAck(SyncAcknowledgement.newBuilder().addAllResults(this.requestResults).build()).build());
                    this.requestResults.clear();
                }
            }
        }

        private void handleBeginSync(BeginSync beginSync) throws IOException {
            if (!this.waitingForBeginMessage) {
                log.warn("Received a {} message after one had already been received", BeginSync.class.getCanonicalName());
                return;
            }
            this.jobId = beginSync.getJobId();
            if (StringUtils.isBlank(this.jobId)) {
                throw new IllegalArgumentException("No job id provided to sync service. Unable to continue");
            }
            log.debug("Beginning to sync job files for job {}", this.jobId);
            this.jobIdPopulatedCallback.accept(this);
            Set<JobFileState> jobDirectoryFileState = this.jobFileService.getJobDirectoryFileState(this.jobId, false);
            this.waitingForBeginMessage = false;
            this.responseObserver.onNext(SyncResponse.newBuilder().setBeginAck(BeginAcknowledgement.newBuilder().setServerDirectoryState(JobDirectoryState.newBuilder().setIncludesMd5(false).addAllFiles((Iterable) jobDirectoryFileState.stream().map(jobFileState -> {
                return com.netflix.genie.proto.JobFileState.newBuilder().setPath(jobFileState.getPath()).setSize(jobFileState.getSize()).build();
            }).collect(Collectors.toSet())).build()).build()).build());
        }

        private void handleDataUpload(DataUpload dataUpload) {
            String id = dataUpload.getId();
            log.debug("Received data upload message with id {} for job {}", id, this.jobId);
            if (this.waitingForBeginMessage) {
                log.debug("Haven't received a {} message. Ignoring data upload message {}.", BeginSync.class.getCanonicalName(), id);
                sendResetMessageIfNecessary();
                return;
            }
            try {
                this.jobFileService.updateFile(this.jobId, dataUpload.getPath(), dataUpload.getStartByte(), dataUpload.getData().toByteArray());
                synchronized (this.messagesLock) {
                    this.requestResults.add(createRequestResult(id, true));
                }
            } catch (Exception e) {
                log.error("Unable to save data for job {} from message {} due to {}", new Object[]{this.jobId, id, e.getMessage(), e});
                synchronized (this.messagesLock) {
                    this.requestResults.add(createRequestResult(id, false));
                }
            }
            checkIfShouldSendAck();
        }

        private void handleDeleteFile(DeleteFile deleteFile) {
            String id = deleteFile.getId();
            log.debug("Received file delete message {} in job {}", id, this.jobId);
            if (this.waitingForBeginMessage) {
                log.debug("Haven't received a {} message. Ignoring file delete message {}.", BeginSync.class.getCanonicalName(), id);
                sendResetMessageIfNecessary();
            } else {
                try {
                    this.jobFileService.deleteJobFile(this.jobId, deleteFile.getPath());
                    synchronized (this.messagesLock) {
                        this.requestResults.add(createRequestResult(id, true));
                    }
                } catch (Exception e) {
                    log.error("Deleting {} for job {} failed due to {}", new Object[]{deleteFile.getPath(), this.jobId, e.getMessage(), e});
                    synchronized (this.messagesLock) {
                        this.requestResults.add(createRequestResult(id, false));
                    }
                }
            }
            checkIfShouldSendAck();
        }

        private void handleSyncComplete(SyncComplete syncComplete) throws IOException {
            if (this.waitingForBeginMessage) {
                log.debug("Haven't received a {} message. Ignoring file sync complete message {}.", BeginSync.class.getCanonicalName(), syncComplete);
                sendResetMessageIfNecessary();
                return;
            }
            log.debug("Job file synchronization from agent for job {} is complete.", this.jobId);
            sendSyncAckMessageIfNecessary();
            cleanup();
            JobDirectoryState finalAgentDirectoryState = syncComplete.getFinalAgentDirectoryState();
            boolean includesMd5 = finalAgentDirectoryState.getIncludesMd5();
            Set set = (Set) finalAgentDirectoryState.getFilesList().stream().map(jobFileState -> {
                return new JobFileState(jobFileState.getPath(), jobFileState.getSize(), includesMd5 ? jobFileState.getMd5() : null);
            }).collect(Collectors.toSet());
            Set<JobFileState> jobDirectoryFileState = this.jobFileService.getJobDirectoryFileState(this.jobId, includesMd5);
            if (set.equals(jobDirectoryFileState)) {
                return;
            }
            log.warn("After the agent finished syncing job files for job {} the state of the files on the server {} is different than the supplied state of the files on the agent {}", new Object[]{this.jobId, jobDirectoryFileState, set});
        }

        private void sendResetMessageIfNecessary() {
            if (this.sentResetMessage) {
                return;
            }
            log.debug("Sending job file sync reset message to agent");
            this.responseObserver.onNext(SyncResponse.newBuilder().setReset(ResetSync.newBuilder().build()).build());
            this.sentResetMessage = true;
        }

        private void checkIfShouldSendAck() {
            boolean z = false;
            synchronized (this.messagesLock) {
                if (this.requestResults.size() == this.maxSyncMessages) {
                    z = true;
                }
            }
            if (z) {
                sendSyncAckMessageIfNecessary();
            }
        }

        private SyncRequestResult createRequestResult(String str, boolean z) {
            return SyncRequestResult.newBuilder().setId(str).setSuccessful(z).build();
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof JobFileSyncObserverImpl)) {
                return false;
            }
            JobFileSyncObserverImpl jobFileSyncObserverImpl = (JobFileSyncObserverImpl) obj;
            if (!jobFileSyncObserverImpl.canEqual(this)) {
                return false;
            }
            String str = this.id;
            String str2 = jobFileSyncObserverImpl.id;
            return str == null ? str2 == null : str.equals(str2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof JobFileSyncObserverImpl;
        }

        public int hashCode() {
            String str = this.id;
            return (1 * 59) + (str == null ? 43 : str.hashCode());
        }

        @Override // com.netflix.genie.web.rpc.grpc.services.impl.v4.GRpcJobFileSyncServiceImpl.JobFileSyncObserver
        public String getId() {
            return this.id;
        }
    }

    public GRpcJobFileSyncServiceImpl(JobFileSyncRpcProperties jobFileSyncRpcProperties, JobFileService jobFileService, TaskScheduler taskScheduler) {
        this.jobFileSyncRpcProperties = jobFileSyncRpcProperties;
        this.jobFileService = jobFileService;
        this.ackFuture = taskScheduler.scheduleWithFixedDelay(this::executeObserverAcknowledgements, this.jobFileSyncRpcProperties.getAckIntervalMilliseconds());
    }

    public StreamObserver<SyncRequest> sync(StreamObserver<SyncResponse> streamObserver) {
        return new JobFileSyncObserverImpl(this.jobFileSyncRpcProperties, streamObserver, this.jobFileService, this::addJobFileSyncObserver, this::removeJobFileSyncObserver);
    }

    @PreDestroy
    public void cleanup() {
        if (this.ackFuture == null || this.ackFuture.isDone()) {
            return;
        }
        log.debug("Attempting to cancel the job file sync acknowledgement thread");
        if (this.ackFuture.cancel(false)) {
            log.debug("Cancelled the job file sync acknowledgement thread");
        } else {
            log.error("Unable to cancel the job file sync acknowledgement thread");
        }
    }

    public void onAgentDetached(String str) {
        JobFileSyncObserver jobFileSyncObserver = this.jobSyncRequestObservers.get(str);
        if (jobFileSyncObserver != null) {
            jobFileSyncObserver.cleanup();
        }
    }

    private void addJobFileSyncObserver(JobFileSyncObserver jobFileSyncObserver) {
        this.jobSyncRequestObservers.put(jobFileSyncObserver.getJobId().orElseThrow(() -> {
            return new IllegalArgumentException("Job id not yet set in observer");
        }), jobFileSyncObserver);
    }

    private void removeJobFileSyncObserver(JobFileSyncObserver jobFileSyncObserver) {
        jobFileSyncObserver.getJobId().ifPresent(str -> {
            if (this.jobSyncRequestObservers.remove(str, jobFileSyncObserver)) {
                log.debug("Successfully removed observer with id {} for job {}", jobFileSyncObserver.getId(), str);
            } else {
                log.debug("Failed to remove observer with id {} for job {}", jobFileSyncObserver.getId(), str);
            }
        });
    }

    private void executeObserverAcknowledgements() {
        log.debug("Invoking job file sync request observers send acknowledgement methods");
        this.jobSyncRequestObservers.values().parallelStream().forEach((v0) -> {
            v0.sendSyncAckMessageIfNecessary();
        });
    }
}
