package org.apache.beam.runners.flink;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_flink_2.p00011.com.google.common.collect.BiMap;
import org.apache.beam.repackaged.beam_runners_flink_2.p00011.com.google.common.collect.HashMultiset;
import org.apache.beam.repackaged.beam_runners_flink_2.p00011.com.google.common.collect.ImmutableMap;
import org.apache.beam.repackaged.beam_runners_flink_2.p00011.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_runners_flink_2.p00011.com.google.common.collect.Lists;
import org.apache.beam.repackaged.beam_runners_flink_2.p00011.com.google.common.collect.Maps;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.runners.flink.FlinkPortablePipelineTranslator;
import org.apache.beam.runners.flink.FlinkStreamingTransformTranslators;
import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.wire.WireCoders;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.class */
public class FlinkStreamingPortablePipelineTranslator implements FlinkPortablePipelineTranslator<StreamingTranslationContext> {
    private final Map<String, PTransformTranslator<StreamingTranslationContext>> urnToTransformTranslator;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator$PTransformTranslator.class */
    public interface PTransformTranslator<T> {
        void translate(String str, RunnerApi.Pipeline pipeline, T t);
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.class */
    public static class StreamingTranslationContext implements FlinkPortablePipelineTranslator.TranslationContext {
        private final JobInfo jobInfo;
        private final PipelineOptions options;
        private final StreamExecutionEnvironment executionEnvironment;
        private final Map<String, DataStream<?>> dataStreams;

        private StreamingTranslationContext(JobInfo jobInfo, PipelineOptions pipelineOptions, StreamExecutionEnvironment streamExecutionEnvironment) {
            this.jobInfo = jobInfo;
            this.options = pipelineOptions;
            this.executionEnvironment = streamExecutionEnvironment;
            this.dataStreams = new HashMap();
        }

        @Override // org.apache.beam.runners.flink.FlinkPortablePipelineTranslator.TranslationContext
        public JobInfo getJobInfo() {
            return this.jobInfo;
        }

        public PipelineOptions getPipelineOptions() {
            return this.options;
        }

        public StreamExecutionEnvironment getExecutionEnvironment() {
            return this.executionEnvironment;
        }

        public <T> void addDataStream(String str, DataStream<T> dataStream) {
            this.dataStreams.put(str, dataStream);
        }

        public <T> DataStream<T> getDataStreamOrThrow(String str) {
            DataStream<T> dataStream = (DataStream) this.dataStreams.get(str);
            if (dataStream == null) {
                throw new IllegalArgumentException(String.format("Unknown datastream for id %s.", str));
            }
            return dataStream;
        }
    }

    public static StreamingTranslationContext createTranslationContext(JobInfo jobInfo, List<String> list) {
        try {
            PipelineOptions fromProto = PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
            return new StreamingTranslationContext(jobInfo, fromProto, FlinkExecutionEnvironments.createStreamExecutionEnvironment((FlinkPipelineOptions) fromProto.as(FlinkPipelineOptions.class), list));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlinkStreamingPortablePipelineTranslator() {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        builder.put(PTransformTranslation.FLATTEN_TRANSFORM_URN, this::translateFlatten);
        builder.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, this::translateGroupByKey);
        builder.put(PTransformTranslation.IMPULSE_TRANSFORM_URN, this::translateImpulse);
        builder.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, this::translateAssignWindows);
        builder.put(ExecutableStage.URN, this::translateExecutableStage);
        builder.put(PTransformTranslation.RESHUFFLE_URN, this::translateReshuffle);
        this.urnToTransformTranslator = builder.build();
    }

    @Override // org.apache.beam.runners.flink.FlinkPortablePipelineTranslator
    public void translate(StreamingTranslationContext streamingTranslationContext, RunnerApi.Pipeline pipeline) {
        for (PipelineNode.PTransformNode pTransformNode : QueryablePipeline.forTransforms(pipeline.getRootTransformIdsList(), pipeline.getComponents()).getTopologicallyOrderedTransforms()) {
            this.urnToTransformTranslator.getOrDefault(pTransformNode.getTransform().getSpec().getUrn(), (v1, v2, v3) -> {
                urnNotFound(v1, v2, v3);
            }).translate(pTransformNode.getId(), pipeline, streamingTranslationContext);
        }
    }

    private void urnNotFound(String str, RunnerApi.Pipeline pipeline, FlinkPortablePipelineTranslator.TranslationContext translationContext) {
        throw new IllegalArgumentException(String.format("Unknown type of URN %s for PTrasnform with id %s.", pipeline.getComponents().getTransformsOrThrow(str).getSpec().getUrn(), str));
    }

    private <K, V> void translateReshuffle(String str, RunnerApi.Pipeline pipeline, StreamingTranslationContext streamingTranslationContext) {
        RunnerApi.PTransform transformsOrThrow = pipeline.getComponents().getTransformsOrThrow(str);
        streamingTranslationContext.addDataStream((String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values()), streamingTranslationContext.getDataStreamOrThrow((String) Iterables.getOnlyElement(transformsOrThrow.getInputsMap().values())).rebalance());
    }

    private <T> void translateFlatten(String str, RunnerApi.Pipeline pipeline, StreamingTranslationContext streamingTranslationContext) {
        Map<String, String> inputsMap = pipeline.getComponents().getTransformsOrThrow(str).getInputsMap();
        if (inputsMap.isEmpty()) {
            streamingTranslationContext.addDataStream((String) Iterables.getOnlyElement(pipeline.getComponents().getTransformsOrThrow(str).getOutputsMap().values()), streamingTranslationContext.getExecutionEnvironment().fromElements("dummy").flatMap((str2, collector) -> {
            }).returns(new CoderTypeInformation(WindowedValue.getFullCoder(VoidCoder.of(), GlobalWindow.Coder.INSTANCE))));
            return;
        }
        DataStream<T> dataStream = null;
        HashMultiset create = HashMultiset.create();
        Iterator<String> it = inputsMap.values().iterator();
        while (it.hasNext()) {
            create.add(streamingTranslationContext.getDataStreamOrThrow(it.next()), 1);
        }
        Iterator<String> it2 = inputsMap.values().iterator();
        while (it2.hasNext()) {
            DataStream<T> dataStreamOrThrow = streamingTranslationContext.getDataStreamOrThrow(it2.next());
            if (create.count(dataStreamOrThrow) > 1) {
                dataStreamOrThrow = dataStreamOrThrow.flatMap(new FlatMapFunction<T, T>() { // from class: org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.1
                    private static final long serialVersionUID = 1;

                    @Override // org.apache.flink.api.common.functions.FlatMapFunction
                    public void flatMap(T t, Collector<T> collector2) {
                        collector2.collect(t);
                    }
                });
            }
            dataStream = dataStream == null ? dataStreamOrThrow : dataStream.union(dataStreamOrThrow);
        }
        streamingTranslationContext.addDataStream((String) Iterables.getOnlyElement(pipeline.getComponents().getTransformsOrThrow(str).getOutputsMap().values()), dataStream);
    }

    private <K, V> void translateGroupByKey(String str, RunnerApi.Pipeline pipeline, StreamingTranslationContext streamingTranslationContext) {
        RunnerApi.PTransform transformsOrThrow = pipeline.getComponents().getTransformsOrThrow(str);
        String str2 = (String) Iterables.getOnlyElement(transformsOrThrow.getInputsMap().values());
        RunnerApi.WindowingStrategy windowingStrategiesOrThrow = pipeline.getComponents().getWindowingStrategiesOrThrow(pipeline.getComponents().getPcollectionsOrThrow(str2).getWindowingStrategyId());
        DataStream dataStreamOrThrow = streamingTranslationContext.getDataStreamOrThrow(str2);
        try {
            WindowingStrategy<?, ?> fromProto = WindowingStrategyTranslation.fromProto(windowingStrategiesOrThrow, RehydratedComponents.forComponents(pipeline.getComponents()));
            KvCoder kvCoder = (KvCoder) ((WindowedValue.WindowedValueCoder) instantiateCoder(str2, pipeline.getComponents())).getValueCoder();
            WindowedValue.FullWindowedValueCoder fullCoder = WindowedValue.getFullCoder(SingletonKeyedWorkItemCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder(), fromProto.getWindowFn().windowCoder()), fromProto.getWindowFn().windowCoder());
            SingleOutputStreamOperator name = dataStreamOrThrow.flatMap(new FlinkStreamingTransformTranslators.ToKeyedWorkItem()).returns(new CoderTypeInformation(fullCoder)).name("ToKeyedWorkItem");
            WorkItemKeySelector workItemKeySelector = new WorkItemKeySelector(kvCoder.getKeyCoder());
            DataStream keyBy = name.keyBy(workItemKeySelector);
            SystemReduceFn buffering = SystemReduceFn.buffering(kvCoder.getValueCoder());
            WindowedValue.FullWindowedValueCoder fullCoder2 = WindowedValue.getFullCoder(KvCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder())), fromProto.getWindowFn().windowCoder());
            CoderTypeInformation coderTypeInformation = new CoderTypeInformation(fullCoder2);
            TupleTag tupleTag = new TupleTag("main output");
            streamingTranslationContext.addDataStream((String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values()), keyBy.transform(transformsOrThrow.getUniqueName(), coderTypeInformation, new WindowDoFnOperator(buffering, transformsOrThrow.getUniqueName(), fullCoder, tupleTag, Collections.emptyList(), new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, fullCoder2), fromProto, new HashMap(), Collections.emptyList(), streamingTranslationContext.getPipelineOptions(), kvCoder.getKeyCoder(), workItemKeySelector)));
        } catch (InvalidProtocolBufferException e) {
            throw new IllegalStateException(String.format("Unable to hydrate GroupByKey windowing strategy %s.", windowingStrategiesOrThrow), e);
        }
    }

    private void translateImpulse(String str, RunnerApi.Pipeline pipeline, StreamingTranslationContext streamingTranslationContext) {
        RunnerApi.PTransform transformsOrThrow = pipeline.getComponents().getTransformsOrThrow(str);
        streamingTranslationContext.addDataStream((String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values()), streamingTranslationContext.getExecutionEnvironment().fromCollection(Collections.singleton(WindowedValue.valueInGlobalWindow(new byte[0])), new CoderTypeInformation(WindowedValue.getFullCoder(ByteArrayCoder.of(), GlobalWindow.Coder.INSTANCE))));
    }

    private <T> void translateAssignWindows(String str, RunnerApi.Pipeline pipeline, StreamingTranslationContext streamingTranslationContext) {
        RunnerApi.Components components = pipeline.getComponents();
        RunnerApi.PTransform transformsOrThrow = components.getTransformsOrThrow(str);
        try {
            WindowFn<?, ?> windowFnFromProto = WindowingStrategyTranslation.windowFnFromProto(RunnerApi.WindowIntoPayload.parseFrom(transformsOrThrow.getSpec().getPayload()).getWindowFn());
            String str2 = (String) Iterables.getOnlyElement(transformsOrThrow.getInputsMap().values());
            String str3 = (String) Iterables.getOnlyElement(transformsOrThrow.getOutputsMap().values());
            streamingTranslationContext.addDataStream(str3, streamingTranslationContext.getDataStreamOrThrow(str2).flatMap(new FlinkAssignWindows(windowFnFromProto)).name(transformsOrThrow.getUniqueName()).returns(new CoderTypeInformation(instantiateCoder(str3, components))));
        } catch (InvalidProtocolBufferException e) {
            throw new IllegalArgumentException(e);
        }
    }

    private <InputT, OutputT> void translateExecutableStage(String str, RunnerApi.Pipeline pipeline, StreamingTranslationContext streamingTranslationContext) {
        RunnerApi.Components components = pipeline.getComponents();
        RunnerApi.PTransform transformsOrThrow = components.getTransformsOrThrow(str);
        Map<String, String> outputsMap = transformsOrThrow.getOutputsMap();
        try {
            RunnerApi.ExecutableStagePayload parseFrom = RunnerApi.ExecutableStagePayload.parseFrom(transformsOrThrow.getSpec().getPayload());
            String str2 = (String) Iterables.getOnlyElement(transformsOrThrow.getInputsMap().values());
            LinkedHashMap newLinkedHashMap = Maps.newLinkedHashMap();
            LinkedHashMap newLinkedHashMap2 = Maps.newLinkedHashMap();
            TupleTag tupleTag = !outputsMap.isEmpty() ? new TupleTag(outputsMap.keySet().iterator().next()) : null;
            BiMap<String, Integer> createOutputMap = FlinkPipelineTranslatorUtils.createOutputMap(outputsMap.keySet());
            HashMap newHashMap = Maps.newHashMap();
            HashMap newHashMap2 = Maps.newHashMap();
            HashMap newHashMap3 = Maps.newHashMap();
            for (String str3 : new TreeMap(createOutputMap).keySet()) {
                String str4 = outputsMap.get(str3);
                Coder instantiateCoder = instantiateCoder(str4, components);
                newHashMap.put(str3, instantiateCoder);
                TupleTag tupleTag2 = new TupleTag(str3);
                newLinkedHashMap.put(tupleTag2, new OutputTag(str3, new CoderTypeInformation(instantiateCoder)));
                newLinkedHashMap2.put(tupleTag2, instantiateCoder);
                newHashMap2.put(tupleTag2, createOutputMap.get(str3));
                newHashMap3.put(str4, tupleTag2);
            }
            DataStream dataStreamOrThrow = streamingTranslationContext.getDataStreamOrThrow(str2);
            CoderTypeInformation coderTypeInformation = !outputsMap.isEmpty() ? new CoderTypeInformation((Coder) newHashMap.get(tupleTag.getId())) : null;
            ArrayList newArrayList = Lists.newArrayList();
            for (TupleTag tupleTag3 : newLinkedHashMap2.keySet()) {
                if (!tupleTag.getId().equals(tupleTag3.getId())) {
                    newArrayList.add(tupleTag3);
                }
            }
            SingleOutputStreamOperator transform = dataStreamOrThrow.transform(transformsOrThrow.getUniqueName(), coderTypeInformation, new ExecutableStageDoFnOperator(transformsOrThrow.getUniqueName(), null, tupleTag, newArrayList, new DoFnOperator.MultiOutputOutputManagerFactory(tupleTag, newLinkedHashMap, newLinkedHashMap2, newHashMap2), Collections.emptyMap(), Collections.emptyList(), streamingTranslationContext.getPipelineOptions(), parseFrom, streamingTranslationContext.getJobInfo(), FlinkExecutableStageContext.batchFactory(), newHashMap3));
            if (tupleTag != null) {
                streamingTranslationContext.addDataStream(outputsMap.get(tupleTag.getId()), transform);
            }
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                TupleTag tupleTag4 = (TupleTag) it.next();
                streamingTranslationContext.addDataStream(outputsMap.get(tupleTag4.getId()), transform.getSideOutput((OutputTag) newLinkedHashMap.get(tupleTag4)));
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    static <T> Coder<WindowedValue<T>> instantiateCoder(String str, RunnerApi.Components components) {
        try {
            return WireCoders.instantiateRunnerWireCoder(PipelineNode.pCollection(str, components.getPcollectionsOrThrow(str)), components);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 2102878772:
                if (implMethodName.equals("lambda$translateFlatten$9263493$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("flatMap") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/flink/util/Collector;)V") && serializedLambda.getImplClass().equals("org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/flink/util/Collector;)V")) {
                    return (str2, collector) -> {
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
