package org.apache.beam.runners.flink;

import java.io.IOException;
import java.util.concurrent.Executors;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.repackaged.beam_runners_flink_2.p00011.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.repackaged.beam_runners_flink_2.p00011.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.repackaged.beam_runners_flink_2.p00011.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkJobServerDriver.class */
public class FlinkJobServerDriver implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FlinkJobServerDriver.class);
    private final ListeningExecutorService executor;
    private final ServerConfiguration configuration;
    private final ServerFactory serverFactory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkJobServerDriver$ServerConfiguration.class */
    public static class ServerConfiguration {

        @Option(name = "--job-host", required = true, usage = "The job server host string")
        private String host;

        @Option(name = "--artifacts-dir", usage = "The location to store staged artifact files")
        private String artifactStagingPath;

        @Option(name = "--flink-master-url", usage = "Flink master url to submit job.")
        private String flinkMasterUrl;

        private ServerConfiguration() {
            this.host = "";
            this.artifactStagingPath = "/tmp/beam-artifact-staging";
            this.flinkMasterUrl = "[auto]";
        }
    }

    public static void main(String[] strArr) throws IOException {
        ServerConfiguration serverConfiguration = new ServerConfiguration();
        CmdLineParser cmdLineParser = new CmdLineParser(serverConfiguration);
        try {
            cmdLineParser.parseArgument(strArr);
            FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
            fromConfig(serverConfiguration).run();
        } catch (CmdLineException e) {
            LOG.error("Unable to parse command line arguments.", (Throwable) e);
            printUsage(cmdLineParser);
        }
    }

    private static void printUsage(CmdLineParser cmdLineParser) {
        System.err.println(String.format("Usage: java %s arguments...", FlinkJobServerDriver.class.getSimpleName()));
        cmdLineParser.printUsage(System.err);
        System.err.println();
    }

    public static FlinkJobServerDriver fromConfig(ServerConfiguration serverConfiguration) {
        return create(serverConfiguration, MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build())), ServerFactory.createDefault());
    }

    public static FlinkJobServerDriver create(ServerConfiguration serverConfiguration, ListeningExecutorService listeningExecutorService, ServerFactory serverFactory) {
        return new FlinkJobServerDriver(serverConfiguration, listeningExecutorService, serverFactory);
    }

    private FlinkJobServerDriver(ServerConfiguration serverConfiguration, ListeningExecutorService listeningExecutorService, ServerFactory serverFactory) {
        this.configuration = serverConfiguration;
        this.executor = listeningExecutorService;
        this.serverFactory = serverFactory;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            createJobServer().getServer().awaitTermination();
        } catch (InterruptedException e) {
            LOG.warn("Job server interrupted", (Throwable) e);
        } catch (Exception e2) {
            LOG.warn("Exception during job server creation", (Throwable) e2);
        }
    }

    private GrpcFnServer<InMemoryJobService> createJobServer() throws IOException {
        return GrpcFnServer.create(createJobService(), Endpoints.ApiServiceDescriptor.newBuilder().setUrl(this.configuration.host).build(), this.serverFactory);
    }

    private InMemoryJobService createJobService() throws IOException {
        GrpcFnServer<BeamFileSystemArtifactStagingService> createArtifactStagingService = createArtifactStagingService();
        return InMemoryJobService.create(createArtifactStagingService.getApiServiceDescriptor(), str -> {
            try {
                return BeamFileSystemArtifactStagingService.generateStagingSessionToken(str, this.configuration.artifactStagingPath);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, createJobInvoker());
    }

    private GrpcFnServer<BeamFileSystemArtifactStagingService> createArtifactStagingService() throws IOException {
        return GrpcFnServer.allocatePortAndCreateFor(new BeamFileSystemArtifactStagingService(), this.serverFactory);
    }

    private JobInvoker createJobInvoker() throws IOException {
        return FlinkJobInvoker.create(this.executor, this.configuration.flinkMasterUrl);
    }
}
