/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnDataReadRunner<OutputT> {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataReadRunner.class);
    private final String pTransformId;
    private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
    private final FnDataReceiver<WindowedValue<OutputT>> consumer;
    private final Supplier<String> processBundleInstructionIdSupplier;
    private final BeamFnDataClient beamFnDataClient;
    private final Coder<WindowedValue<OutputT>> coder;
    private InboundDataClient readFuture;

    BeamFnDataReadRunner(String pTransformId, RunnerApi.PTransform grpcReadNode, Supplier<String> processBundleInstructionIdSupplier, RunnerApi.Coder coderSpec, Map<String, RunnerApi.Coder> coders, BeamFnDataClient beamFnDataClient, FnDataReceiver<WindowedValue<OutputT>> consumer) throws IOException {
        this.pTransformId = pTransformId;
        BeamFnApi.RemoteGrpcPort port = RemoteGrpcPortRead.fromPTransform(grpcReadNode).getPort();
        this.apiServiceDescriptor = port.getApiServiceDescriptor();
        this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
        this.beamFnDataClient = beamFnDataClient;
        this.consumer = consumer;
        RehydratedComponents components = RehydratedComponents.forComponents(RunnerApi.Components.newBuilder().putAllCoders(coders).build());
        Coder<?> coder = !port.getCoderId().isEmpty() ? CoderTranslation.fromProto(coders.get(port.getCoderId()), components) : CoderTranslation.fromProto(coderSpec, components);
        this.coder = coder;
    }

    public void registerInputLocation() {
        this.readFuture = this.beamFnDataClient.receive(this.apiServiceDescriptor, LogicalEndpoint.of(this.processBundleInstructionIdSupplier.get(), this.pTransformId), this.coder, this.consumer);
    }

    public void blockTillReadFinishes() throws Exception {
        LOG.debug("Waiting for process bundle instruction {} and transform {} to close.", (Object)this.processBundleInstructionIdSupplier.get(), (Object)this.pTransformId);
        this.readFuture.awaitCompletion();
    }

    static class Factory<OutputT>
    implements PTransformRunnerFactory<BeamFnDataReadRunner<OutputT>> {
        Factory() {
        }

        @Override
        public BeamFnDataReadRunner<OutputT> createRunnerForPTransform(PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient, String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId, Map<String, RunnerApi.PCollection> pCollections, Map<String, RunnerApi.Coder> coders, Map<String, RunnerApi.WindowingStrategy> windowingStrategies, PCollectionConsumerRegistry pCollectionConsumerRegistry, PTransformFunctionRegistry startFunctionRegistry, PTransformFunctionRegistry finishFunctionRegistry, BundleSplitListener splitListener) throws IOException {
            RunnerApi.Coder coderSpec;
            if (RemoteGrpcPortRead.fromPTransform(pTransform).getPort().getCoderId().isEmpty()) {
                LOG.error("Missing required coder_id on grpc_port for %s; using deprecated fallback.", (Object)pTransformId);
                coderSpec = coders.get(pCollections.get(Iterables.getOnlyElement(pTransform.getOutputsMap().values())).getCoderId());
            } else {
                coderSpec = null;
            }
            FnDataReceiver consumer = pCollectionConsumerRegistry.getMultiplexingConsumer(Iterables.getOnlyElement(pTransform.getOutputsMap().values()));
            BeamFnDataReadRunner runner = new BeamFnDataReadRunner(pTransformId, pTransform, processBundleInstructionId, coderSpec, coders, beamFnDataClient, consumer);
            startFunctionRegistry.register(pTransformId, runner::registerInputLocation);
            finishFunctionRegistry.register(pTransformId, runner::blockTillReadFinishes);
            return runner;
        }
    }

    @AutoService(value=PTransformRunnerFactory.Registrar.class)
    public static class Registrar
    implements PTransformRunnerFactory.Registrar {
        @Override
        public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
            return ImmutableMap.of("beam:source:runner:0.1", new Factory());
        }
    }
}

