package org.apache.beam.runners.dataflow;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.base.Preconditions;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.dataflow.BatchViewOverrides;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.class */
public class BatchStatefulParDoOverrides {

    /* loaded from: input_file:org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides$BatchStatefulDoFn.class */
    public static class BatchStatefulDoFn<K, V, OutputT> extends DoFn<KV<K, Iterable<KV<Instant, WindowedValue<KV<K, V>>>>>, OutputT> {
        private final DoFn<KV<K, V>, OutputT> underlyingDoFn;

        BatchStatefulDoFn(DoFn<KV<K, V>, OutputT> doFn) {
            this.underlyingDoFn = doFn;
        }

        public DoFn<KV<K, V>, OutputT> getUnderlyingDoFn() {
            return this.underlyingDoFn;
        }

        @DoFn.Setup
        public void setup() {
            DoFnInvokers.invokerFor(this.underlyingDoFn).invokeSetup();
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<K, Iterable<KV<Instant, WindowedValue<KV<K, V>>>>>, OutputT>.ProcessContext processContext, BoundedWindow boundedWindow) {
            throw new UnsupportedOperationException("BatchStatefulDoFn.ProcessElement should never be invoked");
        }

        @DoFn.Teardown
        public void teardown() {
            DoFnInvokers.invokerFor(this.underlyingDoFn).invokeTeardown();
        }

        public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
            return this.underlyingDoFn.getOutputTypeDescriptor();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides$ExpandGbkFn.class */
    public static class ExpandGbkFn<K, V> extends DoFn<KV<K, Iterable<V>>, KV<K, V>> {
        ExpandGbkFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<K, Iterable<V>>, KV<K, V>>.ProcessContext processContext) {
            Object key = ((KV) processContext.element()).getKey();
            Iterator it = ((Iterable) ((KV) processContext.element()).getValue()).iterator();
            while (it.hasNext()) {
                processContext.output(KV.of(key, it.next()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides$GbkBeforeStatefulParDo.class */
    public static class GbkBeforeStatefulParDo<K, V> extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<KV<Instant, WindowedValue<KV<K, V>>>>>>> {
        GbkBeforeStatefulParDo() {
        }

        public PCollection<KV<K, Iterable<KV<Instant, WindowedValue<KV<K, V>>>>>> expand(PCollection<KV<K, V>> pCollection) {
            WindowingStrategy windowingStrategy = pCollection.getWindowingStrategy();
            Preconditions.checkState(pCollection.getCoder() instanceof KvCoder, "Input to a %s using state requires a %s, but the coder was %s", ParDo.class.getSimpleName(), KvCoder.class.getSimpleName(), pCollection.getCoder());
            KvCoder coder = pCollection.getCoder();
            return pCollection.apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn())).setCoder(KvCoder.of(coder.getKeyCoder(), KvCoder.of(InstantCoder.of(), WindowedValue.getFullCoder(coder, windowingStrategy.getWindowFn().windowCoder())))).apply("PartitionKeys", new BatchViewOverrides.GroupByKeyAndSortValuesOnly()).setWindowingStrategyInternal(windowingStrategy);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides$MultiOutputOverrideFactory.class */
    public static class MultiOutputOverrideFactory<K, InputT, OutputT> implements PTransformOverrideFactory<PCollection<KV<K, InputT>>, PCollectionTuple, ParDo.MultiOutput<KV<K, InputT>, OutputT>> {
        private final boolean isFnApi;

        private MultiOutputOverrideFactory(boolean z) {
            this.isFnApi = z;
        }

        public PTransformOverrideFactory.PTransformReplacement<PCollection<KV<K, InputT>>, PCollectionTuple> getReplacementTransform(AppliedPTransform<PCollection<KV<K, InputT>>, PCollectionTuple, ParDo.MultiOutput<KV<K, InputT>, OutputT>> appliedPTransform) {
            return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(appliedPTransform), new StatefulMultiOutputParDo(appliedPTransform.getTransform(), this.isFnApi));
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> map, PCollectionTuple pCollectionTuple) {
            return ReplacementOutputs.tagged(map, pCollectionTuple);
        }

        public /* bridge */ /* synthetic */ Map mapOutputs(Map map, POutput pOutput) {
            return mapOutputs((Map<TupleTag<?>, PValue>) map, (PCollectionTuple) pOutput);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides$ReifyWindowedValueFn.class */
    public static class ReifyWindowedValueFn<K, V> extends DoFn<KV<K, V>, KV<K, KV<Instant, WindowedValue<KV<K, V>>>>> {
        ReifyWindowedValueFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<K, V>, KV<K, KV<Instant, WindowedValue<KV<K, V>>>>>.ProcessContext processContext, BoundedWindow boundedWindow) {
            processContext.output(KV.of(((KV) processContext.element()).getKey(), KV.of(processContext.timestamp(), WindowedValue.of((KV) processContext.element(), processContext.timestamp(), boundedWindow, processContext.pane()))));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides$SingleOutputOverrideFactory.class */
    public static class SingleOutputOverrideFactory<K, InputT, OutputT> implements PTransformOverrideFactory<PCollection<KV<K, InputT>>, PCollection<OutputT>, ParDo.SingleOutput<KV<K, InputT>, OutputT>> {
        private final boolean isFnApi;

        private SingleOutputOverrideFactory(boolean z) {
            this.isFnApi = z;
        }

        public PTransformOverrideFactory.PTransformReplacement<PCollection<KV<K, InputT>>, PCollection<OutputT>> getReplacementTransform(AppliedPTransform<PCollection<KV<K, InputT>>, PCollection<OutputT>, ParDo.SingleOutput<KV<K, InputT>, OutputT>> appliedPTransform) {
            return PTransformOverrideFactory.PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(appliedPTransform), new StatefulSingleOutputParDo(appliedPTransform.getTransform(), this.isFnApi));
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> map, PCollection<OutputT> pCollection) {
            return ReplacementOutputs.singleton(map, pCollection);
        }

        public /* bridge */ /* synthetic */ Map mapOutputs(Map map, POutput pOutput) {
            return mapOutputs((Map<TupleTag<?>, PValue>) map, (PCollection) pOutput);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides$StatefulMultiOutputParDo.class */
    static class StatefulMultiOutputParDo<K, InputT, OutputT> extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> {
        private final ParDo.MultiOutput<KV<K, InputT>, OutputT> originalParDo;
        private final boolean isFnApi;

        StatefulMultiOutputParDo(ParDo.MultiOutput<KV<K, InputT>, OutputT> multiOutput, boolean z) {
            this.originalParDo = multiOutput;
            this.isFnApi = z;
        }

        public PCollectionTuple expand(PCollection<KV<K, InputT>> pCollection) {
            DoFn fn = this.originalParDo.getFn();
            BatchStatefulParDoOverrides.verifyFnIsStateful(fn);
            DataflowRunner.verifyStateSupported(fn);
            DataflowRunner.verifyStateSupportForWindowingStrategy(pCollection.getWindowingStrategy());
            if (this.isFnApi) {
                return pCollection.apply(GroupByKey.create()).apply(ParDo.of(new ExpandGbkFn())).apply(this.originalParDo);
            }
            return pCollection.apply(new GbkBeforeStatefulParDo()).apply(ParDo.of(new BatchStatefulDoFn(fn)).withSideInputs(this.originalParDo.getSideInputs()).withOutputTags(this.originalParDo.getMainOutputTag(), this.originalParDo.getAdditionalOutputTags()));
        }

        public ParDo.MultiOutput<KV<K, InputT>, OutputT> getOriginalParDo() {
            return this.originalParDo;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides$StatefulSingleOutputParDo.class */
    static class StatefulSingleOutputParDo<K, InputT, OutputT> extends PTransform<PCollection<KV<K, InputT>>, PCollection<OutputT>> {
        private final ParDo.SingleOutput<KV<K, InputT>, OutputT> originalParDo;
        private final boolean isFnApi;

        StatefulSingleOutputParDo(ParDo.SingleOutput<KV<K, InputT>, OutputT> singleOutput, boolean z) {
            this.originalParDo = singleOutput;
            this.isFnApi = z;
        }

        ParDo.SingleOutput<KV<K, InputT>, OutputT> getOriginalParDo() {
            return this.originalParDo;
        }

        public PCollection<OutputT> expand(PCollection<KV<K, InputT>> pCollection) {
            DoFn fn = this.originalParDo.getFn();
            BatchStatefulParDoOverrides.verifyFnIsStateful(fn);
            DataflowRunner.verifyStateSupported(fn);
            DataflowRunner.verifyStateSupportForWindowingStrategy(pCollection.getWindowingStrategy());
            if (this.isFnApi) {
                return pCollection.apply(GroupByKey.create()).apply(ParDo.of(new ExpandGbkFn())).apply(this.originalParDo);
            }
            return pCollection.apply(new GbkBeforeStatefulParDo()).apply(ParDo.of(new BatchStatefulDoFn(fn)).withSideInputs(this.originalParDo.getSideInputs()));
        }
    }

    public static <K, InputT, OutputT> PTransformOverrideFactory<PCollection<KV<K, InputT>>, PCollection<OutputT>, ParDo.SingleOutput<KV<K, InputT>, OutputT>> singleOutputOverrideFactory(DataflowPipelineOptions dataflowPipelineOptions) {
        return new SingleOutputOverrideFactory(isFnApi(dataflowPipelineOptions));
    }

    public static <K, InputT, OutputT> PTransformOverrideFactory<PCollection<KV<K, InputT>>, PCollectionTuple, ParDo.MultiOutput<KV<K, InputT>, OutputT>> multiOutputOverrideFactory(DataflowPipelineOptions dataflowPipelineOptions) {
        return new MultiOutputOverrideFactory(isFnApi(dataflowPipelineOptions));
    }

    private static boolean isFnApi(DataflowPipelineOptions dataflowPipelineOptions) {
        List experiments = dataflowPipelineOptions.getExperiments();
        return experiments != null && experiments.contains("beam_fn_api");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <InputT, OutputT> void verifyFnIsStateful(DoFn<InputT, OutputT> doFn) {
        DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
        Preconditions.checkState(signature.usesState() || signature.usesTimers(), "%s used for %s that does not use state or timers.", BatchStatefulParDoOverrides.class.getSimpleName(), ParDo.class.getSimpleName());
    }
}
