package org.apache.beam.fn.harness.control;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Phaser;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.data.QueueingBeamFnDataClient;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.fn.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Message;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.HashMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.SetMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler.class */
public class ProcessBundleHandler {
    private static final String DATA_INPUT_URN = "urn:org.apache.beam:source:runner:0.1";
    public static final String JAVA_SOURCE_URN = "urn:org.apache.beam:source:java:0.1";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ProcessBundleHandler.class);
    private static final Map<String, PTransformRunnerFactory> REGISTERED_RUNNER_FACTORIES;
    private final PipelineOptions options;
    private final Function<String, Message> fnApiRegistry;
    private final BeamFnDataClient beamFnDataClient;
    private final BeamFnStateGrpcClientCache beamFnStateGrpcClientCache;
    private final Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap;
    private final PTransformRunnerFactory defaultPTransformRunnerFactory;

    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$BlockTillStateCallsFinish.class */
    private static class BlockTillStateCallsFinish extends HandleStateCallsForBundle {
        private final BeamFnStateClient beamFnStateClient;
        private final Phaser phaser;
        private int currentPhase;

        private BlockTillStateCallsFinish(BeamFnStateClient beamFnStateClient) {
            super();
            this.beamFnStateClient = beamFnStateClient;
            this.phaser = new Phaser(1);
            this.currentPhase = this.phaser.getPhase();
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            int unarrivedParties = this.phaser.getUnarrivedParties();
            if (unarrivedParties > 0) {
                ProcessBundleHandler.LOG.debug("Waiting for {} parties to arrive before closing, current phase {}.", Integer.valueOf(unarrivedParties), Integer.valueOf(this.currentPhase));
            }
            this.currentPhase = this.phaser.arriveAndAwaitAdvance();
        }

        @Override // org.apache.beam.fn.harness.state.BeamFnStateClient
        public void handle(BeamFnApi.StateRequest.Builder builder, CompletableFuture<BeamFnApi.StateResponse> completableFuture) {
            this.phaser.register();
            completableFuture.whenComplete((stateResponse, th) -> {
                this.phaser.arriveAndDeregister();
            });
            this.beamFnStateClient.handle(builder, completableFuture);
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$FailAllStateCallsForBundle.class */
    private static class FailAllStateCallsForBundle extends HandleStateCallsForBundle {
        private final BeamFnApi.ProcessBundleRequest request;

        private FailAllStateCallsForBundle(BeamFnApi.ProcessBundleRequest processBundleRequest) {
            super();
            this.request = processBundleRequest;
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
        }

        @Override // org.apache.beam.fn.harness.state.BeamFnStateClient
        public void handle(BeamFnApi.StateRequest.Builder builder, CompletableFuture<BeamFnApi.StateResponse> completableFuture) {
            throw new IllegalStateException(String.format("State API calls are unsupported because the ProcessBundleRequest %s does not support state.", this.request));
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$HandleStateCallsForBundle.class */
    private static abstract class HandleStateCallsForBundle implements AutoCloseable, BeamFnStateClient {
        private HandleStateCallsForBundle() {
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/control/ProcessBundleHandler$UnknownPTransformRunnerFactory.class */
    private static class UnknownPTransformRunnerFactory implements PTransformRunnerFactory<Object> {
        private final Set<String> knownUrns;

        private UnknownPTransformRunnerFactory(Set<String> set) {
            this.knownUrns = set;
        }

        @Override // org.apache.beam.fn.harness.PTransformRunnerFactory
        public Object createRunnerForPTransform(PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient, String str, RunnerApi.PTransform pTransform, Supplier<String> supplier, Map<String, RunnerApi.PCollection> map, Map<String, RunnerApi.Coder> map2, Map<String, RunnerApi.WindowingStrategy> map3, PCollectionConsumerRegistry pCollectionConsumerRegistry, PTransformFunctionRegistry pTransformFunctionRegistry, PTransformFunctionRegistry pTransformFunctionRegistry2, BundleSplitListener bundleSplitListener) {
            String format = String.format("No factory registered for %s, known factories %s", pTransform.getSpec().getUrn(), this.knownUrns);
            ProcessBundleHandler.LOG.error(format);
            throw new IllegalStateException(format);
        }
    }

    public ProcessBundleHandler(PipelineOptions pipelineOptions, Function<String, Message> function, BeamFnDataClient beamFnDataClient, BeamFnStateGrpcClientCache beamFnStateGrpcClientCache) {
        this(pipelineOptions, function, beamFnDataClient, beamFnStateGrpcClientCache, REGISTERED_RUNNER_FACTORIES);
    }

    @VisibleForTesting
    ProcessBundleHandler(PipelineOptions pipelineOptions, Function<String, Message> function, BeamFnDataClient beamFnDataClient, BeamFnStateGrpcClientCache beamFnStateGrpcClientCache, Map<String, PTransformRunnerFactory> map) {
        this.options = pipelineOptions;
        this.fnApiRegistry = function;
        this.beamFnDataClient = beamFnDataClient;
        this.beamFnStateGrpcClientCache = beamFnStateGrpcClientCache;
        this.urnToPTransformRunnerFactoryMap = map;
        this.defaultPTransformRunnerFactory = new UnknownPTransformRunnerFactory(map.keySet());
    }

    private void createRunnerAndConsumersForPTransformRecursively(BeamFnStateClient beamFnStateClient, BeamFnDataClient beamFnDataClient, String str, RunnerApi.PTransform pTransform, Supplier<String> supplier, BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, SetMultimap<String, String> setMultimap, PCollectionConsumerRegistry pCollectionConsumerRegistry, Set<String> set, PTransformFunctionRegistry pTransformFunctionRegistry, PTransformFunctionRegistry pTransformFunctionRegistry2, BundleSplitListener bundleSplitListener) throws IOException {
        Iterator<String> it = pTransform.getOutputsMap().values().iterator();
        while (it.hasNext()) {
            for (String str2 : setMultimap.get((SetMultimap<String, String>) it.next())) {
                createRunnerAndConsumersForPTransformRecursively(beamFnStateClient, beamFnDataClient, str2, processBundleDescriptor.getTransformsMap().get(str2), supplier, processBundleDescriptor, setMultimap, pCollectionConsumerRegistry, set, pTransformFunctionRegistry, pTransformFunctionRegistry2, bundleSplitListener);
            }
        }
        if (!pTransform.hasSpec()) {
            throw new IllegalArgumentException(String.format("Cannot process transform with no spec: %s", TextFormat.printToString(pTransform)));
        }
        if (pTransform.getSubtransformsCount() > 0) {
            throw new IllegalArgumentException(String.format("Cannot process composite transform: %s", TextFormat.printToString(pTransform)));
        }
        if (set.contains(str)) {
            return;
        }
        this.urnToPTransformRunnerFactoryMap.getOrDefault(pTransform.getSpec().getUrn(), this.defaultPTransformRunnerFactory).createRunnerForPTransform(this.options, beamFnDataClient, beamFnStateClient, str, pTransform, supplier, processBundleDescriptor.getPcollectionsMap(), processBundleDescriptor.getCodersMap(), processBundleDescriptor.getWindowingStrategiesMap(), pCollectionConsumerRegistry, pTransformFunctionRegistry, pTransformFunctionRegistry2, bundleSplitListener);
        set.add(str);
    }

    public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.InstructionRequest instructionRequest) throws Exception {
        QueueingBeamFnDataClient queueingBeamFnDataClient = new QueueingBeamFnDataClient(this.beamFnDataClient);
        BeamFnApi.ProcessBundleDescriptor processBundleDescriptor = (BeamFnApi.ProcessBundleDescriptor) this.fnApiRegistry.apply(instructionRequest.getProcessBundle().getProcessBundleDescriptorReference());
        SetMultimap<String, String> create = HashMultimap.create();
        MetricsContainerStepMap metricsContainerStepMap = new MetricsContainerStepMap();
        ExecutionStateTracker executionStateTracker = new ExecutionStateTracker(ExecutionStateSampler.instance());
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(metricsContainerStepMap, executionStateTracker);
        Set<String> hashSet = new HashSet<>();
        PTransformFunctionRegistry pTransformFunctionRegistry = new PTransformFunctionRegistry(metricsContainerStepMap, executionStateTracker, ExecutionStateTracker.START_STATE_NAME);
        PTransformFunctionRegistry pTransformFunctionRegistry2 = new PTransformFunctionRegistry(metricsContainerStepMap, executionStateTracker, ExecutionStateTracker.FINISH_STATE_NAME);
        for (Map.Entry<String, RunnerApi.PTransform> entry : processBundleDescriptor.getTransformsMap().entrySet()) {
            Iterator<String> it = entry.getValue().getInputsMap().values().iterator();
            while (it.hasNext()) {
                create.put(it.next(), entry.getKey());
            }
        }
        BeamFnApi.ProcessBundleResponse.Builder newBuilder = BeamFnApi.ProcessBundleResponse.newBuilder();
        HandleStateCallsForBundle blockTillStateCallsFinish = processBundleDescriptor.hasStateApiServiceDescriptor() ? new BlockTillStateCallsFinish(this.beamFnStateGrpcClientCache.forApiServiceDescriptor(processBundleDescriptor.getStateApiServiceDescriptor())) : new FailAllStateCallsForBundle(instructionRequest.getProcessBundle());
        try {
            ArrayListMultimap create2 = ArrayListMultimap.create();
            ArrayListMultimap create3 = ArrayListMultimap.create();
            BundleSplitListener bundleSplitListener = (list, list2) -> {
                ArrayListMultimap create4 = ArrayListMultimap.create();
                Iterator it2 = list.iterator();
                while (it2.hasNext()) {
                    BeamFnApi.BundleApplication bundleApplication = (BeamFnApi.BundleApplication) it2.next();
                    create4.put(bundleApplication.getPtransformId(), bundleApplication);
                }
                create2.clear();
                create2.putAll(create4);
                Iterator it3 = list2.iterator();
                while (it3.hasNext()) {
                    BeamFnApi.DelayedBundleApplication delayedBundleApplication = (BeamFnApi.DelayedBundleApplication) it3.next();
                    create3.put(delayedBundleApplication.getApplication().getPtransformId(), delayedBundleApplication);
                }
            };
            for (Map.Entry<String, RunnerApi.PTransform> entry2 : processBundleDescriptor.getTransformsMap().entrySet()) {
                if ("urn:org.apache.beam:source:runner:0.1".equals(entry2.getValue().getSpec().getUrn()) || JAVA_SOURCE_URN.equals(entry2.getValue().getSpec().getUrn()) || PTransformTranslation.READ_TRANSFORM_URN.equals(entry2.getValue().getSpec().getUrn())) {
                    String key = entry2.getKey();
                    RunnerApi.PTransform value = entry2.getValue();
                    Objects.requireNonNull(instructionRequest);
                    createRunnerAndConsumersForPTransformRecursively(blockTillStateCallsFinish, queueingBeamFnDataClient, key, value, instructionRequest::getInstructionId, processBundleDescriptor, create, pCollectionConsumerRegistry, hashSet, pTransformFunctionRegistry, pTransformFunctionRegistry2, bundleSplitListener);
                }
            }
            Closeable activate = executionStateTracker.activate();
            Throwable th = null;
            try {
                try {
                    for (ThrowingRunnable throwingRunnable : pTransformFunctionRegistry.getFunctions()) {
                        LOG.debug("Starting function {}", throwingRunnable);
                        throwingRunnable.run();
                    }
                    queueingBeamFnDataClient.drainAndBlock();
                    for (ThrowingRunnable throwingRunnable2 : Lists.reverse(pTransformFunctionRegistry2.getFunctions())) {
                        LOG.debug("Finishing function {}", throwingRunnable2);
                        throwingRunnable2.run();
                    }
                    if (!create3.isEmpty()) {
                        newBuilder.addAllResidualRoots(create3.values());
                    }
                    if (activate != null) {
                        $closeResource(null, activate);
                    }
                    Iterator<BeamFnApi.MonitoringInfo> it2 = pTransformFunctionRegistry.getExecutionTimeMonitoringInfos().iterator();
                    while (it2.hasNext()) {
                        newBuilder.addMonitoringInfos(it2.next());
                    }
                    Iterator<BeamFnApi.MonitoringInfo> it3 = pCollectionConsumerRegistry.getExecutionTimeMonitoringInfos().iterator();
                    while (it3.hasNext()) {
                        newBuilder.addMonitoringInfos(it3.next());
                    }
                    Iterator<BeamFnApi.MonitoringInfo> it4 = pTransformFunctionRegistry2.getExecutionTimeMonitoringInfos().iterator();
                    while (it4.hasNext()) {
                        newBuilder.addMonitoringInfos(it4.next());
                    }
                    Iterator<BeamFnApi.MonitoringInfo> it5 = metricsContainerStepMap.getMonitoringInfos().iterator();
                    while (it5.hasNext()) {
                        newBuilder.addMonitoringInfos(it5.next());
                    }
                    return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(newBuilder);
                } finally {
                }
            } catch (Throwable th2) {
                if (activate != null) {
                    $closeResource(th, activate);
                }
                throw th2;
            }
        } finally {
            if (blockTillStateCallsFinish != null) {
                $closeResource(null, blockTillStateCallsFinish);
            }
        }
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }

    static {
        TreeSet newTreeSet = Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
        newTreeSet.addAll(Lists.newArrayList(ServiceLoader.load(PTransformRunnerFactory.Registrar.class, ReflectHelpers.findClassLoader())));
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Iterator it = newTreeSet.iterator();
        while (it.hasNext()) {
            builder.putAll(((PTransformRunnerFactory.Registrar) it.next()).getPTransformRunnerFactories());
        }
        REGISTERED_RUNNER_FACTORIES = builder.build();
    }
}
