/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnDataWriteRunner<InputT> {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataWriteRunner.class);
    private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
    private final BeamFnApi.Target outputTarget;
    private final Coder<WindowedValue<InputT>> coder;
    private final BeamFnDataClient beamFnDataClientFactory;
    private final Supplier<String> processBundleInstructionIdSupplier;
    private CloseableFnDataReceiver<WindowedValue<InputT>> consumer;

    BeamFnDataWriteRunner(RunnerApi.PTransform remoteWriteNode, Supplier<String> processBundleInstructionIdSupplier, BeamFnApi.Target outputTarget, RunnerApi.Coder coderSpec, Map<String, RunnerApi.Coder> coders, BeamFnDataClient beamFnDataClientFactory) throws IOException {
        BeamFnApi.RemoteGrpcPort port = RemoteGrpcPortWrite.fromPTransform(remoteWriteNode).getPort();
        this.apiServiceDescriptor = port.getApiServiceDescriptor();
        this.beamFnDataClientFactory = beamFnDataClientFactory;
        this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
        this.outputTarget = outputTarget;
        RehydratedComponents components = RehydratedComponents.forComponents(RunnerApi.Components.newBuilder().putAllCoders(coders).build());
        Coder<?> coder = !port.getCoderId().isEmpty() ? CoderTranslation.fromProto(coders.get(port.getCoderId()), components) : CoderTranslation.fromProto(coderSpec, components);
        this.coder = coder;
    }

    public void registerForOutput() {
        this.consumer = this.beamFnDataClientFactory.send(this.apiServiceDescriptor, LogicalEndpoint.of(this.processBundleInstructionIdSupplier.get(), this.outputTarget), this.coder);
    }

    public void close() throws Exception {
        this.consumer.close();
    }

    public void consume(WindowedValue<InputT> value) throws Exception {
        this.consumer.accept(value);
    }

    static class Factory<InputT>
    implements PTransformRunnerFactory<BeamFnDataWriteRunner<InputT>> {
        Factory() {
        }

        @Override
        public BeamFnDataWriteRunner<InputT> createRunnerForPTransform(PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient, String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId, Map<String, RunnerApi.PCollection> pCollections, Map<String, RunnerApi.Coder> coders, Map<String, RunnerApi.WindowingStrategy> windowingStrategies, PCollectionConsumerRegistry pCollectionConsumerRegistry, PTransformFunctionRegistry startFunctionRegistry, PTransformFunctionRegistry finishFunctionRegistry, BundleSplitListener splitListener) throws IOException {
            RunnerApi.Coder coderSpec;
            BeamFnApi.Target target = BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(pTransformId).setName(Iterables.getOnlyElement(pTransform.getInputsMap().keySet())).build();
            if (RemoteGrpcPortWrite.fromPTransform(pTransform).getPort().getCoderId().isEmpty()) {
                LOG.error("Missing required coder_id on grpc_port for %s; using deprecated fallback.", (Object)pTransformId);
                coderSpec = coders.get(pCollections.get(Iterables.getOnlyElement(pTransform.getInputsMap().values())).getCoderId());
            } else {
                coderSpec = null;
            }
            BeamFnDataWriteRunner runner = new BeamFnDataWriteRunner(pTransform, processBundleInstructionId, target, coderSpec, coders, beamFnDataClient);
            startFunctionRegistry.register(pTransformId, runner::registerForOutput);
            pCollectionConsumerRegistry.register(Iterables.getOnlyElement(pTransform.getInputsMap().values()), pTransformId, runner::consume);
            finishFunctionRegistry.register(pTransformId, runner::close);
            return runner;
        }
    }

    @AutoService(value=PTransformRunnerFactory.Registrar.class)
    public static class Registrar
    implements PTransformRunnerFactory.Registrar {
        @Override
        public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
            return ImmutableMap.of("urn:org.apache.beam:sink:runner:0.1", new Factory());
        }
    }
}

