package org.apache.flink.streaming.api.environment;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Public
/* loaded from: input_file:org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.class */
public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
    private final String host;
    private final int port;
    private final Configuration clientConfiguration;
    private final List<URL> jarFiles;
    private final List<URL> globalClasspaths;

    public RemoteStreamEnvironment(String str, int i, String... strArr) {
        this(str, i, null, strArr);
    }

    public RemoteStreamEnvironment(String str, int i, Configuration configuration, String... strArr) {
        this(str, i, configuration, strArr, null);
    }

    public RemoteStreamEnvironment(String str, int i, Configuration configuration, String[] strArr, URL[] urlArr) {
        if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
            throw new InvalidProgramException("The RemoteEnvironment cannot be used when submitting a program through a client, or running in a TestEnvironment context.");
        }
        if (str == null) {
            throw new NullPointerException("Host must not be null.");
        }
        if (i < 1 || i >= 65535) {
            throw new IllegalArgumentException("Port out of range");
        }
        this.host = str;
        this.port = i;
        this.clientConfiguration = configuration == null ? new Configuration() : configuration;
        this.jarFiles = new ArrayList(strArr.length);
        for (String str2 : strArr) {
            try {
                URL url = new File(str2).getAbsoluteFile().toURI().toURL();
                this.jarFiles.add(url);
                JobWithJars.checkJarFile(url);
            } catch (MalformedURLException e) {
                throw new IllegalArgumentException("JAR file path is invalid '" + str2 + "'", e);
            } catch (IOException e2) {
                throw new RuntimeException("Problem with jar file " + str2, e2);
            }
        }
        if (urlArr == null) {
            this.globalClasspaths = Collections.emptyList();
        } else {
            this.globalClasspaths = Arrays.asList(urlArr);
        }
    }

    @Override // org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    public JobExecutionResult execute(String str) throws ProgramInvocationException {
        StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(str);
        this.transformations.clear();
        return executeRemotely(streamGraph, this.jarFiles);
    }

    protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL> list) throws ProgramInvocationException {
        if (LOG.isInfoEnabled()) {
            LOG.info("Running remotely at {}:{}", this.host, Integer.valueOf(this.port));
        }
        ClassLoader buildUserCodeClassLoader = JobWithJars.buildUserCodeClassLoader(list, this.globalClasspaths, getClass().getClassLoader());
        Configuration configuration = new Configuration();
        configuration.addAll(this.clientConfiguration);
        configuration.setString("jobmanager.rpc.address", this.host);
        configuration.setInteger("jobmanager.rpc.port", this.port);
        try {
            StandaloneClusterClient standaloneClusterClient = new StandaloneClusterClient(configuration);
            standaloneClusterClient.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
            try {
                try {
                    JobExecutionResult jobExecutionResult = standaloneClusterClient.run(streamGraph, list, this.globalClasspaths, buildUserCodeClassLoader).getJobExecutionResult();
                    standaloneClusterClient.shutdown();
                    return jobExecutionResult;
                } catch (Throwable th) {
                    standaloneClusterClient.shutdown();
                    throw th;
                }
            } catch (ProgramInvocationException e) {
                throw e;
            } catch (Exception e2) {
                throw new ProgramInvocationException("The program execution failed" + (e2.getMessage() == null ? "." : ": " + e2.getMessage()), e2);
            }
        } catch (Exception e3) {
            throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e3.getMessage(), e3);
        }
    }

    public String toString() {
        return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " + (getParallelism() == -1 ? "default" : Integer.valueOf(getParallelism())) + ")";
    }

    public String getHost() {
        return this.host;
    }

    public int getPort() {
        return this.port;
    }

    public Configuration getClientConfiguration() {
        return this.clientConfiguration;
    }
}
