/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

public class KStreamsFineGrainedAutoResetIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;
    private static final String TOPIC_1 = "topic-1";
    private static final String TOPIC_2 = "topic-2";
    private static final String TOPIC_A = "topic-A";
    private static final String TOPIC_C = "topic-C";
    private static final String TOPIC_Y = "topic-Y";
    private static final String TOPIC_Z = "topic-Z";
    private static final String NOOP = "noop";
    private final Serde<String> stringSerde;
    private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
    private Properties streamsConfiguration;

    public KStreamsFineGrainedAutoResetIntegrationTest() {
        this.mockTime = KStreamsFineGrainedAutoResetIntegrationTest.CLUSTER.time;
        this.stringSerde = Serdes.String();
    }

    @BeforeClass
    public static void startKafkaCluster() throws Exception {
        CLUSTER.createTopic(TOPIC_1);
        CLUSTER.createTopic(TOPIC_2);
        CLUSTER.createTopic(TOPIC_A);
        CLUSTER.createTopic(TOPIC_C);
        CLUSTER.createTopic(TOPIC_Y);
        CLUSTER.createTopic(TOPIC_Z);
        CLUSTER.createTopic(NOOP);
        CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
    }

    @Before
    public void setUp() throws Exception {
        Properties props = new Properties();
        props.put("auto.offset.reset", "earliest");
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig("testAutoOffsetId", CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, STRING_SERDE_CLASSNAME, props);
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldOnlyReadRecordsWhereEarliestSpecified() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        KStream pattern1Stream = builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d"));
        KStream pattern2Stream = builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]"));
        KStream namedTopicsStream = builder.stream(new String[]{TOPIC_Y, TOPIC_Z});
        pattern1Stream.to(this.stringSerde, this.stringSerde, DEFAULT_OUTPUT_TOPIC);
        pattern2Stream.to(this.stringSerde, this.stringSerde, DEFAULT_OUTPUT_TOPIC);
        namedTopicsStream.to(this.stringSerde, this.stringSerde, DEFAULT_OUTPUT_TOPIC);
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        String topic1TestMessage = "topic-1 test";
        String topic2TestMessage = "topic-2 test";
        String topicATestMessage = "topic-A test";
        String topicCTestMessage = "topic-C test";
        String topicYTestMessage = "topic-Y test";
        String topicZTestMessage = "topic-Z test";
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Collections.singletonList("topic-1 test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Collections.singletonList("topic-2 test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Collections.singletonList("topic-A test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Collections.singletonList("topic-C test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Collections.singletonList("topic-Y test"), producerConfig, (Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Collections.singletonList("topic-Z test"), producerConfig, (Time)this.mockTime);
        Properties consumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, this.streamsConfiguration);
        streams.start();
        List<String> expectedReceivedValues = Arrays.asList("topic-1 test", "topic-2 test", "topic-Y test", "topic-Z test");
        List receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 4);
        ArrayList<Object> actualValues = new ArrayList<Object>(4);
        for (KeyValue receivedKeyValue : receivedKeyValues) {
            actualValues.add(receivedKeyValue.value);
        }
        streams.close();
        Collections.sort(actualValues);
        Collections.sort(expectedReceivedValues);
        Assert.assertThat(actualValues, (Matcher)CoreMatchers.equalTo(expectedReceivedValues));
    }

    @Test(expected=TopologyBuilderException.class)
    public void shouldThrowExceptionOverlappingPattern() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        KStream pattern1Stream = builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]"));
        KStream pattern2Stream = builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]"));
        KStream namedTopicsStream = builder.stream(new String[]{TOPIC_Y, TOPIC_Z});
        builder.earliestResetTopicsPattern();
    }

    @Test(expected=TopologyBuilderException.class)
    public void shouldThrowExceptionOverlappingTopic() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        KStream pattern1Stream = builder.stream(TopologyBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]"));
        KStream pattern2Stream = builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-\\d]"));
        KStream namedTopicsStream = builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, new String[]{TOPIC_A, TOPIC_Z});
        builder.latestResetTopicsPattern();
    }

    @Test
    public void shouldThrowStreamsExceptionNoResetSpecified() throws Exception {
        Properties props = new Properties();
        props.put("auto.offset.reset", "none");
        Properties localConfig = StreamsTestUtils.getStreamsConfig("testAutoOffsetWithNone", CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, STRING_SERDE_CLASSNAME, props);
        KStreamBuilder builder = new KStreamBuilder();
        KStream exceptionStream = builder.stream(new String[]{NOOP});
        exceptionStream.to(this.stringSerde, this.stringSerde, DEFAULT_OUTPUT_TOPIC);
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, localConfig);
        final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler();
        TestCondition correctExceptionThrownCondition = new TestCondition(){

            public boolean conditionMet() {
                return uncaughtExceptionHandler.correctExceptionThrown;
            }
        };
        streams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler)uncaughtExceptionHandler);
        streams.start();
        TestUtils.waitForCondition((TestCondition)correctExceptionThrownCondition, (String)"The expected NoOffsetForPartitionException was never thrown");
        streams.close();
    }

    private static final class TestingUncaughtExceptionHandler
    implements Thread.UncaughtExceptionHandler {
        boolean correctExceptionThrown = false;

        private TestingUncaughtExceptionHandler() {
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            Assert.assertThat((Object)e.getClass().getSimpleName(), (Matcher)CoreMatchers.is((Object)"StreamsException"));
            Assert.assertThat((Object)e.getCause().getClass().getSimpleName(), (Matcher)CoreMatchers.is((Object)"NoOffsetForPartitionException"));
            this.correctExceptionThrown = true;
        }
    }
}

