/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class KTableKTableJoinIntegrationTest {
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final MockTime MOCK_TIME = KTableKTableJoinIntegrationTest.CLUSTER.time;
    private static final String TABLE_1 = "table1";
    private static final String TABLE_2 = "table2";
    private static final String TABLE_3 = "table3";
    private static final String OUTPUT = "output-";
    private static Properties streamsConfig;
    private KafkaStreams streams;
    private static final Properties CONSUMER_CONFIG;
    @Parameterized.Parameter(value=0)
    public JoinType joinType1;
    @Parameterized.Parameter(value=1)
    public JoinType joinType2;
    @Parameterized.Parameter(value=2)
    public List<KeyValue<String, String>> expectedResult;

    @Parameterized.Parameters
    public static Object[] parameters() {
        return new Object[][]{{JoinType.INNER, JoinType.INNER, Arrays.asList(new KeyValue((Object)"b", (Object)"B1-B2-B3"))}, {JoinType.INNER, JoinType.LEFT, Arrays.asList(new KeyValue((Object)"b", (Object)"B1-B2-B3"))}, {JoinType.INNER, JoinType.OUTER, Arrays.asList(new KeyValue((Object)"a", (Object)"null-A3"), new KeyValue((Object)"b", (Object)"null-B3"), new KeyValue((Object)"c", (Object)"null-C3"), new KeyValue((Object)"b", (Object)"B1-B2-B3"))}, {JoinType.LEFT, JoinType.INNER, Arrays.asList(new KeyValue((Object)"a", (Object)"A1-null-A3"), new KeyValue((Object)"b", (Object)"B1-null-B3"), new KeyValue((Object)"b", (Object)"B1-B2-B3"))}, {JoinType.LEFT, JoinType.LEFT, Arrays.asList(new KeyValue((Object)"a", (Object)"A1-null-A3"), new KeyValue((Object)"b", (Object)"B1-null-B3"), new KeyValue((Object)"b", (Object)"B1-B2-B3"))}, {JoinType.LEFT, JoinType.OUTER, Arrays.asList(new KeyValue((Object)"a", (Object)"null-A3"), new KeyValue((Object)"b", (Object)"null-B3"), new KeyValue((Object)"c", (Object)"null-C3"), new KeyValue((Object)"a", (Object)"A1-null-A3"), new KeyValue((Object)"b", (Object)"B1-null-B3"), new KeyValue((Object)"b", (Object)"B1-B2-B3"))}, {JoinType.OUTER, JoinType.INNER, Arrays.asList(new KeyValue((Object)"a", (Object)"A1-null-A3"), new KeyValue((Object)"b", (Object)"B1-null-B3"), new KeyValue((Object)"b", (Object)"B1-B2-B3"), new KeyValue((Object)"c", (Object)"null-C2-C3"))}, {JoinType.OUTER, JoinType.LEFT, Arrays.asList(new KeyValue((Object)"a", (Object)"A1-null-A3"), new KeyValue((Object)"b", (Object)"B1-null-B3"), new KeyValue((Object)"b", (Object)"B1-B2-B3"), new KeyValue((Object)"c", (Object)"null-C2-C3"))}, {JoinType.OUTER, JoinType.OUTER, Arrays.asList(new KeyValue((Object)"a", (Object)"null-A3"), new KeyValue((Object)"b", (Object)"null-B3"), new KeyValue((Object)"c", (Object)"null-C3"), new KeyValue((Object)"a", (Object)"A1-null-A3"), new KeyValue((Object)"b", (Object)"B1-null-B3"), new KeyValue((Object)"b", (Object)"B1-B2-B3"), new KeyValue((Object)"c", (Object)"null-C2-C3"))}};
    }

    @BeforeClass
    public static void beforeTest() throws Exception {
        CLUSTER.createTopic(TABLE_1);
        CLUSTER.createTopic(TABLE_2);
        CLUSTER.createTopic(TABLE_3);
        CLUSTER.createTopic(OUTPUT);
        streamsConfig = new Properties();
        streamsConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfig.put("key.serde", Serdes.String().getClass().getName());
        streamsConfig.put("value.serde", Serdes.String().getClass().getName());
        streamsConfig.put("auto.offset.reset", "earliest");
        streamsConfig.put("state.dir", TestUtils.tempDirectory().getPath());
        streamsConfig.put("cache.max.bytes.buffering", (Object)0);
        Properties producerConfig = new Properties();
        producerConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        producerConfig.put("acks", "all");
        producerConfig.put("retries", (Object)0);
        producerConfig.put("key.serializer", StringSerializer.class);
        producerConfig.put("value.serializer", StringSerializer.class);
        List table1 = Arrays.asList(new KeyValue((Object)"a", (Object)"A1"), new KeyValue((Object)"b", (Object)"B1"));
        List table2 = Arrays.asList(new KeyValue((Object)"b", (Object)"B2"), new KeyValue((Object)"c", (Object)"C2"));
        List table3 = Arrays.asList(new KeyValue((Object)"a", (Object)"A3"), new KeyValue((Object)"b", (Object)"B3"), new KeyValue((Object)"c", (Object)"C3"));
        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_3, table3, producerConfig, (Time)MOCK_TIME);
        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, producerConfig, (Time)MOCK_TIME);
        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, producerConfig, (Time)MOCK_TIME);
        CONSUMER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        CONSUMER_CONFIG.put("group.id", "ktable-ktable-consumer");
        CONSUMER_CONFIG.put("key.deserializer", StringDeserializer.class);
        CONSUMER_CONFIG.put("value.deserializer", StringDeserializer.class);
    }

    @Before
    public void before() throws Exception {
        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
    }

    @After
    public void after() throws Exception {
        if (this.streams != null) {
            this.streams.close();
            this.streams = null;
        }
        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
    }

    private KafkaStreams prepareTopology() {
        KStreamBuilder builder = new KStreamBuilder();
        KTable table1 = builder.table(TABLE_1, TABLE_1);
        KTable table2 = builder.table(TABLE_2, TABLE_2);
        KTable table3 = builder.table(TABLE_3, TABLE_3);
        this.join(this.join((KTable<String, String>)table1, (KTable<String, String>)table2, this.joinType1), (KTable<String, String>)table3, this.joinType2).to(OUTPUT);
        return new KafkaStreams((TopologyBuilder)builder, new StreamsConfig((Map)streamsConfig));
    }

    private KTable<String, String> join(KTable<String, String> first, KTable<String, String> second, JoinType joinType) {
        ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>(){

            public String apply(String value1, String value2) {
                return value1 + "-" + value2;
            }
        };
        switch (joinType) {
            case INNER: {
                return first.join(second, (ValueJoiner)joiner);
            }
            case LEFT: {
                return first.leftJoin(second, (ValueJoiner)joiner);
            }
            case OUTER: {
                return first.outerJoin(second, (ValueJoiner)joiner);
            }
        }
        throw new RuntimeException("Unknown join type.");
    }

    @Test
    public void KTableKTableJoin() throws Exception {
        streamsConfig.put("application.id", (Object)((Object)this.joinType1) + "-" + (Object)((Object)this.joinType2) + "-ktable-ktable-join");
        this.streams = this.prepareTopology();
        this.streams.start();
        List result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(CONSUMER_CONFIG, OUTPUT, this.expectedResult.size());
        Assert.assertThat(result, (Matcher)CoreMatchers.equalTo(this.expectedResult));
    }

    static {
        CONSUMER_CONFIG = new Properties();
    }

    private static enum JoinType {
        INNER,
        LEFT,
        OUTER;

    }
}

