/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import scala.collection.JavaConversions;
import scala.collection.Seq;

public class JoinIntegrationTest {
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static ZkUtils zkUtils = null;
    private static final String APP_ID = "join-integration-test";
    private static final String INPUT_TOPIC_1 = "inputTopicLeft";
    private static final String INPUT_TOPIC_2 = "inputTopicRight";
    private static final String OUTPUT_TOPIC = "outputTopic";
    private static final Properties PRODUCER_CONFIG = new Properties();
    private static final Properties RESULT_CONSUMER_CONFIG = new Properties();
    private static final Properties STREAMS_CONFIG = new Properties();
    private KStreamBuilder builder;
    private KStream<Long, String> leftStream;
    private KStream<Long, String> rightStream;
    private KTable<Long, String> leftTable;
    private KTable<Long, String> rightTable;
    private final List<Input<String>> input = Arrays.asList(new Input<String>("inputTopicLeft", null), new Input<String>("inputTopicRight", null), new Input<String>("inputTopicLeft", "A"), new Input<String>("inputTopicRight", "a"), new Input<String>("inputTopicLeft", "B"), new Input<String>("inputTopicRight", "b"), new Input<String>("inputTopicLeft", null), new Input<String>("inputTopicRight", null), new Input<String>("inputTopicLeft", "C"), new Input<String>("inputTopicRight", "c"), new Input<String>("inputTopicRight", null), new Input<String>("inputTopicLeft", null), new Input<String>("inputTopicRight", null), new Input<String>("inputTopicRight", "d"), new Input<String>("inputTopicLeft", "D"));
    private final ValueJoiner<String, String, String> valueJoiner = new ValueJoiner<String, String, String>(){

        public String apply(String value1, String value2) {
            return value1 + "-" + value2;
        }
    };
    private final TestCondition topicsGotDeleted = new TopicsGotDeletedCondition();

    @BeforeClass
    public static void setupConfigsAndUtils() throws Exception {
        PRODUCER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG.put("acks", "all");
        PRODUCER_CONFIG.put("retries", (Object)0);
        PRODUCER_CONFIG.put("key.serializer", LongSerializer.class);
        PRODUCER_CONFIG.put("value.serializer", StringSerializer.class);
        RESULT_CONSUMER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        RESULT_CONSUMER_CONFIG.put("group.id", "join-integration-test-result-consumer");
        RESULT_CONSUMER_CONFIG.put("auto.offset.reset", "earliest");
        RESULT_CONSUMER_CONFIG.put("key.deserializer", LongDeserializer.class);
        RESULT_CONSUMER_CONFIG.put("value.deserializer", StringDeserializer.class);
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        STREAMS_CONFIG.put("auto.offset.reset", "earliest");
        STREAMS_CONFIG.put("state.dir", TestUtils.tempDirectory().getPath());
        STREAMS_CONFIG.put("key.serde", Serdes.Long().getClass());
        STREAMS_CONFIG.put("value.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("cache.max.bytes.buffering", (Object)0);
        zkUtils = ZkUtils.apply((String)CLUSTER.zKConnectString(), (int)30000, (int)30000, (boolean)JaasUtils.isZkSecurityEnabled());
    }

    @AfterClass
    public static void release() {
        if (zkUtils != null) {
            zkUtils.close();
        }
    }

    @Before
    public void prepareTopology() throws Exception {
        CLUSTER.createTopic(INPUT_TOPIC_1);
        CLUSTER.createTopic(INPUT_TOPIC_2);
        CLUSTER.createTopic(OUTPUT_TOPIC);
        this.builder = new KStreamBuilder();
        this.leftTable = this.builder.table(INPUT_TOPIC_1, "leftTable");
        this.rightTable = this.builder.table(INPUT_TOPIC_2, "rightTable");
        this.leftStream = this.leftTable.toStream();
        this.rightStream = this.rightTable.toStream();
    }

    @After
    public void cleanup() throws Exception {
        CLUSTER.deleteTopic(INPUT_TOPIC_1);
        CLUSTER.deleteTopic(INPUT_TOPIC_2);
        CLUSTER.deleteTopic(OUTPUT_TOPIC);
        TestUtils.waitForCondition((TestCondition)this.topicsGotDeleted, (long)120000L, (String)"Topics not deleted after 120 seconds.");
    }

    private void checkResult(String outputTopic, List<String> expectedResult) throws Exception {
        if (expectedResult != null) {
            List result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), 30000L);
            MatcherAssert.assertThat(result, (Matcher)Is.is(expectedResult));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runTest(List<List<String>> expectedResult) throws Exception {
        assert (expectedResult.size() == this.input.size());
        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
        try (KafkaStreams streams = new KafkaStreams((TopologyBuilder)this.builder, STREAMS_CONFIG);){
            streams.start();
            long ts = System.currentTimeMillis();
            Iterator<List<String>> resultIterator = expectedResult.iterator();
            for (Input<String> singleInput : this.input) {
                IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(singleInput.topic, Collections.singleton(singleInput.record), PRODUCER_CONFIG, ++ts);
                this.checkResult(OUTPUT_TOPIC, resultIterator.next());
            }
        }
    }

    @Test
    public void testInnerKStreamKStream() throws Exception {
        STREAMS_CONFIG.put("application.id", "join-integration-test-inner-KStream-KStream");
        List<List<String>> expectedResult = Arrays.asList(null, null, null, Collections.singletonList("A-a"), Collections.singletonList("B-a"), Arrays.asList("A-b", "B-b"), null, null, Arrays.asList("C-a", "C-b"), Arrays.asList("A-c", "B-c", "C-c"), null, null, null, Arrays.asList("A-d", "B-d", "C-d"), Arrays.asList("D-a", "D-b", "D-c", "D-d"));
        this.leftStream.join(this.rightStream, this.valueJoiner, JoinWindows.of((long)10000L)).to(OUTPUT_TOPIC);
        this.runTest(expectedResult);
    }

    @Test
    public void testLeftKStreamKStream() throws Exception {
        STREAMS_CONFIG.put("application.id", "join-integration-test-left-KStream-KStream");
        List<List<String>> expectedResult = Arrays.asList(null, null, Collections.singletonList("A-null"), Collections.singletonList("A-a"), Collections.singletonList("B-a"), Arrays.asList("A-b", "B-b"), null, null, Arrays.asList("C-a", "C-b"), Arrays.asList("A-c", "B-c", "C-c"), null, null, null, Arrays.asList("A-d", "B-d", "C-d"), Arrays.asList("D-a", "D-b", "D-c", "D-d"));
        this.leftStream.leftJoin(this.rightStream, this.valueJoiner, JoinWindows.of((long)10000L)).to(OUTPUT_TOPIC);
        this.runTest(expectedResult);
    }

    @Test
    public void testOuterKStreamKStream() throws Exception {
        STREAMS_CONFIG.put("application.id", "join-integration-test-outer-KStream-KStream");
        List<List<String>> expectedResult = Arrays.asList(null, null, Collections.singletonList("A-null"), Collections.singletonList("A-a"), Collections.singletonList("B-a"), Arrays.asList("A-b", "B-b"), null, null, Arrays.asList("C-a", "C-b"), Arrays.asList("A-c", "B-c", "C-c"), null, null, null, Arrays.asList("A-d", "B-d", "C-d"), Arrays.asList("D-a", "D-b", "D-c", "D-d"));
        this.leftStream.outerJoin(this.rightStream, this.valueJoiner, JoinWindows.of((long)10000L)).to(OUTPUT_TOPIC);
        this.runTest(expectedResult);
    }

    @Test
    public void testInnerKStreamKTable() throws Exception {
        STREAMS_CONFIG.put("application.id", "join-integration-test-inner-KStream-KTable");
        List<List<String>> expectedResult = Arrays.asList(null, null, null, null, Collections.singletonList("B-a"), null, null, null, null, null, null, null, null, null, Collections.singletonList("D-d"));
        this.leftStream.join(this.rightTable, this.valueJoiner).to(OUTPUT_TOPIC);
        this.runTest(expectedResult);
    }

    @Test
    public void testLeftKStreamKTable() throws Exception {
        STREAMS_CONFIG.put("application.id", "join-integration-test-left-KStream-KTable");
        List<List<String>> expectedResult = Arrays.asList(null, null, Collections.singletonList("A-null"), null, Collections.singletonList("B-a"), null, null, null, Collections.singletonList("C-null"), null, null, null, null, null, Collections.singletonList("D-d"));
        this.leftStream.leftJoin(this.rightTable, this.valueJoiner).to(OUTPUT_TOPIC);
        this.runTest(expectedResult);
    }

    @Test
    public void testInnerKTableKTable() throws Exception {
        STREAMS_CONFIG.put("application.id", "join-integration-test-inner-KTable-KTable");
        List<List<String>> expectedResult = Arrays.asList(null, null, null, Collections.singletonList("A-a"), Collections.singletonList("B-a"), Collections.singletonList("B-b"), Collections.singletonList(null), null, null, Collections.singletonList("C-c"), Collections.singletonList(null), null, null, null, Collections.singletonList("D-d"));
        this.leftTable.join(this.rightTable, this.valueJoiner).to(OUTPUT_TOPIC);
        this.runTest(expectedResult);
    }

    @Test
    public void testLeftKTableKTable() throws Exception {
        STREAMS_CONFIG.put("application.id", "join-integration-test-left-KTable-KTable");
        List<List<String>> expectedResult = Arrays.asList(null, null, Collections.singletonList("A-null"), Collections.singletonList("A-a"), Collections.singletonList("B-a"), Collections.singletonList("B-b"), Collections.singletonList(null), null, Collections.singletonList("C-null"), Collections.singletonList("C-c"), Collections.singletonList("C-null"), Collections.singletonList(null), null, null, Collections.singletonList("D-d"));
        this.leftTable.leftJoin(this.rightTable, this.valueJoiner).to(OUTPUT_TOPIC);
        this.runTest(expectedResult);
    }

    @Test
    public void testOuterKTableKTable() throws Exception {
        STREAMS_CONFIG.put("application.id", "join-integration-test-outer-KTable-KTable");
        List<List<String>> expectedResult = Arrays.asList(null, null, Collections.singletonList("A-null"), Collections.singletonList("A-a"), Collections.singletonList("B-a"), Collections.singletonList("B-b"), Collections.singletonList("null-b"), Collections.singletonList(null), Collections.singletonList("C-null"), Collections.singletonList("C-c"), Collections.singletonList("C-null"), Collections.singletonList(null), null, Collections.singletonList("null-d"), Collections.singletonList("D-d"));
        this.leftTable.outerJoin(this.rightTable, this.valueJoiner).to(OUTPUT_TOPIC);
        this.runTest(expectedResult);
    }

    private final class Input<V> {
        String topic;
        KeyValue<Long, V> record;
        private final long anyUniqueKey = 0L;

        Input(String topic, V value) {
            this.topic = topic;
            this.record = KeyValue.pair((Object)0L, value);
        }
    }

    private final class TopicsGotDeletedCondition
    implements TestCondition {
        private TopicsGotDeletedCondition() {
        }

        public boolean conditionMet() {
            HashSet allTopics = new HashSet();
            allTopics.addAll(JavaConversions.seqAsJavaList((Seq)zkUtils.getAllTopics()));
            return !allTopics.contains(JoinIntegrationTest.INPUT_TOPIC_1) && !allTopics.contains(JoinIntegrationTest.INPUT_TOPIC_2) && !allTopics.contains(JoinIntegrationTest.OUTPUT_TOPIC);
        }
    }
}

