package org.apache.kafka.streams;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/StreamsConfigTest.class */
public class StreamsConfigTest {
    private final Properties props = new Properties();
    private StreamsConfig streamsConfig;

    /* loaded from: input_file:org/apache/kafka/streams/StreamsConfigTest$MisconfiguredSerde.class */
    static class MisconfiguredSerde implements Serde {
        MisconfiguredSerde() {
        }

        public void configure(Map map, boolean z) {
            throw new RuntimeException("boom");
        }

        public void close() {
        }

        public Serializer serializer() {
            return null;
        }

        public Deserializer deserializer() {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/StreamsConfigTest$MockTimestampExtractor.class */
    public static class MockTimestampExtractor implements TimestampExtractor {
        public long extract(ConsumerRecord<Object, Object> consumerRecord, long j) {
            return 0L;
        }
    }

    @Before
    public void setUp() {
        this.props.put("application.id", "streams-config-test");
        this.props.put("bootstrap.servers", "localhost:9092");
        this.props.put("default.key.serde", Serdes.String().getClass().getName());
        this.props.put("default.value.serde", Serdes.String().getClass().getName());
        this.props.put("key.deserializer.encoding", "UTF8");
        this.props.put("value.deserializer.encoding", "UTF-16");
        this.streamsConfig = new StreamsConfig(this.props);
    }

    @Test(expected = ConfigException.class)
    public void shouldThrowExceptionIfApplicationIdIsNotSet() {
        this.props.remove("application.id");
        new StreamsConfig(this.props);
    }

    @Test(expected = ConfigException.class)
    public void shouldThrowExceptionIfBootstrapServersIsNotSet() {
        this.props.remove("bootstrap.servers");
        new StreamsConfig(this.props);
    }

    @Test
    public void testGetProducerConfigs() {
        Map producerConfigs = this.streamsConfig.getProducerConfigs("client");
        Assert.assertEquals(producerConfigs.get("client.id"), "client-producer");
        Assert.assertEquals(producerConfigs.get("linger.ms"), "100");
    }

    @Test
    public void testGetConsumerConfigs() {
        Map mainConsumerConfigs = this.streamsConfig.getMainConsumerConfigs("example-application", "client");
        Assert.assertEquals(mainConsumerConfigs.get("client.id"), "client-consumer");
        Assert.assertEquals(mainConsumerConfigs.get("group.id"), "example-application");
        Assert.assertEquals(mainConsumerConfigs.get("max.poll.records"), "1000");
    }

    @Test
    public void consumerConfigMustContainStreamPartitionAssignorConfig() {
        this.props.put("replication.factor", 42);
        this.props.put("num.standby.replicas", 1);
        this.props.put("windowstore.changelog.additional.retention.ms", 7L);
        this.props.put("application.server", "dummy:host");
        this.props.put("retries", 10);
        this.props.put(StreamsConfig.adminClientPrefix("retries"), 5);
        this.props.put(StreamsConfig.topicPrefix("segment.bytes"), 100);
        Map mainConsumerConfigs = new StreamsConfig(this.props).getMainConsumerConfigs("example-application", "client");
        Assert.assertEquals(42, mainConsumerConfigs.get("replication.factor"));
        Assert.assertEquals(1, mainConsumerConfigs.get("num.standby.replicas"));
        Assert.assertEquals(StreamsPartitionAssignor.class.getName(), mainConsumerConfigs.get("partition.assignment.strategy"));
        Assert.assertEquals(7L, mainConsumerConfigs.get("windowstore.changelog.additional.retention.ms"));
        Assert.assertEquals("dummy:host", mainConsumerConfigs.get("application.server"));
        Assert.assertNull(mainConsumerConfigs.get("retries"));
        Assert.assertEquals(5, mainConsumerConfigs.get(StreamsConfig.adminClientPrefix("retries")));
        Assert.assertEquals(100, mainConsumerConfigs.get(StreamsConfig.topicPrefix("segment.bytes")));
    }

    @Test
    public void consumerConfigMustUseAdminClientConfigForRetries() {
        this.props.put(StreamsConfig.adminClientPrefix("retries"), 20);
        this.props.put("retries", 10);
        Assert.assertEquals(20, new StreamsConfig(this.props).getMainConsumerConfigs("example-application", "client").get(StreamsConfig.adminClientPrefix("retries")));
    }

    @Test
    public void testGetMainConsumerConfigsWithMainConsumerOverridenPrefix() {
        this.props.put(StreamsConfig.consumerPrefix("max.poll.records"), "5");
        this.props.put(StreamsConfig.mainConsumerPrefix("max.poll.records"), "50");
        Assert.assertEquals("50", this.streamsConfig.getMainConsumerConfigs("example-application", "client").get("max.poll.records"));
    }

    @Test
    public void testGetRestoreConsumerConfigs() {
        Map restoreConsumerConfigs = this.streamsConfig.getRestoreConsumerConfigs("client");
        Assert.assertEquals(restoreConsumerConfigs.get("client.id"), "client-restore-consumer");
        Assert.assertNull(restoreConsumerConfigs.get("group.id"));
    }

    @Test
    public void defaultSerdeShouldBeConfigured() {
        HashMap hashMap = new HashMap();
        hashMap.put("key.serializer.encoding", "UTF8");
        hashMap.put("value.serializer.encoding", "UTF-16");
        Serializer serializer = Serdes.String().serializer();
        serializer.configure(hashMap, true);
        Assert.assertEquals("Should get the original string after serialization and deserialization with the configured encoding", "my string for testing", this.streamsConfig.defaultKeySerde().deserializer().deserialize("my topic", serializer.serialize("my topic", "my string for testing")));
        serializer.configure(hashMap, false);
        Assert.assertEquals("Should get the original string after serialization and deserialization with the configured encoding", "my string for testing", this.streamsConfig.defaultValueSerde().deserializer().deserialize("my topic", serializer.serialize("my topic", "my string for testing")));
    }

    @Test
    public void shouldSupportMultipleBootstrapServers() {
        List asList = Arrays.asList("broker1:9092", "broker2:9092");
        String join = Utils.join(asList, ",");
        Properties properties = new Properties();
        properties.put("application.id", "irrelevant");
        properties.put("bootstrap.servers", join);
        Assert.assertEquals(asList, new StreamsConfig(properties).getList("bootstrap.servers"));
    }

    @Test
    public void shouldSupportPrefixedConsumerConfigs() {
        this.props.put(StreamsConfig.consumerPrefix("auto.offset.reset"), "earliest");
        this.props.put(StreamsConfig.consumerPrefix("metrics.num.samples"), 1);
        Map mainConsumerConfigs = new StreamsConfig(this.props).getMainConsumerConfigs("groupId", "clientId");
        Assert.assertEquals("earliest", mainConsumerConfigs.get("auto.offset.reset"));
        Assert.assertEquals(1, mainConsumerConfigs.get("metrics.num.samples"));
    }

    @Test
    public void shouldSupportPrefixedRestoreConsumerConfigs() {
        this.props.put(StreamsConfig.consumerPrefix("metrics.num.samples"), 1);
        Assert.assertEquals(1, new StreamsConfig(this.props).getRestoreConsumerConfigs("clientId").get("metrics.num.samples"));
    }

    @Test
    public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
        StreamsConfig streamsConfig = new StreamsConfig(this.props);
        this.props.put(StreamsConfig.consumerPrefix("interceptor.statsd.host"), "host");
        Assert.assertEquals("host", streamsConfig.getMainConsumerConfigs("groupId", "clientId").get("interceptor.statsd.host"));
    }

    @Test
    public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() {
        StreamsConfig streamsConfig = new StreamsConfig(this.props);
        this.props.put(StreamsConfig.consumerPrefix("interceptor.statsd.host"), "host");
        Assert.assertEquals("host", streamsConfig.getRestoreConsumerConfigs("clientId").get("interceptor.statsd.host"));
    }

    @Test
    public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() {
        StreamsConfig streamsConfig = new StreamsConfig(this.props);
        this.props.put(StreamsConfig.producerPrefix("interceptor.statsd.host"), "host");
        Assert.assertEquals("host", streamsConfig.getProducerConfigs("clientId").get("interceptor.statsd.host"));
    }

    @Test
    public void shouldSupportPrefixedProducerConfigs() {
        this.props.put(StreamsConfig.producerPrefix("buffer.memory"), 10);
        this.props.put(StreamsConfig.producerPrefix("metrics.num.samples"), 1);
        Map producerConfigs = new StreamsConfig(this.props).getProducerConfigs("clientId");
        Assert.assertEquals(10, producerConfigs.get("buffer.memory"));
        Assert.assertEquals(1, producerConfigs.get("metrics.num.samples"));
    }

    @Test
    public void shouldBeSupportNonPrefixedConsumerConfigs() {
        this.props.put("auto.offset.reset", "earliest");
        this.props.put("metrics.num.samples", 1);
        Map mainConsumerConfigs = new StreamsConfig(this.props).getMainConsumerConfigs("groupId", "clientId");
        Assert.assertEquals("earliest", mainConsumerConfigs.get("auto.offset.reset"));
        Assert.assertEquals(1, mainConsumerConfigs.get("metrics.num.samples"));
    }

    @Test
    public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() {
        this.props.put("metrics.num.samples", 1);
        Assert.assertEquals(1, new StreamsConfig(this.props).getRestoreConsumerConfigs("groupId").get("metrics.num.samples"));
    }

    @Test
    public void shouldSupportNonPrefixedProducerConfigs() {
        this.props.put("buffer.memory", 10);
        this.props.put("metrics.num.samples", 1);
        Map producerConfigs = new StreamsConfig(this.props).getProducerConfigs("clientId");
        Assert.assertEquals(10, producerConfigs.get("buffer.memory"));
        Assert.assertEquals(1, producerConfigs.get("metrics.num.samples"));
    }

    @Test
    public void shouldForwardCustomConfigsWithNoPrefixToAllClients() {
        StreamsConfig streamsConfig = new StreamsConfig(this.props);
        this.props.put("custom.property.host", "host");
        Map mainConsumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
        Map restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
        Map producerConfigs = streamsConfig.getProducerConfigs("clientId");
        Map adminConfigs = streamsConfig.getAdminConfigs("clientId");
        Assert.assertEquals("host", mainConsumerConfigs.get("custom.property.host"));
        Assert.assertEquals("host", restoreConsumerConfigs.get("custom.property.host"));
        Assert.assertEquals("host", producerConfigs.get("custom.property.host"));
        Assert.assertEquals("host", adminConfigs.get("custom.property.host"));
    }

    @Test
    public void shouldOverrideNonPrefixedCustomConfigsWithPrefixedConfigs() {
        StreamsConfig streamsConfig = new StreamsConfig(this.props);
        this.props.put("custom.property.host", "host0");
        this.props.put(StreamsConfig.consumerPrefix("custom.property.host"), "host1");
        this.props.put(StreamsConfig.producerPrefix("custom.property.host"), "host2");
        this.props.put(StreamsConfig.adminClientPrefix("custom.property.host"), "host3");
        Map mainConsumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
        Map restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
        Map producerConfigs = streamsConfig.getProducerConfigs("clientId");
        Map adminConfigs = streamsConfig.getAdminConfigs("clientId");
        Assert.assertEquals("host1", mainConsumerConfigs.get("custom.property.host"));
        Assert.assertEquals("host1", restoreConsumerConfigs.get("custom.property.host"));
        Assert.assertEquals("host2", producerConfigs.get("custom.property.host"));
        Assert.assertEquals("host3", adminConfigs.get("custom.property.host"));
    }

    @Test
    public void shouldSupportNonPrefixedAdminConfigs() {
        this.props.put("retries", 10);
        Assert.assertEquals(10, new StreamsConfig(this.props).getAdminConfigs("clientId").get("retries"));
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() {
        this.props.put("default.key.serde", MisconfiguredSerde.class);
        new StreamsConfig(this.props).defaultKeySerde();
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() {
        this.props.put("default.value.serde", MisconfiguredSerde.class);
        new StreamsConfig(this.props).defaultValueSerde();
    }

    @Test
    public void shouldOverrideStreamsDefaultConsumerConfigs() {
        this.props.put(StreamsConfig.consumerPrefix("auto.offset.reset"), "latest");
        this.props.put(StreamsConfig.consumerPrefix("max.poll.records"), "10");
        Map mainConsumerConfigs = new StreamsConfig(this.props).getMainConsumerConfigs("groupId", "clientId");
        Assert.assertEquals("latest", mainConsumerConfigs.get("auto.offset.reset"));
        Assert.assertEquals("10", mainConsumerConfigs.get("max.poll.records"));
    }

    @Test
    public void shouldOverrideStreamsDefaultProducerConfigs() {
        this.props.put(StreamsConfig.producerPrefix("linger.ms"), "10000");
        Assert.assertEquals("10000", new StreamsConfig(this.props).getProducerConfigs("clientId").get("linger.ms"));
    }

    @Test
    public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() {
        this.props.put(StreamsConfig.consumerPrefix("max.poll.records"), "10");
        Assert.assertEquals("10", new StreamsConfig(this.props).getRestoreConsumerConfigs("clientId").get("max.poll.records"));
    }

    @Test
    public void shouldResetToDefaultIfConsumerAutoCommitIsOverridden() {
        this.props.put(StreamsConfig.consumerPrefix("enable.auto.commit"), "true");
        Assert.assertEquals("false", new StreamsConfig(this.props).getMainConsumerConfigs("a", "b").get("enable.auto.commit"));
    }

    @Test
    public void shouldResetToDefaultIfRestoreConsumerAutoCommitIsOverridden() {
        this.props.put(StreamsConfig.consumerPrefix("enable.auto.commit"), "true");
        Assert.assertEquals("false", new StreamsConfig(this.props).getRestoreConsumerConfigs("client").get("enable.auto.commit"));
    }

    @Test
    public void testGetRestoreConsumerConfigsWithRestoreConsumerOverridenPrefix() {
        this.props.put(StreamsConfig.consumerPrefix("max.poll.records"), "5");
        this.props.put(StreamsConfig.restoreConsumerPrefix("max.poll.records"), "50");
        Assert.assertEquals("50", this.streamsConfig.getRestoreConsumerConfigs("clientId").get("max.poll.records"));
    }

    @Test
    public void testGetGlobalConsumerConfigs() {
        Map globalConsumerConfigs = this.streamsConfig.getGlobalConsumerConfigs("client");
        Assert.assertEquals(globalConsumerConfigs.get("client.id"), "client-global-consumer");
        Assert.assertNull(globalConsumerConfigs.get("group.id"));
    }

    @Test
    public void shouldSupportPrefixedGlobalConsumerConfigs() {
        this.props.put(StreamsConfig.consumerPrefix("metrics.num.samples"), 1);
        Assert.assertEquals(1, new StreamsConfig(this.props).getGlobalConsumerConfigs("clientId").get("metrics.num.samples"));
    }

    @Test
    public void shouldSupportPrefixedPropertiesThatAreNotPartOfGlobalConsumerConfig() {
        StreamsConfig streamsConfig = new StreamsConfig(this.props);
        this.props.put(StreamsConfig.consumerPrefix("interceptor.statsd.host"), "host");
        Assert.assertEquals("host", streamsConfig.getGlobalConsumerConfigs("clientId").get("interceptor.statsd.host"));
    }

    @Test
    public void shouldBeSupportNonPrefixedGlobalConsumerConfigs() {
        this.props.put("metrics.num.samples", 1);
        Assert.assertEquals(1, new StreamsConfig(this.props).getGlobalConsumerConfigs("groupId").get("metrics.num.samples"));
    }

    @Test
    public void shouldResetToDefaultIfGlobalConsumerAutoCommitIsOverridden() {
        this.props.put(StreamsConfig.consumerPrefix("enable.auto.commit"), "true");
        Assert.assertEquals("false", new StreamsConfig(this.props).getGlobalConsumerConfigs("client").get("enable.auto.commit"));
    }

    @Test
    public void testGetGlobalConsumerConfigsWithGlobalConsumerOverridenPrefix() {
        this.props.put(StreamsConfig.consumerPrefix("max.poll.records"), "5");
        this.props.put(StreamsConfig.globalConsumerPrefix("max.poll.records"), "50");
        Assert.assertEquals("50", this.streamsConfig.getGlobalConsumerConfigs("clientId").get("max.poll.records"));
    }

    @Test
    public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
        Assert.assertThat(new StreamsConfig(this.props).getMainConsumerConfigs("groupId", "clientId").get(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE), CoreMatchers.equalTo(false));
    }

    @Test
    public void shouldAcceptAtLeastOnce() {
        this.props.put("processing.guarantee", "at_least_once");
        new StreamsConfig(this.props);
    }

    @Test
    public void shouldAcceptExactlyOnce() {
        this.props.put("processing.guarantee", "exactly_once");
        new StreamsConfig(this.props);
    }

    @Test(expected = ConfigException.class)
    public void shouldThrowExceptionIfNotAtLestOnceOrExactlyOnce() {
        this.props.put("processing.guarantee", "bad_value");
        new StreamsConfig(this.props);
    }

    @Test
    public void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled() {
        this.props.put("processing.guarantee", "exactly_once");
        this.props.put("isolation.level", "anyValue");
        Assert.assertThat(new StreamsConfig(this.props).getMainConsumerConfigs("groupId", "clientId").get("isolation.level"), IsEqual.equalTo(IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
    }

    @Test
    public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
        this.props.put("isolation.level", IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
        Assert.assertThat(new StreamsConfig(this.props).getMainConsumerConfigs("groupId", "clientrId").get("isolation.level"), IsEqual.equalTo(IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
    }

    @Test
    public void shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosEnabled() {
        this.props.put("processing.guarantee", "exactly_once");
        this.props.put("enable.idempotence", "anyValue");
        Assert.assertTrue(((Boolean) new StreamsConfig(this.props).getProducerConfigs("clientId").get("enable.idempotence")).booleanValue());
    }

    @Test
    public void shouldAllowSettingProducerEnableIdempotenceIfEosDisabled() {
        this.props.put("enable.idempotence", false);
        Assert.assertThat(new StreamsConfig(this.props).getProducerConfigs("clientId").get("enable.idempotence"), IsEqual.equalTo(false));
    }

    @Test
    public void shouldSetDifferentDefaultsIfEosEnabled() {
        this.props.put("processing.guarantee", "exactly_once");
        StreamsConfig streamsConfig = new StreamsConfig(this.props);
        Map mainConsumerConfigs = streamsConfig.getMainConsumerConfigs("groupId", "clientId");
        Map producerConfigs = streamsConfig.getProducerConfigs("clientId");
        Assert.assertThat(mainConsumerConfigs.get("isolation.level"), IsEqual.equalTo(IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
        Assert.assertTrue(((Boolean) producerConfigs.get("enable.idempotence")).booleanValue());
        Assert.assertThat(producerConfigs.get("delivery.timeout.ms"), IsEqual.equalTo(Integer.MAX_VALUE));
        Assert.assertThat(streamsConfig.getLong("commit.interval.ms"), IsEqual.equalTo(100L));
    }

    @Test
    public void shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled() {
        this.props.put("processing.guarantee", "exactly_once");
        this.props.put("retries", 42);
        Assert.assertThat(new StreamsConfig(this.props).getProducerConfigs("clientId").get("retries"), IsEqual.equalTo(42));
    }

    @Test
    public void shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled() {
        this.props.put("processing.guarantee", "exactly_once");
        this.props.put("commit.interval.ms", 73L);
        Assert.assertThat(new StreamsConfig(this.props).getLong("commit.interval.ms"), IsEqual.equalTo(73L));
    }

    @Test
    public void shouldUseNewConfigsWhenPresent() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("default.key.serde", Serdes.Long().getClass());
        streamsConfig.put("default.value.serde", Serdes.Long().getClass());
        streamsConfig.put("default.timestamp.extractor", MockTimestampExtractor.class);
        StreamsConfig streamsConfig2 = new StreamsConfig(streamsConfig);
        Assert.assertTrue(streamsConfig2.defaultKeySerde() instanceof Serdes.LongSerde);
        Assert.assertTrue(streamsConfig2.defaultValueSerde() instanceof Serdes.LongSerde);
        Assert.assertTrue(streamsConfig2.defaultTimestampExtractor() instanceof MockTimestampExtractor);
    }

    @Test
    public void shouldUseCorrectDefaultsWhenNoneSpecified() {
        StreamsConfig streamsConfig = new StreamsConfig(StreamsTestUtils.getStreamsConfig());
        Assert.assertTrue(streamsConfig.defaultKeySerde() instanceof Serdes.ByteArraySerde);
        Assert.assertTrue(streamsConfig.defaultValueSerde() instanceof Serdes.ByteArraySerde);
        Assert.assertTrue(streamsConfig.defaultTimestampExtractor() instanceof FailOnInvalidTimestamp);
    }

    @Test
    public void shouldSpecifyCorrectKeySerdeClassOnError() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("default.key.serde", MisconfiguredSerde.class);
        try {
            new StreamsConfig(streamsConfig).defaultKeySerde();
            Assert.fail("Test should throw a StreamsException");
        } catch (StreamsException e) {
            Assert.assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
        }
    }

    @Test
    public void shouldSpecifyCorrectValueSerdeClassOnError() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("default.value.serde", MisconfiguredSerde.class);
        try {
            new StreamsConfig(streamsConfig).defaultValueSerde();
            Assert.fail("Test should throw a StreamsException");
        } catch (StreamsException e) {
            Assert.assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
        }
    }

    @Test
    public void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnabled() {
        this.props.put("processing.guarantee", "exactly_once");
        this.props.put("max.in.flight.requests.per.connection", 7);
        try {
            new StreamsConfig(this.props).getProducerConfigs("clientId");
            Assert.fail("Should throw ConfigException when ESO is enabled and maxInFlight requests exceeds 5");
        } catch (ConfigException e) {
            Assert.assertEquals("Invalid value 7 for configuration max.in.flight.requests.per.connection: Can't exceed 5 when exactly-once processing is enabled", e.getMessage());
        }
    }

    @Test
    public void shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosEnabled() {
        this.props.put("processing.guarantee", "exactly_once");
        this.props.put("max.in.flight.requests.per.connection", "3");
        new StreamsConfig(this.props).getProducerConfigs("clientId");
    }

    @Test
    public void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled() {
        this.props.put("processing.guarantee", "exactly_once");
        this.props.put("max.in.flight.requests.per.connection", "not-a-number");
        try {
            new StreamsConfig(this.props).getProducerConfigs("clientId");
            Assert.fail("Should throw ConfigException when EOS is enabled and maxInFlight cannot be paresed into an integer");
        } catch (ConfigException e) {
            Assert.assertEquals("Invalid value not-a-number for configuration max.in.flight.requests.per.connection: String value could not be parsed as 32-bit integer", e.getMessage());
        }
    }

    @Test
    public void shouldSpecifyNoOptimizationWhenNotExplicitlyAddedToConfigs() {
        Assert.assertEquals("Optimization should be \"none\"", "none", this.streamsConfig.getString("topology.optimization"));
    }

    @Test
    public void shouldSpecifyOptimizationWhenNotExplicitlyAddedToConfigs() {
        this.props.put("topology.optimization", "all");
        Assert.assertEquals("Optimization should be \"all\"", "all", new StreamsConfig(this.props).getString("topology.optimization"));
    }

    @Test(expected = ConfigException.class)
    public void shouldThrowConfigExceptionWhenOptimizationConfigNotValueInRange() {
        this.props.put("topology.optimization", "maybe");
        new StreamsConfig(this.props);
    }
}
