package org.apache.flink.streaming.runtime.translators;

import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.datastream.impl.ExecutionEnvironmentImpl;
import org.apache.flink.datastream.impl.stream.AbstractDataStream;
import org.apache.flink.datastream.impl.stream.NonKeyedPartitionStreamImpl;
import org.apache.flink.datastream.impl.utils.StreamUtils;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator.class */
public class DataStreamV2SinkTransformationTranslator<Input, Output> implements TransformationTranslator<Output, DataStreamV2SinkTransformation<Input, Output>> {
    private static final String COMMITTER_NAME = "Committer";
    private static final String WRITER_NAME = "Writer";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/translators/DataStreamV2SinkTransformationTranslator$SinkExpander.class */
    public static class SinkExpander<T> {
        private final DataStreamV2SinkTransformation<T, ?> transformation;
        private final Sink<T> sink;
        private final TransformationTranslator.Context context;
        private final AbstractDataStream<T> inputStream;
        private final ExecutionEnvironmentImpl executionEnvironment;
        private final boolean isCheckpointingEnabled;
        private final boolean isBatchMode;

        public SinkExpander(AbstractDataStream<T> abstractDataStream, Sink<T> sink, DataStreamV2SinkTransformation<T, ?> dataStreamV2SinkTransformation, TransformationTranslator.Context context, boolean z) {
            this.inputStream = abstractDataStream;
            this.executionEnvironment = abstractDataStream.getEnvironment();
            this.isCheckpointingEnabled = this.executionEnvironment.getCheckpointCfg().isCheckpointingEnabled();
            this.transformation = dataStreamV2SinkTransformation;
            this.sink = sink;
            this.context = context;
            this.isBatchMode = z;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void expand() {
            int size = this.executionEnvironment.getTransformations().size();
            AbstractDataStream<T> abstractDataStream = this.inputStream;
            if (this.sink instanceof SupportsPreWriteTopology) {
                throw new UnsupportedOperationException("Sink with pre-write topology is not supported for DataStream v2 atm.");
            }
            if (this.sink instanceof SupportsPreCommitTopology) {
                throw new UnsupportedOperationException("Sink with pre-commit topology is not supported for DataStream v2 atm.");
            }
            if (this.sink instanceof SupportsPostCommitTopology) {
                throw new UnsupportedOperationException("Sink with post-commit topology is not supported for DataStream v2 atm.");
            }
            if (this.sink instanceof SupportsCommitter) {
                addCommittingTopology(this.sink, abstractDataStream);
            } else {
                adjustTransformations(abstractDataStream, abstractDataStream2 -> {
                    return StreamUtils.transformOneInputOperator(DataStreamV2SinkTransformationTranslator.WRITER_NAME, abstractDataStream2, CommittableMessageTypeInfo.noOutput(), new SinkWriterOperatorFactory(this.sink));
                }, this.sink instanceof SupportsConcurrentExecutionAttempts);
            }
            List<Transformation<?>> subList = this.executionEnvironment.getTransformations().subList(size, this.executionEnvironment.getTransformations().size());
            TransformationTranslator.Context context = this.context;
            context.getClass();
            subList.forEach(context::transform);
            while (this.executionEnvironment.getTransformations().size() > size) {
                this.executionEnvironment.getTransformations().remove(this.executionEnvironment.getTransformations().size() - 1);
            }
        }

        private <CommT> void addCommittingTopology(Sink<T> sink, AbstractDataStream<T> abstractDataStream) {
            SupportsCommitter supportsCommitter = (SupportsCommitter) sink;
            supportsCommitter.getClass();
            TypeInformation<CommittableMessage<WriteResultT>> of = CommittableMessageTypeInfo.of(supportsCommitter::getCommittableSerializer);
            adjustTransformations(addWriter(sink, abstractDataStream, of), abstractDataStream2 -> {
                return StreamUtils.transformOneInputOperator(DataStreamV2SinkTransformationTranslator.COMMITTER_NAME, abstractDataStream2, of, new CommitterOperatorFactory(supportsCommitter, this.isBatchMode, this.isCheckpointingEnabled));
            }, false);
        }

        /* JADX WARN: Multi-variable type inference failed */
        private <WriteResultT> AbstractDataStream<CommittableMessage<WriteResultT>> addWriter(Sink<T> sink, AbstractDataStream<T> abstractDataStream, TypeInformation<CommittableMessage<WriteResultT>> typeInformation) {
            return (AbstractDataStream<CommittableMessage<WriteResultT>>) addFailOverRegion((AbstractDataStream) adjustTransformations(abstractDataStream, abstractDataStream2 -> {
                return StreamUtils.transformOneInputOperator(DataStreamV2SinkTransformationTranslator.WRITER_NAME, abstractDataStream2, typeInformation, new SinkWriterOperatorFactory(sink));
            }, sink instanceof SupportsConcurrentExecutionAttempts));
        }

        private <I> AbstractDataStream<I> addFailOverRegion(AbstractDataStream<I> abstractDataStream) {
            return new NonKeyedPartitionStreamImpl(abstractDataStream.getEnvironment(), new PartitionTransformation(abstractDataStream.getTransformation(), new ForwardPartitioner(), StreamExchangeMode.BATCH));
        }

        private <I, R> R adjustTransformations(AbstractDataStream<I> abstractDataStream, Function<AbstractDataStream<I>, R> function, boolean z) {
            int size = this.executionEnvironment.getTransformations().size();
            R apply = function.apply(abstractDataStream);
            List<Transformation<?>> transformations = this.executionEnvironment.getTransformations();
            List<Transformation<?>> subList = transformations.subList(size, transformations.size());
            CustomSinkOperatorUidHashes customSinkOperatorUidHashes = CustomSinkOperatorUidHashes.DEFAULT;
            Iterator<Transformation<?>> it = subList.iterator();
            while (it.hasNext()) {
                PhysicalTransformation physicalTransformation = (Transformation) it.next();
                setOperatorUidHashIfPossible(physicalTransformation, DataStreamV2SinkTransformationTranslator.WRITER_NAME, customSinkOperatorUidHashes.getWriterUidHash());
                setOperatorUidHashIfPossible(physicalTransformation, DataStreamV2SinkTransformationTranslator.COMMITTER_NAME, customSinkOperatorUidHashes.getCommitterUidHash());
                setOperatorUidHashIfPossible(physicalTransformation, "Global Committer", customSinkOperatorUidHashes.getGlobalCommitterUidHash());
                concatUid(physicalTransformation, (v0) -> {
                    return v0.getUid();
                }, (v0, v1) -> {
                    v0.setUid(v1);
                }, physicalTransformation.getName());
                concatProperty(physicalTransformation, (v0) -> {
                    return v0.getCoLocationGroupKey();
                }, (v0, v1) -> {
                    v0.setCoLocationGroupKey(v1);
                });
                concatProperty(physicalTransformation, (v0) -> {
                    return v0.getName();
                }, (v0, v1) -> {
                    v0.setName(v1);
                });
                concatProperty(physicalTransformation, (v0) -> {
                    return v0.getDescription();
                }, (v0, v1) -> {
                    v0.setDescription(v1);
                });
                Optional slotSharingGroup = this.transformation.getSlotSharingGroup();
                if (slotSharingGroup.isPresent() && !physicalTransformation.getSlotSharingGroup().isPresent()) {
                    physicalTransformation.setSlotSharingGroup((SlotSharingGroup) slotSharingGroup.get());
                }
                physicalTransformation.setParallelism(this.transformation.getParallelism());
                if (physicalTransformation.getMaxParallelism() < 0 && this.transformation.getMaxParallelism() > 0) {
                    physicalTransformation.setMaxParallelism(this.transformation.getMaxParallelism());
                }
                if (physicalTransformation instanceof PhysicalTransformation) {
                    PhysicalTransformation physicalTransformation2 = physicalTransformation;
                    if (this.transformation.getChainingStrategy() != null) {
                        physicalTransformation2.setChainingStrategy(this.transformation.getChainingStrategy());
                    }
                    physicalTransformation2.setSupportsConcurrentExecutionAttempts(z);
                }
            }
            return apply;
        }

        private void setOperatorUidHashIfPossible(Transformation<?> transformation, String str, @Nullable String str2) {
            if (str2 == null || !transformation.getName().equals(str)) {
                return;
            }
            transformation.setUidHash(str2);
        }

        private void concatUid(Transformation<?> transformation, Function<Transformation<?>, String> function, BiConsumer<Transformation<?>, String> biConsumer, @Nullable String str) {
            if (str != null && function.apply(this.transformation) != null) {
                if (str.equals(DataStreamV2SinkTransformationTranslator.COMMITTER_NAME)) {
                    biConsumer.accept(transformation, String.format("Sink Committer: %s", function.apply(this.transformation)));
                    return;
                } else if (str.equals(DataStreamV2SinkTransformationTranslator.WRITER_NAME)) {
                    biConsumer.accept(transformation, function.apply(this.transformation));
                    return;
                } else if (str.equals("Global Committer")) {
                    biConsumer.accept(transformation, String.format("Sink %s Global Committer", function.apply(this.transformation)));
                    return;
                }
            }
            concatProperty(transformation, function, biConsumer);
        }

        private void concatProperty(Transformation<?> transformation, Function<Transformation<?>, String> function, BiConsumer<Transformation<?>, String> biConsumer) {
            if (function.apply(this.transformation) == null || function.apply(transformation) == null) {
                return;
            }
            biConsumer.accept(transformation, function.apply(this.transformation) + ": " + function.apply(transformation));
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -731654093:
                    if (implMethodName.equals("getCommittableSerializer")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/api/connector/sink2/SupportsCommitter") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/flink/core/io/SimpleVersionedSerializer;")) {
                        SupportsCommitter supportsCommitter = (SupportsCommitter) serializedLambda.getCapturedArg(0);
                        return supportsCommitter::getCommittableSerializer;
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    public Collection<Integer> translateForBatch(DataStreamV2SinkTransformation<Input, Output> dataStreamV2SinkTransformation, TransformationTranslator.Context context) {
        return translateInternal(dataStreamV2SinkTransformation, context, true);
    }

    public Collection<Integer> translateForStreaming(DataStreamV2SinkTransformation<Input, Output> dataStreamV2SinkTransformation, TransformationTranslator.Context context) {
        return translateInternal(dataStreamV2SinkTransformation, context, false);
    }

    private Collection<Integer> translateInternal(DataStreamV2SinkTransformation<Input, Output> dataStreamV2SinkTransformation, TransformationTranslator.Context context, boolean z) {
        new SinkExpander(dataStreamV2SinkTransformation.getInputStream(), dataStreamV2SinkTransformation.getSink(), dataStreamV2SinkTransformation, context, z).expand();
        return Collections.emptyList();
    }

    public static void registerSinkTransformationTranslator() throws Exception {
        Field declaredField = StreamGraphGenerator.class.getDeclaredField("translatorMap");
        declaredField.setAccessible(true);
        Map map = (Map) declaredField.get(null);
        Field declaredField2 = map.getClass().getDeclaredField("m");
        declaredField2.setAccessible(true);
        ((Map) declaredField2.get(map)).put(DataStreamV2SinkTransformation.class, new DataStreamV2SinkTransformationTranslator());
    }
}
