package org.apache.flink.runtime.entrypoint;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JMXServerOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.management.JMXService;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.ReporterSetup;
import org.apache.flink.runtime.metrics.groups.ProcessMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.security.ExitTrappingSecurityManager;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.security.contexts.SecurityContext;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcMetricQueryServiceRetriever;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/entrypoint/ClusterEntrypoint.class */
public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErrorHandler {
    protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
    protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;
    private final Configuration configuration;

    @GuardedBy("lock")
    private DispatcherResourceManagerComponent clusterComponent;

    @GuardedBy("lock")
    private MetricRegistryImpl metricRegistry;

    @GuardedBy("lock")
    private ProcessMetricGroup processMetricGroup;

    @GuardedBy("lock")
    private HighAvailabilityServices haServices;

    @GuardedBy("lock")
    private BlobServer blobServer;

    @GuardedBy("lock")
    private HeartbeatServices heartbeatServices;

    @GuardedBy("lock")
    private RpcService commonRpcService;

    @GuardedBy("lock")
    private ExecutorService ioExecutor;
    private ArchivedExecutionGraphStore archivedExecutionGraphStore;
    public static final ConfigOption<String> EXECUTION_MODE = ConfigOptions.key("internal.cluster.execution-mode").defaultValue(ExecutionMode.NORMAL.toString());
    protected static final Logger LOG = LoggerFactory.getLogger(ClusterEntrypoint.class);
    private static final Time INITIALIZATION_SHUTDOWN_TIMEOUT = Time.seconds(30);
    private final Object lock = new Object();
    private final AtomicBoolean isShutDown = new AtomicBoolean(false);
    private final CompletableFuture<ApplicationStatus> terminationFuture = new CompletableFuture<>();
    private final Thread shutDownHook = ShutdownHookUtil.addShutdownHook(this::cleanupDirectories, getClass().getSimpleName(), LOG);

    /* loaded from: input_file:org/apache/flink/runtime/entrypoint/ClusterEntrypoint$ExecutionMode.class */
    public enum ExecutionMode {
        NORMAL,
        DETACHED
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ClusterEntrypoint(Configuration configuration) {
        this.configuration = generateClusterConfiguration(configuration);
    }

    public CompletableFuture<ApplicationStatus> getTerminationFuture() {
        return this.terminationFuture;
    }

    public void startCluster() throws ClusterEntrypointException {
        LOG.info("Starting {}.", getClass().getSimpleName());
        try {
            ExitTrappingSecurityManager.replaceGracefulExitWithHaltIfConfigured(this.configuration);
            PluginManager createPluginManagerFromRootFolder = PluginUtils.createPluginManagerFromRootFolder(this.configuration);
            configureFileSystems(this.configuration, createPluginManagerFromRootFolder);
            installSecurityContext(this.configuration).runSecured(() -> {
                runCluster(this.configuration, createPluginManagerFromRootFolder);
                return null;
            });
        } catch (Throwable th) {
            Throwable stripException = ExceptionUtils.stripException(th, UndeclaredThrowableException.class);
            try {
                shutDownAsync(ApplicationStatus.FAILED, ExceptionUtils.stringifyException(stripException), false).get(INITIALIZATION_SHUTDOWN_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                stripException.addSuppressed(e);
            }
            throw new ClusterEntrypointException(String.format("Failed to initialize the cluster entrypoint %s.", getClass().getSimpleName()), stripException);
        }
    }

    private void configureFileSystems(Configuration configuration, PluginManager pluginManager) {
        LOG.info("Install default filesystem.");
        FileSystem.initialize(configuration, pluginManager);
    }

    private SecurityContext installSecurityContext(Configuration configuration) throws Exception {
        LOG.info("Install security context.");
        SecurityUtils.install(new SecurityConfiguration(configuration));
        return SecurityUtils.getInstalledContext();
    }

    private void runCluster(Configuration configuration, PluginManager pluginManager) throws Exception {
        synchronized (this.lock) {
            initializeServices(configuration, pluginManager);
            configuration.setString(JobManagerOptions.ADDRESS, this.commonRpcService.getAddress());
            configuration.setInteger(JobManagerOptions.PORT, this.commonRpcService.getPort());
            this.clusterComponent = createDispatcherResourceManagerComponentFactory(configuration).create(configuration, this.ioExecutor, this.commonRpcService, this.haServices, this.blobServer, this.heartbeatServices, this.metricRegistry, this.archivedExecutionGraphStore, new RpcMetricQueryServiceRetriever(this.metricRegistry.getMetricQueryServiceRpcService()), this);
            this.clusterComponent.getShutDownFuture().whenComplete((applicationStatus, th) -> {
                if (th != null) {
                    shutDownAsync(ApplicationStatus.UNKNOWN, ExceptionUtils.stringifyException(th), false);
                } else {
                    shutDownAsync(applicationStatus, null, true);
                }
            });
        }
    }

    protected void initializeServices(Configuration configuration, PluginManager pluginManager) throws Exception {
        LOG.info("Initializing cluster services.");
        synchronized (this.lock) {
            this.commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService(configuration, configuration.getString(JobManagerOptions.ADDRESS), getRPCPortRange(configuration), configuration.getString(JobManagerOptions.BIND_HOST), configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));
            JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));
            configuration.setString(JobManagerOptions.ADDRESS, this.commonRpcService.getAddress());
            configuration.setInteger(JobManagerOptions.PORT, this.commonRpcService.getPort());
            this.ioExecutor = Executors.newFixedThreadPool(ClusterEntrypointUtils.getPoolSize(configuration), new ExecutorThreadFactory("cluster-io"));
            this.haServices = createHaServices(configuration, this.ioExecutor);
            this.blobServer = new BlobServer(configuration, this.haServices.createBlobStore());
            this.blobServer.start();
            this.heartbeatServices = createHeartbeatServices(configuration);
            this.metricRegistry = createMetricRegistry(configuration, pluginManager);
            this.metricRegistry.startQueryService(MetricUtils.startRemoteMetricsRpcService(configuration, this.commonRpcService.getAddress()), null);
            this.processMetricGroup = MetricUtils.instantiateProcessMetricGroup(this.metricRegistry, RpcUtils.getHostname(this.commonRpcService), ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
            this.archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, this.commonRpcService.getScheduledExecutor());
        }
    }

    protected String getRPCPortRange(Configuration configuration) {
        return ZooKeeperUtils.isZooKeeperRecoveryMode(configuration) ? configuration.getString(HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE) : String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
    }

    protected HighAvailabilityServices createHaServices(Configuration configuration, Executor executor) throws Exception {
        return HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration, executor, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
    }

    protected HeartbeatServices createHeartbeatServices(Configuration configuration) {
        return HeartbeatServices.fromConfiguration(configuration);
    }

    protected MetricRegistryImpl createMetricRegistry(Configuration configuration, PluginManager pluginManager) {
        return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration), ReporterSetup.fromConfiguration(configuration, pluginManager));
    }

    public CompletableFuture<Void> closeAsync() {
        return shutDownAsync(ApplicationStatus.UNKNOWN, "Cluster entrypoint has been closed externally.", true).thenAccept(applicationStatus -> {
        });
    }

    protected CompletableFuture<Void> stopClusterServices(boolean z) {
        FutureUtils.ConjunctFuture<Void> completeAll;
        long j = this.configuration.getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT);
        synchronized (this.lock) {
            Throwable th = null;
            ArrayList arrayList = new ArrayList(3);
            if (this.blobServer != null) {
                try {
                    this.blobServer.close();
                } catch (Throwable th2) {
                    th = ExceptionUtils.firstOrSuppressed(th2, (Throwable) null);
                }
            }
            if (this.haServices != null) {
                try {
                    if (z) {
                        this.haServices.closeAndCleanupAllData();
                    } else {
                        this.haServices.close();
                    }
                } catch (Throwable th3) {
                    th = ExceptionUtils.firstOrSuppressed(th3, th);
                }
            }
            if (this.archivedExecutionGraphStore != null) {
                try {
                    this.archivedExecutionGraphStore.close();
                } catch (Throwable th4) {
                    th = ExceptionUtils.firstOrSuppressed(th4, th);
                }
            }
            if (this.processMetricGroup != null) {
                this.processMetricGroup.close();
            }
            if (this.metricRegistry != null) {
                arrayList.add(this.metricRegistry.shutdown());
            }
            if (this.ioExecutor != null) {
                arrayList.add(ExecutorUtils.nonBlockingShutdown(j, TimeUnit.MILLISECONDS, new ExecutorService[]{this.ioExecutor}));
            }
            if (this.commonRpcService != null) {
                arrayList.add(this.commonRpcService.stopService());
            }
            try {
                JMXService.stopInstance();
            } catch (Throwable th5) {
                th = ExceptionUtils.firstOrSuppressed(th5, th);
            }
            if (th != null) {
                arrayList.add(FutureUtils.completedExceptionally(th));
            }
            completeAll = FutureUtils.completeAll(arrayList);
        }
        return completeAll;
    }

    @Override // org.apache.flink.runtime.rpc.FatalErrorHandler
    public void onFatalError(Throwable th) {
        ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(th);
        LOG.error("Fatal error occurred in the cluster entrypoint.", th);
        System.exit(2);
    }

    private Configuration generateClusterConfiguration(Configuration configuration) {
        Configuration configuration2 = new Configuration((Configuration) Preconditions.checkNotNull(configuration));
        configuration2.setString(WebOptions.TMP_DIR, new File(configuration.getString(WebOptions.TMP_DIR), "flink-web-" + UUID.randomUUID()).getAbsolutePath());
        return configuration2;
    }

    private CompletableFuture<ApplicationStatus> shutDownAsync(ApplicationStatus applicationStatus, @Nullable String str, boolean z) {
        if (this.isShutDown.compareAndSet(false, true)) {
            LOG.info("Shutting {} down with application status {}. Diagnostics {}.", new Object[]{getClass().getSimpleName(), applicationStatus, str});
            FutureUtils.runAfterwards(FutureUtils.composeAfterwards(closeClusterComponent(applicationStatus, str), () -> {
                return stopClusterServices(z);
            }), this::cleanupDirectories).whenComplete((r5, th) -> {
                if (th != null) {
                    this.terminationFuture.completeExceptionally(th);
                } else {
                    this.terminationFuture.complete(applicationStatus);
                }
            });
        }
        return this.terminationFuture;
    }

    private CompletableFuture<Void> closeClusterComponent(ApplicationStatus applicationStatus, @Nullable String str) {
        synchronized (this.lock) {
            if (this.clusterComponent != null) {
                return this.clusterComponent.deregisterApplicationAndClose(applicationStatus, str);
            }
            return CompletableFuture.completedFuture(null);
        }
    }

    private void cleanupDirectories() throws IOException {
        ShutdownHookUtil.removeShutdownHook(this.shutDownHook, getClass().getSimpleName(), LOG);
        FileUtils.deleteDirectory(new File(this.configuration.getString(WebOptions.TMP_DIR)));
    }

    protected abstract DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) throws IOException;

    protected abstract ArchivedExecutionGraphStore createSerializableExecutionGraphStore(Configuration configuration, ScheduledExecutor scheduledExecutor) throws IOException;

    protected static EntrypointClusterConfiguration parseArguments(String[] strArr) throws FlinkParseException {
        return (EntrypointClusterConfiguration) new CommandLineParser(new EntrypointClusterConfigurationParserFactory()).parse(strArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Configuration loadConfiguration(EntrypointClusterConfiguration entrypointClusterConfiguration) {
        Configuration loadConfiguration = GlobalConfiguration.loadConfiguration(entrypointClusterConfiguration.getConfigDir(), ConfigurationUtils.createConfiguration(entrypointClusterConfiguration.getDynamicProperties()));
        int restPort = entrypointClusterConfiguration.getRestPort();
        if (restPort >= 0) {
            loadConfiguration.setInteger(RestOptions.PORT, restPort);
        }
        String hostname = entrypointClusterConfiguration.getHostname();
        if (hostname != null) {
            loadConfiguration.setString(JobManagerOptions.ADDRESS, hostname);
        }
        return loadConfiguration;
    }

    public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
        String simpleName = clusterEntrypoint.getClass().getSimpleName();
        try {
            clusterEntrypoint.startCluster();
        } catch (ClusterEntrypointException e) {
            LOG.error(String.format("Could not start cluster entrypoint %s.", simpleName), e);
            System.exit(1);
        }
        clusterEntrypoint.getTerminationFuture().whenComplete((applicationStatus, th) -> {
            int processExitCode = th != null ? 2 : applicationStatus.processExitCode();
            LOG.info("Terminating cluster entrypoint process {} with exit code {}.", new Object[]{simpleName, Integer.valueOf(processExitCode), th});
            System.exit(processExitCode);
        });
    }
}
