package org.apache.beam.runners.flink.translation.wrappers.streaming;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.flink.util.Preconditions;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.class */
public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<InputT, OutputT> {
    private static final Logger logger = LoggerFactory.getLogger(ExecutableStageDoFnOperator.class.getName());
    private final RunnerApi.ExecutableStagePayload payload;
    private final JobInfo jobInfo;
    private final FlinkExecutableStageContext.Factory contextFactory;
    private final Map<String, TupleTag<?>> outputMap;
    private transient FlinkExecutableStageContext stageContext;
    private transient StateRequestHandler stateRequestHandler;
    private transient BundleProgressHandler progressHandler;
    private transient StageBundleFactory stageBundleFactory;

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator$NoOpDoFn.class */
    private static class NoOpDoFn<InputT, OutputT> extends DoFn<InputT, OutputT> {
        private NoOpDoFn() {
        }

        @DoFn.ProcessElement
        public void doNothing(DoFn<InputT, OutputT>.ProcessContext processContext) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator$ReceiverFactory.class */
    public static class ReceiverFactory implements OutputReceiverFactory {
        private final Object collectorLock = new Object();

        @GuardedBy("collectorLock")
        private final DoFnOperator.BufferedOutputManager<RawUnionValue> collector;
        private final Map<String, TupleTag<?>> outputMap;

        ReceiverFactory(DoFnOperator.BufferedOutputManager bufferedOutputManager, Map<String, TupleTag<?>> map) {
            this.collector = bufferedOutputManager;
            this.outputMap = map;
        }

        @Override // org.apache.beam.runners.fnexecution.control.OutputReceiverFactory
        public <OutputT> FnDataReceiver<OutputT> create(String str) {
            return obj -> {
                synchronized (this.collectorLock) {
                    this.collector.output((TupleTag) this.outputMap.get(str), (WindowedValue) obj);
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator$SdkHarnessDoFnRunner.class */
    private class SdkHarnessDoFnRunner implements DoFnRunner<InputT, OutputT> {
        private SdkHarnessDoFnRunner() {
        }

        @Override // org.apache.beam.runners.core.DoFnRunner
        public void startBundle() {
        }

        @Override // org.apache.beam.runners.core.DoFnRunner
        public void processElement(WindowedValue<InputT> windowedValue) {
            try {
                ExecutableStageDoFnOperator.this.processElementWithSdkHarness(windowedValue);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override // org.apache.beam.runners.core.DoFnRunner
        public void onTimer(String str, BoundedWindow boundedWindow, Instant instant, TimeDomain timeDomain) {
        }

        @Override // org.apache.beam.runners.core.DoFnRunner
        public void finishBundle() {
        }

        @Override // org.apache.beam.runners.core.DoFnRunner
        public DoFn<InputT, OutputT> getFn() {
            throw new UnsupportedOperationException();
        }
    }

    public ExecutableStageDoFnOperator(String str, Coder<WindowedValue<InputT>> coder, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, DoFnOperator.OutputManagerFactory<OutputT> outputManagerFactory, Map<Integer, PCollectionView<?>> map, Collection<PCollectionView<?>> collection, PipelineOptions pipelineOptions, RunnerApi.ExecutableStagePayload executableStagePayload, JobInfo jobInfo, FlinkExecutableStageContext.Factory factory, Map<String, TupleTag<?>> map2) {
        super(new NoOpDoFn(), str, coder, tupleTag, list, outputManagerFactory, WindowingStrategy.globalDefault(), map, collection, pipelineOptions, null, null);
        this.payload = executableStagePayload;
        this.jobInfo = jobInfo;
        this.contextFactory = factory;
        this.outputMap = map2;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        ExecutableStage fromPayload = ExecutableStage.fromPayload(this.payload);
        this.stageContext = this.contextFactory.get(this.jobInfo);
        this.stateRequestHandler = this.stageContext.getStateRequestHandler(fromPayload, getRuntimeContext());
        this.stageBundleFactory = this.stageContext.getStageBundleFactory(fromPayload);
        this.progressHandler = BundleProgressHandler.unsupported();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processElementWithSdkHarness(WindowedValue<InputT> windowedValue) throws Exception {
        Preconditions.checkState(this.stageBundleFactory != null, "%s not yet prepared", StageBundleFactory.class.getName());
        Preconditions.checkState(this.stateRequestHandler != null, "%s not yet prepared", StateRequestHandler.class.getName());
        RemoteBundle bundle = this.stageBundleFactory.getBundle(new ReceiverFactory(this.outputManager, this.outputMap), this.stateRequestHandler, this.progressHandler);
        Throwable th = null;
        try {
            try {
                logger.debug(String.format("Sending value: %s", windowedValue));
                bundle.getInputReceiver().accept(windowedValue);
                if (bundle != null) {
                    if (0 == 0) {
                        bundle.close();
                        return;
                    }
                    try {
                        bundle.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (bundle != null) {
                if (th != null) {
                    try {
                        bundle.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    bundle.close();
                }
            }
            throw th4;
        }
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        StageBundleFactory stageBundleFactory = this.stageBundleFactory;
        Throwable th = null;
        if (stageBundleFactory != null) {
            if (0 != 0) {
                try {
                    stageBundleFactory.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            } else {
                stageBundleFactory.close();
            }
        }
        this.stageContext = null;
        super.close();
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator
    protected DoFnRunner<InputT, OutputT> createWrappingDoFnRunner(DoFnRunner<InputT, OutputT> doFnRunner) {
        return new SdkHarnessDoFnRunner();
    }
}
