/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class KTableSourceTopicRestartIntegrationTest {
    private static final int NUM_BROKERS = 3;
    private static final String SOURCE_TOPIC = "source-topic";
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
    private final Time time;
    private KafkaStreams streamsOne;
    private final StreamsBuilder streamsBuilder;
    private final Map<String, String> readKeyValues;
    private static final Properties PRODUCER_CONFIG = new Properties();
    private static final Properties STREAMS_CONFIG = new Properties();
    private Map<String, String> expectedInitialResultsMap;
    private Map<String, String> expectedResultsWithDataWrittenDuringRestoreMap;

    public KTableSourceTopicRestartIntegrationTest() {
        this.time = KTableSourceTopicRestartIntegrationTest.CLUSTER.time;
        this.streamsBuilder = new StreamsBuilder();
        this.readKeyValues = new ConcurrentHashMap<String, String>();
    }

    @BeforeClass
    public static void setUpBeforeAllTests() throws Exception {
        CLUSTER.createTopic(SOURCE_TOPIC);
        STREAMS_CONFIG.put("application.id", "ktable-restore-from-source");
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        STREAMS_CONFIG.put("default.key.serde", Serdes.String().getClass().getName());
        STREAMS_CONFIG.put("default.value.serde", Serdes.String().getClass().getName());
        STREAMS_CONFIG.put("state.dir", TestUtils.tempDirectory().getPath());
        STREAMS_CONFIG.put("cache.max.bytes.buffering", (Object)0);
        STREAMS_CONFIG.put("internal.leave.group.on.close", (Object)true);
        STREAMS_CONFIG.put("commit.interval.ms", (Object)5);
        STREAMS_CONFIG.put("default.timestamp.extractor", WallclockTimestampExtractor.class);
        PRODUCER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG.put("acks", "all");
        PRODUCER_CONFIG.put("key.serializer", StringSerializer.class);
        PRODUCER_CONFIG.put("value.serializer", StringSerializer.class);
    }

    @Before
    public void before() {
        KTable kTable = this.streamsBuilder.table(SOURCE_TOPIC);
        kTable.toStream().foreach((ForeachAction)new ForeachAction<String, String>(){

            public void apply(String key, String value) {
                KTableSourceTopicRestartIntegrationTest.this.readKeyValues.put(key, value);
            }
        });
        this.expectedInitialResultsMap = this.createExpectedResultsMap("a", "b", "c");
        this.expectedResultsWithDataWrittenDuringRestoreMap = this.createExpectedResultsMap("a", "b", "c", "d", "f", "g", "h");
    }

    @After
    public void after() throws IOException {
        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
    }

    @Test
    public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled() throws Exception {
        try {
            this.streamsOne = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streamsOne.start();
            this.produceKeyValues("a", "b", "c");
            this.assertNumberValuesRead(this.readKeyValues, this.expectedInitialResultsMap, "Table did not read all values");
            this.streamsOne.close();
            this.streamsOne = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streamsOne.setGlobalStateRestoreListener((StateRestoreListener)new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
            this.streamsOne.start();
            this.produceKeyValues("f", "g", "h");
            this.assertNumberValuesRead(this.readKeyValues, this.expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
        }
        finally {
            this.streamsOne.close(Duration.ofSeconds(5L));
        }
    }

    @Test
    public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception {
        try {
            STREAMS_CONFIG.put("processing.guarantee", "exactly_once");
            this.streamsOne = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streamsOne.start();
            this.produceKeyValues("a", "b", "c");
            this.assertNumberValuesRead(this.readKeyValues, this.expectedInitialResultsMap, "Table did not read all values");
            this.streamsOne.close();
            this.streamsOne = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streamsOne.setGlobalStateRestoreListener((StateRestoreListener)new UpdatingSourceTopicOnRestoreStartStateRestoreListener());
            this.streamsOne.start();
            this.produceKeyValues("f", "g", "h");
            this.assertNumberValuesRead(this.readKeyValues, this.expectedResultsWithDataWrittenDuringRestoreMap, "Table did not get all values after restart");
        }
        finally {
            this.streamsOne.close(Duration.ofSeconds(5L));
        }
    }

    @Test
    public void shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() throws Exception {
        try {
            this.streamsOne = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streamsOne.start();
            this.produceKeyValues("a", "b", "c");
            this.assertNumberValuesRead(this.readKeyValues, this.expectedInitialResultsMap, "Table did not read all values");
            this.streamsOne.close();
            this.streamsOne = new KafkaStreams(this.streamsBuilder.build(), STREAMS_CONFIG);
            this.streamsOne.start();
            this.produceKeyValues("f", "g", "h");
            Map<String, String> expectedValues = this.createExpectedResultsMap("a", "b", "c", "f", "g", "h");
            this.assertNumberValuesRead(this.readKeyValues, expectedValues, "Table did not get all values after restart");
        }
        finally {
            this.streamsOne.close(Duration.ofSeconds(5L));
        }
    }

    private void assertNumberValuesRead(final Map<String, String> valueMap, final Map<String, String> expectedMap, String errorMessage) throws InterruptedException {
        TestUtils.waitForCondition((TestCondition)new TestCondition(){

            public boolean conditionMet() {
                return valueMap.equals(expectedMap);
            }
        }, (long)30000L, (String)errorMessage);
    }

    private void produceKeyValues(String ... keys) throws ExecutionException, InterruptedException {
        ArrayList keyValueList = new ArrayList();
        for (String key : keys) {
            keyValueList.add(new KeyValue((Object)key, (Object)(key + "1")));
        }
        IntegrationTestUtils.produceKeyValuesSynchronously(SOURCE_TOPIC, keyValueList, PRODUCER_CONFIG, this.time);
    }

    private Map<String, String> createExpectedResultsMap(String ... keys) {
        HashMap<String, String> expectedMap = new HashMap<String, String>();
        for (String key : keys) {
            expectedMap.put(key, key + "1");
        }
        return expectedMap;
    }

    private class UpdatingSourceTopicOnRestoreStartStateRestoreListener
    implements StateRestoreListener {
        private UpdatingSourceTopicOnRestoreStartStateRestoreListener() {
        }

        public void onRestoreStart(TopicPartition topicPartition, String storeName, long startingOffset, long endingOffset) {
            try {
                KTableSourceTopicRestartIntegrationTest.this.produceKeyValues(new String[]{"d"});
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }

        public void onBatchRestored(TopicPartition topicPartition, String storeName, long batchEndOffset, long numRestored) {
        }

        public void onRestoreEnd(TopicPartition topicPartition, String storeName, long totalRestored) {
        }
    }
}

