package org.apache.beam.runners.flink.translation.functions;

import java.util.Iterator;
import java.util.Map;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_flink_2.p00011.com.google.common.base.Preconditions;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.class */
public class FlinkExecutableStageFunction<InputT> extends RichMapPartitionFunction<WindowedValue<InputT>, RawUnionValue> {
    private final RunnerApi.ExecutableStagePayload stagePayload;
    private final JobInfo jobInfo;
    private final Map<String, Integer> outputMap;
    private final FlinkExecutableStageContext.Factory contextFactory;
    private transient RuntimeContext runtimeContext;
    private transient FlinkExecutableStageContext stageContext;
    private transient StateRequestHandler stateRequestHandler;
    private transient StageBundleFactory<InputT> stageBundleFactory;
    private transient BundleProgressHandler progressHandler;

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction$ReceiverFactory.class */
    private static class ReceiverFactory implements OutputReceiverFactory {
        private final Object collectorLock = new Object();

        @GuardedBy("collectorLock")
        private final Collector<RawUnionValue> collector;
        private final Map<String, Integer> outputMap;

        ReceiverFactory(Collector<RawUnionValue> collector, Map<String, Integer> map) {
            this.collector = collector;
            this.outputMap = map;
        }

        @Override // org.apache.beam.runners.fnexecution.control.OutputReceiverFactory
        public <OutputT> FnDataReceiver<OutputT> create(String str) {
            Integer num = this.outputMap.get(str);
            Preconditions.checkArgument(num != null, "Unknown PCollection id: %s", str);
            int intValue = num.intValue();
            return obj -> {
                synchronized (this.collectorLock) {
                    this.collector.collect(new RawUnionValue(intValue, obj));
                }
            };
        }
    }

    public FlinkExecutableStageFunction(RunnerApi.ExecutableStagePayload executableStagePayload, JobInfo jobInfo, Map<String, Integer> map, FlinkExecutableStageContext.Factory factory) {
        this.stagePayload = executableStagePayload;
        this.jobInfo = jobInfo;
        this.outputMap = map;
        this.contextFactory = factory;
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void open(Configuration configuration) throws Exception {
        FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
        ExecutableStage fromPayload = ExecutableStage.fromPayload(this.stagePayload);
        this.runtimeContext = getRuntimeContext();
        this.stageContext = this.contextFactory.get(this.jobInfo);
        this.stateRequestHandler = this.stageContext.getStateRequestHandler(fromPayload, this.runtimeContext);
        this.stageBundleFactory = this.stageContext.getStageBundleFactory(fromPayload);
        this.progressHandler = BundleProgressHandler.unsupported();
    }

    @Override // org.apache.flink.api.common.functions.RichMapPartitionFunction, org.apache.flink.api.common.functions.MapPartitionFunction
    public void mapPartition(Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionValue> collector) throws Exception {
        Preconditions.checkState(this.runtimeContext == getRuntimeContext(), "RuntimeContext changed from under us. State handler invalid.");
        Preconditions.checkState(this.stageBundleFactory != null, "%s not yet prepared", StageBundleFactory.class.getName());
        Preconditions.checkState(this.stateRequestHandler != null, "%s not yet prepared", StateRequestHandler.class.getName());
        RemoteBundle<InputT> bundle = this.stageBundleFactory.getBundle(new ReceiverFactory(collector, this.outputMap), this.stateRequestHandler, this.progressHandler);
        Throwable th = null;
        try {
            try {
                FnDataReceiver<WindowedValue<InputT>> inputReceiver = bundle.getInputReceiver();
                Iterator<WindowedValue<InputT>> it = iterable.iterator();
                while (it.hasNext()) {
                    inputReceiver.accept(it.next());
                }
                if (bundle != null) {
                    if (0 == 0) {
                        bundle.close();
                        return;
                    }
                    try {
                        bundle.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (bundle != null) {
                if (th != null) {
                    try {
                        bundle.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    bundle.close();
                }
            }
            throw th4;
        }
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void close() throws Exception {
        StageBundleFactory<InputT> stageBundleFactory = this.stageBundleFactory;
        Throwable th = null;
        if (stageBundleFactory != null) {
            if (0 != 0) {
                try {
                    stageBundleFactory.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            } else {
                stageBundleFactory.close();
            }
        }
        this.stageContext = null;
    }
}
