/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.data;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer;
import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ManagedChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnDataGrpcClient
implements BeamFnDataClient {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataGrpcClient.class);
    private static final String BEAM_FN_API_DATA_BUFFER_LIMIT = "beam_fn_api_data_buffer_limit=";
    private final ConcurrentMap<Endpoints.ApiServiceDescriptor, BeamFnDataGrpcMultiplexer> cache;
    private final Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory;
    private final OutboundObserverFactory outboundObserverFactory;
    private final PipelineOptions options;

    public BeamFnDataGrpcClient(PipelineOptions options, Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory, OutboundObserverFactory outboundObserverFactory) {
        this.options = options;
        this.channelFactory = channelFactory;
        this.outboundObserverFactory = outboundObserverFactory;
        this.cache = new ConcurrentHashMap<Endpoints.ApiServiceDescriptor, BeamFnDataGrpcMultiplexer>();
    }

    @Override
    public <T> InboundDataClient receive(Endpoints.ApiServiceDescriptor apiServiceDescriptor, LogicalEndpoint inputLocation, Coder<WindowedValue<T>> coder, FnDataReceiver<WindowedValue<T>> consumer) {
        LOG.debug("Registering consumer for instruction {} and transform {}", (Object)inputLocation.getInstructionId(), (Object)inputLocation.getPTransformId());
        BeamFnDataGrpcMultiplexer client = this.getClientFor(apiServiceDescriptor);
        BeamFnDataInboundObserver<T> inboundObserver = BeamFnDataInboundObserver.forConsumer(coder, consumer);
        client.registerConsumer(inputLocation, inboundObserver);
        return inboundObserver;
    }

    @Override
    public <T> CloseableFnDataReceiver<WindowedValue<T>> send(Endpoints.ApiServiceDescriptor apiServiceDescriptor, LogicalEndpoint outputLocation, Coder<WindowedValue<T>> coder) {
        BeamFnDataGrpcMultiplexer client = this.getClientFor(apiServiceDescriptor);
        LOG.debug("Creating output consumer for instruction {} and transform {}", (Object)outputLocation.getInstructionId(), (Object)outputLocation.getPTransformId());
        Optional<Integer> bufferLimit = BeamFnDataGrpcClient.getBufferLimit(this.options);
        if (bufferLimit.isPresent()) {
            return BeamFnDataBufferingOutboundObserver.forLocationWithBufferLimit(bufferLimit.get(), outputLocation, coder, client.getOutboundObserver());
        }
        return BeamFnDataBufferingOutboundObserver.forLocation(outputLocation, coder, client.getOutboundObserver());
    }

    private static Optional<Integer> getBufferLimit(PipelineOptions options) {
        List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
        for (String string : experiments == null ? Collections.emptyList() : experiments) {
            if (!string.startsWith(BEAM_FN_API_DATA_BUFFER_LIMIT)) continue;
            return Optional.of(Integer.parseInt(string.substring(BEAM_FN_API_DATA_BUFFER_LIMIT.length())));
        }
        return Optional.empty();
    }

    private BeamFnDataGrpcMultiplexer getClientFor(Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
        return this.cache.computeIfAbsent(apiServiceDescriptor, descriptor -> new BeamFnDataGrpcMultiplexer((Endpoints.ApiServiceDescriptor)descriptor, this.outboundObserverFactory, BeamFnDataGrpc.newStub(this.channelFactory.apply(apiServiceDescriptor))::data));
    }
}

