/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.RemoteOutputReceiver;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.data.FnDataService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.data.RemoteInputDestination;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateDelegator;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.IdGenerator;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.IdGenerators;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.data.FnDataReceiver;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.data.InboundDataClient;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.base.Preconditions;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SdkHarnessClient
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(SdkHarnessClient.class);
    private final IdGenerator idGenerator;
    private final InstructionRequestHandler fnApiControlClient;
    private final FnDataService fnApiDataService;
    private final ConcurrentHashMap<String, BundleProcessor<?>> clientProcessors;

    private SdkHarnessClient(InstructionRequestHandler fnApiControlClient, FnDataService fnApiDataService, IdGenerator idGenerator) {
        this.fnApiDataService = fnApiDataService;
        this.idGenerator = idGenerator;
        this.fnApiControlClient = fnApiControlClient;
        this.clientProcessors = new ConcurrentHashMap();
    }

    public static SdkHarnessClient usingFnApiClient(InstructionRequestHandler fnApiControlClient, FnDataService fnApiDataService) {
        return new SdkHarnessClient(fnApiControlClient, fnApiDataService, IdGenerators.incrementingLongs());
    }

    public SdkHarnessClient withIdGenerator(IdGenerator idGenerator) {
        return new SdkHarnessClient(this.fnApiControlClient, this.fnApiDataService, idGenerator);
    }

    public <T> BundleProcessor<T> getProcessor(BeamFnApi.ProcessBundleDescriptor descriptor, RemoteInputDestination<WindowedValue<T>> remoteInputDesination) {
        Preconditions.checkState(!descriptor.hasStateApiServiceDescriptor(), "The %s cannot support a %s containing a state %s.", (Object)BundleProcessor.class.getSimpleName(), (Object)BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(), (Object)Endpoints.ApiServiceDescriptor.class.getSimpleName());
        return this.getProcessor(descriptor, remoteInputDesination, NoOpStateDelegator.INSTANCE);
    }

    public <T> BundleProcessor<T> getProcessor(BeamFnApi.ProcessBundleDescriptor descriptor, RemoteInputDestination<WindowedValue<T>> remoteInputDesination, StateDelegator stateDelegator) {
        BundleProcessor bundleProcessor = this.clientProcessors.computeIfAbsent(descriptor.getId(), s -> this.create(descriptor, remoteInputDesination, stateDelegator));
        Preconditions.checkArgument(bundleProcessor.processBundleDescriptor.equals((Object)descriptor), "The provided %s with id %s collides with an existing %s with the same id but containing different contents.", (Object)BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(), (Object)descriptor.getId(), (Object)BeamFnApi.ProcessBundleDescriptor.class.getSimpleName());
        return bundleProcessor;
    }

    private <T> BundleProcessor<T> create(BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, RemoteInputDestination<WindowedValue<T>> remoteInputDestination, StateDelegator stateDelegator) {
        LOG.debug("Registering {}", (Object)processBundleDescriptor);
        CompletionStage<BeamFnApi.InstructionResponse> genericResponse = this.fnApiControlClient.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(this.idGenerator.getId()).setRegister(BeamFnApi.RegisterRequest.newBuilder().addProcessBundleDescriptor(processBundleDescriptor).build()).build());
        CompletionStage<BeamFnApi.RegisterResponse> registerResponseFuture = genericResponse.thenApply(BeamFnApi.InstructionResponse::getRegister);
        BundleProcessor bundleProcessor = new BundleProcessor(processBundleDescriptor, registerResponseFuture, remoteInputDestination, stateDelegator);
        return bundleProcessor;
    }

    @Override
    public void close() {
    }

    private static class NoOpStateDelegator
    implements StateDelegator {
        private static final NoOpStateDelegator INSTANCE = new NoOpStateDelegator();

        private NoOpStateDelegator() {
        }

        @Override
        public Registration registerForProcessBundleInstructionId(String processBundleInstructionId, StateRequestHandler handler) {
            return Registration.INSTANCE;
        }

        private static class Registration
        implements StateDelegator.Registration {
            private static final Registration INSTANCE = new Registration();

            private Registration() {
            }

            @Override
            public void deregister() {
            }

            @Override
            public void abort() {
            }
        }
    }

    public static class ActiveBundle<InputT>
    implements RemoteBundle<InputT> {
        private final String bundleId;
        private final CompletionStage<BeamFnApi.ProcessBundleResponse> response;
        private final CloseableFnDataReceiver<WindowedValue<InputT>> inputReceiver;
        private final Map<BeamFnApi.Target, InboundDataClient> outputClients;
        private final StateDelegator.Registration stateRegistration;
        private final BundleProgressHandler progressHandler;

        private ActiveBundle(String bundleId, CompletionStage<BeamFnApi.ProcessBundleResponse> response, CloseableFnDataReceiver<WindowedValue<InputT>> inputReceiver, Map<BeamFnApi.Target, InboundDataClient> outputClients, StateDelegator.Registration stateRegistration, BundleProgressHandler progressHandler) {
            this.bundleId = bundleId;
            this.response = response;
            this.inputReceiver = inputReceiver;
            this.outputClients = outputClients;
            this.stateRegistration = stateRegistration;
            this.progressHandler = progressHandler;
        }

        @Override
        public String getId() {
            return this.bundleId;
        }

        @Override
        public FnDataReceiver<WindowedValue<InputT>> getInputReceiver() {
            return this.inputReceiver;
        }

        @Override
        public void close() throws Exception {
            Exception exception = null;
            try {
                this.inputReceiver.close();
            }
            catch (Exception e) {
                exception = e;
            }
            try {
                if (exception != null) {
                    throw new IllegalStateException("Processing bundle failed, TODO: [BEAM-3962] abort bundle.");
                }
                this.progressHandler.onCompleted((BeamFnApi.ProcessBundleResponse)MoreFutures.get(this.response));
            }
            catch (Exception e) {
                if (exception == null) {
                    exception = e;
                }
                exception.addSuppressed(e);
            }
            try {
                if (exception == null) {
                    this.stateRegistration.deregister();
                } else {
                    this.stateRegistration.abort();
                }
            }
            catch (Exception e) {
                if (exception == null) {
                    exception = e;
                }
                exception.addSuppressed(e);
            }
            for (InboundDataClient outputClient : this.outputClients.values()) {
                try {
                    if (exception == null) {
                        outputClient.awaitCompletion();
                        continue;
                    }
                    outputClient.cancel();
                }
                catch (Exception e) {
                    if (exception == null) {
                        exception = e;
                        continue;
                    }
                    exception.addSuppressed(e);
                }
            }
            if (exception != null) {
                throw exception;
            }
        }
    }

    public class BundleProcessor<T> {
        private final BeamFnApi.ProcessBundleDescriptor processBundleDescriptor;
        private final CompletionStage<BeamFnApi.RegisterResponse> registrationFuture;
        private final RemoteInputDestination<WindowedValue<T>> remoteInput;
        private final StateDelegator stateDelegator;

        private BundleProcessor(BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, CompletionStage<BeamFnApi.RegisterResponse> registrationFuture, RemoteInputDestination<WindowedValue<T>> remoteInput, StateDelegator stateDelegator) {
            this.processBundleDescriptor = processBundleDescriptor;
            this.registrationFuture = registrationFuture;
            this.remoteInput = remoteInput;
            this.stateDelegator = stateDelegator;
        }

        public CompletionStage<BeamFnApi.RegisterResponse> getRegistrationFuture() {
            return this.registrationFuture;
        }

        public ActiveBundle<T> newBundle(Map<BeamFnApi.Target, RemoteOutputReceiver<?>> outputReceivers, BundleProgressHandler progressHandler) {
            return this.newBundle(outputReceivers, request -> {
                throw new UnsupportedOperationException(String.format("The %s does not have a registered state handler.", ActiveBundle.class.getSimpleName()));
            }, progressHandler);
        }

        public ActiveBundle<T> newBundle(Map<BeamFnApi.Target, RemoteOutputReceiver<?>> outputReceivers, StateRequestHandler stateRequestHandler, BundleProgressHandler progressHandler) {
            String bundleId = SdkHarnessClient.this.idGenerator.getId();
            CompletionStage<BeamFnApi.InstructionResponse> genericResponse = SdkHarnessClient.this.fnApiControlClient.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(bundleId).setProcessBundle(BeamFnApi.ProcessBundleRequest.newBuilder().setProcessBundleDescriptorReference(this.processBundleDescriptor.getId())).build());
            LOG.debug("Sent {} with ID {} for {} with ID {}", new Object[]{BeamFnApi.ProcessBundleRequest.class.getSimpleName(), bundleId, BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(), this.processBundleDescriptor.getId()});
            CompletionStage<BeamFnApi.ProcessBundleResponse> specificResponse = genericResponse.thenApply(BeamFnApi.InstructionResponse::getProcessBundle);
            HashMap<BeamFnApi.Target, InboundDataClient> outputClients = new HashMap<BeamFnApi.Target, InboundDataClient>();
            for (Map.Entry<BeamFnApi.Target, RemoteOutputReceiver<?>> targetReceiver : outputReceivers.entrySet()) {
                InboundDataClient outputClient = this.attachReceiver(bundleId, targetReceiver.getKey(), targetReceiver.getValue());
                outputClients.put(targetReceiver.getKey(), outputClient);
            }
            CloseableFnDataReceiver<WindowedValue<T>> dataReceiver = SdkHarnessClient.this.fnApiDataService.send(LogicalEndpoint.of(bundleId, this.remoteInput.getTarget()), this.remoteInput.getCoder());
            return new ActiveBundle(bundleId, specificResponse, dataReceiver, outputClients, this.stateDelegator.registerForProcessBundleInstructionId(bundleId, stateRequestHandler), progressHandler);
        }

        private <OutputT> InboundDataClient attachReceiver(String bundleId, BeamFnApi.Target target, RemoteOutputReceiver<WindowedValue<OutputT>> receiver) {
            return SdkHarnessClient.this.fnApiDataService.receive(LogicalEndpoint.of(bundleId, target), receiver.getCoder(), receiver.getReceiver());
        }
    }
}

