/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamsConfig
extends AbstractConfig {
    private static final Logger log = LoggerFactory.getLogger(StreamsConfig.class);
    private static final ConfigDef CONFIG;
    private final boolean eosEnabled = StreamsConfigUtils.eosEnabled(this);
    private static final long DEFAULT_COMMIT_INTERVAL_MS = 30000L;
    private static final long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L;
    private static final int DEFAULT_TRANSACTION_TIMEOUT = 10000;
    public static final int DUMMY_THREAD_INDEX = 1;
    public static final long MAX_TASK_IDLE_MS_DISABLED = -1L;
    public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE = 5;
    public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH = 20;
    public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH = 30;
    public static final String TOPIC_PREFIX = "topic.";
    public static final String CONSUMER_PREFIX = "consumer.";
    public static final String MAIN_CONSUMER_PREFIX = "main.consumer.";
    public static final String RESTORE_CONSUMER_PREFIX = "restore.consumer.";
    public static final String GLOBAL_CONSUMER_PREFIX = "global.consumer.";
    public static final String PRODUCER_PREFIX = "producer.";
    public static final String ADMIN_CLIENT_PREFIX = "admin.";
    public static final String CLIENT_TAG_PREFIX = "client.tag.";
    public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
    private static final String CONFIG_ERROR_MSG = "Acceptable values are: \"+NO_OPTIMIZATION+\", \"+OPTIMIZE+\", or a comma separated list of specific optimizations: (\"+REUSE_KTABLE_SOURCE_TOPICS+\", \"+MERGE_REPARTITION_TOPICS+\" + \"SINGLE_STORE_SELF_JOIN+\").";
    private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: \"+NO_OPTIMIZATION+\", \"+OPTIMIZE+\", or a comma separated list of specific optimizations: (\"+REUSE_KTABLE_SOURCE_TOPICS+\", \"+MERGE_REPARTITION_TOPICS+\" + \"SINGLE_STORE_SELF_JOIN+\").\"NO_OPTIMIZATION\" by default.";
    public static final String NO_OPTIMIZATION = "none";
    public static final String OPTIMIZE = "all";
    public static final String REUSE_KTABLE_SOURCE_TOPICS = "reuse.ktable.source.topics";
    public static final String MERGE_REPARTITION_TOPICS = "merge.repartition.topics";
    public static final String SINGLE_STORE_SELF_JOIN = "single.store.self.join";
    private static final List<String> TOPOLOGY_OPTIMIZATION_CONFIGS;
    public static final String UPGRADE_FROM_0100;
    public static final String UPGRADE_FROM_0101;
    public static final String UPGRADE_FROM_0102;
    public static final String UPGRADE_FROM_0110;
    public static final String UPGRADE_FROM_10;
    public static final String UPGRADE_FROM_11;
    public static final String UPGRADE_FROM_20;
    public static final String UPGRADE_FROM_21;
    public static final String UPGRADE_FROM_22;
    public static final String UPGRADE_FROM_23;
    public static final String UPGRADE_FROM_24;
    public static final String UPGRADE_FROM_25;
    public static final String UPGRADE_FROM_26;
    public static final String UPGRADE_FROM_27;
    public static final String UPGRADE_FROM_28;
    public static final String UPGRADE_FROM_30;
    public static final String UPGRADE_FROM_31;
    public static final String UPGRADE_FROM_32;
    public static final String UPGRADE_FROM_33;
    public static final String UPGRADE_FROM_34;
    public static final String UPGRADE_FROM_35;
    public static final String UPGRADE_FROM_36;
    public static final String AT_LEAST_ONCE = "at_least_once";
    @Deprecated
    public static final String EXACTLY_ONCE = "exactly_once";
    @Deprecated
    public static final String EXACTLY_ONCE_BETA = "exactly_once_beta";
    public static final String EXACTLY_ONCE_V2 = "exactly_once_v2";
    public static final String METRICS_LATEST = "latest";
    public static final String ACCEPTABLE_RECOVERY_LAG_CONFIG = "acceptable.recovery.lag";
    private static final String ACCEPTABLE_RECOVERY_LAG_DOC = "The maximum acceptable lag (number of offsets to catch up) for a client to be considered caught-up enough to receive an active task assignment. Upon assignment, it will still restore the rest of the changelog before processing. To avoid a pause in processing during rebalances, this config should correspond to a recovery time of well under a minute for a given workload. Must be at least 0.";
    public static final String APPLICATION_ID_CONFIG = "application.id";
    private static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
    public static final String APPLICATION_SERVER_CONFIG = "application.server";
    private static final String APPLICATION_SERVER_DOC = "A host:port pair pointing to a user-defined endpoint that can be used for state store discovery and interactive queries on this KafkaStreams instance.";
    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
    public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
    public static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "Maximum number of records to buffer per partition.";
    public static final String BUILT_IN_METRICS_VERSION_CONFIG = "built.in.metrics.version";
    private static final String BUILT_IN_METRICS_VERSION_DOC = "Version of the built-in metrics to use.";
    @Deprecated
    public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering";
    public static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads";
    public static final String STATESTORE_CACHE_MAX_BYTES_CONFIG = "statestore.cache.max.bytes";
    public static final String STATESTORE_CACHE_MAX_BYTES_DOC = "Maximum number of memory bytes to be used for statestore cache across all threads";
    public static final String CLIENT_ID_CONFIG = "client.id";
    private static final String CLIENT_ID_DOC = "An ID prefix string used for the client IDs of internal [main-|restore-|global-]consumer, producer, and admin clients with pattern <code>&lt;client.id&gt;-[Global]StreamThread[-&lt;threadSequenceNumber$gt;]-&lt;consumer|producer|restore-consumer|global-consumer&gt;</code>.";
    public static final String ENABLE_METRICS_PUSH_CONFIG = "enable.metrics.push";
    public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable pushing of internal [main-|restore-|global]consumer, producer, and admin client metrics to the cluster, if the cluster has a client metrics subscription which matches a client.";
    public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
    private static final String COMMIT_INTERVAL_MS_DOC = "The frequency in milliseconds with which to commit processing progress. For at-least-once processing, committing means to save the position (ie, offsets) of the processor. For exactly-once processing, it means to commit the transaction which includes to save the position and to make the committed data in the output topic visible to consumers with isolation level read_committed. (Note, if <code>processing.guarantee</code> is set to <code>exactly_once_v2</code>, <code>exactly_once</code>,the default value is <code>100</code>, otherwise the default value is <code>30000</code>.";
    public static final String REPARTITION_PURGE_INTERVAL_MS_CONFIG = "repartition.purge.interval.ms";
    private static final String REPARTITION_PURGE_INTERVAL_MS_DOC = "The frequency in milliseconds with which to delete fully consumed records from repartition topics. Purging will occur after at least this value since the last purge, but may be delayed until later. (Note, unlike <code>commit.interval.ms</code>, the default for this value remains unchanged when <code>processing.guarantee</code> is set to <code>exactly_once_v2</code>).";
    public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
    public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler";
    public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code> interface.";
    public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler";
    private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code> interface.";
    @Deprecated
    public static final String DEFAULT_DSL_STORE_CONFIG = "default.dsl.store";
    @Deprecated
    public static final String DEFAULT_DSL_STORE_DOC = "The default state store type used by DSL operators.";
    @Deprecated
    public static final String ROCKS_DB = "rocksDB";
    @Deprecated
    public static final String IN_MEMORY = "in_memory";
    @Deprecated
    public static final String DEFAULT_DSL_STORE = "rocksDB";
    public static final String DSL_STORE_SUPPLIERS_CLASS_CONFIG = "dsl.store.suppliers.class";
    static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the <code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.";
    static final Class<?> DSL_STORE_SUPPLIERS_CLASS_DEFAULT;
    @Deprecated
    public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = "default.windowed.key.serde.inner";
    private static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS_DOC = "Default serializer / deserializer for the inner class of a windowed key. Must implement the <code>org.apache.kafka.common.serialization.Serde</code> interface.";
    @Deprecated
    public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS = "default.windowed.value.serde.inner";
    private static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS_DOC = "Default serializer / deserializer for the inner class of a windowed value. Must implement the <code>org.apache.kafka.common.serialization.Serde</code> interface.";
    public static final String WINDOWED_INNER_CLASS_SERDE = "windowed.inner.class.serde";
    private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default serializer / deserializer for the inner class of a windowed record. Must implement the <code>org.apache.kafka.common.serialization.Serde</code> interface. Note that setting this config in KafkaStreams application would result in an error as it is meant to be used only from Plain consumer client.";
    public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
    private static final String DEFAULT_KEY_SERDE_CLASS_DOC = "Default serializer / deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. Note when windowed serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via 'default.windowed.key.serde.inner' or 'default.windowed.value.serde.inner' as well";
    public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde";
    private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. Note when windowed serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via 'default.windowed.key.serde.inner' or 'default.windowed.value.serde.inner' as well";
    public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor";
    public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface.";
    public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms";
    public static final String MAX_TASK_IDLE_MS_DOC = "This config controls whether joins and merges may produce out-of-order results. The config value is the maximum amount of time in milliseconds a stream task will stay idle when it is fully caught up on some (but not all) input partitions to wait for producers to send additional records and avoid potential out-of-order record processing across multiple input streams. The default (zero) does not wait for producers to send more records, but it does wait to fetch data that is already present on the brokers. This default means that for records that are already present on the brokers, Streams will process them in timestamp order. Set to -1 to disable idling entirely and process any locally available data, even though doing so may produce out-of-order processing.";
    public static final String MAX_WARMUP_REPLICAS_CONFIG = "max.warmup.replicas";
    private static final String MAX_WARMUP_REPLICAS_DOC = "The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once for the purpose of keeping  the task available on one instance while it is warming up on another instance it has been reassigned to. Used to throttle how much extra broker  traffic and cluster state can be used for high availability. Must be at least 1.Note that one warmup replica corresponds to one Stream Task. Furthermore, note that each warmup replica can only be promoted to an active task during a rebalance (normally during a so-called probing rebalance, which occur at a frequency specified by the `probing.rebalance.interval.ms` config). This means that the maximum rate at which active tasks can be migrated from one Kafka Streams Instance to another instance can be determined by (`max.warmup.replicas` / `probing.rebalance.interval.ms`).";
    public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
    public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
    public static final String METRICS_RECORDING_LEVEL_CONFIG = "metrics.recording.level";
    public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
    @Deprecated
    public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = "auto.include.jmx.reporter";
    public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
    private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
    public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
    private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
    public static final String POLL_MS_CONFIG = "poll.ms";
    private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
    public static final String PROBING_REBALANCE_INTERVAL_MS_CONFIG = "probing.rebalance.interval.ms";
    private static final String PROBING_REBALANCE_INTERVAL_MS_DOC = "The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have finished warming up and are ready to become active. Probing rebalances will continue to be triggered until the assignment is balanced. Must be at least 1 minute.";
    public static final String PROCESSING_GUARANTEE_CONFIG = "processing.guarantee";
    private static final String PROCESSING_GUARANTEE_DOC = "The processing guarantee that should be used. Possible values are <code>at_least_once</code> (default) and <code>exactly_once_v2</code> (requires brokers version 2.5 or higher). Deprecated options are <code>exactly_once</code> (requires brokers version 0.11.0 or higher) and <code>exactly_once_beta</code> (requires brokers version 2.5 or higher). Note that exactly-once processing requires a cluster of at least three brokers by default what is the recommended setting for production; for development you can change this, by adjusting broker setting <code>transaction.state.log.replication.factor</code> and <code>transaction.state.log.min.isr</code>.";
    public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
    public static final String RACK_AWARE_ASSIGNMENT_TAGS_CONFIG = "rack.aware.assignment.tags";
    private static final String RACK_AWARE_ASSIGNMENT_TAGS_DOC = "List of client tag keys used to distribute standby replicas across Kafka Streams instances. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over each client tag dimension.";
    public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
    public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms";
    public static final String REPLICATION_FACTOR_CONFIG = "replication.factor";
    private static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application. The default of <code>-1</code> (meaning: use broker default replication factor) requires broker version 2.4 or newer";
    public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";
    @Deprecated
    public static final String RETRIES_CONFIG = "retries";
    public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
    public static final String ROCKSDB_CONFIG_SETTER_CLASS_CONFIG = "rocksdb.config.setter";
    private static final String ROCKSDB_CONFIG_SETTER_CLASS_DOC = "A Rocks DB config setter class or class name that implements the <code>org.apache.kafka.streams.state.RocksDBConfigSetter</code> interface";
    public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
    public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
    public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
    private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated. Only state directories that have not been modified for at least <code>state.cleanup.delay.ms</code> will be removed";
    public static final String STATE_DIR_CONFIG = "state.dir";
    private static final String STATE_DIR_DOC = "Directory location for state store. This path must be unique for each streams instance sharing the same underlying filesystem.";
    public static final String TASK_TIMEOUT_MS_CONFIG = "task.timeout.ms";
    public static final String TASK_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of 0ms, a task would raise an error for the first internal error. For any timeout larger than 0ms, a task will retry at least once before an error is raised.";
    public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
    private static final String WINDOW_SIZE_MS_DOC = "Sets window size for the deserializer in order to calculate window end times.";
    public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
    private static final String UPGRADE_FROM_DOC;
    public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms";
    private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day";
    public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier";
    public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the <code>org.apache.kafka.streams.KafkaClientSupplier</code> interface.";
    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "none";
    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "min_traffic";
    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY = "balance_subtopology";
    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy";
    public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take <code>client.rack</code> and <code>racks</code> of <code>TopicPartition</code> into account when assigning tasks to minimize cross rack traffic. Valid settings are : <code>none</code> (default), which will disable rack aware assignment; <code>min_traffic</code>, which will compute minimum cross rack traffic assignment; <code>balance_subtopology</code>, which will compute minimum cross rack traffic and try to balance the tasks of same subtopolgies across different clients";
    public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG = "rack.aware.assignment.traffic_cost";
    public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC;
    public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG = "rack.aware.assignment.non_overlap_cost";
    public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC;
    @Deprecated
    public static final String TOPOLOGY_OPTIMIZATION = "topology.optimization";
    private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS;
    private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS;
    private static final String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS;
    private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
    private static final Map<String, Object> PRODUCER_EOS_OVERRIDES;
    private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
    private static final Map<String, Object> CONSUMER_EOS_OVERRIDES;

    public static String consumerPrefix(String consumerProp) {
        return CONSUMER_PREFIX + consumerProp;
    }

    public static String mainConsumerPrefix(String consumerProp) {
        return MAIN_CONSUMER_PREFIX + consumerProp;
    }

    public static String restoreConsumerPrefix(String consumerProp) {
        return RESTORE_CONSUMER_PREFIX + consumerProp;
    }

    public static String clientTagPrefix(String clientTagKey) {
        return CLIENT_TAG_PREFIX + clientTagKey;
    }

    public static String globalConsumerPrefix(String consumerProp) {
        return GLOBAL_CONSUMER_PREFIX + consumerProp;
    }

    public static String producerPrefix(String producerProp) {
        return PRODUCER_PREFIX + producerProp;
    }

    public static String adminClientPrefix(String adminClientProp) {
        return ADMIN_CLIENT_PREFIX + adminClientProp;
    }

    public static String topicPrefix(String topicProp) {
        return TOPIC_PREFIX + topicProp;
    }

    public static ConfigDef configDef() {
        return new ConfigDef(CONFIG);
    }

    public StreamsConfig(Map<?, ?> props) {
        this(props, true);
    }

    protected StreamsConfig(Map<?, ?> props, boolean doLog) {
        super(CONFIG, props, doLog);
        String processingModeConfig = this.getString(PROCESSING_GUARANTEE_CONFIG);
        if (processingModeConfig.equals(EXACTLY_ONCE)) {
            log.warn("Configuration parameter `{}` is deprecated and will be removed in the 4.0.0 release. Please use `{}` instead. Note that this requires broker version 2.5+ so you should prepare to upgrade your brokers if necessary.", (Object)EXACTLY_ONCE, (Object)EXACTLY_ONCE_V2);
        }
        if (processingModeConfig.equals(EXACTLY_ONCE_BETA)) {
            log.warn("Configuration parameter `{}` is deprecated and will be removed in the 4.0.0 release. Please use `{}` instead.", (Object)EXACTLY_ONCE_BETA, (Object)EXACTLY_ONCE_V2);
        }
        if (props.containsKey(RETRIES_CONFIG)) {
            log.warn("Configuration parameter `{}` is deprecated and will be removed in the 4.0.0 release.", (Object)RETRIES_CONFIG);
        }
        if (this.eosEnabled) {
            this.verifyEOSTransactionTimeoutCompatibility();
        }
        StreamsConfig.verifyTopologyOptimizationConfigs(this.getString("topology.optimization"));
    }

    private void verifyEOSTransactionTimeoutCompatibility() {
        int transactionTimeout;
        long commitInterval = this.getLong(COMMIT_INTERVAL_MS_CONFIG);
        String transactionTimeoutConfigKey = StreamsConfig.producerPrefix("transaction.timeout.ms");
        int n = transactionTimeout = this.originals().containsKey(transactionTimeoutConfigKey) ? (Integer)Objects.requireNonNull(ConfigDef.parseType((String)transactionTimeoutConfigKey, this.originals().get(transactionTimeoutConfigKey), (ConfigDef.Type)ConfigDef.Type.INT), "Could not parse config `commit.interval.ms` because it's set to `null`") : 10000;
        if ((long)transactionTimeout < commitInterval) {
            throw new IllegalArgumentException(String.format("Transaction timeout %d was set lower than streams commit interval %d. This will cause ongoing transaction always timeout due to inactivity caused by long commit interval. Consider reconfiguring commit interval to match transaction timeout by tuning 'commit.interval.ms' config, or increase the transaction timeout to match commit interval by tuning `producer.transaction.timeout.ms` config.", transactionTimeout, commitInterval));
        }
    }

    protected Map<String, Object> postProcessParsedConfig(Map<String, Object> parsedValues) {
        Map configUpdates = CommonClientConfigs.postProcessReconnectBackoffConfigs((AbstractConfig)this, parsedValues);
        if (StreamsConfigUtils.eosEnabled(this) && !this.originals().containsKey(COMMIT_INTERVAL_MS_CONFIG)) {
            log.debug("Using {} default value of {} as exactly once is enabled.", (Object)COMMIT_INTERVAL_MS_CONFIG, (Object)100L);
            configUpdates.put(COMMIT_INTERVAL_MS_CONFIG, 100L);
        }
        this.validateRackAwarenessConfiguration();
        return configUpdates;
    }

    private void validateRackAwarenessConfiguration() {
        List rackAwareAssignmentTags = this.getList(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
        Map<String, String> clientTags = this.getClientTags();
        if (clientTags.size() > 5) {
            throw new ConfigException("At most 5 client tags can be specified using client.tag. prefix.");
        }
        for (String rackAwareAssignmentTag : rackAwareAssignmentTags) {
            if (clientTags.containsKey(rackAwareAssignmentTag)) continue;
            throw new ConfigException(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, (Object)rackAwareAssignmentTags, "Contains invalid value [" + rackAwareAssignmentTag + "] which doesn't have corresponding tag set via [" + CLIENT_TAG_PREFIX + "] prefix.");
        }
        clientTags.forEach((tagKey, tagValue) -> {
            if (tagKey.length() > 20) {
                throw new ConfigException(CLIENT_TAG_PREFIX, tagKey, "Tag key exceeds maximum length of 20.");
            }
            if (tagValue.length() > 30) {
                throw new ConfigException(CLIENT_TAG_PREFIX, tagValue, "Tag value exceeds maximum length of 30.");
            }
        });
    }

    private Map<String, Object> getCommonConsumerConfigs() {
        Map<String, Object> clientProvidedProps = this.getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
        this.checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
        this.checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
        HashMap<String, Object> consumerProps = new HashMap<String, Object>(this.eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
        if (StreamsConfigUtils.processingMode(this) == StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2) {
            consumerProps.put("internal.throw.on.fetch.stable.offset.unsupported", true);
        }
        consumerProps.putAll(this.getClientCustomProps());
        consumerProps.putAll(clientProvidedProps);
        consumerProps.put(BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
        return consumerProps;
    }

    private void checkIfUnexpectedUserSpecifiedConsumerConfig(Map<String, Object> clientProvidedProps, String[] nonConfigurableConfigs) {
        String nonConfigurableConfigMessage = "Unexpected user-specified %s config: %s found. %sUser setting (%s) will be ignored and the Streams default setting (%s) will be used ";
        String eosMessage = "processing.guarantee is set to " + this.getString(PROCESSING_GUARANTEE_CONFIG) + ". Hence, ";
        for (String config : nonConfigurableConfigs) {
            if (!clientProvidedProps.containsKey(config)) continue;
            if (CONSUMER_DEFAULT_OVERRIDES.containsKey(config)) {
                if (clientProvidedProps.get(config).equals(CONSUMER_DEFAULT_OVERRIDES.get(config))) continue;
                log.warn(String.format("Unexpected user-specified %s config: %s found. %sUser setting (%s) will be ignored and the Streams default setting (%s) will be used ", "consumer", config, "", clientProvidedProps.get(config), CONSUMER_DEFAULT_OVERRIDES.get(config)));
                clientProvidedProps.remove(config);
                continue;
            }
            if (!this.eosEnabled) continue;
            if (CONSUMER_EOS_OVERRIDES.containsKey(config)) {
                if (clientProvidedProps.get(config).equals(CONSUMER_EOS_OVERRIDES.get(config))) continue;
                log.warn(String.format("Unexpected user-specified %s config: %s found. %sUser setting (%s) will be ignored and the Streams default setting (%s) will be used ", "consumer", config, eosMessage, clientProvidedProps.get(config), CONSUMER_EOS_OVERRIDES.get(config)));
                clientProvidedProps.remove(config);
                continue;
            }
            if (PRODUCER_EOS_OVERRIDES.containsKey(config)) {
                if (clientProvidedProps.get(config).equals(PRODUCER_EOS_OVERRIDES.get(config))) continue;
                log.warn(String.format("Unexpected user-specified %s config: %s found. %sUser setting (%s) will be ignored and the Streams default setting (%s) will be used ", "producer", config, eosMessage, clientProvidedProps.get(config), PRODUCER_EOS_OVERRIDES.get(config)));
                clientProvidedProps.remove(config);
                continue;
            }
            if (!"transactional.id".equals(config)) continue;
            log.warn(String.format("Unexpected user-specified %s config: %s found. %sUser setting (%s) will be ignored and the Streams default setting (%s) will be used ", "producer", config, eosMessage, clientProvidedProps.get(config), "<appId>-<generatedSuffix>"));
            clientProvidedProps.remove(config);
        }
        if (this.eosEnabled) {
            this.verifyMaxInFlightRequestPerConnection(clientProvidedProps.get("max.in.flight.requests.per.connection"));
        }
    }

    private void verifyMaxInFlightRequestPerConnection(Object maxInFlightRequests) {
        if (maxInFlightRequests != null) {
            int maxInFlightRequestsAsInteger;
            if (maxInFlightRequests instanceof Integer) {
                maxInFlightRequestsAsInteger = (Integer)maxInFlightRequests;
            } else if (maxInFlightRequests instanceof String) {
                try {
                    maxInFlightRequestsAsInteger = Integer.parseInt(((String)maxInFlightRequests).trim());
                }
                catch (NumberFormatException e) {
                    throw new ConfigException("max.in.flight.requests.per.connection", maxInFlightRequests, "String value could not be parsed as 32-bit integer");
                }
            } else {
                throw new ConfigException("max.in.flight.requests.per.connection", maxInFlightRequests, "Expected value to be a 32-bit integer, but it was a " + maxInFlightRequests.getClass().getName());
            }
            if (maxInFlightRequestsAsInteger > 5) {
                throw new ConfigException("max.in.flight.requests.per.connection", (Object)maxInFlightRequestsAsInteger, "Can't exceed 5 when exactly-once processing is enabled");
            }
        }
    }

    public Map<String, Object> getMainConsumerConfigs(String groupId, String clientId, int threadIdx) {
        int batchSize;
        int segmentSize;
        Map<String, Object> consumerProps = this.getCommonConsumerConfigs();
        Map mainConsumerProps = this.originalsWithPrefix(MAIN_CONSUMER_PREFIX);
        consumerProps.putAll(mainConsumerProps);
        consumerProps.put(APPLICATION_ID_CONFIG, groupId);
        consumerProps.put("group.id", groupId);
        consumerProps.put(CLIENT_ID_CONFIG, clientId);
        String groupInstanceId = (String)consumerProps.get("group.instance.id");
        if (groupInstanceId != null) {
            consumerProps.put("group.instance.id", groupInstanceId + "-" + threadIdx);
        }
        consumerProps.put(UPGRADE_FROM_CONFIG, this.getString(UPGRADE_FROM_CONFIG));
        consumerProps.put(REPLICATION_FACTOR_CONFIG, this.getInt(REPLICATION_FACTOR_CONFIG));
        consumerProps.put(APPLICATION_SERVER_CONFIG, this.getString(APPLICATION_SERVER_CONFIG));
        consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, this.getInt(NUM_STANDBY_REPLICAS_CONFIG));
        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, this.getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
        consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, this.getInt(MAX_WARMUP_REPLICAS_CONFIG));
        consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, this.getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));
        consumerProps.put("partition.assignment.strategy", StreamsPartitionAssignor.class.getName());
        consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, this.getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
        consumerProps.put(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, this.getInt(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG));
        consumerProps.put(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, this.getString(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG));
        consumerProps.put(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, this.getList(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG));
        consumerProps.put(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, this.getInt(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
        consumerProps.put("allow.auto.create.topics", "false");
        Map topicProps = this.originalsWithPrefix(TOPIC_PREFIX, false);
        Map<String, Object> producerProps = this.getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
        if (topicProps.containsKey(StreamsConfig.topicPrefix("segment.bytes")) && producerProps.containsKey("batch.size") && (segmentSize = Integer.parseInt(topicProps.get(StreamsConfig.topicPrefix("segment.bytes")).toString())) < (batchSize = Integer.parseInt(producerProps.get("batch.size").toString()))) {
            throw new IllegalArgumentException(String.format("Specified topic segment size %d is is smaller than the configured producer batch size %d, this will cause produced batch not able to be appended to the topic", segmentSize, batchSize));
        }
        consumerProps.putAll(topicProps);
        return consumerProps;
    }

    public Map<String, Object> getRestoreConsumerConfigs(String clientId) {
        Map<String, Object> baseConsumerProps = this.getCommonConsumerConfigs();
        Map restoreConsumerProps = this.originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
        baseConsumerProps.putAll(restoreConsumerProps);
        baseConsumerProps.remove("group.id");
        baseConsumerProps.remove("group.instance.id");
        baseConsumerProps.put(CLIENT_ID_CONFIG, clientId);
        baseConsumerProps.put("auto.offset.reset", "none");
        return baseConsumerProps;
    }

    public Map<String, Object> getGlobalConsumerConfigs(String clientId) {
        Map<String, Object> baseConsumerProps = this.getCommonConsumerConfigs();
        Map globalConsumerProps = this.originalsWithPrefix(GLOBAL_CONSUMER_PREFIX);
        baseConsumerProps.putAll(globalConsumerProps);
        baseConsumerProps.remove("group.id");
        baseConsumerProps.remove("group.instance.id");
        baseConsumerProps.put(CLIENT_ID_CONFIG, clientId + "-global-consumer");
        baseConsumerProps.put("auto.offset.reset", "none");
        return baseConsumerProps;
    }

    public Map<String, Object> getProducerConfigs(String clientId) {
        Map<String, Object> clientProvidedProps = this.getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
        this.checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS);
        HashMap<String, Object> props = new HashMap<String, Object>(this.eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES);
        props.putAll(this.getClientCustomProps());
        props.putAll(clientProvidedProps);
        if (StreamsConfigUtils.processingMode(this) == StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA) {
            props.put("internal.auto.downgrade.txn.commit", true);
        }
        props.put(BOOTSTRAP_SERVERS_CONFIG, this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
        props.put(CLIENT_ID_CONFIG, clientId);
        return props;
    }

    public Map<String, Object> getAdminConfigs(String clientId) {
        Map<String, Object> clientProvidedProps = this.getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames());
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.putAll(this.getClientCustomProps());
        props.putAll(clientProvidedProps);
        props.put(CLIENT_ID_CONFIG, clientId);
        return props;
    }

    public Map<String, String> getClientTags() {
        return this.originalsWithPrefix(CLIENT_TAG_PREFIX).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, tagEntry -> Objects.toString(tagEntry.getValue())));
    }

    private Map<String, Object> getClientPropsWithPrefix(String prefix, Set<String> configNames) {
        Map<String, Object> props = this.clientProps(configNames, this.originals());
        props.putAll(this.originalsWithPrefix(prefix));
        return props;
    }

    private Map<String, Object> getClientCustomProps() {
        Map props = this.originals();
        props.keySet().removeAll(CONFIG.names());
        props.keySet().removeAll(ConsumerConfig.configNames());
        props.keySet().removeAll(ProducerConfig.configNames());
        props.keySet().removeAll(AdminClientConfig.configNames());
        props.keySet().removeAll(this.originalsWithPrefix(CONSUMER_PREFIX, false).keySet());
        props.keySet().removeAll(this.originalsWithPrefix(PRODUCER_PREFIX, false).keySet());
        props.keySet().removeAll(this.originalsWithPrefix(ADMIN_CLIENT_PREFIX, false).keySet());
        return props;
    }

    public static Set<String> verifyTopologyOptimizationConfigs(String config) {
        List<String> configs = Arrays.asList(config.split("\\s*,\\s*"));
        HashSet<String> verifiedConfigs = new HashSet<String>();
        if ((configs.contains("none") || configs.contains(OPTIMIZE)) && configs.size() > 1) {
            throw new ConfigException("\"" + config + "\" is not a valid optimization config. " + CONFIG_ERROR_MSG);
        }
        for (String conf : configs) {
            if (TOPOLOGY_OPTIMIZATION_CONFIGS.contains(conf)) continue;
            throw new ConfigException("Unrecognized config. Acceptable values are: \"+NO_OPTIMIZATION+\", \"+OPTIMIZE+\", or a comma separated list of specific optimizations: (\"+REUSE_KTABLE_SOURCE_TOPICS+\", \"+MERGE_REPARTITION_TOPICS+\" + \"SINGLE_STORE_SELF_JOIN+\").");
        }
        if (configs.contains(OPTIMIZE)) {
            verifiedConfigs.add(REUSE_KTABLE_SOURCE_TOPICS);
            verifiedConfigs.add(MERGE_REPARTITION_TOPICS);
            verifiedConfigs.add(SINGLE_STORE_SELF_JOIN);
        } else if (!configs.contains("none")) {
            verifiedConfigs.addAll(configs);
        }
        return verifiedConfigs;
    }

    public KafkaClientSupplier getKafkaClientSupplier() {
        return (KafkaClientSupplier)this.getConfiguredInstance(DEFAULT_CLIENT_SUPPLIER_CONFIG, KafkaClientSupplier.class);
    }

    public Serde<?> defaultKeySerde() {
        Object keySerdeConfigSetting = this.get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
        if (keySerdeConfigSetting == null) {
            throw new ConfigException("Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG");
        }
        try {
            Serde serde = (Serde)this.getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class);
            serde.configure(this.originals(), true);
            return serde;
        }
        catch (Exception e) {
            throw new StreamsException(String.format("Failed to configure key serde %s", keySerdeConfigSetting), e);
        }
    }

    public Serde<?> defaultValueSerde() {
        Object valueSerdeConfigSetting = this.get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
        if (valueSerdeConfigSetting == null) {
            throw new ConfigException("Please specify a value serde or set one through StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG");
        }
        try {
            Serde serde = (Serde)this.getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class);
            serde.configure(this.originals(), false);
            return serde;
        }
        catch (Exception e) {
            throw new StreamsException(String.format("Failed to configure value serde %s", valueSerdeConfigSetting), e);
        }
    }

    public TimestampExtractor defaultTimestampExtractor() {
        return (TimestampExtractor)this.getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
    }

    public DeserializationExceptionHandler defaultDeserializationExceptionHandler() {
        return (DeserializationExceptionHandler)this.getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class);
    }

    public ProductionExceptionHandler defaultProductionExceptionHandler() {
        return (ProductionExceptionHandler)this.getConfiguredInstance(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ProductionExceptionHandler.class);
    }

    private Map<String, Object> clientProps(Set<String> configNames, Map<String, Object> originals) {
        HashMap<String, Object> parsed = new HashMap<String, Object>();
        for (String configName : configNames) {
            if (!originals.containsKey(configName)) continue;
            parsed.put(configName, originals.get(configName));
        }
        return parsed;
    }

    public static void main(String[] args) {
        System.out.println(CONFIG.toHtml(4, config -> "streamsconfigs_" + config));
    }

    static {
        TOPOLOGY_OPTIMIZATION_CONFIGS = Arrays.asList(OPTIMIZE, "none", REUSE_KTABLE_SOURCE_TOPICS, MERGE_REPARTITION_TOPICS, SINGLE_STORE_SELF_JOIN);
        UPGRADE_FROM_0100 = UpgradeFromValues.UPGRADE_FROM_0100.toString();
        UPGRADE_FROM_0101 = UpgradeFromValues.UPGRADE_FROM_0101.toString();
        UPGRADE_FROM_0102 = UpgradeFromValues.UPGRADE_FROM_0102.toString();
        UPGRADE_FROM_0110 = UpgradeFromValues.UPGRADE_FROM_0110.toString();
        UPGRADE_FROM_10 = UpgradeFromValues.UPGRADE_FROM_10.toString();
        UPGRADE_FROM_11 = UpgradeFromValues.UPGRADE_FROM_11.toString();
        UPGRADE_FROM_20 = UpgradeFromValues.UPGRADE_FROM_20.toString();
        UPGRADE_FROM_21 = UpgradeFromValues.UPGRADE_FROM_21.toString();
        UPGRADE_FROM_22 = UpgradeFromValues.UPGRADE_FROM_22.toString();
        UPGRADE_FROM_23 = UpgradeFromValues.UPGRADE_FROM_23.toString();
        UPGRADE_FROM_24 = UpgradeFromValues.UPGRADE_FROM_24.toString();
        UPGRADE_FROM_25 = UpgradeFromValues.UPGRADE_FROM_25.toString();
        UPGRADE_FROM_26 = UpgradeFromValues.UPGRADE_FROM_26.toString();
        UPGRADE_FROM_27 = UpgradeFromValues.UPGRADE_FROM_27.toString();
        UPGRADE_FROM_28 = UpgradeFromValues.UPGRADE_FROM_28.toString();
        UPGRADE_FROM_30 = UpgradeFromValues.UPGRADE_FROM_30.toString();
        UPGRADE_FROM_31 = UpgradeFromValues.UPGRADE_FROM_31.toString();
        UPGRADE_FROM_32 = UpgradeFromValues.UPGRADE_FROM_32.toString();
        UPGRADE_FROM_33 = UpgradeFromValues.UPGRADE_FROM_33.toString();
        UPGRADE_FROM_34 = UpgradeFromValues.UPGRADE_FROM_34.toString();
        UPGRADE_FROM_35 = UpgradeFromValues.UPGRADE_FROM_35.toString();
        UPGRADE_FROM_36 = UpgradeFromValues.UPGRADE_FROM_36.toString();
        DSL_STORE_SUPPLIERS_CLASS_DEFAULT = BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class;
        UPGRADE_FROM_DOC = "Allows upgrading in a backward compatible way. This is needed when upgrading from [0.10.0, 1.1] to 2.0+, or when upgrading from [2.0, 2.3] to 2.4+. When upgrading from 3.3 to a newer version it is not required to specify this config. Default is `null`. Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" + UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" + UPGRADE_FROM_11 + "\", \"" + UPGRADE_FROM_20 + "\", \"" + UPGRADE_FROM_21 + "\", \"" + UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\", \"" + UPGRADE_FROM_24 + "\", \"" + UPGRADE_FROM_25 + "\", \"" + UPGRADE_FROM_26 + "\", \"" + UPGRADE_FROM_27 + "\", \"" + UPGRADE_FROM_28 + "\", \"" + UPGRADE_FROM_30 + "\", \"" + UPGRADE_FROM_31 + "\", \"" + UPGRADE_FROM_32 + "\", \"" + UPGRADE_FROM_33 + "\", \"" + UPGRADE_FROM_34 + "\", \"" + UPGRADE_FROM_35 + "\", \"" + UPGRADE_FROM_36 + "(for upgrading from the corresponding old version).";
        RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC = "Cost associated with cross rack traffic. This config and <code>rack.aware.assignment.non_overlap_cost</code> controls whether the optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value <code>" + RackAwareTaskAssignor.class.getName() + "</code> will optimize for minimizing cross rack traffic. The default value is null which means it will use default traffic cost values in different assignors.";
        RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC = "Cost associated with moving tasks from existing assignment. This config and <code>rack.aware.assignment.traffic_cost</code> controls whether the optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value <code>" + RackAwareTaskAssignor.class.getName() + "</code> will optimize to maintain the existing assignment. The default value is null which means it will use default non_overlap cost values in different assignors.";
        NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS = new String[]{"enable.auto.commit"};
        NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS = new String[]{"isolation.level"};
        NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS = new String[]{"enable.idempotence", "max.in.flight.requests.per.connection", "transactional.id"};
        CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, APPLICATION_ID_DOC).define(BOOTSTRAP_SERVERS_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping&mdash;this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form <code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).").define(NUM_STANDBY_REPLICAS_CONFIG, ConfigDef.Type.INT, (Object)0, ConfigDef.Importance.HIGH, NUM_STANDBY_REPLICAS_DOC).define(STATE_DIR_CONFIG, ConfigDef.Type.STRING, (Object)(System.getProperty("java.io.tmpdir") + File.separator + "kafka-streams"), ConfigDef.Importance.HIGH, STATE_DIR_DOC).define(ACCEPTABLE_RECOVERY_LAG_CONFIG, ConfigDef.Type.LONG, (Object)10000L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.MEDIUM, ACCEPTABLE_RECOVERY_LAG_DOC).define(CACHE_MAX_BYTES_BUFFERING_CONFIG, ConfigDef.Type.LONG, (Object)0xA00000L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.MEDIUM, CACHE_MAX_BYTES_BUFFERING_DOC).define(STATESTORE_CACHE_MAX_BYTES_CONFIG, ConfigDef.Type.LONG, (Object)0xA00000L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.MEDIUM, STATESTORE_CACHE_MAX_BYTES_DOC).define(CLIENT_ID_CONFIG, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.MEDIUM, CLIENT_ID_DOC).define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, ConfigDef.Type.CLASS, (Object)LogAndFailExceptionHandler.class.getName(), ConfigDef.Importance.MEDIUM, DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC).define(DEFAULT_KEY_SERDE_CLASS_CONFIG, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, DEFAULT_KEY_SERDE_CLASS_DOC).define("default.list.key.serde.inner", ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, "Default inner class of list serde for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. This configuration will be read if and only if <code>default.key.serde</code> configuration is set to <code>org.apache.kafka.common.serialization.Serdes.ListSerde</code>").define("default.list.value.serde.inner", ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, "Default inner class of list serde for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. This configuration will be read if and only if <code>default.value.serde</code> configuration is set to <code>org.apache.kafka.common.serialization.Serdes.ListSerde</code>").define("default.list.key.serde.type", ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, "Default class for key that implements the <code>java.util.List</code> interface. This configuration will be read if and only if <code>default.key.serde</code> configuration is set to <code>org.apache.kafka.common.serialization.Serdes.ListSerde</code> Note when list serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via 'default.list.key.serde.inner'").define("default.list.value.serde.type", ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, "Default class for value that implements the <code>java.util.List</code> interface. This configuration will be read if and only if <code>default.value.serde</code> configuration is set to <code>org.apache.kafka.common.serialization.Serdes.ListSerde</code> Note when list serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via 'default.list.value.serde.inner'").define(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, ConfigDef.Type.CLASS, (Object)DefaultProductionExceptionHandler.class.getName(), ConfigDef.Importance.MEDIUM, DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC).define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, ConfigDef.Type.CLASS, (Object)FailOnInvalidTimestamp.class.getName(), ConfigDef.Importance.MEDIUM, DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC).define(DEFAULT_VALUE_SERDE_CLASS_CONFIG, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, DEFAULT_VALUE_SERDE_CLASS_DOC).define(MAX_TASK_IDLE_MS_CONFIG, ConfigDef.Type.LONG, (Object)0L, ConfigDef.Importance.MEDIUM, MAX_TASK_IDLE_MS_DOC).define(MAX_WARMUP_REPLICAS_CONFIG, ConfigDef.Type.INT, (Object)2, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)1), ConfigDef.Importance.MEDIUM, MAX_WARMUP_REPLICAS_DOC).define(NUM_STREAM_THREADS_CONFIG, ConfigDef.Type.INT, (Object)1, ConfigDef.Importance.MEDIUM, NUM_STREAM_THREADS_DOC).define(PROCESSING_GUARANTEE_CONFIG, ConfigDef.Type.STRING, (Object)AT_LEAST_ONCE, (ConfigDef.Validator)ConfigDef.ValidString.in((String[])new String[]{AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2}), ConfigDef.Importance.MEDIUM, PROCESSING_GUARANTEE_DOC).define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, ConfigDef.Type.INT, null, ConfigDef.Importance.MEDIUM, RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC).define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, ConfigDef.Type.STRING, (Object)"none", (ConfigDef.Validator)ConfigDef.ValidString.in((String[])new String[]{"none", RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC, RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY}), ConfigDef.Importance.MEDIUM, RACK_AWARE_ASSIGNMENT_STRATEGY_DOC).define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, ConfigDef.Type.LIST, Collections.emptyList(), (ConfigDef.Validator)ConfigDef.ListSize.atMostOfSize((int)5), ConfigDef.Importance.MEDIUM, RACK_AWARE_ASSIGNMENT_TAGS_DOC).define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, ConfigDef.Type.INT, null, ConfigDef.Importance.MEDIUM, RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC).define(REPLICATION_FACTOR_CONFIG, ConfigDef.Type.INT, (Object)-1, ConfigDef.Importance.MEDIUM, REPLICATION_FACTOR_DOC).define(SECURITY_PROTOCOL_CONFIG, ConfigDef.Type.STRING, (Object)"PLAINTEXT", (ConfigDef.Validator)ConfigDef.CaseInsensitiveValidString.in((String[])Utils.enumOptions(SecurityProtocol.class)), ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC).define(TASK_TIMEOUT_MS_CONFIG, ConfigDef.Type.LONG, (Object)Duration.ofMinutes(5L).toMillis(), (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0L), ConfigDef.Importance.MEDIUM, TASK_TIMEOUT_MS_DOC).define("topology.optimization", ConfigDef.Type.STRING, (Object)"none", (name, value) -> StreamsConfig.verifyTopologyOptimizationConfigs((String)value), ConfigDef.Importance.MEDIUM, TOPOLOGY_OPTIMIZATION_DOC).define(APPLICATION_SERVER_CONFIG, ConfigDef.Type.STRING, (Object)"", ConfigDef.Importance.LOW, APPLICATION_SERVER_DOC).define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, ConfigDef.Type.INT, (Object)1000, ConfigDef.Importance.LOW, BUFFERED_RECORDS_PER_PARTITION_DOC).define(BUILT_IN_METRICS_VERSION_CONFIG, ConfigDef.Type.STRING, (Object)METRICS_LATEST, (ConfigDef.Validator)ConfigDef.ValidString.in((String[])new String[]{METRICS_LATEST}), ConfigDef.Importance.LOW, BUILT_IN_METRICS_VERSION_DOC).define(COMMIT_INTERVAL_MS_CONFIG, ConfigDef.Type.LONG, (Object)30000L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.LOW, COMMIT_INTERVAL_MS_DOC).define(ENABLE_METRICS_PUSH_CONFIG, ConfigDef.Type.BOOLEAN, (Object)true, ConfigDef.Importance.LOW, ENABLE_METRICS_PUSH_DOC).define(REPARTITION_PURGE_INTERVAL_MS_CONFIG, ConfigDef.Type.LONG, (Object)30000L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.LOW, REPARTITION_PURGE_INTERVAL_MS_DOC).define(CONNECTIONS_MAX_IDLE_MS_CONFIG, ConfigDef.Type.LONG, (Object)540000L, ConfigDef.Importance.LOW, "Close idle connections after the number of milliseconds specified by this config.").define(DEFAULT_DSL_STORE_CONFIG, ConfigDef.Type.STRING, (Object)"rocksDB", (ConfigDef.Validator)ConfigDef.ValidString.in((String[])new String[]{"rocksDB", IN_MEMORY}), ConfigDef.Importance.LOW, DEFAULT_DSL_STORE_DOC).define(DSL_STORE_SUPPLIERS_CLASS_CONFIG, ConfigDef.Type.CLASS, DSL_STORE_SUPPLIERS_CLASS_DEFAULT, ConfigDef.Importance.LOW, DSL_STORE_SUPPLIERS_CLASS_DOC).define(DEFAULT_CLIENT_SUPPLIER_CONFIG, ConfigDef.Type.CLASS, (Object)DefaultKafkaClientSupplier.class.getName(), ConfigDef.Importance.LOW, DEFAULT_CLIENT_SUPPLIER_DOC).define(METADATA_MAX_AGE_CONFIG, ConfigDef.Type.LONG, (Object)300000L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.LOW, "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.").define(METRICS_NUM_SAMPLES_CONFIG, ConfigDef.Type.INT, (Object)2, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)1), ConfigDef.Importance.LOW, "The number of samples maintained to compute metrics.").define(METRIC_REPORTER_CLASSES_CONFIG, ConfigDef.Type.LIST, (Object)"", ConfigDef.Importance.LOW, "A list of classes to use as metrics reporters. Implementing the <code>org.apache.kafka.common.metrics.MetricsReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.").define(METRICS_RECORDING_LEVEL_CONFIG, ConfigDef.Type.STRING, (Object)Sensor.RecordingLevel.INFO.toString(), (ConfigDef.Validator)ConfigDef.ValidString.in((String[])new String[]{Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString(), Sensor.RecordingLevel.TRACE.toString()}), ConfigDef.Importance.LOW, "The highest recording level for metrics.").define(METRICS_SAMPLE_WINDOW_MS_CONFIG, ConfigDef.Type.LONG, (Object)30000L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.LOW, "The window of time a metrics sample is computed over.").define(AUTO_INCLUDE_JMX_REPORTER_CONFIG, ConfigDef.Type.BOOLEAN, (Object)true, ConfigDef.Importance.LOW, "Deprecated. Whether to automatically include JmxReporter even if it's not listed in <code>metric.reporters</code>. This configuration will be removed in Kafka 4.0, users should instead include <code>org.apache.kafka.common.metrics.JmxReporter</code> in <code>metric.reporters</code> in order to enable the JmxReporter.").define(POLL_MS_CONFIG, ConfigDef.Type.LONG, (Object)100L, ConfigDef.Importance.LOW, POLL_MS_DOC).define(PROBING_REBALANCE_INTERVAL_MS_CONFIG, ConfigDef.Type.LONG, (Object)600000L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)60000L), ConfigDef.Importance.LOW, PROBING_REBALANCE_INTERVAL_MS_DOC).define(RECEIVE_BUFFER_CONFIG, ConfigDef.Type.INT, (Object)32768, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)-1), ConfigDef.Importance.LOW, "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.").define(RECONNECT_BACKOFF_MS_CONFIG, ConfigDef.Type.LONG, (Object)50L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0L), ConfigDef.Importance.LOW, "The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. This value is the initial backoff value and will increase exponentially for each consecutive connection failure, up to the <code>reconnect.backoff.max.ms</code> value.").define(RECONNECT_BACKOFF_MAX_MS_CONFIG, ConfigDef.Type.LONG, (Object)1000L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0L), ConfigDef.Importance.LOW, "The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.").define(RETRIES_CONFIG, ConfigDef.Type.INT, (Object)0, (ConfigDef.Validator)ConfigDef.Range.between((Number)0, (Number)Integer.MAX_VALUE), ConfigDef.Importance.LOW, "Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error. It is recommended to set the value to either zero or `MAX_VALUE` and use corresponding timeout parameters to control how long a client should retry a request.").define(RETRY_BACKOFF_MS_CONFIG, ConfigDef.Type.LONG, (Object)100L, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0L), ConfigDef.Importance.LOW, "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. This value is the initial backoff value and will increase exponentially for each failed request, up to the <code>retry.backoff.max.ms</code> value.").define(REQUEST_TIMEOUT_MS_CONFIG, ConfigDef.Type.INT, (Object)40000, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)0), ConfigDef.Importance.LOW, "The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.").define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, ROCKSDB_CONFIG_SETTER_CLASS_DOC).define(SEND_BUFFER_CONFIG, ConfigDef.Type.INT, (Object)131072, (ConfigDef.Validator)ConfigDef.Range.atLeast((Number)-1), ConfigDef.Importance.LOW, "The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used.").define(STATE_CLEANUP_DELAY_MS_CONFIG, ConfigDef.Type.LONG, (Object)600000L, ConfigDef.Importance.LOW, STATE_CLEANUP_DELAY_MS_DOC).define(UPGRADE_FROM_CONFIG, ConfigDef.Type.STRING, null, (ConfigDef.Validator)ConfigDef.ValidString.in((String[])((String[])Stream.concat(Stream.of((String)null), Arrays.stream(UpgradeFromValues.values()).map(UpgradeFromValues::toString)).toArray(String[]::new))), ConfigDef.Importance.LOW, UPGRADE_FROM_DOC).define(WINDOWED_INNER_CLASS_SERDE, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, WINDOWED_INNER_CLASS_SERDE_DOC).define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, ConfigDef.Type.LONG, (Object)86400000L, ConfigDef.Importance.LOW, WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC).define(WINDOW_SIZE_MS_CONFIG, ConfigDef.Type.LONG, null, ConfigDef.Importance.LOW, WINDOW_SIZE_MS_DOC);
        HashMap<Object, Object> tempProducerDefaultOverrides = new HashMap<String, String>();
        tempProducerDefaultOverrides.put("linger.ms", "100");
        PRODUCER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides);
        tempProducerDefaultOverrides = new HashMap<String, Object>(PRODUCER_DEFAULT_OVERRIDES);
        tempProducerDefaultOverrides.put("delivery.timeout.ms", Integer.MAX_VALUE);
        tempProducerDefaultOverrides.put("enable.idempotence", true);
        tempProducerDefaultOverrides.put("transaction.timeout.ms", 10000);
        PRODUCER_EOS_OVERRIDES = Collections.unmodifiableMap(tempProducerDefaultOverrides);
        HashMap<Object, Object> tempConsumerDefaultOverrides = new HashMap<String, Object>();
        tempConsumerDefaultOverrides.put("max.poll.records", "1000");
        tempConsumerDefaultOverrides.put("auto.offset.reset", "earliest");
        tempConsumerDefaultOverrides.put("enable.auto.commit", "false");
        tempConsumerDefaultOverrides.put("internal.leave.group.on.close", false);
        CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
        tempConsumerDefaultOverrides = new HashMap<String, Object>(CONSUMER_DEFAULT_OVERRIDES);
        tempConsumerDefaultOverrides.put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
        CONSUMER_EOS_OVERRIDES = Collections.unmodifiableMap(tempConsumerDefaultOverrides);
    }

    public static class InternalConfig {
        public static final String INTERNAL_TASK_ASSIGNOR_CLASS = "internal.task.assignor.class";
        public static final String REFERENCE_CONTAINER_PARTITION_ASSIGNOR = "__reference.container.instance__";
        public static final String ASSIGNMENT_LISTENER = "__assignment.listener__";
        public static final String EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "__emit.interval.ms.kstreams.outer.join.spurious.results.fix__";
        public static final String EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION = "__emit.interval.ms.kstreams.windowed.aggregation__";
        public static final String IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED = "__iq.consistency.offset.vector.enabled__";
        public static final String TOPIC_PREFIX_ALTERNATIVE = "__internal.override.topic.prefix__";
        public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__";
        public static final String PROCESSING_THREADS_ENABLED = "__processing.threads.enabled__";

        public static boolean getStateUpdaterEnabled(Map<String, Object> configs) {
            return InternalConfig.getBoolean(configs, STATE_UPDATER_ENABLED, false);
        }

        public static boolean getProcessingThreadsEnabled(Map<String, Object> configs) {
            return InternalConfig.getBoolean(configs, PROCESSING_THREADS_ENABLED, false);
        }

        public static boolean getBoolean(Map<String, Object> configs, String key, boolean defaultValue) {
            Object value = configs.getOrDefault(key, defaultValue);
            if (value instanceof Boolean) {
                return (Boolean)value;
            }
            if (value instanceof String) {
                return Boolean.parseBoolean((String)value);
            }
            log.warn("Invalid value (" + value + ") on internal configuration '" + key + "'. Please specify a true/false value.");
            return defaultValue;
        }

        public static long getLong(Map<String, Object> configs, String key, long defaultValue) {
            Object value = configs.getOrDefault(key, defaultValue);
            if (value instanceof Number) {
                return ((Number)value).longValue();
            }
            if (value instanceof String) {
                return Long.parseLong((String)value);
            }
            log.warn("Invalid value (" + value + ") on internal configuration '" + key + "'. Please specify a numeric value.");
            return defaultValue;
        }

        public static String getString(Map<String, Object> configs, String key, String defaultValue) {
            Object value = configs.getOrDefault(key, defaultValue);
            if (value instanceof String) {
                return (String)value;
            }
            log.warn("Invalid value (" + value + ") on internal configuration '" + key + "'. Please specify a String value.");
            return defaultValue;
        }
    }
}

