/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.core.construction.expansion;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.model.expansion.v1.ExpansionApi;
import org.apache.beam.model.expansion.v1.ExpansionServiceGrpc;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
import org.apache.beam.sdk.transforms.ExternalTransformBuilder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ServerBuilder;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.CaseFormat;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Converter;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExpansionService
extends ExpansionServiceGrpc.ExpansionServiceImplBase
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ExpansionService.class);
    private Map<String, TransformProvider> registeredTransforms = this.loadRegisteredTransforms();

    private Map<String, TransformProvider> loadRegisteredTransforms() {
        ImmutableMap.Builder<String, TransformProvider> registeredTransforms = ImmutableMap.builder();
        for (ExpansionServiceRegistrar registrar : ServiceLoader.load(ExpansionServiceRegistrar.class)) {
            registeredTransforms.putAll(registrar.knownTransforms());
        }
        return registeredTransforms.build();
    }

    @VisibleForTesting
    ExpansionApi.ExpansionResponse expand(ExpansionApi.ExpansionRequest request) {
        LOG.info("Expanding '{}' with URN '{}'", (Object)request.getTransform().getUniqueName(), (Object)request.getTransform().getSpec().getUrn());
        LOG.debug("Full transform: {}", (Object)request.getTransform());
        Set<String> existingTransformIds = request.getComponents().getTransformsMap().keySet();
        Pipeline pipeline = Pipeline.create();
        RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents(request.getComponents()).withPipeline(pipeline);
        Map<String, PCollection<?>> inputs = request.getTransform().getInputsMap().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, input -> {
            try {
                return rehydratedComponents.getPCollection((String)input.getValue());
            }
            catch (IOException exn) {
                throw new RuntimeException(exn);
            }
        }));
        if (!this.registeredTransforms.containsKey(request.getTransform().getSpec().getUrn())) {
            throw new UnsupportedOperationException("Unknown urn: " + request.getTransform().getSpec().getUrn());
        }
        this.registeredTransforms.get(request.getTransform().getSpec().getUrn()).apply(pipeline, request.getTransform().getUniqueName(), request.getTransform().getSpec(), inputs);
        SdkComponents sdkComponents = rehydratedComponents.getSdkComponents().withNewIdPrefix(request.getNamespace());
        sdkComponents.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT);
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents);
        String expandedTransformId = (String)Iterables.getOnlyElement(pipelineProto.getRootTransformIdsList().stream().filter(id -> !existingTransformIds.contains(id)).collect(Collectors.toList()));
        RunnerApi.Components components = pipelineProto.getComponents();
        RunnerApi.PTransform expandedTransform = components.getTransformsOrThrow(expandedTransformId).toBuilder().setUniqueName(expandedTransformId).build();
        LOG.debug("Expanded to {}", (Object)expandedTransform);
        return ExpansionApi.ExpansionResponse.newBuilder().setComponents(components.toBuilder().removeTransforms(expandedTransformId)).setTransform(expandedTransform).build();
    }

    @Override
    public void expand(ExpansionApi.ExpansionRequest request, StreamObserver<ExpansionApi.ExpansionResponse> responseObserver) {
        try {
            responseObserver.onNext(this.expand(request));
            responseObserver.onCompleted();
        }
        catch (RuntimeException exn) {
            responseObserver.onError(exn);
            throw exn;
        }
    }

    @Override
    public void close() throws Exception {
    }

    public static void main(String[] args) throws Exception {
        int port = Integer.parseInt(args[0]);
        System.out.println("Starting expansion service at localhost:" + port);
        Server server = ((ServerBuilder)ServerBuilder.forPort(port).addService(new ExpansionService())).build();
        server.start();
        server.awaitTermination();
    }

    public static interface TransformProvider<InputT extends PInput, OutputT extends PValue> {
        default public InputT createInput(Pipeline p, Map<String, PCollection<?>> inputs) {
            if (inputs.size() == 0) {
                return (InputT)p.begin();
            }
            if (inputs.size() == 1) {
                return (InputT)Iterables.getOnlyElement(inputs.values());
            }
            PCollectionTuple inputTuple = PCollectionTuple.empty(p);
            for (Map.Entry<String, PCollection<?>> entry : inputs.entrySet()) {
                inputTuple = inputTuple.and(new TupleTag(entry.getKey()), entry.getValue());
            }
            return (InputT)inputTuple;
        }

        public PTransform<InputT, OutputT> getTransform(RunnerApi.FunctionSpec var1);

        default public Map<String, PCollection<?>> extractOutputs(OutputT output) {
            if (output instanceof PDone) {
                return Collections.emptyMap();
            }
            if (output instanceof PCollection) {
                return ImmutableMap.of("output", (PCollection)output);
            }
            if (output instanceof PCollectionTuple) {
                return ((PCollectionTuple)output).getAll().entrySet().stream().collect(Collectors.toMap(entry -> ((TupleTag)entry.getKey()).toString(), Map.Entry::getValue));
            }
            if (output instanceof PCollectionList) {
                PCollectionList listOutput = (PCollectionList)output;
                return IntStream.range(0, listOutput.size()).boxed().collect(Collectors.toMap(index -> "output_" + index, listOutput::get));
            }
            throw new UnsupportedOperationException("Unknown output type: " + output.getClass());
        }

        default public Map<String, PCollection<?>> apply(Pipeline p, String name, RunnerApi.FunctionSpec spec, Map<String, PCollection<?>> inputs) {
            return this.extractOutputs((PValue)Pipeline.applyTransform(name, this.createInput(p, inputs), this.getTransform(spec)));
        }
    }

    @AutoService(value=ExpansionServiceRegistrar.class)
    public static class ExternalTransformRegistrarLoader<ConfigT>
    implements ExpansionServiceRegistrar {
        @Override
        public Map<String, TransformProvider> knownTransforms() {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            for (ExternalTransformRegistrar registrar : ServiceLoader.load(ExternalTransformRegistrar.class)) {
                for (Map.Entry<String, Class<ExternalTransformBuilder<?, ?, ?>>> entry : registrar.knownBuilders().entrySet()) {
                    String urn = entry.getKey();
                    Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass = entry.getValue();
                    builder.put(urn, spec -> {
                        try {
                            ExternalTransforms.ExternalConfigurationPayload payload = ExternalTransforms.ExternalConfigurationPayload.parseFrom(spec.getPayload());
                            return ExternalTransformRegistrarLoader.translate(payload, builderClass);
                        }
                        catch (Exception e) {
                            throw new RuntimeException(String.format("Failed to build transform %s from spec %s", urn, spec), e);
                        }
                    });
                }
            }
            return builder.build();
        }

        private static PTransform translate(ExternalTransforms.ExternalConfigurationPayload payload, Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass) throws Exception {
            Preconditions.checkState(ExternalTransformBuilder.class.isAssignableFrom(builderClass), "Provided identifier %s is not an ExternalTransformBuilder.", (Object)builderClass.getName());
            Object configObject = ExternalTransformRegistrarLoader.initConfiguration(builderClass);
            ExternalTransformRegistrarLoader.populateConfiguration(configObject, payload);
            return ExternalTransformRegistrarLoader.buildTransform(builderClass, configObject);
        }

        private static Object initConfiguration(Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass) throws Exception {
            for (Method method : builderClass.getMethods()) {
                if (!method.getName().equals("buildExternal")) continue;
                Preconditions.checkState(method.getParameterCount() == 1, "Build method for ExternalTransformBuilder %s must have exactly one parameter, but had %s parameters.", (Object)builderClass.getSimpleName(), method.getParameterCount());
                Class<?> configurationClass = method.getParameterTypes()[0];
                if (Object.class.equals(configurationClass)) continue;
                return configurationClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            }
            throw new RuntimeException("Couldn't find build method on ExternalTransformBuilder.");
        }

        private static void populateConfiguration(Object config, ExternalTransforms.ExternalConfigurationPayload payload) throws Exception {
            Converter<String, String> camelCaseConverter = CaseFormat.LOWER_UNDERSCORE.converterTo(CaseFormat.LOWER_CAMEL);
            for (Map.Entry<String, ExternalTransforms.ConfigValue> entry : payload.getConfigurationMap().entrySet()) {
                Method method;
                String fieldName = camelCaseConverter.convert(entry.getKey());
                String coderUrn = entry.getValue().getCoderUrn();
                if (!BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.VARINT).equals(coderUrn)) {
                    throw new RuntimeException("Unsupported coder urn " + coderUrn);
                }
                VarLongCoder coder = VarLongCoder.of();
                Class<Long> type = Long.class;
                String setterName = "set" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
                try {
                    method = config.getClass().getMethod(setterName, type);
                }
                catch (NoSuchMethodException e) {
                    throw new RuntimeException(String.format("The configuration class %s is missing a setter %s for %s", config.getClass(), setterName, fieldName), e);
                }
                method.invoke(config, ((Coder)coder).decode(entry.getValue().getPayload().newInput()));
            }
        }

        private static PTransform buildTransform(Class<? extends ExternalTransformBuilder<?, ?, ?>> builderClass, Object configObject) throws Exception {
            Constructor<ExternalTransformBuilder<?, ?, ?>> constructor = builderClass.getDeclaredConstructor(new Class[0]);
            constructor.setAccessible(true);
            ExternalTransformBuilder<?, ?, ?> externalTransformBuilder = constructor.newInstance(new Object[0]);
            Method buildMethod = builderClass.getMethod("buildExternal", configObject.getClass());
            buildMethod.setAccessible(true);
            return (PTransform)buildMethod.invoke(externalTransformBuilder, configObject);
        }
    }

    public static interface ExpansionServiceRegistrar {
        public Map<String, TransformProvider> knownTransforms();
    }
}

