/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.SyntheticComponents;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.SideInputReference;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.UserStateReference;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.AutoValue_ProcessBundleDescriptors_BagUserStateSpec;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.AutoValue_ProcessBundleDescriptors_ExecutableProcessBundleDescriptor;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.AutoValue_ProcessBundleDescriptors_SideInputSpec;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.AutoValue_ProcessBundleDescriptors_TargetEncoding;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.data.RemoteInputDestination;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.wire.LengthPrefixUnknownCoders;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.wire.WireCoders;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.data.RemoteGrpcPortWrite;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.ImmutableTable;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.Iterables;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.ByteString;
import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.sdk.v2.sdk.extensions.protobuf.ByteStringCoder;

public class ProcessBundleDescriptors {
    public static ExecutableProcessBundleDescriptor fromExecutableStage(String id, ExecutableStage stage, Endpoints.ApiServiceDescriptor dataEndpoint, Endpoints.ApiServiceDescriptor stateEndpoint) throws IOException {
        Preconditions.checkState(id != null, "id must be specified.");
        Preconditions.checkState(stage != null, "stage must be specified.");
        Preconditions.checkState(dataEndpoint != null, "dataEndpoint must be specified.");
        Preconditions.checkState(stateEndpoint != null, "stateEndpoint must be specified.");
        return ProcessBundleDescriptors.fromExecutableStageInternal(id, stage, dataEndpoint, stateEndpoint);
    }

    public static ExecutableProcessBundleDescriptor fromExecutableStage(String id, ExecutableStage stage, Endpoints.ApiServiceDescriptor dataEndpoint) throws IOException {
        Preconditions.checkState(id != null, "id must be specified.");
        Preconditions.checkState(stage != null, "stage must be specified.");
        Preconditions.checkState(dataEndpoint != null, "dateEndpoint must be specified.");
        return ProcessBundleDescriptors.fromExecutableStageInternal(id, stage, dataEndpoint, null);
    }

    private static ExecutableProcessBundleDescriptor fromExecutableStageInternal(String id, ExecutableStage stage, Endpoints.ApiServiceDescriptor dataEndpoint, @Nullable Endpoints.ApiServiceDescriptor stateEndpoint) throws IOException {
        Map<String, RunnerApi.PTransform> stageTransforms = stage.getTransforms().stream().collect(Collectors.toMap(PipelineNode.PTransformNode::getId, PipelineNode.PTransformNode::getTransform));
        RunnerApi.Components.Builder components = stage.getComponents().toBuilder().clearTransforms().putAllTransforms(stageTransforms);
        RemoteInputDestination<WindowedValue<?>> inputDestination = ProcessBundleDescriptors.addStageInput(dataEndpoint, stage.getInputPCollection(), components);
        Map<BeamFnApi.Target, Coder<WindowedValue<?>>> outputTargetCoders = ProcessBundleDescriptors.addStageOutputs(dataEndpoint, stage.getOutputPCollections(), components);
        Map<String, Map<String, SideInputSpec>> sideInputSpecs = ProcessBundleDescriptors.addSideInputs(stage, components);
        BeamFnApi.ProcessBundleDescriptor.Builder bundleDescriptorBuilder = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(id);
        if (stateEndpoint != null) {
            bundleDescriptorBuilder.setStateApiServiceDescriptor(stateEndpoint);
        }
        bundleDescriptorBuilder.putAllCoders(components.getCodersMap()).putAllEnvironments(components.getEnvironmentsMap()).putAllPcollections(components.getPcollectionsMap()).putAllWindowingStrategies(components.getWindowingStrategiesMap()).putAllTransforms(components.getTransformsMap());
        Map<String, Map<String, BagUserStateSpec>> bagUserStateSpecs = ProcessBundleDescriptors.forBagUserStates(stage, components.build());
        return ExecutableProcessBundleDescriptor.of(bundleDescriptorBuilder.build(), inputDestination, outputTargetCoders, sideInputSpecs, bagUserStateSpecs);
    }

    private static Map<BeamFnApi.Target, Coder<WindowedValue<?>>> addStageOutputs(Endpoints.ApiServiceDescriptor dataEndpoint, Collection<PipelineNode.PCollectionNode> outputPCollections, RunnerApi.Components.Builder components) throws IOException {
        LinkedHashMap outputTargetCoders = new LinkedHashMap();
        for (PipelineNode.PCollectionNode outputPCollection : outputPCollections) {
            TargetEncoding targetEncoding = ProcessBundleDescriptors.addStageOutput(dataEndpoint, components, outputPCollection);
            outputTargetCoders.put(targetEncoding.getTarget(), targetEncoding.getCoder());
        }
        return outputTargetCoders;
    }

    private static RemoteInputDestination<WindowedValue<?>> addStageInput(Endpoints.ApiServiceDescriptor dataEndpoint, PipelineNode.PCollectionNode inputPCollection, RunnerApi.Components.Builder components) throws IOException {
        String inputWireCoderId = WireCoders.addSdkWireCoder(inputPCollection, components);
        Coder wireCoder = WireCoders.instantiateRunnerWireCoder(inputPCollection, components.build());
        BeamFnApi.RemoteGrpcPort inputPort = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(dataEndpoint).setCoderId(inputWireCoderId).build();
        String inputId = SyntheticComponents.uniqueId(String.format("fn/read/%s", inputPCollection.getId()), arg_0 -> ((RunnerApi.Components.Builder)components).containsTransforms(arg_0));
        RunnerApi.PTransform inputTransform = RemoteGrpcPortRead.readFromPort(inputPort, inputPCollection.getId()).toPTransform();
        components.putTransforms(inputId, inputTransform);
        return RemoteInputDestination.of(wireCoder, BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(inputId).setName((String)Iterables.getOnlyElement(inputTransform.getOutputsMap().keySet())).build());
    }

    private static TargetEncoding addStageOutput(Endpoints.ApiServiceDescriptor dataEndpoint, RunnerApi.Components.Builder components, PipelineNode.PCollectionNode outputPCollection) throws IOException {
        String outputWireCoderId = WireCoders.addSdkWireCoder(outputPCollection, components);
        Coder wireCoder = WireCoders.instantiateRunnerWireCoder(outputPCollection, components.build());
        BeamFnApi.RemoteGrpcPort outputPort = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(dataEndpoint).setCoderId(outputWireCoderId).build();
        RemoteGrpcPortWrite outputWrite = RemoteGrpcPortWrite.writeToPort(outputPCollection.getId(), outputPort);
        String outputId = SyntheticComponents.uniqueId(String.format("fn/write/%s", outputPCollection.getId()), arg_0 -> ((RunnerApi.Components.Builder)components).containsTransforms(arg_0));
        RunnerApi.PTransform outputTransform = outputWrite.toPTransform();
        components.putTransforms(outputId, outputTransform);
        return new AutoValue_ProcessBundleDescriptors_TargetEncoding(BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(outputId).setName((String)Iterables.getOnlyElement(outputTransform.getInputsMap().keySet())).build(), wireCoder);
    }

    public static Map<String, Map<String, SideInputSpec>> getSideInputs(ExecutableStage stage) throws IOException {
        return ProcessBundleDescriptors.addSideInputs(stage, stage.getComponents().toBuilder());
    }

    private static Map<String, Map<String, SideInputSpec>> addSideInputs(ExecutableStage stage, RunnerApi.Components.Builder components) throws IOException {
        ImmutableTable.Builder<String, String, SideInputSpec> idsToSpec = ImmutableTable.builder();
        for (SideInputReference sideInputReference : stage.getSideInputs()) {
            PipelineNode.PCollectionNode pcNode = sideInputReference.collection();
            RunnerApi.PCollection pc = pcNode.getPCollection();
            String lengthPrefixedCoderId = LengthPrefixUnknownCoders.addLengthPrefixedCoder(pc.getCoderId(), components, false);
            components.putPcollections(pcNode.getId(), pc.toBuilder().setCoderId(lengthPrefixedCoderId).build());
            WindowedValue.FullWindowedValueCoder coder = (WindowedValue.FullWindowedValueCoder)WireCoders.instantiateRunnerWireCoder(pcNode, components.build());
            idsToSpec.put(sideInputReference.transform().getId(), sideInputReference.localName(), SideInputSpec.of(sideInputReference.transform().getId(), sideInputReference.localName(), ProcessBundleDescriptors.getAccessPattern(sideInputReference), coder.getValueCoder(), coder.getWindowCoder()));
        }
        return idsToSpec.build().rowMap();
    }

    private static RunnerApi.FunctionSpec getAccessPattern(SideInputReference sideInputReference) {
        try {
            return ((RunnerApi.SideInput)RunnerApi.ParDoPayload.parseFrom((ByteString)sideInputReference.transform().getTransform().getSpec().getPayload()).getSideInputsMap().get(sideInputReference.localName())).getAccessPattern();
        }
        catch (InvalidProtocolBufferException e) {
            throw new RuntimeException(e);
        }
    }

    private static Map<String, Map<String, BagUserStateSpec>> forBagUserStates(ExecutableStage stage, RunnerApi.Components components) throws IOException {
        ImmutableTable.Builder idsToSpec = ImmutableTable.builder();
        for (UserStateReference userStateReference : stage.getUserStates()) {
            WindowedValue.FullWindowedValueCoder coder = (WindowedValue.FullWindowedValueCoder)WireCoders.instantiateRunnerWireCoder(userStateReference.collection(), components);
            idsToSpec.put(userStateReference.transform().getId(), userStateReference.localName(), BagUserStateSpec.of(userStateReference.transform().getId(), userStateReference.localName(), ByteStringCoder.of(), ByteStringCoder.of(), coder.getWindowCoder()));
        }
        return idsToSpec.build().rowMap();
    }

    @AutoValue
    public static abstract class ExecutableProcessBundleDescriptor {
        public static ExecutableProcessBundleDescriptor of(BeamFnApi.ProcessBundleDescriptor descriptor, RemoteInputDestination<WindowedValue<?>> inputDestination, Map<BeamFnApi.Target, Coder<WindowedValue<?>>> outputTargetCoders, Map<String, Map<String, SideInputSpec>> sideInputSpecs, Map<String, Map<String, BagUserStateSpec>> bagUserStateSpecs) {
            ImmutableTable.Builder<String, String, SideInputSpec> copyOfSideInputSpecs = ImmutableTable.builder();
            for (Map.Entry<String, Map<String, SideInputSpec>> outer : sideInputSpecs.entrySet()) {
                for (Map.Entry<String, SideInputSpec> inner : outer.getValue().entrySet()) {
                    copyOfSideInputSpecs.put(outer.getKey(), inner.getKey(), inner.getValue());
                }
            }
            ImmutableTable.Builder<String, String, BagUserStateSpec> copyOfBagUserStateSpecs = ImmutableTable.builder();
            for (Map.Entry<String, Map<String, BagUserStateSpec>> outer : bagUserStateSpecs.entrySet()) {
                for (Map.Entry<String, BagUserStateSpec> inner : outer.getValue().entrySet()) {
                    copyOfBagUserStateSpecs.put(outer.getKey(), inner.getKey(), inner.getValue());
                }
            }
            return new AutoValue_ProcessBundleDescriptors_ExecutableProcessBundleDescriptor(descriptor, inputDestination, Collections.unmodifiableMap(outputTargetCoders), copyOfSideInputSpecs.build().rowMap(), copyOfBagUserStateSpecs.build().rowMap());
        }

        public abstract BeamFnApi.ProcessBundleDescriptor getProcessBundleDescriptor();

        public abstract RemoteInputDestination<WindowedValue<?>> getRemoteInputDestination();

        public abstract Map<BeamFnApi.Target, Coder<WindowedValue<?>>> getOutputTargetCoders();

        public abstract Map<String, Map<String, SideInputSpec>> getSideInputSpecs();

        public abstract Map<String, Map<String, BagUserStateSpec>> getBagUserStateSpecs();
    }

    @AutoValue
    public static abstract class BagUserStateSpec<K, V, W extends BoundedWindow> {
        static <K, V, W extends BoundedWindow> BagUserStateSpec<K, V, W> of(String transformId, String userStateId, Coder<K> keyCoder, Coder<V> valueCoder, Coder<W> windowCoder) {
            return new AutoValue_ProcessBundleDescriptors_BagUserStateSpec<K, V, W>(transformId, userStateId, keyCoder, valueCoder, windowCoder);
        }

        public abstract String transformId();

        public abstract String userStateId();

        public abstract Coder<K> keyCoder();

        public abstract Coder<V> valueCoder();

        public abstract Coder<W> windowCoder();
    }

    @AutoValue
    public static abstract class SideInputSpec<K, T, W extends BoundedWindow> {
        public static <T, W extends BoundedWindow> SideInputSpec of(String transformId, String sideInputId, RunnerApi.FunctionSpec accessPattern, Coder<T> elementCoder, Coder<W> windowCoder) {
            return new AutoValue_ProcessBundleDescriptors_SideInputSpec(transformId, sideInputId, accessPattern, elementCoder, windowCoder);
        }

        public abstract String transformId();

        public abstract String sideInputId();

        public abstract RunnerApi.FunctionSpec accessPattern();

        public abstract Coder<T> elementCoder();

        public abstract Coder<W> windowCoder();
    }

    @AutoValue
    static abstract class TargetEncoding {
        TargetEncoding() {
        }

        abstract BeamFnApi.Target getTarget();

        abstract Coder<WindowedValue<?>> getCoder();
    }
}

