package org.apache.beam.runners.fnexecution.control;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.base.Preconditions;
import org.apache.beam.runners.fnexecution.data.FnDataService;
import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
import org.apache.beam.runners.fnexecution.state.StateDelegator;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient.class */
public class SdkHarnessClient implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SdkHarnessClient.class);
    private final IdGenerator idGenerator;
    private final InstructionRequestHandler fnApiControlClient;
    private final FnDataService fnApiDataService;
    private final ConcurrentHashMap<String, BundleProcessor<?>> clientProcessors = new ConcurrentHashMap<>();

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient$ActiveBundle.class */
    public static class ActiveBundle<InputT> implements RemoteBundle<InputT> {
        private final String bundleId;
        private final CompletionStage<BeamFnApi.ProcessBundleResponse> response;
        private final CloseableFnDataReceiver<WindowedValue<InputT>> inputReceiver;
        private final Map<BeamFnApi.Target, InboundDataClient> outputClients;
        private final StateDelegator.Registration stateRegistration;
        private final BundleProgressHandler progressHandler;

        private ActiveBundle(String str, CompletionStage<BeamFnApi.ProcessBundleResponse> completionStage, CloseableFnDataReceiver<WindowedValue<InputT>> closeableFnDataReceiver, Map<BeamFnApi.Target, InboundDataClient> map, StateDelegator.Registration registration, BundleProgressHandler bundleProgressHandler) {
            this.bundleId = str;
            this.response = completionStage;
            this.inputReceiver = closeableFnDataReceiver;
            this.outputClients = map;
            this.stateRegistration = registration;
            this.progressHandler = bundleProgressHandler;
        }

        @Override // org.apache.beam.runners.fnexecution.control.RemoteBundle
        public String getId() {
            return this.bundleId;
        }

        @Override // org.apache.beam.runners.fnexecution.control.RemoteBundle
        public FnDataReceiver<WindowedValue<InputT>> getInputReceiver() {
            return this.inputReceiver;
        }

        @Override // org.apache.beam.runners.fnexecution.control.RemoteBundle, java.lang.AutoCloseable
        public void close() throws Exception {
            Exception exc = null;
            try {
                this.inputReceiver.close();
            } catch (Exception e) {
                exc = e;
            }
            try {
            } catch (Exception e2) {
                if (exc == null) {
                    exc = e2;
                } else {
                    exc.addSuppressed(e2);
                }
            }
            if (exc != null) {
                throw new IllegalStateException("Processing bundle failed, TODO: [BEAM-3962] abort bundle.");
            }
            this.progressHandler.onCompleted((BeamFnApi.ProcessBundleResponse) MoreFutures.get(this.response));
            try {
                if (exc == null) {
                    this.stateRegistration.deregister();
                } else {
                    this.stateRegistration.abort();
                }
            } catch (Exception e3) {
                if (exc == null) {
                    exc = e3;
                } else {
                    exc.addSuppressed(e3);
                }
            }
            for (InboundDataClient inboundDataClient : this.outputClients.values()) {
                if (exc == null) {
                    try {
                        inboundDataClient.awaitCompletion();
                    } catch (Exception e4) {
                        if (exc == null) {
                            exc = e4;
                        } else {
                            exc.addSuppressed(e4);
                        }
                    }
                } else {
                    inboundDataClient.cancel();
                }
            }
            if (exc != null) {
                throw exc;
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient$BundleProcessor.class */
    public class BundleProcessor<T> {
        private final BeamFnApi.ProcessBundleDescriptor processBundleDescriptor;
        private final CompletionStage<BeamFnApi.RegisterResponse> registrationFuture;
        private final RemoteInputDestination<WindowedValue<T>> remoteInput;
        private final StateDelegator stateDelegator;

        private BundleProcessor(BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, CompletionStage<BeamFnApi.RegisterResponse> completionStage, RemoteInputDestination<WindowedValue<T>> remoteInputDestination, StateDelegator stateDelegator) {
            this.processBundleDescriptor = processBundleDescriptor;
            this.registrationFuture = completionStage;
            this.remoteInput = remoteInputDestination;
            this.stateDelegator = stateDelegator;
        }

        public CompletionStage<BeamFnApi.RegisterResponse> getRegistrationFuture() {
            return this.registrationFuture;
        }

        public ActiveBundle<T> newBundle(Map<BeamFnApi.Target, RemoteOutputReceiver<?>> map, BundleProgressHandler bundleProgressHandler) {
            return newBundle(map, stateRequest -> {
                throw new UnsupportedOperationException(String.format("The %s does not have a registered state handler.", ActiveBundle.class.getSimpleName()));
            }, bundleProgressHandler);
        }

        public ActiveBundle<T> newBundle(Map<BeamFnApi.Target, RemoteOutputReceiver<?>> map, StateRequestHandler stateRequestHandler, BundleProgressHandler bundleProgressHandler) {
            String id = SdkHarnessClient.this.idGenerator.getId();
            CompletionStage<BeamFnApi.InstructionResponse> handle = SdkHarnessClient.this.fnApiControlClient.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference(this.processBundleDescriptor.getId())).build());
            SdkHarnessClient.LOG.debug("Sent {} with ID {} for {} with ID {}", BeamFnApi.ProcessBundleRequest.class.getSimpleName(), id, BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(), this.processBundleDescriptor.getId());
            CompletionStage<U> thenApply = handle.thenApply((v0) -> {
                return v0.getProcessBundle();
            });
            HashMap hashMap = new HashMap();
            for (Map.Entry<BeamFnApi.Target, RemoteOutputReceiver<?>> entry : map.entrySet()) {
                hashMap.put(entry.getKey(), attachReceiver(id, entry.getKey(), (RemoteOutputReceiver) entry.getValue()));
            }
            return new ActiveBundle<>(id, thenApply, SdkHarnessClient.this.fnApiDataService.send(LogicalEndpoint.of(id, this.remoteInput.getTarget()), this.remoteInput.getCoder()), hashMap, this.stateDelegator.registerForProcessBundleInstructionId(id, stateRequestHandler), bundleProgressHandler);
        }

        private <OutputT> InboundDataClient attachReceiver(String str, BeamFnApi.Target target, RemoteOutputReceiver<WindowedValue<OutputT>> remoteOutputReceiver) {
            return SdkHarnessClient.this.fnApiDataService.receive(LogicalEndpoint.of(str, target), remoteOutputReceiver.getCoder(), remoteOutputReceiver.getReceiver());
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient$NoOpStateDelegator.class */
    private static class NoOpStateDelegator implements StateDelegator {
        private static final NoOpStateDelegator INSTANCE = new NoOpStateDelegator();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/runners/fnexecution/control/SdkHarnessClient$NoOpStateDelegator$Registration.class */
        public static class Registration implements StateDelegator.Registration {
            private static final Registration INSTANCE = new Registration();

            private Registration() {
            }

            @Override // org.apache.beam.runners.fnexecution.state.StateDelegator.Registration
            public void deregister() {
            }

            @Override // org.apache.beam.runners.fnexecution.state.StateDelegator.Registration
            public void abort() {
            }
        }

        private NoOpStateDelegator() {
        }

        @Override // org.apache.beam.runners.fnexecution.state.StateDelegator
        public Registration registerForProcessBundleInstructionId(String str, StateRequestHandler stateRequestHandler) {
            return Registration.INSTANCE;
        }
    }

    private SdkHarnessClient(InstructionRequestHandler instructionRequestHandler, FnDataService fnDataService, IdGenerator idGenerator) {
        this.fnApiDataService = fnDataService;
        this.idGenerator = idGenerator;
        this.fnApiControlClient = instructionRequestHandler;
    }

    public static SdkHarnessClient usingFnApiClient(InstructionRequestHandler instructionRequestHandler, FnDataService fnDataService) {
        return new SdkHarnessClient(instructionRequestHandler, fnDataService, IdGenerators.incrementingLongs());
    }

    public SdkHarnessClient withIdGenerator(IdGenerator idGenerator) {
        return new SdkHarnessClient(this.fnApiControlClient, this.fnApiDataService, idGenerator);
    }

    public <T> BundleProcessor<T> getProcessor(BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, RemoteInputDestination<WindowedValue<T>> remoteInputDestination) {
        Preconditions.checkState(!processBundleDescriptor.hasStateApiServiceDescriptor(), "The %s cannot support a %s containing a state %s.", BundleProcessor.class.getSimpleName(), BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(), Endpoints.ApiServiceDescriptor.class.getSimpleName());
        return getProcessor(processBundleDescriptor, remoteInputDestination, NoOpStateDelegator.INSTANCE);
    }

    public <T> BundleProcessor<T> getProcessor(BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, RemoteInputDestination<WindowedValue<T>> remoteInputDestination, StateDelegator stateDelegator) {
        BundleProcessor<T> bundleProcessor = (BundleProcessor) this.clientProcessors.computeIfAbsent(processBundleDescriptor.getId(), str -> {
            return create(processBundleDescriptor, remoteInputDestination, stateDelegator);
        });
        Preconditions.checkArgument(((BundleProcessor) bundleProcessor).processBundleDescriptor.equals(processBundleDescriptor), "The provided %s with id %s collides with an existing %s with the same id but containing different contents.", BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(), processBundleDescriptor.getId(), BeamFnApi.ProcessBundleDescriptor.class.getSimpleName());
        return bundleProcessor;
    }

    private <T> BundleProcessor<T> create(BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, RemoteInputDestination<WindowedValue<T>> remoteInputDestination, StateDelegator stateDelegator) {
        LOG.debug("Registering {}", processBundleDescriptor);
        return new BundleProcessor<>(processBundleDescriptor, this.fnApiControlClient.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(this.idGenerator.getId()).setRegister(BeamFnApi.RegisterRequest.newBuilder().addProcessBundleDescriptor(processBundleDescriptor).build()).build()).thenApply((v0) -> {
            return v0.getRegister();
        }), remoteInputDestination, stateDelegator);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
    }
}
