/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import java.util.EnumMap;
import java.util.List;
import org.apache.beam.fn.harness.control.AddHarnessIdInterceptor;
import org.apache.beam.fn.harness.control.BeamFnControlClient;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.control.RegisterHandler;
import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.fn.harness.stream.HarnessStreamObserverFactories;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Message;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FnHarness {
    private static final String HARNESS_ID = "HARNESS_ID";
    private static final String CONTROL_API_SERVICE_DESCRIPTOR = "CONTROL_API_SERVICE_DESCRIPTOR";
    private static final String LOGGING_API_SERVICE_DESCRIPTOR = "LOGGING_API_SERVICE_DESCRIPTOR";
    private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
    private static final Logger LOG = LoggerFactory.getLogger(FnHarness.class);

    private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String env) throws TextFormat.ParseException {
        Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder = Endpoints.ApiServiceDescriptor.newBuilder();
        TextFormat.merge(System.getenv(env), (Message.Builder)apiServiceDescriptorBuilder);
        return apiServiceDescriptorBuilder.build();
    }

    public static void main(String[] args) throws Exception {
        System.out.format("SDK Fn Harness started%n", new Object[0]);
        System.out.format("Harness ID %s%n", System.getenv(HARNESS_ID));
        System.out.format("Logging location %s%n", System.getenv(LOGGING_API_SERVICE_DESCRIPTOR));
        System.out.format("Control location %s%n", System.getenv(CONTROL_API_SERVICE_DESCRIPTOR));
        System.out.format("Pipeline options %s%n", System.getenv(PIPELINE_OPTIONS));
        String id = System.getenv(HARNESS_ID);
        PipelineOptions options = PipelineOptionsTranslation.fromJson(System.getenv(PIPELINE_OPTIONS));
        Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor = FnHarness.getApiServiceDescriptor(LOGGING_API_SERVICE_DESCRIPTOR);
        Endpoints.ApiServiceDescriptor controlApiServiceDescriptor = FnHarness.getApiServiceDescriptor(CONTROL_API_SERVICE_DESCRIPTOR);
        FnHarness.main(id, options, loggingApiServiceDescriptor, controlApiServiceDescriptor);
    }

    public static void main(String id, PipelineOptions options, Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor, Endpoints.ApiServiceDescriptor controlApiServiceDescriptor) throws Exception {
        List<String> experiments = options.as(ExperimentalOptions.class).getExperiments();
        ManagedChannelFactory channelFactory = experiments != null && experiments.contains("beam_fn_api_epoll") ? ManagedChannelFactory.createEpoll() : ManagedChannelFactory.createDefault();
        OutboundObserverFactory outboundObserverFactory = HarnessStreamObserverFactories.fromOptions(options);
        channelFactory = channelFactory.withInterceptors(ImmutableList.of(AddHarnessIdInterceptor.create(id)));
        FnHarness.main(id, options, loggingApiServiceDescriptor, controlApiServiceDescriptor, channelFactory, outboundObserverFactory);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String id, PipelineOptions options, Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor, Endpoints.ApiServiceDescriptor controlApiServiceDescriptor, ManagedChannelFactory channelFactory, OutboundObserverFactory outboundObserverFactory) throws Exception {
        IdGenerator idGenerator = IdGenerators.decrementingLongs();
        try (BeamFnLoggingClient logging = new BeamFnLoggingClient(options, loggingApiServiceDescriptor, channelFactory::forDescriptor);){
            LOG.info("Fn Harness started");
            FileSystems.setDefaultPipelineOptions(options);
            EnumMap<BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>> handlers = new EnumMap<BeamFnApi.InstructionRequest.RequestCase, ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>>(BeamFnApi.InstructionRequest.RequestCase.class);
            RegisterHandler fnApiRegistry = new RegisterHandler();
            BeamFnDataGrpcClient beamFnDataMultiplexer = new BeamFnDataGrpcClient(options, channelFactory::forDescriptor, outboundObserverFactory);
            BeamFnStateGrpcClientCache beamFnStateGrpcClientCache = new BeamFnStateGrpcClientCache(idGenerator, channelFactory::forDescriptor, outboundObserverFactory);
            ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(options, fnApiRegistry::getById, beamFnDataMultiplexer, beamFnStateGrpcClientCache);
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER, fnApiRegistry::register);
            handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE, processBundleHandler::processBundle);
            BeamFnControlClient control = new BeamFnControlClient(id, controlApiServiceDescriptor, channelFactory, outboundObserverFactory, handlers);
            LOG.info("Entering instruction processing loop");
            control.processInstructionRequests(options.as(GcsOptions.class).getExecutorService());
        }
        finally {
            System.out.println("Shutting SDK harness down.");
        }
    }
}

