package org.apache.beam.fn.harness;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.DoFnPTransformRunner;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.SideInputSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PCollectionViewTranslation;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;

/* loaded from: input_file:org/apache/beam/fn/harness/DoFnPTransformRunnerFactory.class */
abstract class DoFnPTransformRunnerFactory<TransformInputT, FnInputT, OutputT, RunnerT extends DoFnPTransformRunner<TransformInputT>> implements PTransformRunnerFactory<RunnerT> {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/fn/harness/DoFnPTransformRunnerFactory$Context.class */
    public static class Context<InputT, OutputT> {
        final PipelineOptions pipelineOptions;
        final BeamFnStateClient beamFnStateClient;
        final String ptransformId;
        final RunnerApi.PTransform pTransform;
        final Supplier<String> processBundleInstructionId;
        final RehydratedComponents rehydratedComponents;
        final DoFn<InputT, OutputT> doFn;
        final DoFnSignature doFnSignature;
        final TupleTag<OutputT> mainOutputTag;
        final Coder<?> inputCoder;
        final SchemaCoder<InputT> schemaCoder;
        final Coder<?> keyCoder;
        final SchemaCoder<OutputT> mainOutputSchemaCoder;
        final Coder<? extends BoundedWindow> windowCoder;
        final WindowingStrategy<InputT, ?> windowingStrategy;
        final Map<TupleTag<?>, SideInputSpec> tagToSideInputSpecMap;
        Map<TupleTag<?>, Coder<?>> outputCoders;
        final RunnerApi.ParDoPayload parDoPayload;
        final ListMultimap<String, FnDataReceiver<WindowedValue<?>>> localNameToConsumer;
        final BundleSplitListener splitListener;

        Context(PipelineOptions pipelineOptions, BeamFnStateClient beamFnStateClient, String str, RunnerApi.PTransform pTransform, Supplier<String> supplier, Map<String, RunnerApi.PCollection> map, Map<String, RunnerApi.Coder> map2, Map<String, RunnerApi.WindowingStrategy> map3, PCollectionConsumerRegistry pCollectionConsumerRegistry, BundleSplitListener bundleSplitListener) {
            this.pipelineOptions = pipelineOptions;
            this.beamFnStateClient = beamFnStateClient;
            this.ptransformId = str;
            this.pTransform = pTransform;
            this.processBundleInstructionId = supplier;
            ImmutableMap.Builder builder = ImmutableMap.builder();
            try {
                this.rehydratedComponents = RehydratedComponents.forComponents(RunnerApi.Components.newBuilder().putAllCoders(map2).putAllPcollections(map).putAllWindowingStrategies(map3).build()).withPipeline(Pipeline.create());
                this.parDoPayload = RunnerApi.ParDoPayload.parseFrom(pTransform.getSpec().getPayload());
                this.doFn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(this.parDoPayload);
                this.doFnSignature = DoFnSignatures.signatureForDoFn(this.doFn);
                this.mainOutputTag = (TupleTag<OutputT>) ParDoTranslation.getMainOutputTag(this.parDoPayload);
                RunnerApi.PCollection pCollection = map.get(pTransform.getInputsOrThrow((String) Iterables.getOnlyElement(Sets.difference(pTransform.getInputsMap().keySet(), Sets.union(this.parDoPayload.getSideInputsMap().keySet(), this.parDoPayload.getTimerSpecsMap().keySet())))));
                this.inputCoder = this.rehydratedComponents.getCoder(pCollection.getCoderId());
                if ((this.inputCoder instanceof KvCoder) || ((this.inputCoder instanceof WindowedValue.WindowedValueCoder) && (((WindowedValue.WindowedValueCoder) this.inputCoder).getValueCoder() instanceof KvCoder))) {
                    this.keyCoder = this.inputCoder instanceof WindowedValue.WindowedValueCoder ? ((KvCoder) ((WindowedValue.WindowedValueCoder) this.inputCoder).getValueCoder()).getKeyCoder() : ((KvCoder) this.inputCoder).getKeyCoder();
                } else {
                    this.keyCoder = null;
                }
                if ((this.inputCoder instanceof SchemaCoder) || ((this.inputCoder instanceof WindowedValue.WindowedValueCoder) && (((WindowedValue.WindowedValueCoder) this.inputCoder).getValueCoder() instanceof SchemaCoder))) {
                    this.schemaCoder = this.inputCoder instanceof WindowedValue.WindowedValueCoder ? (SchemaCoder) ((WindowedValue.WindowedValueCoder) this.inputCoder).getValueCoder() : (SchemaCoder) this.inputCoder;
                } else {
                    this.schemaCoder = null;
                }
                this.windowingStrategy = (WindowingStrategy<InputT, ?>) this.rehydratedComponents.getWindowingStrategy(pCollection.getWindowingStrategyId());
                this.windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
                this.outputCoders = Maps.newHashMap();
                for (Map.Entry<String, String> entry : pTransform.getOutputsMap().entrySet()) {
                    TupleTag<?> tupleTag = new TupleTag<>(entry.getKey());
                    Coder<?> coder = this.rehydratedComponents.getCoder(map.get(entry.getValue()).getCoderId());
                    if (coder instanceof WindowedValue.WindowedValueCoder) {
                        coder = ((WindowedValue.WindowedValueCoder) coder).getValueCoder();
                    }
                    this.outputCoders.put(tupleTag, coder);
                }
                Coder<?> coder2 = this.outputCoders.get(this.mainOutputTag);
                this.mainOutputSchemaCoder = coder2 instanceof SchemaCoder ? (SchemaCoder) coder2 : null;
                for (Map.Entry<String, RunnerApi.SideInput> entry2 : this.parDoPayload.getSideInputsMap().entrySet()) {
                    String key = entry2.getKey();
                    RunnerApi.SideInput value = entry2.getValue();
                    Preconditions.checkArgument(Materializations.MULTIMAP_MATERIALIZATION_URN.equals(value.getAccessPattern().getUrn()), "This SDK is only capable of dealing with %s materializations but was asked to handle %s for PCollectionView with tag %s.", Materializations.MULTIMAP_MATERIALIZATION_URN, value.getAccessPattern().getUrn(), key);
                    RunnerApi.PCollection pCollection2 = map.get(pTransform.getInputsOrThrow(key));
                    builder.put(new TupleTag(entry2.getKey()), SideInputSpec.create(this.rehydratedComponents.getCoder(pCollection2.getCoderId()), this.rehydratedComponents.getWindowingStrategy(pCollection2.getWindowingStrategyId()).getWindowFn().windowCoder(), PCollectionViewTranslation.viewFnFromProto(entry2.getValue().getViewFn()), PCollectionViewTranslation.windowMappingFnFromProto(entry2.getValue().getWindowMappingFn())));
                }
                ImmutableListMultimap.Builder builder2 = ImmutableListMultimap.builder();
                for (Map.Entry<String, String> entry3 : pTransform.getOutputsMap().entrySet()) {
                    builder2.putAll((ImmutableListMultimap.Builder) entry3.getKey(), (Object[]) new FnDataReceiver[]{pCollectionConsumerRegistry.getMultiplexingConsumer(entry3.getValue())});
                }
                this.localNameToConsumer = builder2.build();
                this.tagToSideInputSpecMap = builder.build();
                this.splitListener = bundleSplitListener;
            } catch (IOException e) {
                throw new IllegalArgumentException("Malformed ParDoPayload", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/fn/harness/DoFnPTransformRunnerFactory$DoFnPTransformRunner.class */
    public interface DoFnPTransformRunner<T> {
        void startBundle() throws Exception;

        void processElement(WindowedValue<T> windowedValue) throws Exception;

        void processTimer(String str, TimeDomain timeDomain, WindowedValue<KV<Object, Timer>> windowedValue);

        void finishBundle() throws Exception;
    }

    @Override // org.apache.beam.fn.harness.PTransformRunnerFactory
    public final RunnerT createRunnerForPTransform(PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient, String str, RunnerApi.PTransform pTransform, Supplier<String> supplier, Map<String, RunnerApi.PCollection> map, Map<String, RunnerApi.Coder> map2, Map<String, RunnerApi.WindowingStrategy> map3, PCollectionConsumerRegistry pCollectionConsumerRegistry, PTransformFunctionRegistry pTransformFunctionRegistry, PTransformFunctionRegistry pTransformFunctionRegistry2, BundleSplitListener bundleSplitListener) {
        Context<FnInputT, OutputT> context = new Context<>(pipelineOptions, beamFnStateClient, str, pTransform, supplier, map, map2, map3, pCollectionConsumerRegistry, bundleSplitListener);
        RunnerT createRunner = createRunner(context);
        Objects.requireNonNull(createRunner);
        pTransformFunctionRegistry.register(str, createRunner::startBundle);
        Iterator<E> it = Sets.difference(pTransform.getInputsMap().keySet(), Sets.union(context.parDoPayload.getSideInputsMap().keySet(), context.parDoPayload.getTimerSpecsMap().keySet())).iterator();
        while (it.hasNext()) {
            String inputsOrThrow = pTransform.getInputsOrThrow((String) it.next());
            Objects.requireNonNull(createRunner);
            pCollectionConsumerRegistry.register(inputsOrThrow, str, createRunner::processElement);
        }
        for (String str2 : context.parDoPayload.getTimerSpecsMap().keySet()) {
            TimeDomain timeDomain = DoFnSignatures.getTimerSpecOrThrow(context.doFnSignature.timerDeclarations().get(str2), context.doFn).getTimeDomain();
            pCollectionConsumerRegistry.register(pTransform.getInputsOrThrow(str2), str, obj -> {
                createRunner.processTimer(str2, timeDomain, (WindowedValue) obj);
            });
        }
        Objects.requireNonNull(createRunner);
        pTransformFunctionRegistry2.register(str, createRunner::finishBundle);
        return createRunner;
    }

    abstract RunnerT createRunner(Context<FnInputT, OutputT> context);

    @Override // org.apache.beam.fn.harness.PTransformRunnerFactory
    public /* bridge */ /* synthetic */ Object createRunnerForPTransform(PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient, String str, RunnerApi.PTransform pTransform, Supplier supplier, Map map, Map map2, Map map3, PCollectionConsumerRegistry pCollectionConsumerRegistry, PTransformFunctionRegistry pTransformFunctionRegistry, PTransformFunctionRegistry pTransformFunctionRegistry2, BundleSplitListener bundleSplitListener) throws IOException {
        return createRunnerForPTransform(pipelineOptions, beamFnDataClient, beamFnStateClient, str, pTransform, (Supplier<String>) supplier, (Map<String, RunnerApi.PCollection>) map, (Map<String, RunnerApi.Coder>) map2, (Map<String, RunnerApi.WindowingStrategy>) map3, pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, bundleSplitListener);
    }
}
