package org.apache.beam.fn.harness;

import java.util.EnumMap;
import java.util.List;
import java.util.Objects;
import org.apache.beam.fn.harness.control.AddHarnessIdInterceptor;
import org.apache.beam.fn.harness.control.BeamFnControlClient;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.fn.harness.control.RegisterHandler;
import org.apache.beam.fn.harness.data.BeamFnDataGrpcClient;
import org.apache.beam.fn.harness.logging.BeamFnLoggingClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.fn.harness.stream.HarnessStreamObserverFactories;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/FnHarness.class */
public class FnHarness {
    private static final String HARNESS_ID = "HARNESS_ID";
    private static final String CONTROL_API_SERVICE_DESCRIPTOR = "CONTROL_API_SERVICE_DESCRIPTOR";
    private static final String LOGGING_API_SERVICE_DESCRIPTOR = "LOGGING_API_SERVICE_DESCRIPTOR";
    private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FnHarness.class);

    private static Endpoints.ApiServiceDescriptor getApiServiceDescriptor(String str) throws TextFormat.ParseException {
        Endpoints.ApiServiceDescriptor.Builder newBuilder = Endpoints.ApiServiceDescriptor.newBuilder();
        TextFormat.merge(System.getenv(str), newBuilder);
        return newBuilder.build();
    }

    public static void main(String[] strArr) throws Exception {
        System.out.format("SDK Fn Harness started%n", new Object[0]);
        System.out.format("Harness ID %s%n", System.getenv(HARNESS_ID));
        System.out.format("Logging location %s%n", System.getenv(LOGGING_API_SERVICE_DESCRIPTOR));
        System.out.format("Control location %s%n", System.getenv(CONTROL_API_SERVICE_DESCRIPTOR));
        System.out.format("Pipeline options %s%n", System.getenv(PIPELINE_OPTIONS));
        main(System.getenv(HARNESS_ID), PipelineOptionsTranslation.fromJson(System.getenv(PIPELINE_OPTIONS)), getApiServiceDescriptor(LOGGING_API_SERVICE_DESCRIPTOR), getApiServiceDescriptor(CONTROL_API_SERVICE_DESCRIPTOR));
    }

    public static void main(String str, PipelineOptions pipelineOptions, Endpoints.ApiServiceDescriptor apiServiceDescriptor, Endpoints.ApiServiceDescriptor apiServiceDescriptor2) throws Exception {
        List<String> experiments = ((ExperimentalOptions) pipelineOptions.as(ExperimentalOptions.class)).getExperiments();
        main(str, pipelineOptions, apiServiceDescriptor, apiServiceDescriptor2, ((experiments == null || !experiments.contains("beam_fn_api_epoll")) ? ManagedChannelFactory.createDefault() : ManagedChannelFactory.createEpoll()).withInterceptors(ImmutableList.of(AddHarnessIdInterceptor.create(str))), HarnessStreamObserverFactories.fromOptions(pipelineOptions));
    }

    public static void main(String str, PipelineOptions pipelineOptions, Endpoints.ApiServiceDescriptor apiServiceDescriptor, Endpoints.ApiServiceDescriptor apiServiceDescriptor2, ManagedChannelFactory managedChannelFactory, OutboundObserverFactory outboundObserverFactory) throws Exception {
        IdGenerator decrementingLongs = IdGenerators.decrementingLongs();
        try {
            Objects.requireNonNull(managedChannelFactory);
            BeamFnLoggingClient beamFnLoggingClient = new BeamFnLoggingClient(pipelineOptions, apiServiceDescriptor, managedChannelFactory::forDescriptor);
            Throwable th = null;
            try {
                LOG.info("Fn Harness started");
                FileSystems.setDefaultPipelineOptions(pipelineOptions);
                EnumMap enumMap = new EnumMap(BeamFnApi.InstructionRequest.RequestCase.class);
                RegisterHandler registerHandler = new RegisterHandler();
                Objects.requireNonNull(managedChannelFactory);
                BeamFnDataGrpcClient beamFnDataGrpcClient = new BeamFnDataGrpcClient(pipelineOptions, managedChannelFactory::forDescriptor, outboundObserverFactory);
                Objects.requireNonNull(managedChannelFactory);
                BeamFnStateGrpcClientCache beamFnStateGrpcClientCache = new BeamFnStateGrpcClientCache(decrementingLongs, managedChannelFactory::forDescriptor, outboundObserverFactory);
                Objects.requireNonNull(registerHandler);
                ProcessBundleHandler processBundleHandler = new ProcessBundleHandler(pipelineOptions, registerHandler::getById, beamFnDataGrpcClient, beamFnStateGrpcClientCache);
                BeamFnApi.InstructionRequest.RequestCase requestCase = BeamFnApi.InstructionRequest.RequestCase.REGISTER;
                Objects.requireNonNull(registerHandler);
                enumMap.put((EnumMap) requestCase, (BeamFnApi.InstructionRequest.RequestCase) registerHandler::register);
                BeamFnApi.InstructionRequest.RequestCase requestCase2 = BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE;
                Objects.requireNonNull(processBundleHandler);
                enumMap.put((EnumMap) requestCase2, (BeamFnApi.InstructionRequest.RequestCase) processBundleHandler::processBundle);
                BeamFnControlClient beamFnControlClient = new BeamFnControlClient(str, apiServiceDescriptor2, managedChannelFactory, outboundObserverFactory, enumMap);
                LOG.info("Entering instruction processing loop");
                beamFnControlClient.processInstructionRequests(((GcsOptions) pipelineOptions.as(GcsOptions.class)).getExecutorService());
                if (0 != 0) {
                    try {
                        beamFnLoggingClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    beamFnLoggingClient.close();
                }
                System.out.println("Shutting SDK harness down.");
            } finally {
            }
        } catch (Throwable th3) {
            System.out.println("Shutting SDK harness down.");
            throw th3;
        }
    }
}
