package org.apache.beam.runners.direct.portable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.splittabledofn.SDFFeederViaStateAndTimers;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.wire.WireCoders;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.data.FnDataReceiver;
import org.apache.beam.runners.direct.portable.StepStateAndTimers;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.class */
public class SplittableRemoteStageEvaluatorFactory implements TransformEvaluatorFactory {
    public static final String URN = "beam:directrunner:transforms:splittable_remote_stage:v1";
    public static final String FEED_SDF_URN = "beam:directrunner:transforms:feed_sdf:v1";
    private final BundleFactory bundleFactory;
    private final JobBundleFactory jobBundleFactory;
    private final StepStateAndTimers.Provider stp;

    /* loaded from: input_file:org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory$SplittableRemoteStageEvaluator.class */
    private static class SplittableRemoteStageEvaluator<InputT, RestrictionT> implements TransformEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> {
        private final PipelineNode.PTransformNode transform;
        private final ExecutableStage stage;
        private final CopyOnAccessInMemoryStateInternals<byte[]> stateInternals;
        private final DirectTimerInternals timerInternals;
        private final RemoteBundle bundle;
        private final FnDataReceiver<WindowedValue<?>> mainInput;
        private final Collection<UncommittedBundle<?>> outputs;
        private final SDFFeederViaStateAndTimers<InputT, RestrictionT> feeder;

        private SplittableRemoteStageEvaluator(BundleFactory bundleFactory, JobBundleFactory jobBundleFactory, StepStateAndTimers<byte[]> stepStateAndTimers, PipelineNode.PTransformNode pTransformNode) throws Exception {
            this.stateInternals = stepStateAndTimers.stateInternals();
            this.timerInternals = stepStateAndTimers.timerInternals();
            this.transform = pTransformNode;
            this.stage = ExecutableStage.fromPayload(RunnerApi.ExecutableStagePayload.parseFrom(pTransformNode.getTransform().getSpec().getPayload()));
            this.outputs = new ArrayList();
            WindowedValue.FullWindowedValueCoder instantiateRunnerWireCoder = WireCoders.instantiateRunnerWireCoder(this.stage.getInputPCollection(), this.stage.getComponents());
            KvCoder valueCoder = instantiateRunnerWireCoder.getValueCoder();
            this.feeder = new SDFFeederViaStateAndTimers<>(this.stateInternals, this.timerInternals, valueCoder.getKeyCoder(), valueCoder.getValueCoder(), instantiateRunnerWireCoder.getWindowCoder());
            StageBundleFactory forStage = jobBundleFactory.forStage(this.stage);
            RunnerApi.Components components = this.stage.getComponents();
            Collection<UncommittedBundle<?>> collection = this.outputs;
            Objects.requireNonNull(collection);
            this.bundle = forStage.getBundle(BundleFactoryOutputReceiverFactory.create(bundleFactory, components, (v1) -> {
                r4.add(v1);
            }), StateRequestHandler.unsupported(), new BundleProgressHandler() { // from class: org.apache.beam.runners.direct.portable.SplittableRemoteStageEvaluatorFactory.SplittableRemoteStageEvaluator.1
                @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.BundleProgressHandler
                public void onProgress(BeamFnApi.ProcessBundleProgressResponse processBundleProgressResponse) {
                }

                @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.BundleProgressHandler
                public void onCompleted(BeamFnApi.ProcessBundleResponse processBundleResponse) {
                }
            });
            this.mainInput = (FnDataReceiver) Iterables.getOnlyElement(this.bundle.getInputReceivers().values());
        }

        @Override // org.apache.beam.runners.direct.portable.TransformEvaluator
        public void processElement(WindowedValue<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> windowedValue) throws Exception {
            KeyedWorkItem keyedWorkItem = (KeyedWorkItem) windowedValue.getValue();
            WindowedValue<KV<InputT, RestrictionT>> windowedValue2 = (WindowedValue) Iterables.getOnlyElement(keyedWorkItem.elementsIterable(), null);
            if (windowedValue2 != null) {
                this.feeder.seed(windowedValue2);
            } else {
                windowedValue2 = this.feeder.resume((TimerInternals.TimerData) Iterables.getOnlyElement(keyedWorkItem.timersIterable()));
            }
            this.mainInput.accept(windowedValue2);
        }

        @Override // org.apache.beam.runners.direct.portable.TransformEvaluator
        public TransformResult<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> finishBundle() throws Exception {
            this.bundle.close();
            this.feeder.commit();
            CopyOnAccessInMemoryStateInternals<byte[]> commit = this.stateInternals.commit();
            return StepTransformResult.withHold(this.transform, commit.getEarliestWatermarkHold()).addOutput(this.outputs).withState(commit).withTimerUpdate(this.timerInternals.getTimerUpdate()).build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SplittableRemoteStageEvaluatorFactory(BundleFactory bundleFactory, JobBundleFactory jobBundleFactory, StepStateAndTimers.Provider provider) {
        this.bundleFactory = bundleFactory;
        this.jobBundleFactory = jobBundleFactory;
        this.stp = provider;
    }

    @Override // org.apache.beam.runners.direct.portable.TransformEvaluatorFactory
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(PipelineNode.PTransformNode pTransformNode, CommittedBundle<?> committedBundle) throws Exception {
        return new SplittableRemoteStageEvaluator(this.bundleFactory, this.jobBundleFactory, this.stp.forStepAndKey(pTransformNode, committedBundle.getKey()), pTransformNode);
    }

    @Override // org.apache.beam.runners.direct.portable.TransformEvaluatorFactory
    public void cleanup() throws Exception {
        this.jobBundleFactory.close();
    }
}
