/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class GlobalKTableJoinsTest {
    private final StreamsBuilder builder = new StreamsBuilder();
    private final Map<String, String> results = new HashMap<String, String>();
    private final String streamTopic = "stream";
    private final String globalTopic = "global";
    private GlobalKTable<String, String> global;
    private KStream<String, String> stream;
    private KeyValueMapper<String, String, String> keyValueMapper;
    private ForeachAction<String, String> action;

    @Before
    public void setUp() {
        Consumed consumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String());
        this.global = this.builder.globalTable("global", consumed);
        this.stream = this.builder.stream("stream", consumed);
        this.keyValueMapper = new KeyValueMapper<String, String, String>(){

            public String apply(String key, String value) {
                return value;
            }
        };
        this.action = new ForeachAction<String, String>(){

            public void apply(String key, String value) {
                GlobalKTableJoinsTest.this.results.put(key, value);
            }
        };
    }

    @Test
    public void shouldLeftJoinWithStream() {
        this.stream.leftJoin(this.global, this.keyValueMapper, MockValueJoiner.TOSTRING_JOINER).foreach(this.action);
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("1", "a+A");
        expected.put("2", "b+B");
        expected.put("3", "c+null");
        this.verifyJoin(expected);
    }

    @Test
    public void shouldInnerJoinWithStream() {
        this.stream.join(this.global, this.keyValueMapper, MockValueJoiner.TOSTRING_JOINER).foreach(this.action);
        HashMap<String, String> expected = new HashMap<String, String>();
        expected.put("1", "a+A");
        expected.put("2", "b+B");
        this.verifyJoin(expected);
    }

    private void verifyJoin(Map<String, String> expected) {
        ConsumerRecordFactory recordFactory = new ConsumerRecordFactory((Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), props);){
            driver.pipeInput(recordFactory.create("global", (Object)"a", (Object)"A"));
            driver.pipeInput(recordFactory.create("global", (Object)"b", (Object)"B"));
            driver.pipeInput(recordFactory.create("stream", (Object)"1", (Object)"a"));
            driver.pipeInput(recordFactory.create("stream", (Object)"2", (Object)"b"));
            driver.pipeInput(recordFactory.create("stream", (Object)"3", (Object)"c"));
        }
        Assert.assertEquals(expected, this.results);
    }
}

