package org.apache.beam.runners.flink;

import java.util.Iterator;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.class */
public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class);
    private final FlinkBatchTranslationContext batchContext;
    private int depth = 0;

    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkBatchPipelineTranslator$BatchTransformTranslator.class */
    public interface BatchTransformTranslator<TransformT extends PTransform> {
        void translateNode(TransformT transformt, FlinkBatchTranslationContext flinkBatchTranslationContext);
    }

    public FlinkBatchPipelineTranslator(ExecutionEnvironment executionEnvironment, PipelineOptions pipelineOptions) {
        this.batchContext = new FlinkBatchTranslationContext(executionEnvironment, pipelineOptions);
    }

    @Override // org.apache.beam.runners.flink.FlinkPipelineTranslator
    public void translate(Pipeline pipeline) {
        super.translate(pipeline);
        Iterator<DataSet<?>> it = this.batchContext.getDanglingDataSets().values().iterator();
        while (it.hasNext()) {
            it.next().output(new DiscardingOutputFormat());
        }
    }

    public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
        LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
        this.depth++;
        BatchTransformTranslator<?> translator = getTranslator(node);
        if (translator == null) {
            return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
        }
        applyBatchTransform(node.getTransform(), node, translator);
        LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName());
        return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
    }

    public void leaveCompositeTransform(TransformHierarchy.Node node) {
        this.depth--;
        LOG.info("{} leaveCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
    }

    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
        LOG.info("{} visitPrimitiveTransform- {}", genSpaces(this.depth), node.getFullName());
        PTransform<?, ?> transform = node.getTransform();
        BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
        if (translator == null) {
            throw new UnsupportedOperationException("The transform " + PTransformTranslation.urnForTransform(transform) + " is currently not supported.");
        }
        applyBatchTransform(transform, node, translator);
    }

    private <T extends PTransform<?, ?>> void applyBatchTransform(PTransform<?, ?> pTransform, TransformHierarchy.Node node, BatchTransformTranslator<?> batchTransformTranslator) {
        this.batchContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
        batchTransformTranslator.translateNode(pTransform, this.batchContext);
    }

    private static BatchTransformTranslator<?> getTranslator(TransformHierarchy.Node node) {
        PTransform transform = node.getTransform();
        if (transform == null) {
            return null;
        }
        return FlinkBatchTransformTranslators.getTranslator(transform);
    }
}
