/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.control;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Phaser;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.data.QueueingBeamFnDataClient;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Message;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.HashMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.SetMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessBundleHandler {
    private static final String DATA_INPUT_URN = "urn:org.apache.beam:source:runner:0.1";
    public static final String JAVA_SOURCE_URN = "urn:org.apache.beam:source:java:0.1";
    private static final Logger LOG = LoggerFactory.getLogger(ProcessBundleHandler.class);
    private static final Map<String, PTransformRunnerFactory> REGISTERED_RUNNER_FACTORIES;
    private final PipelineOptions options;
    private final Function<String, Message> fnApiRegistry;
    private final BeamFnDataClient beamFnDataClient;
    private final BeamFnStateGrpcClientCache beamFnStateGrpcClientCache;
    private final Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap;
    private final PTransformRunnerFactory defaultPTransformRunnerFactory;

    public ProcessBundleHandler(PipelineOptions options, Function<String, Message> fnApiRegistry, BeamFnDataClient beamFnDataClient, BeamFnStateGrpcClientCache beamFnStateGrpcClientCache) {
        this(options, fnApiRegistry, beamFnDataClient, beamFnStateGrpcClientCache, REGISTERED_RUNNER_FACTORIES);
    }

    @VisibleForTesting
    ProcessBundleHandler(PipelineOptions options, Function<String, Message> fnApiRegistry, BeamFnDataClient beamFnDataClient, BeamFnStateGrpcClientCache beamFnStateGrpcClientCache, Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap) {
        this.options = options;
        this.fnApiRegistry = fnApiRegistry;
        this.beamFnDataClient = beamFnDataClient;
        this.beamFnStateGrpcClientCache = beamFnStateGrpcClientCache;
        this.urnToPTransformRunnerFactoryMap = urnToPTransformRunnerFactoryMap;
        this.defaultPTransformRunnerFactory = new UnknownPTransformRunnerFactory(urnToPTransformRunnerFactoryMap.keySet());
    }

    private void createRunnerAndConsumersForPTransformRecursively(BeamFnStateClient beamFnStateClient, BeamFnDataClient queueingClient, String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId, BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, SetMultimap<String, String> pCollectionIdsToConsumingPTransforms, PCollectionConsumerRegistry pCollectionConsumerRegistry, Set<String> processedPTransformIds, PTransformFunctionRegistry startFunctionRegistry, PTransformFunctionRegistry finishFunctionRegistry, BundleSplitListener splitListener) throws IOException {
        for (String pCollectionId : pTransform.getOutputsMap().values()) {
            for (String consumingPTransformId : pCollectionIdsToConsumingPTransforms.get(pCollectionId)) {
                this.createRunnerAndConsumersForPTransformRecursively(beamFnStateClient, queueingClient, consumingPTransformId, processBundleDescriptor.getTransformsMap().get(consumingPTransformId), processBundleInstructionId, processBundleDescriptor, pCollectionIdsToConsumingPTransforms, pCollectionConsumerRegistry, processedPTransformIds, startFunctionRegistry, finishFunctionRegistry, splitListener);
            }
        }
        if (!pTransform.hasSpec()) {
            throw new IllegalArgumentException(String.format("Cannot process transform with no spec: %s", TextFormat.printToString(pTransform)));
        }
        if (pTransform.getSubtransformsCount() > 0) {
            throw new IllegalArgumentException(String.format("Cannot process composite transform: %s", TextFormat.printToString(pTransform)));
        }
        if (!processedPTransformIds.contains(pTransformId)) {
            this.urnToPTransformRunnerFactoryMap.getOrDefault(pTransform.getSpec().getUrn(), this.defaultPTransformRunnerFactory).createRunnerForPTransform(this.options, queueingClient, beamFnStateClient, pTransformId, pTransform, processBundleInstructionId, processBundleDescriptor.getPcollectionsMap(), processBundleDescriptor.getCodersMap(), processBundleDescriptor.getWindowingStrategiesMap(), pCollectionConsumerRegistry, startFunctionRegistry, finishFunctionRegistry, splitListener);
            processedPTransformIds.add(pTransformId);
        }
    }

    public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.InstructionRequest request) throws Exception {
        QueueingBeamFnDataClient queueingClient = new QueueingBeamFnDataClient(this.beamFnDataClient);
        String bundleId = request.getProcessBundle().getProcessBundleDescriptorReference();
        BeamFnApi.ProcessBundleDescriptor bundleDescriptor = (BeamFnApi.ProcessBundleDescriptor)this.fnApiRegistry.apply(bundleId);
        HashMultimap<String, String> pCollectionIdsToConsumingPTransforms = HashMultimap.create();
        MetricsContainerStepMap metricsContainerRegistry = new MetricsContainerStepMap();
        ExecutionStateTracker stateTracker = new ExecutionStateTracker(ExecutionStateSampler.instance());
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(metricsContainerRegistry, stateTracker);
        HashSet<String> processedPTransformIds = new HashSet<String>();
        PTransformFunctionRegistry startFunctionRegistry = new PTransformFunctionRegistry(metricsContainerRegistry, stateTracker, "start");
        PTransformFunctionRegistry finishFunctionRegistry = new PTransformFunctionRegistry(metricsContainerRegistry, stateTracker, "finish");
        for (Map.Entry<String, RunnerApi.PTransform> entry : bundleDescriptor.getTransformsMap().entrySet()) {
            for (String pCollectionId : entry.getValue().getInputsMap().values()) {
                pCollectionIdsToConsumingPTransforms.put(pCollectionId, entry.getKey());
            }
        }
        BeamFnApi.ProcessBundleResponse.Builder response = BeamFnApi.ProcessBundleResponse.newBuilder();
        HandleStateCallsForBundle beamFnStateClient = bundleDescriptor.hasStateApiServiceDescriptor() ? new BlockTillStateCallsFinish(this.beamFnStateGrpcClientCache.forApiServiceDescriptor(bundleDescriptor.getStateApiServiceDescriptor())) : new FailAllStateCallsForBundle(request.getProcessBundle());
        Object object = null;
        try {
            Map.Entry<String, RunnerApi.PTransform> entry2;
            ArrayListMultimap allPrimaries = ArrayListMultimap.create();
            ArrayListMultimap allResiduals = ArrayListMultimap.create();
            BundleSplitListener splitListener = (primaries, residuals) -> {
                ArrayListMultimap<String, BeamFnApi.BundleApplication> newPrimaries = ArrayListMultimap.create();
                for (BeamFnApi.BundleApplication primary : primaries) {
                    newPrimaries.put(primary.getPtransformId(), primary);
                }
                allPrimaries.clear();
                allPrimaries.putAll(newPrimaries);
                for (BeamFnApi.DelayedBundleApplication residual : residuals) {
                    allResiduals.put(residual.getApplication().getPtransformId(), residual);
                }
            };
            for (Map.Entry<String, RunnerApi.PTransform> entry2 : bundleDescriptor.getTransformsMap().entrySet()) {
                if (!DATA_INPUT_URN.equals(entry2.getValue().getSpec().getUrn()) && !JAVA_SOURCE_URN.equals(((RunnerApi.PTransform)entry2.getValue()).getSpec().getUrn()) && !PTransformTranslation.READ_TRANSFORM_URN.equals(((RunnerApi.PTransform)entry2.getValue()).getSpec().getUrn())) continue;
                this.createRunnerAndConsumersForPTransformRecursively(beamFnStateClient, queueingClient, (String)entry2.getKey(), (RunnerApi.PTransform)entry2.getValue(), request::getInstructionId, bundleDescriptor, pCollectionIdsToConsumingPTransforms, pCollectionConsumerRegistry, processedPTransformIds, startFunctionRegistry, finishFunctionRegistry, splitListener);
            }
            Closeable closeTracker = stateTracker.activate();
            entry2 = null;
            try {
                for (ThrowingRunnable startFunction : startFunctionRegistry.getFunctions()) {
                    LOG.debug("Starting function {}", (Object)startFunction);
                    startFunction.run();
                }
                queueingClient.drainAndBlock();
                for (ThrowingRunnable finishFunction : Lists.reverse(finishFunctionRegistry.getFunctions())) {
                    LOG.debug("Finishing function {}", (Object)finishFunction);
                    finishFunction.run();
                }
                if (!allResiduals.isEmpty()) {
                    response.addAllResidualRoots(allResiduals.values());
                }
            }
            catch (Throwable throwable) {
                entry2 = throwable;
                throw throwable;
            }
            finally {
                if (closeTracker != null) {
                    ProcessBundleHandler.$closeResource((Throwable)((Object)entry2), closeTracker);
                }
            }
            for (MetricsApi.MonitoringInfo mi : startFunctionRegistry.getExecutionTimeMonitoringInfos()) {
                response.addMonitoringInfos(mi);
            }
            for (MetricsApi.MonitoringInfo mi : pCollectionConsumerRegistry.getExecutionTimeMonitoringInfos()) {
                response.addMonitoringInfos(mi);
            }
            for (MetricsApi.MonitoringInfo mi : finishFunctionRegistry.getExecutionTimeMonitoringInfos()) {
                response.addMonitoringInfos(mi);
            }
            for (MetricsApi.MonitoringInfo mi : metricsContainerRegistry.getMonitoringInfos()) {
                response.addMonitoringInfos(mi);
            }
        }
        catch (Throwable throwable) {
            object = throwable;
            throw throwable;
        }
        finally {
            if (beamFnStateClient != null) {
                ProcessBundleHandler.$closeResource((Throwable)object, beamFnStateClient);
            }
        }
        return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response);
    }

    private static /* synthetic */ /* end resource */ void $closeResource(Throwable x0, AutoCloseable x1) {
        if (x0 != null) {
            try {
                x1.close();
            }
            catch (Throwable throwable) {
                x0.addSuppressed(throwable);
            }
        } else {
            x1.close();
        }
    }

    static {
        TreeSet<Object> pipelineRunnerRegistrars = Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
        pipelineRunnerRegistrars.addAll(Lists.newArrayList(ServiceLoader.load(PTransformRunnerFactory.Registrar.class, ReflectHelpers.findClassLoader())));
        ImmutableMap.Builder<String, PTransformRunnerFactory> builder = ImmutableMap.builder();
        for (PTransformRunnerFactory.Registrar registrar : pipelineRunnerRegistrars) {
            builder.putAll(registrar.getPTransformRunnerFactories());
        }
        REGISTERED_RUNNER_FACTORIES = builder.build();
    }

    private static class UnknownPTransformRunnerFactory
    implements PTransformRunnerFactory<Object> {
        private final Set<String> knownUrns;

        private UnknownPTransformRunnerFactory(Set<String> knownUrns) {
            this.knownUrns = knownUrns;
        }

        @Override
        public Object createRunnerForPTransform(PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient, String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId, Map<String, RunnerApi.PCollection> pCollections, Map<String, RunnerApi.Coder> coders, Map<String, RunnerApi.WindowingStrategy> windowingStrategies, PCollectionConsumerRegistry pCollectionConsumerRegistry, PTransformFunctionRegistry startFunctionRegistry, PTransformFunctionRegistry finishFunctionRegistry, BundleSplitListener splitListener) {
            String message = String.format("No factory registered for %s, known factories %s", pTransform.getSpec().getUrn(), this.knownUrns);
            LOG.error(message);
            throw new IllegalStateException(message);
        }
    }

    private static abstract class HandleStateCallsForBundle
    implements AutoCloseable,
    BeamFnStateClient {
        private HandleStateCallsForBundle() {
        }
    }

    private static class FailAllStateCallsForBundle
    extends HandleStateCallsForBundle {
        private final BeamFnApi.ProcessBundleRequest request;

        private FailAllStateCallsForBundle(BeamFnApi.ProcessBundleRequest request) {
            this.request = request;
        }

        @Override
        public void close() throws Exception {
        }

        @Override
        public void handle(BeamFnApi.StateRequest.Builder requestBuilder, CompletableFuture<BeamFnApi.StateResponse> response) {
            throw new IllegalStateException(String.format("State API calls are unsupported because the ProcessBundleRequest %s does not support state.", this.request));
        }
    }

    private static class BlockTillStateCallsFinish
    extends HandleStateCallsForBundle {
        private final BeamFnStateClient beamFnStateClient;
        private final Phaser phaser;
        private int currentPhase;

        private BlockTillStateCallsFinish(BeamFnStateClient beamFnStateClient) {
            this.beamFnStateClient = beamFnStateClient;
            this.phaser = new Phaser(1);
            this.currentPhase = this.phaser.getPhase();
        }

        @Override
        public void close() throws Exception {
            int unarrivedParties = this.phaser.getUnarrivedParties();
            if (unarrivedParties > 0) {
                LOG.debug("Waiting for {} parties to arrive before closing, current phase {}.", (Object)unarrivedParties, (Object)this.currentPhase);
            }
            this.currentPhase = this.phaser.arriveAndAwaitAdvance();
        }

        @Override
        public void handle(BeamFnApi.StateRequest.Builder requestBuilder, CompletableFuture<BeamFnApi.StateResponse> response) {
            this.phaser.register();
            response.whenComplete((stateResponse, throwable) -> this.phaser.arriveAndDeregister());
            this.beamFnStateClient.handle(requestBuilder, response);
        }
    }
}

