/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.distributed;

import java.security.InvalidParameterException;
import java.security.NoSuchAlgorithmException;
import java.security.Provider;
import java.security.Security;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.crypto.KeyGenerator;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.TopicCreationConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
import org.apache.kafka.connect.runtime.distributed.Crypto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedConfig
extends WorkerConfig {
    private static final Logger log = LoggerFactory.getLogger(DistributedConfig.class);
    public static final String GROUP_ID_CONFIG = "group.id";
    private static final String GROUP_ID_DOC = "A unique string that identifies the Connect cluster group this worker belongs to.";
    public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect worker failures. The worker sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove the worker from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by <code>group.min.session.timeout.ms</code> and <code>group.max.session.timeout.ms</code>.";
    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
    private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the group coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the worker's session stays active and to facilitate rebalancing when new members join or leave the group. The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
    public static final String REBALANCE_TIMEOUT_MS_CONFIG = "rebalance.timeout.ms";
    private static final String REBALANCE_TIMEOUT_MS_DOC = "The maximum allowed time for each worker to join the group once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed from the group, which will cause offset commit failures.";
    public static final String WORKER_SYNC_TIMEOUT_MS_CONFIG = "worker.sync.timeout.ms";
    private static final String WORKER_SYNC_TIMEOUT_MS_DOC = "When the worker is out of sync with other workers and needs to resynchronize configurations, wait up to this amount of time before giving up, leaving the group, and waiting a backoff period before rejoining.";
    public static final String WORKER_UNSYNC_BACKOFF_MS_CONFIG = "worker.unsync.backoff.ms";
    private static final String WORKER_UNSYNC_BACKOFF_MS_DOC = "When the worker is out of sync with other workers and  fails to catch up within worker.sync.timeout.ms, leave the Connect cluster for this long before rejoining.";
    public static final int WORKER_UNSYNC_BACKOFF_MS_DEFAULT = 300000;
    public static final String CONFIG_STORAGE_PREFIX = "config.storage.";
    public static final String OFFSET_STORAGE_PREFIX = "offset.storage.";
    public static final String STATUS_STORAGE_PREFIX = "status.storage.";
    public static final String TOPIC_SUFFIX = "topic";
    public static final String PARTITIONS_SUFFIX = "partitions";
    public static final String REPLICATION_FACTOR_SUFFIX = "replication.factor";
    public static final String OFFSET_STORAGE_TOPIC_CONFIG = "offset.storage.topic";
    private static final String OFFSET_STORAGE_TOPIC_CONFIG_DOC = "The name of the Kafka topic where source connector offsets are stored";
    public static final String OFFSET_STORAGE_PARTITIONS_CONFIG = "offset.storage.partitions";
    private static final String OFFSET_STORAGE_PARTITIONS_CONFIG_DOC = "The number of partitions used when creating the offset storage topic";
    public static final String OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG = "offset.storage.replication.factor";
    private static final String OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used when creating the offset storage topic";
    public static final String CONFIG_TOPIC_CONFIG = "config.storage.topic";
    private static final String CONFIG_TOPIC_CONFIG_DOC = "The name of the Kafka topic where connector configurations are stored";
    public static final String CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG = "config.storage.replication.factor";
    private static final String CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used when creating the configuration storage topic";
    public static final String STATUS_STORAGE_TOPIC_CONFIG = "status.storage.topic";
    public static final String STATUS_STORAGE_TOPIC_CONFIG_DOC = "The name of the Kafka topic where connector and task status are stored";
    public static final String STATUS_STORAGE_PARTITIONS_CONFIG = "status.storage.partitions";
    private static final String STATUS_STORAGE_PARTITIONS_CONFIG_DOC = "The number of partitions used when creating the status storage topic";
    public static final String STATUS_STORAGE_REPLICATION_FACTOR_CONFIG = "status.storage.replication.factor";
    private static final String STATUS_STORAGE_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used when creating the status storage topic";
    public static final String CONNECT_PROTOCOL_CONFIG = "connect.protocol";
    public static final String CONNECT_PROTOCOL_DOC = "Compatibility mode for Kafka Connect Protocol";
    public static final String CONNECT_PROTOCOL_DEFAULT = ConnectProtocolCompatibility.SESSIONED.toString();
    public static final String SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG = "scheduled.rebalance.max.delay.ms";
    public static final String SCHEDULED_REBALANCE_MAX_DELAY_MS_DOC = "The maximum delay that is scheduled in order to wait for the return of one or more departed workers before rebalancing and reassigning their connectors and tasks to the group. During this period the connectors and tasks of the departed workers remain unassigned";
    public static final int SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT = Math.toIntExact(TimeUnit.SECONDS.toMillis(300L));
    public static final String INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG = "inter.worker.key.generation.algorithm";
    public static final String INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT = "HmacSHA256";
    public static final String INTER_WORKER_KEY_GENERATION_ALGORITHM_DOC = "The algorithm to use for generating internal request keys. The algorithm 'HmacSHA256' will be used as a default on JVMs that support it; on other JVMs, no default is used and a value for this property must be manually specified in the worker config.";
    public static final String INTER_WORKER_KEY_SIZE_CONFIG = "inter.worker.key.size";
    public static final String INTER_WORKER_KEY_SIZE_DOC = "The size of the key to use for signing internal requests, in bits. If null, the default key size for the key generation algorithm will be used.";
    public static final Long INTER_WORKER_KEY_SIZE_DEFAULT = null;
    public static final String INTER_WORKER_KEY_TTL_MS_CONFIG = "inter.worker.key.ttl.ms";
    public static final String INTER_WORKER_KEY_TTL_MS_MS_DOC = "The TTL of generated session keys used for internal request validation (in milliseconds)";
    public static final int INTER_WORKER_KEY_TTL_MS_MS_DEFAULT = Math.toIntExact(TimeUnit.HOURS.toMillis(1L));
    public static final String INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG = "inter.worker.signature.algorithm";
    public static final String INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT = "HmacSHA256";
    public static final String INTER_WORKER_SIGNATURE_ALGORITHM_DOC = "The algorithm used to sign internal requestsThe algorithm 'inter.worker.signature.algorithm' will be used as a default on JVMs that support it; on other JVMs, no default is used and a value for this property must be manually specified in the worker config.";
    public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG = "inter.worker.verification.algorithms";
    public static final List<String> INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = Collections.singletonList("HmacSHA256");
    public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A list of permitted algorithms for verifying internal requests, which must include the algorithm used for the inter.worker.signature.algorithm property. The algorithm(s) '" + INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT + "' will be used as a default on JVMs that provide them; on other JVMs, no default is used and a value for this property must be manually specified in the worker config.";
    private Crypto crypto;
    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = "exactly.once.source.support";
    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to enable exactly-once support for source connectors in the cluster by using transactions to write source records and their source offsets, and by proactively fencing out old task generations before bringing up new ones.\nTo enable exactly-once source support on a new cluster, set this property to '" + (Object)((Object)ExactlyOnceSourceSupport.ENABLED) + "'. To enable support on an existing cluster, first set to '" + (Object)((Object)ExactlyOnceSourceSupport.PREPARING) + "' on every worker in the cluster, then set to '" + (Object)((Object)ExactlyOnceSourceSupport.ENABLED) + "'. A rolling upgrade may be used for both changes. For more information on this feature, see the <a href=\"https://kafka.apache.org/documentation.html#connect_exactlyoncesource\">exactly-once source support documentation</a>.";
    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT = ExactlyOnceSourceSupport.DISABLED.toString();
    private final ExactlyOnceSourceSupport exactlyOnceSourceSupport;

    private static Object defaultKeyGenerationAlgorithm(Crypto crypto) {
        try {
            DistributedConfig.validateKeyAlgorithm(crypto, INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG, "HmacSHA256");
            return "HmacSHA256";
        }
        catch (Throwable t) {
            log.info("The default key generation algorithm '{}' does not appear to be available on this worker.A key algorithm will have to be manually specified via the '{}' worker property", (Object)"HmacSHA256", (Object)INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG);
            return ConfigDef.NO_DEFAULT_VALUE;
        }
    }

    private static Object defaultSignatureAlgorithm(Crypto crypto) {
        try {
            DistributedConfig.validateSignatureAlgorithm(crypto, INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG, "HmacSHA256");
            return "HmacSHA256";
        }
        catch (Throwable t) {
            log.info("The default signature algorithm '{}' does not appear to be available on this worker.A signature algorithm will have to be manually specified via the '{}' worker property", (Object)"HmacSHA256", (Object)INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG);
            return ConfigDef.NO_DEFAULT_VALUE;
        }
    }

    private static Object defaultVerificationAlgorithms(Crypto crypto) {
        ArrayList<String> result = new ArrayList<String>();
        for (String verificationAlgorithm : INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT) {
            try {
                DistributedConfig.validateSignatureAlgorithm(crypto, INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG, verificationAlgorithm);
                result.add(verificationAlgorithm);
            }
            catch (Throwable t) {
                log.trace("Verification algorithm '{}' not found", (Object)verificationAlgorithm);
            }
        }
        if (result.isEmpty()) {
            log.info("The default verification algorithm '{}' does not appear to be available on this worker.One or more verification algorithms will have to be manually specified via the '{}' worker property", INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT, (Object)INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG);
            return ConfigDef.NO_DEFAULT_VALUE;
        }
        return result;
    }

    private static ConfigDef config(Crypto crypto) {
        return DistributedConfig.baseConfigDef().define(GROUP_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, GROUP_ID_DOC).define(SESSION_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, Math.toIntExact(TimeUnit.SECONDS.toMillis(10L)), ConfigDef.Importance.HIGH, SESSION_TIMEOUT_MS_DOC).define(REBALANCE_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, Math.toIntExact(TimeUnit.MINUTES.toMillis(1L)), ConfigDef.Importance.HIGH, REBALANCE_TIMEOUT_MS_DOC).define(HEARTBEAT_INTERVAL_MS_CONFIG, ConfigDef.Type.INT, Math.toIntExact(TimeUnit.SECONDS.toMillis(3L)), ConfigDef.Importance.HIGH, HEARTBEAT_INTERVAL_MS_DOC).define(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, ConfigDef.Type.STRING, EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ExactlyOnceSourceSupport.class)), ConfigDef.Importance.HIGH, EXACTLY_ONCE_SOURCE_SUPPORT_DOC).define("metadata.max.age.ms", ConfigDef.Type.LONG, TimeUnit.MINUTES.toMillis(5L), ConfigDef.Range.atLeast(0), ConfigDef.Importance.LOW, "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.").define("client.id", ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW, "An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.").define("send.buffer.bytes", ConfigDef.Type.INT, 131072, ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, "The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.").define("receive.buffer.bytes", ConfigDef.Type.INT, 32768, ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.").define("reconnect.backoff.ms", ConfigDef.Type.LONG, 50L, ConfigDef.Range.atLeast(0L), ConfigDef.Importance.LOW, "The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.").define("reconnect.backoff.max.ms", ConfigDef.Type.LONG, TimeUnit.SECONDS.toMillis(1L), ConfigDef.Range.atLeast(0L), ConfigDef.Importance.LOW, "The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.").define("socket.connection.setup.timeout.ms", ConfigDef.Type.LONG, CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS, ConfigDef.Range.atLeast(0L), ConfigDef.Importance.LOW, "The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel.").define("socket.connection.setup.timeout.max.ms", ConfigDef.Type.LONG, CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS, ConfigDef.Range.atLeast(0L), ConfigDef.Importance.LOW, "The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value.").define("retry.backoff.ms", ConfigDef.Type.LONG, 100L, ConfigDef.Range.atLeast(0L), ConfigDef.Importance.LOW, "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.").define("request.timeout.ms", ConfigDef.Type.INT, Math.toIntExact(TimeUnit.SECONDS.toMillis(40L)), ConfigDef.Range.atLeast(0), ConfigDef.Importance.MEDIUM, "The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.").define("connections.max.idle.ms", ConfigDef.Type.LONG, TimeUnit.MINUTES.toMillis(9L), ConfigDef.Importance.MEDIUM, "Close idle connections after the number of milliseconds specified by this config.").define("security.protocol", ConfigDef.Type.STRING, "PLAINTEXT", ConfigDef.ValidString.in(Utils.enumOptions(SecurityProtocol.class)), ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC).withClientSaslSupport().define(WORKER_SYNC_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, 3000, ConfigDef.Importance.MEDIUM, WORKER_SYNC_TIMEOUT_MS_DOC).define(WORKER_UNSYNC_BACKOFF_MS_CONFIG, ConfigDef.Type.INT, 300000, ConfigDef.Importance.MEDIUM, WORKER_UNSYNC_BACKOFF_MS_DOC).define(OFFSET_STORAGE_TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, OFFSET_STORAGE_TOPIC_CONFIG_DOC).define(OFFSET_STORAGE_PARTITIONS_CONFIG, ConfigDef.Type.INT, 25, TopicCreationConfig.PARTITIONS_VALIDATOR, ConfigDef.Importance.LOW, OFFSET_STORAGE_PARTITIONS_CONFIG_DOC).define(OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, (short)3, TopicCreationConfig.REPLICATION_FACTOR_VALIDATOR, ConfigDef.Importance.LOW, OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG_DOC).define(CONFIG_TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, CONFIG_TOPIC_CONFIG_DOC).define(CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, (short)3, TopicCreationConfig.REPLICATION_FACTOR_VALIDATOR, ConfigDef.Importance.LOW, CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG_DOC).define(STATUS_STORAGE_TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, STATUS_STORAGE_TOPIC_CONFIG_DOC).define(STATUS_STORAGE_PARTITIONS_CONFIG, ConfigDef.Type.INT, 5, TopicCreationConfig.PARTITIONS_VALIDATOR, ConfigDef.Importance.LOW, STATUS_STORAGE_PARTITIONS_CONFIG_DOC).define(STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, (short)3, TopicCreationConfig.REPLICATION_FACTOR_VALIDATOR, ConfigDef.Importance.LOW, STATUS_STORAGE_REPLICATION_FACTOR_CONFIG_DOC).define(CONNECT_PROTOCOL_CONFIG, ConfigDef.Type.STRING, CONNECT_PROTOCOL_DEFAULT, ConfigDef.LambdaValidator.with((name, value) -> {
            try {
                ConnectProtocolCompatibility.compatibility((String)value);
            }
            catch (Throwable t) {
                throw new ConfigException((String)name, value, "Invalid Connect protocol compatibility");
            }
        }, () -> "[" + Utils.join(ConnectProtocolCompatibility.values(), ", ") + "]"), ConfigDef.Importance.LOW, CONNECT_PROTOCOL_DOC).define(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, ConfigDef.Type.INT, SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT, ConfigDef.Range.between(0, Integer.MAX_VALUE), ConfigDef.Importance.LOW, SCHEDULED_REBALANCE_MAX_DELAY_MS_DOC).define(INTER_WORKER_KEY_TTL_MS_CONFIG, ConfigDef.Type.INT, INTER_WORKER_KEY_TTL_MS_MS_DEFAULT, ConfigDef.Range.between(0, Integer.MAX_VALUE), ConfigDef.Importance.LOW, INTER_WORKER_KEY_TTL_MS_MS_DOC).define(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, DistributedConfig.defaultKeyGenerationAlgorithm(crypto), ConfigDef.LambdaValidator.with((name, value) -> DistributedConfig.validateKeyAlgorithm(crypto, name, (String)value), () -> "Any KeyGenerator algorithm supported by the worker JVM"), ConfigDef.Importance.LOW, INTER_WORKER_KEY_GENERATION_ALGORITHM_DOC).define(INTER_WORKER_KEY_SIZE_CONFIG, ConfigDef.Type.INT, INTER_WORKER_KEY_SIZE_DEFAULT, ConfigDef.Importance.LOW, INTER_WORKER_KEY_SIZE_DOC).define(INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG, ConfigDef.Type.STRING, DistributedConfig.defaultSignatureAlgorithm(crypto), ConfigDef.LambdaValidator.with((name, value) -> DistributedConfig.validateSignatureAlgorithm(crypto, name, (String)value), () -> "Any MAC algorithm supported by the worker JVM"), ConfigDef.Importance.LOW, INTER_WORKER_SIGNATURE_ALGORITHM_DOC).define(INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG, ConfigDef.Type.LIST, DistributedConfig.defaultVerificationAlgorithms(crypto), ConfigDef.LambdaValidator.with((name, value) -> DistributedConfig.validateVerificationAlgorithms(crypto, name, (List)value), () -> "A list of one or more MAC algorithms, each supported by the worker JVM"), ConfigDef.Importance.LOW, INTER_WORKER_VERIFICATION_ALGORITHMS_DOC);
    }

    @Override
    public Integer getRebalanceTimeout() {
        return this.getInt(REBALANCE_TIMEOUT_MS_CONFIG);
    }

    @Override
    public boolean exactlyOnceSourceEnabled() {
        return this.exactlyOnceSourceSupport == ExactlyOnceSourceSupport.ENABLED;
    }

    public boolean transactionalLeaderEnabled() {
        return this.exactlyOnceSourceSupport.usesTransactionalLeader;
    }

    public String transactionalProducerId() {
        return DistributedConfig.transactionalProducerId(this.groupId());
    }

    public static String transactionalProducerId(String groupId) {
        return "connect-cluster-" + groupId;
    }

    @Override
    public String offsetsTopic() {
        return this.getString(OFFSET_STORAGE_TOPIC_CONFIG);
    }

    @Override
    public boolean connectorOffsetsTopicsPermitted() {
        return true;
    }

    @Override
    public String groupId() {
        return this.getString(GROUP_ID_CONFIG);
    }

    public DistributedConfig(Map<String, String> props) {
        this(Crypto.SYSTEM, props);
    }

    DistributedConfig(Crypto crypto, Map<String, String> props) {
        super(DistributedConfig.config(crypto), props);
        this.crypto = crypto;
        this.exactlyOnceSourceSupport = ExactlyOnceSourceSupport.fromProperty(this.getString(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG));
        this.validateInterWorkerKeyConfigs();
    }

    public static void main(String[] args) {
        System.out.println(DistributedConfig.config(Crypto.SYSTEM).toHtml(4, config -> "connectconfigs_" + config));
    }

    public KeyGenerator getInternalRequestKeyGenerator() {
        try {
            KeyGenerator result = this.crypto.keyGenerator(this.getString(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG));
            Optional.ofNullable(this.getInt(INTER_WORKER_KEY_SIZE_CONFIG)).ifPresent(result::init);
            return result;
        }
        catch (InvalidParameterException | NoSuchAlgorithmException e) {
            throw new ConfigException(String.format("Unable to create key generator with algorithm %s and key size %d: %s", this.getString(INTER_WORKER_KEY_GENERATION_ALGORITHM_CONFIG), this.getInt(INTER_WORKER_KEY_SIZE_CONFIG), e.getMessage()));
        }
    }

    private Map<String, Object> topicSettings(String prefix) {
        Object removedPolicy;
        Map<String, Object> result = this.originalsWithPrefix(prefix);
        if (CONFIG_STORAGE_PREFIX.equals(prefix) && result.containsKey(PARTITIONS_SUFFIX)) {
            log.warn("Ignoring '{}{}={}' setting, since config topic partitions is always 1", new Object[]{prefix, PARTITIONS_SUFFIX, result.get(PARTITIONS_SUFFIX)});
        }
        if ((removedPolicy = result.remove("cleanup.policy")) != null) {
            log.warn("Ignoring '{}cleanup.policy={}' setting, since compaction is always used", (Object)prefix, removedPolicy);
        }
        result.remove(TOPIC_SUFFIX);
        result.remove(REPLICATION_FACTOR_SUFFIX);
        result.remove(PARTITIONS_SUFFIX);
        return result;
    }

    public Map<String, Object> configStorageTopicSettings() {
        return this.topicSettings(CONFIG_STORAGE_PREFIX);
    }

    public Map<String, Object> offsetStorageTopicSettings() {
        return this.topicSettings(OFFSET_STORAGE_PREFIX);
    }

    public Map<String, Object> statusStorageTopicSettings() {
        return this.topicSettings(STATUS_STORAGE_PREFIX);
    }

    private void validateInterWorkerKeyConfigs() {
        this.getInternalRequestKeyGenerator();
        this.ensureVerificationAlgorithmsIncludeSignatureAlgorithm();
    }

    private void ensureVerificationAlgorithmsIncludeSignatureAlgorithm() {
        String signatureAlgorithm = this.getString(INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG);
        List<String> verificationAlgorithms = this.getList(INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG);
        if (!verificationAlgorithms.contains(signatureAlgorithm)) {
            throw new ConfigException(INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG, signatureAlgorithm, String.format("Signature algorithm must be present in %s list", INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG));
        }
    }

    private static void validateVerificationAlgorithms(Crypto crypto, String configName, List<String> algorithms) {
        if (algorithms.isEmpty()) {
            throw new ConfigException(configName, algorithms, "At least one signature verification algorithm must be provided");
        }
        for (String algorithm : algorithms) {
            try {
                crypto.mac(algorithm);
            }
            catch (NoSuchAlgorithmException e) {
                throw DistributedConfig.unsupportedAlgorithmException(configName, algorithm, "Mac");
            }
        }
    }

    private static void validateSignatureAlgorithm(Crypto crypto, String configName, String algorithm) {
        try {
            crypto.mac(algorithm);
        }
        catch (NoSuchAlgorithmException e) {
            throw DistributedConfig.unsupportedAlgorithmException(configName, algorithm, "Mac");
        }
    }

    private static void validateKeyAlgorithm(Crypto crypto, String configName, String algorithm) {
        try {
            crypto.keyGenerator(algorithm);
        }
        catch (NoSuchAlgorithmException e) {
            throw DistributedConfig.unsupportedAlgorithmException(configName, algorithm, "KeyGenerator");
        }
    }

    private static ConfigException unsupportedAlgorithmException(String name, Object value, String type) {
        return new ConfigException(name, value, "the algorithm is not supported by this JVM; the supported algorithms are: " + DistributedConfig.supportedAlgorithms(type));
    }

    static Set<String> supportedAlgorithms(String type) {
        HashSet<String> result = new HashSet<String>();
        for (Provider provider : Security.getProviders()) {
            for (Provider.Service service : provider.getServices()) {
                if (!type.equals(service.getType())) continue;
                result.add(service.getAlgorithm());
            }
        }
        return result;
    }

    private static enum ExactlyOnceSourceSupport {
        DISABLED(false),
        PREPARING(true),
        ENABLED(true);

        public final boolean usesTransactionalLeader;

        private ExactlyOnceSourceSupport(boolean usesTransactionalLeader) {
            this.usesTransactionalLeader = usesTransactionalLeader;
        }

        public static ExactlyOnceSourceSupport fromProperty(String property) {
            return ExactlyOnceSourceSupport.valueOf(property.toUpperCase(Locale.ROOT));
        }

        public String toString() {
            return this.name().toLowerCase(Locale.ROOT);
        }
    }
}

