/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class KStreamTransformIntegrationTest {
    private StreamsBuilder builder;
    private final String topic = "stream";
    private final String stateStoreName = "myTransformState";
    private final List<KeyValue<Integer, Integer>> results = new ArrayList<KeyValue<Integer, Integer>>();
    private final ForeachAction<Integer, Integer> action = (key, value) -> this.results.add((KeyValue<Integer, Integer>)KeyValue.pair((Object)key, (Object)value));
    private KStream<Integer, Integer> stream;

    @Before
    public void before() {
        this.builder = new StreamsBuilder();
        StoreBuilder keyValueStoreBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"myTransformState"), (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
        this.builder.addStateStore(keyValueStoreBuilder);
        this.stream = this.builder.stream("stream", Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
    }

    private void verifyResult(List<KeyValue<Integer, Integer>> expected) {
        Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), props);){
            TestInputTopic inputTopic = driver.createInputTopic("stream", (Serializer)new IntegerSerializer(), (Serializer)new IntegerSerializer());
            inputTopic.pipeKeyValueList(Arrays.asList(new KeyValue((Object)1, (Object)1), new KeyValue((Object)2, (Object)2), new KeyValue((Object)3, (Object)3), new KeyValue((Object)2, (Object)1), new KeyValue((Object)2, (Object)3), new KeyValue((Object)1, (Object)3)));
        }
        MatcherAssert.assertThat(this.results, (Matcher)IsEqual.equalTo(expected));
    }

    @Test
    public void shouldTransform() {
        this.stream.transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>(){
            private KeyValueStore state;

            public void init(ProcessorContext context) {
                this.state = (KeyValueStore)context.getStateStore("myTransformState");
            }

            public KeyValue<Integer, Integer> transform(Integer key, Integer value) {
                Integer storedValue;
                this.state.putIfAbsent((Object)key, (Object)0);
                Integer n = storedValue = (Integer)this.state.get((Object)key);
                Integer n2 = storedValue = Integer.valueOf(storedValue + 1);
                KeyValue result = new KeyValue((Object)(key + 1), (Object)(value + n));
                this.state.put((Object)key, (Object)storedValue);
                return result;
            }

            public void close() {
            }
        }, new String[]{"myTransformState"}).foreach(this.action);
        List<KeyValue<Integer, Integer>> expected = Arrays.asList(KeyValue.pair((Object)2, (Object)1), KeyValue.pair((Object)3, (Object)2), KeyValue.pair((Object)4, (Object)3), KeyValue.pair((Object)3, (Object)2), KeyValue.pair((Object)3, (Object)5), KeyValue.pair((Object)2, (Object)4));
        this.verifyResult(expected);
    }

    @Test
    public void shouldFlatTransform() {
        this.stream.flatTransform(() -> new Transformer<Integer, Integer, Iterable<KeyValue<Integer, Integer>>>(){
            private KeyValueStore state;

            public void init(ProcessorContext context) {
                this.state = (KeyValueStore)context.getStateStore("myTransformState");
            }

            public Iterable<KeyValue<Integer, Integer>> transform(Integer key, Integer value) {
                ArrayList<KeyValue<Integer, Integer>> result = new ArrayList<KeyValue<Integer, Integer>>();
                this.state.putIfAbsent((Object)key, (Object)0);
                Integer storedValue = (Integer)this.state.get((Object)key);
                for (int i = 0; i < 3; ++i) {
                    Integer n = storedValue;
                    Integer n2 = storedValue = Integer.valueOf(storedValue + 1);
                    result.add((KeyValue<Integer, Integer>)new KeyValue((Object)(key + i), (Object)(value + n)));
                }
                this.state.put((Object)key, (Object)storedValue);
                return result;
            }

            public void close() {
            }
        }, new String[]{"myTransformState"}).foreach(this.action);
        List<KeyValue<Integer, Integer>> expected = Arrays.asList(KeyValue.pair((Object)1, (Object)1), KeyValue.pair((Object)2, (Object)2), KeyValue.pair((Object)3, (Object)3), KeyValue.pair((Object)2, (Object)2), KeyValue.pair((Object)3, (Object)3), KeyValue.pair((Object)4, (Object)4), KeyValue.pair((Object)3, (Object)3), KeyValue.pair((Object)4, (Object)4), KeyValue.pair((Object)5, (Object)5), KeyValue.pair((Object)2, (Object)4), KeyValue.pair((Object)3, (Object)5), KeyValue.pair((Object)4, (Object)6), KeyValue.pair((Object)2, (Object)9), KeyValue.pair((Object)3, (Object)10), KeyValue.pair((Object)4, (Object)11), KeyValue.pair((Object)1, (Object)6), KeyValue.pair((Object)2, (Object)7), KeyValue.pair((Object)3, (Object)8));
        this.verifyResult(expected);
    }

    @Test
    public void shouldTransformValuesWithValueTransformerWithKey() {
        this.stream.transformValues(() -> new ValueTransformerWithKey<Integer, Integer, Integer>(){
            private KeyValueStore state;

            public void init(ProcessorContext context) {
                this.state = (KeyValueStore)context.getStateStore("myTransformState");
            }

            public Integer transform(Integer key, Integer value) {
                Integer storedValue;
                this.state.putIfAbsent((Object)key, (Object)0);
                Integer n = storedValue = (Integer)this.state.get((Object)key);
                Integer n2 = storedValue = Integer.valueOf(storedValue + 1);
                Integer result = value + n;
                this.state.put((Object)key, (Object)storedValue);
                return result;
            }

            public void close() {
            }
        }, new String[]{"myTransformState"}).foreach(this.action);
        List<KeyValue<Integer, Integer>> expected = Arrays.asList(KeyValue.pair((Object)1, (Object)1), KeyValue.pair((Object)2, (Object)2), KeyValue.pair((Object)3, (Object)3), KeyValue.pair((Object)2, (Object)2), KeyValue.pair((Object)2, (Object)5), KeyValue.pair((Object)1, (Object)4));
        this.verifyResult(expected);
    }

    @Test
    public void shouldTransformValuesWithValueTransformerWithoutKey() {
        this.stream.transformValues(() -> new ValueTransformer<Integer, Integer>(){
            private KeyValueStore state;

            public void init(ProcessorContext context) {
                this.state = (KeyValueStore)context.getStateStore("myTransformState");
            }

            public Integer transform(Integer value) {
                this.state.putIfAbsent((Object)value, (Object)0);
                Integer counter = (Integer)this.state.get((Object)value);
                counter = counter + 1;
                this.state.put((Object)value, (Object)counter);
                return counter;
            }

            public void close() {
            }
        }, new String[]{"myTransformState"}).foreach(this.action);
        List<KeyValue<Integer, Integer>> expected = Arrays.asList(KeyValue.pair((Object)1, (Object)1), KeyValue.pair((Object)2, (Object)1), KeyValue.pair((Object)3, (Object)1), KeyValue.pair((Object)2, (Object)2), KeyValue.pair((Object)2, (Object)2), KeyValue.pair((Object)1, (Object)3));
        this.verifyResult(expected);
    }

    @Test
    public void shouldFlatTransformValuesWithKey() {
        this.stream.flatTransformValues(() -> new ValueTransformerWithKey<Integer, Integer, Iterable<Integer>>(){
            private KeyValueStore state;

            public void init(ProcessorContext context) {
                this.state = (KeyValueStore)context.getStateStore("myTransformState");
            }

            public Iterable<Integer> transform(Integer key, Integer value) {
                ArrayList<Integer> result = new ArrayList<Integer>();
                this.state.putIfAbsent((Object)key, (Object)0);
                Integer storedValue = (Integer)this.state.get((Object)key);
                for (int i = 0; i < 3; ++i) {
                    Integer n = storedValue;
                    Integer n2 = storedValue = Integer.valueOf(storedValue + 1);
                    result.add(value + n);
                }
                this.state.put((Object)key, (Object)storedValue);
                return result;
            }

            public void close() {
            }
        }, new String[]{"myTransformState"}).foreach(this.action);
        List<KeyValue<Integer, Integer>> expected = Arrays.asList(KeyValue.pair((Object)1, (Object)1), KeyValue.pair((Object)1, (Object)2), KeyValue.pair((Object)1, (Object)3), KeyValue.pair((Object)2, (Object)2), KeyValue.pair((Object)2, (Object)3), KeyValue.pair((Object)2, (Object)4), KeyValue.pair((Object)3, (Object)3), KeyValue.pair((Object)3, (Object)4), KeyValue.pair((Object)3, (Object)5), KeyValue.pair((Object)2, (Object)4), KeyValue.pair((Object)2, (Object)5), KeyValue.pair((Object)2, (Object)6), KeyValue.pair((Object)2, (Object)9), KeyValue.pair((Object)2, (Object)10), KeyValue.pair((Object)2, (Object)11), KeyValue.pair((Object)1, (Object)6), KeyValue.pair((Object)1, (Object)7), KeyValue.pair((Object)1, (Object)8));
        this.verifyResult(expected);
    }

    @Test
    public void shouldFlatTransformValuesWithValueTransformerWithoutKey() {
        this.stream.flatTransformValues(() -> new ValueTransformer<Integer, Iterable<Integer>>(){
            private KeyValueStore state;

            public void init(ProcessorContext context) {
                this.state = (KeyValueStore)context.getStateStore("myTransformState");
            }

            public Iterable<Integer> transform(Integer value) {
                ArrayList<Integer> result = new ArrayList<Integer>();
                this.state.putIfAbsent((Object)value, (Object)0);
                Integer counter = (Integer)this.state.get((Object)value);
                for (int i = 0; i < 3; ++i) {
                    counter = counter + 1;
                    result.add(counter);
                }
                this.state.put((Object)value, (Object)counter);
                return result;
            }

            public void close() {
            }
        }, new String[]{"myTransformState"}).foreach(this.action);
        List<KeyValue<Integer, Integer>> expected = Arrays.asList(KeyValue.pair((Object)1, (Object)1), KeyValue.pair((Object)1, (Object)2), KeyValue.pair((Object)1, (Object)3), KeyValue.pair((Object)2, (Object)1), KeyValue.pair((Object)2, (Object)2), KeyValue.pair((Object)2, (Object)3), KeyValue.pair((Object)3, (Object)1), KeyValue.pair((Object)3, (Object)2), KeyValue.pair((Object)3, (Object)3), KeyValue.pair((Object)2, (Object)4), KeyValue.pair((Object)2, (Object)5), KeyValue.pair((Object)2, (Object)6), KeyValue.pair((Object)2, (Object)4), KeyValue.pair((Object)2, (Object)5), KeyValue.pair((Object)2, (Object)6), KeyValue.pair((Object)1, (Object)7), KeyValue.pair((Object)1, (Object)8), KeyValue.pair((Object)1, (Object)9));
        this.verifyResult(expected);
    }
}

