/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;

public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT> {
    private final PipelineOptions pipelineOptions;
    private final RunnerApi.FunctionSpec definition;
    private final Collection<FnDataReceiver<WindowedValue<OutputT>>> consumers;

    BoundedSourceRunner(PipelineOptions pipelineOptions, RunnerApi.FunctionSpec definition, Collection<FnDataReceiver<WindowedValue<OutputT>>> consumers) {
        this.pipelineOptions = pipelineOptions;
        this.definition = definition;
        this.consumers = consumers;
    }

    @Deprecated
    public void start() throws Exception {
        try {
            BoundedSource<?> boundedSource;
            if (this.definition.getUrn().equals("beam:source:java:0.1")) {
                BoundedSource<?> boundedSource0;
                byte[] bytes = this.definition.getPayload().toByteArray();
                boundedSource = boundedSource0 = (BoundedSource<?>)SerializableUtils.deserializeFromByteArray(bytes, this.definition.toString());
            } else if (this.definition.getUrn().equals(PTransformTranslation.READ_TRANSFORM_URN)) {
                RunnerApi.ReadPayload readPayload = RunnerApi.ReadPayload.parseFrom(this.definition.getPayload());
                boundedSource = ReadTranslation.boundedSourceFromProto(readPayload);
            } else {
                throw new IllegalArgumentException("Unknown source URN: " + this.definition.getUrn());
            }
            this.runReadLoop(WindowedValue.valueInGlobalWindow(boundedSource));
        }
        catch (InvalidProtocolBufferException e) {
            throw new IOException(String.format("Failed to decode %s", this.definition.getUrn()), e);
        }
    }

    public void runReadLoop(WindowedValue<InputT> value) throws Exception {
        try (BoundedSource.BoundedReader reader = ((BoundedSource)value.getValue()).createReader(this.pipelineOptions);){
            if (!reader.start()) {
                return;
            }
            do {
                WindowedValue nextValue = WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(), ((Source.Reader)reader).getCurrentTimestamp());
                for (FnDataReceiver<WindowedValue<OutputT>> consumer : this.consumers) {
                    consumer.accept(nextValue);
                }
            } while (reader.advance());
        }
    }

    public String toString() {
        return this.definition.toString();
    }

    static class Factory<InputT extends BoundedSource<OutputT>, OutputT>
    implements PTransformRunnerFactory<BoundedSourceRunner<InputT, OutputT>> {
        Factory() {
        }

        @Override
        public BoundedSourceRunner<InputT, OutputT> createRunnerForPTransform(PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient, String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId, Map<String, RunnerApi.PCollection> pCollections, Map<String, RunnerApi.Coder> coders, Map<String, RunnerApi.WindowingStrategy> windowingStrategies, PCollectionConsumerRegistry pCollectionConsumerRegistry, PTransformFunctionRegistry startFunctionRegistry, PTransformFunctionRegistry finishFunctionRegistry, BundleSplitListener splitListener) {
            ImmutableList.Builder consumers = ImmutableList.builder();
            for (String pCollectionId : pTransform.getOutputsMap().values()) {
                consumers.add(pCollectionConsumerRegistry.getMultiplexingConsumer(pCollectionId));
            }
            BoundedSourceRunner runner = new BoundedSourceRunner(pipelineOptions, pTransform.getSpec(), consumers.build());
            startFunctionRegistry.register(pTransformId, runner::start);
            FnDataReceiver runReadLoop = runner::runReadLoop;
            for (String pCollectionId : pTransform.getInputsMap().values()) {
                pCollectionConsumerRegistry.register(pCollectionId, pTransformId, runReadLoop);
            }
            return runner;
        }
    }

    @AutoService(value=PTransformRunnerFactory.Registrar.class)
    public static class Registrar
    implements PTransformRunnerFactory.Registrar {
        @Override
        public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
            return ImmutableMap.of("beam:source:java:0.1", new Factory(), PTransformTranslation.READ_TRANSFORM_URN, new Factory());
        }
    }
}

