/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KStreamBuilderTest {
    private static final String APP_ID = "app-id";
    private final KStreamBuilder builder = new KStreamBuilder();
    private KStreamTestDriver driver = null;

    @Before
    public void setUp() {
        this.builder.setApplicationId(APP_ID);
    }

    @After
    public void cleanup() {
        if (this.driver != null) {
            this.driver.close();
        }
        this.driver = null;
    }

    @Test(expected=TopologyBuilderException.class)
    public void testFrom() {
        this.builder.stream(new String[]{"topic-1", "topic-2"});
        this.builder.addSource("KSTREAM-SOURCE-0000000000", new String[]{"topic-3"});
    }

    @Test
    public void testNewName() {
        Assert.assertEquals((Object)"X-0000000000", (Object)this.builder.newName("X-"));
        Assert.assertEquals((Object)"Y-0000000001", (Object)this.builder.newName("Y-"));
        Assert.assertEquals((Object)"Z-0000000002", (Object)this.builder.newName("Z-"));
        KStreamBuilder newBuilder = new KStreamBuilder();
        Assert.assertEquals((Object)"X-0000000000", (Object)newBuilder.newName("X-"));
        Assert.assertEquals((Object)"Y-0000000001", (Object)newBuilder.newName("Y-"));
        Assert.assertEquals((Object)"Z-0000000002", (Object)newBuilder.newName("Z-"));
    }

    @Test
    public void testMerge() {
        String topic1 = "topic-1";
        String topic2 = "topic-2";
        KStream source1 = this.builder.stream(new String[]{topic1});
        KStream source2 = this.builder.stream(new String[]{topic2});
        KStream merged = this.builder.merge(new KStream[]{source1, source2});
        MockProcessorSupplier processorSupplier = new MockProcessorSupplier();
        merged.process(processorSupplier, new String[0]);
        this.driver = new KStreamTestDriver(this.builder);
        this.driver.setTime(0L);
        this.driver.process(topic1, "A", "aa");
        this.driver.process(topic2, "B", "bb");
        this.driver.process(topic2, "C", "cc");
        this.driver.process(topic1, "D", "dd");
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"A:aa", "B:bb", "C:cc", "D:dd"}), processorSupplier.processed);
    }

    @Test
    public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() throws Exception {
        String topic1 = "topic-1";
        String topic2 = "topic-2";
        String topic3 = "topic-3";
        KStream source1 = this.builder.stream(new String[]{"topic-1"});
        KStream source2 = this.builder.stream(new String[]{"topic-2"});
        KStream source3 = this.builder.stream(new String[]{"topic-3"});
        KStream processedSource1 = source1.mapValues((ValueMapper)new ValueMapper<String, String>(){

            public String apply(String value) {
                return value;
            }
        }).filter((Predicate)new Predicate<String, String>(){

            public boolean test(String key, String value) {
                return true;
            }
        });
        KStream processedSource2 = source2.filter((Predicate)new Predicate<String, String>(){

            public boolean test(String key, String value) {
                return true;
            }
        });
        KStream merged = this.builder.merge(new KStream[]{processedSource1, processedSource2, source3});
        merged.groupByKey().count("my-table");
        Map actual = this.builder.stateStoreNameToSourceTopics();
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"topic-1", "topic-2", "topic-3"}), actual.get("my-table"));
    }

    @Test(expected=TopologyBuilderException.class)
    public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
        this.builder.stream(new String[0]);
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception {
        this.builder.stream(Serdes.String(), Serdes.String(), new String[]{null, null});
    }

    @Test
    public void shouldNotMaterializeSourceKTableIfStateNameNotSpecified() throws Exception {
        this.builder.table("topic1", "table1");
        this.builder.table("topic2", null);
        ProcessorTopology topology = this.builder.build(null);
        Assert.assertEquals((long)1L, (long)topology.stateStores().size());
        Assert.assertEquals((Object)"table1", (Object)((StateStore)topology.stateStores().get(0)).name());
        Assert.assertEquals((long)1L, (long)topology.storeToChangelogTopic().size());
        Assert.assertEquals((Object)"topic1", topology.storeToChangelogTopic().get("table1"));
    }

    @Test
    public void shouldBuildSimpleGlobalTableTopology() throws Exception {
        this.builder.globalTable("table", "globalTable");
        ProcessorTopology topology = this.builder.buildGlobalStateTopology();
        List stateStores = topology.globalStateStores();
        Assert.assertEquals((long)1L, (long)stateStores.size());
        Assert.assertEquals((Object)"globalTable", (Object)((StateStore)stateStores.get(0)).name());
    }

    @Test
    public void shouldBuildGlobalTopologyWithAllGlobalTables() throws Exception {
        this.builder.globalTable("table", "globalTable");
        this.builder.globalTable("table2", "globalTable2");
        ProcessorTopology topology = this.builder.buildGlobalStateTopology();
        List stateStores = topology.globalStateStores();
        Set sourceTopics = topology.sourceTopics();
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"table", "table2"}), (Object)sourceTopics);
        Assert.assertEquals((long)2L, (long)stateStores.size());
    }

    @Test
    public void shouldAddGlobalTablesToEachGroup() throws Exception {
        String one = "globalTable";
        String two = "globalTable2";
        GlobalKTable globalTable = this.builder.globalTable("table", "globalTable");
        GlobalKTable globalTable2 = this.builder.globalTable("table2", "globalTable2");
        this.builder.table("not-global", "not-global");
        KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>(){

            public String apply(String key, String value) {
                return value;
            }
        };
        KStream stream = this.builder.stream(new String[]{"t1"});
        stream.leftJoin(globalTable, (KeyValueMapper)kvMapper, MockValueJoiner.TOSTRING_JOINER);
        KStream stream2 = this.builder.stream(new String[]{"t2"});
        stream2.leftJoin(globalTable2, (KeyValueMapper)kvMapper, MockValueJoiner.TOSTRING_JOINER);
        Map nodeGroups = this.builder.nodeGroups();
        for (Integer groupId : nodeGroups.keySet()) {
            ProcessorTopology topology = this.builder.build(groupId);
            List stateStores = topology.globalStateStores();
            HashSet<String> names = new HashSet<String>();
            for (StateStore stateStore : stateStores) {
                names.add(stateStore.name());
            }
            Assert.assertEquals((long)2L, (long)stateStores.size());
            Assert.assertTrue((boolean)names.contains("globalTable"));
            Assert.assertTrue((boolean)names.contains("globalTable2"));
        }
    }

    @Test
    public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
        KStream playEvents = this.builder.stream(new String[]{"events"});
        KTable table = this.builder.table("table-topic", "table-store");
        Assert.assertEquals(Collections.singletonList("table-topic"), this.builder.stateStoreNameToSourceTopics().get("table-store"));
        KStream mapped = playEvents.map(MockKeyValueMapper.SelectValueKeyValueMapper());
        mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count");
        Assert.assertEquals(Collections.singletonList("table-topic"), this.builder.stateStoreNameToSourceTopics().get("table-store"));
        Assert.assertEquals(Collections.singletonList("app-id-KSTREAM-MAP-0000000003-repartition"), this.builder.stateStoreNameToSourceTopics().get("count"));
    }
}

