package org.apache.flink.runtime.highavailability;

import java.util.concurrent.Executor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.ConfigurationException;

/* loaded from: input_file:org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.class */
public class HighAvailabilityServicesUtils {

    /* loaded from: input_file:org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils$AddressResolution.class */
    public enum AddressResolution {
        TRY_ADDRESS_RESOLUTION,
        NO_ADDRESS_RESOLUTION
    }

    public static HighAvailabilityServices createAvailableOrEmbeddedServices(Configuration configuration, Executor executor) throws Exception {
        HighAvailabilityMode recoveryMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
        switch (recoveryMode) {
            case NONE:
                return new EmbeddedHaServices(executor);
            case ZOOKEEPER:
                return new ZooKeeperHaServices(ZooKeeperUtils.startCuratorFramework(configuration), executor, configuration, BlobUtils.createBlobStoreFromConfig(configuration));
            default:
                throw new Exception("High availability mode " + recoveryMode + " is not supported.");
        }
    }

    public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor, AddressResolution addressResolution) throws Exception {
        HighAvailabilityMode recoveryMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
        switch (recoveryMode) {
            case NONE:
                Tuple2<String, Integer> jobManagerAddress = getJobManagerAddress(configuration);
                return new StandaloneHaServices(AkkaRpcServiceUtils.getRpcUrl((String) jobManagerAddress.f0, ((Integer) jobManagerAddress.f1).intValue(), "resourcemanager", addressResolution, configuration), AkkaRpcServiceUtils.getRpcUrl((String) jobManagerAddress.f0, ((Integer) jobManagerAddress.f1).intValue(), "jobmanager", addressResolution, configuration));
            case ZOOKEEPER:
                return new ZooKeeperHaServices(ZooKeeperUtils.startCuratorFramework(configuration), executor, configuration, BlobUtils.createBlobStoreFromConfig(configuration));
            default:
                throw new Exception("Recovery mode " + recoveryMode + " is not supported.");
        }
    }

    public static Tuple2<String, Integer> getJobManagerAddress(Configuration configuration) throws ConfigurationException {
        String string = configuration.getString(JobManagerOptions.ADDRESS);
        int integer = configuration.getInteger(JobManagerOptions.PORT);
        if (string == null) {
            throw new ConfigurationException("Config parameter '" + JobManagerOptions.ADDRESS + "' is missing (hostname/address of JobManager to connect to).");
        }
        if (integer <= 0 || integer >= 65536) {
            throw new ConfigurationException("Invalid value for '" + JobManagerOptions.PORT + "' (port of the JobManager actor system) : " + integer + ".  it must be greater than 0 and less than 65536.");
        }
        return Tuple2.of(string, Integer.valueOf(integer));
    }
}
