/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class KStreamAggregationDedupIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final long COMMIT_INTERVAL_MS = 300L;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;
    private static volatile int testNo = 0;
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String streamOneInput;
    private String outputTopic;
    private KGroupedStream<String, String> groupedStream;
    private Reducer<String> reducer;
    private KStream<Integer, String> stream;

    public KStreamAggregationDedupIntegrationTest() {
        this.mockTime = KStreamAggregationDedupIntegrationTest.CLUSTER.time;
    }

    @Before
    public void before() throws InterruptedException {
        this.builder = new StreamsBuilder();
        this.createTopics();
        this.streamsConfiguration = new Properties();
        String applicationId = "kgrouped-stream-test-" + ++testNo;
        this.streamsConfiguration.put("application.id", applicationId);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("commit.interval.ms", (Object)300L);
        this.streamsConfiguration.put("cache.max.bytes.buffering", (Object)0xA00000L);
        this.streamsConfiguration.put("internal.leave.group.on.close", (Object)true);
        KeyValueMapper mapper = MockMapper.selectValueMapper();
        this.stream = this.builder.stream(this.streamOneInput, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String()));
        this.groupedStream = this.stream.groupBy(mapper, Serialized.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.reducer = (value1, value2) -> value1 + ":" + value2;
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldReduce() throws Exception {
        this.produceMessages(System.currentTimeMillis());
        this.groupedStream.reduce(this.reducer, Materialized.as((String)"reduce-by-key")).toStream().to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.startStreams();
        this.produceMessages(System.currentTimeMillis());
        this.validateReceivedMessages((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), Arrays.asList(KeyValue.pair((Object)"A", (Object)"A:A"), KeyValue.pair((Object)"B", (Object)"B:B"), KeyValue.pair((Object)"C", (Object)"C:C"), KeyValue.pair((Object)"D", (Object)"D:D"), KeyValue.pair((Object)"E", (Object)"E:E")));
    }

    @Test
    public void shouldReduceWindowed() throws Exception {
        long firstBatchTimestamp = System.currentTimeMillis() - 1000L;
        this.produceMessages(firstBatchTimestamp);
        long secondBatchTimestamp = System.currentTimeMillis();
        this.produceMessages(secondBatchTimestamp);
        this.produceMessages(secondBatchTimestamp);
        this.groupedStream.windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(500L))).reduce(this.reducer, Materialized.as((String)"reduce-time-windows")).toStream((windowedKey, value) -> (String)windowedKey.key() + "@" + windowedKey.window().start()).to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.startStreams();
        long firstBatchWindow = firstBatchTimestamp / 500L * 500L;
        long secondBatchWindow = secondBatchTimestamp / 500L * 500L;
        this.validateReceivedMessages((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), Arrays.asList(new KeyValue((Object)("A@" + firstBatchWindow), (Object)"A"), new KeyValue((Object)("A@" + secondBatchWindow), (Object)"A:A"), new KeyValue((Object)("B@" + firstBatchWindow), (Object)"B"), new KeyValue((Object)("B@" + secondBatchWindow), (Object)"B:B"), new KeyValue((Object)("C@" + firstBatchWindow), (Object)"C"), new KeyValue((Object)("C@" + secondBatchWindow), (Object)"C:C"), new KeyValue((Object)("D@" + firstBatchWindow), (Object)"D"), new KeyValue((Object)("D@" + secondBatchWindow), (Object)"D:D"), new KeyValue((Object)("E@" + firstBatchWindow), (Object)"E"), new KeyValue((Object)("E@" + secondBatchWindow), (Object)"E:E")));
    }

    @Test
    public void shouldGroupByKey() throws Exception {
        long timestamp = this.mockTime.milliseconds();
        this.produceMessages(timestamp);
        this.produceMessages(timestamp);
        this.stream.groupByKey(Serialized.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(500L))).count(Materialized.as((String)"count-windows")).toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()).to(this.outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        this.startStreams();
        long window = timestamp / 500L * 500L;
        this.validateReceivedMessages((Deserializer)new StringDeserializer(), (Deserializer)new LongDeserializer(), Arrays.asList(KeyValue.pair((Object)("1@" + window), (Object)2L), KeyValue.pair((Object)("2@" + window), (Object)2L), KeyValue.pair((Object)("3@" + window), (Object)2L), KeyValue.pair((Object)("4@" + window), (Object)2L), KeyValue.pair((Object)("5@" + window), (Object)2L)));
    }

    private void produceMessages(long timestamp) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.streamOneInput, Arrays.asList(new KeyValue((Object)1, (Object)"A"), new KeyValue((Object)2, (Object)"B"), new KeyValue((Object)3, (Object)"C"), new KeyValue((Object)4, (Object)"D"), new KeyValue((Object)5, (Object)"E")), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, (Properties)new Properties()), timestamp);
    }

    private void createTopics() throws InterruptedException {
        this.streamOneInput = "stream-one-" + testNo;
        this.outputTopic = "output-" + testNo;
        CLUSTER.createTopic(this.streamOneInput, 3, 1);
        CLUSTER.createTopic(this.outputTopic);
    }

    private void startStreams() {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private <K, V> void validateReceivedMessages(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, List<KeyValue<K, V>> expectedRecords) throws InterruptedException {
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        consumerProperties.setProperty("group.id", "kgroupedstream-test-" + testNo);
        consumerProperties.setProperty("auto.offset.reset", "earliest");
        consumerProperties.setProperty("key.deserializer", keyDeserializer.getClass().getName());
        consumerProperties.setProperty("value.deserializer", valueDeserializer.getClass().getName());
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerProperties, this.outputTopic, expectedRecords);
    }
}

