package org.apache.kafka.streams.integration;

import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/RestoreIntegrationTest.class */
public class RestoreIntegrationTest {
    private static final String APPID = "restore-test";
    private static final String INPUT_STREAM = "input-stream";
    private static final String INPUT_STREAM_2 = "input-stream-2";
    private final int numberOfKeys = 10000;
    private KafkaStreams kafkaStreams;
    private static final int NUM_BROKERS = 1;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);

    /* loaded from: input_file:org/apache/kafka/streams/integration/RestoreIntegrationTest$KeyValueStoreProcessor.class */
    public static class KeyValueStoreProcessor implements Processor<Integer, Integer> {
        private String topic;
        private final CountDownLatch processorLatch;
        private KeyValueStore<Integer, Integer> store;

        public KeyValueStoreProcessor(String str, CountDownLatch countDownLatch) {
            this.topic = str;
            this.processorLatch = countDownLatch;
        }

        public void init(ProcessorContext processorContext) {
            this.store = processorContext.getStateStore(this.topic);
        }

        public void process(Integer num, Integer num2) {
            if (num != null) {
                this.store.put(num, num2);
                this.processorLatch.countDown();
            }
        }

        public void close() {
        }
    }

    @BeforeClass
    public static void createTopics() throws InterruptedException {
        CLUSTER.createTopic(INPUT_STREAM, 2, NUM_BROKERS);
        CLUSTER.createTopic(INPUT_STREAM_2, 2, NUM_BROKERS);
        CLUSTER.createTopic("restore-test-store-changelog", 2, NUM_BROKERS);
    }

    private Properties props(String str) {
        Properties properties = new Properties();
        properties.put("application.id", str);
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("cache.max.bytes.buffering", 0);
        properties.put("state.dir", TestUtils.tempDirectory(str).getPath());
        properties.put("default.key.serde", Serdes.Integer().getClass());
        properties.put("default.value.serde", Serdes.Integer().getClass());
        properties.put("commit.interval.ms", 1000);
        properties.put("auto.offset.reset", "earliest");
        properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
        return properties;
    }

    @After
    public void shutdown() {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close(Duration.ofSeconds(30L));
        }
    }

    @Test
    public void shouldRestoreStateFromSourceTopic() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties props = props(APPID);
        props.put("topology.optimization", "all");
        createStateForRestoration(INPUT_STREAM);
        setCommittedOffset(INPUT_STREAM, 1000);
        StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime());
        new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 0)), ".checkpoint")).write(Collections.singletonMap(new TopicPartition(INPUT_STREAM, 0), 1000L));
        new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, NUM_BROKERS)), ".checkpoint")).write(Collections.singletonMap(new TopicPartition(INPUT_STREAM, NUM_BROKERS), 1000L));
        final CountDownLatch countDownLatch = new CountDownLatch(NUM_BROKERS);
        final CountDownLatch countDownLatch2 = new CountDownLatch(NUM_BROKERS);
        streamsBuilder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(), Serdes.Integer())).toStream().foreach(new ForeachAction<Integer, Integer>() { // from class: org.apache.kafka.streams.integration.RestoreIntegrationTest.1
            public void apply(Integer num, Integer num2) {
                if (atomicInteger.incrementAndGet() == 2000) {
                    countDownLatch2.countDown();
                }
            }
        });
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.setStateListener(new KafkaStreams.StateListener() { // from class: org.apache.kafka.streams.integration.RestoreIntegrationTest.2
            public void onChange(KafkaStreams.State state, KafkaStreams.State state2) {
                if (state == KafkaStreams.State.RUNNING && state2 == KafkaStreams.State.REBALANCING) {
                    countDownLatch.countDown();
                }
            }
        });
        final AtomicLong atomicLong = new AtomicLong(0L);
        this.kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() { // from class: org.apache.kafka.streams.integration.RestoreIntegrationTest.3
            public void onRestoreStart(TopicPartition topicPartition, String str, long j, long j2) {
            }

            public void onBatchRestored(TopicPartition topicPartition, String str, long j, long j2) {
            }

            public void onRestoreEnd(TopicPartition topicPartition, String str, long j) {
                atomicLong.addAndGet(j);
            }
        });
        this.kafkaStreams.start();
        Assert.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat(Long.valueOf(atomicLong.get()), IsEqual.equalTo(6000L));
        Assert.assertTrue(countDownLatch2.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), IsEqual.equalTo(2000));
    }

    @Test
    public void shouldRestoreStateFromChangelogTopic() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties props = props(APPID);
        createStateForRestoration("restore-test-store-changelog");
        createStateForRestoration(INPUT_STREAM);
        StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime());
        new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, 0)), ".checkpoint")).write(Collections.singletonMap(new TopicPartition("restore-test-store-changelog", 0), 1000L));
        new OffsetCheckpoint(new File(stateDirectory.directoryForTask(new TaskId(0, NUM_BROKERS)), ".checkpoint")).write(Collections.singletonMap(new TopicPartition("restore-test-store-changelog", NUM_BROKERS), 1000L));
        final CountDownLatch countDownLatch = new CountDownLatch(NUM_BROKERS);
        final CountDownLatch countDownLatch2 = new CountDownLatch(NUM_BROKERS);
        streamsBuilder.table(INPUT_STREAM, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as("store")).toStream().foreach(new ForeachAction<Integer, Integer>() { // from class: org.apache.kafka.streams.integration.RestoreIntegrationTest.4
            public void apply(Integer num, Integer num2) {
                if (atomicInteger.incrementAndGet() == 10000) {
                    countDownLatch2.countDown();
                }
            }
        });
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.setStateListener(new KafkaStreams.StateListener() { // from class: org.apache.kafka.streams.integration.RestoreIntegrationTest.5
            public void onChange(KafkaStreams.State state, KafkaStreams.State state2) {
                if (state == KafkaStreams.State.RUNNING && state2 == KafkaStreams.State.REBALANCING) {
                    countDownLatch.countDown();
                }
            }
        });
        final AtomicLong atomicLong = new AtomicLong(0L);
        this.kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() { // from class: org.apache.kafka.streams.integration.RestoreIntegrationTest.6
            public void onRestoreStart(TopicPartition topicPartition, String str, long j, long j2) {
            }

            public void onBatchRestored(TopicPartition topicPartition, String str, long j, long j2) {
            }

            public void onRestoreEnd(TopicPartition topicPartition, String str, long j) {
                atomicLong.addAndGet(j);
            }
        });
        this.kafkaStreams.start();
        Assert.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat(Long.valueOf(atomicLong.get()), IsEqual.equalTo(8000L));
        Assert.assertTrue(countDownLatch2.await(30L, TimeUnit.SECONDS));
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), IsEqual.equalTo(10000));
    }

    @Test
    public void shouldSuccessfullyStartWhenLoggingDisabled() throws InterruptedException {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(INPUT_STREAM).groupByKey().reduce(new Reducer<Integer>() { // from class: org.apache.kafka.streams.integration.RestoreIntegrationTest.7
            public Integer apply(Integer num, Integer num2) {
                return Integer.valueOf(num.intValue() + num2.intValue());
            }
        }, Materialized.as("reduce-store").withLoggingDisabled());
        final CountDownLatch countDownLatch = new CountDownLatch(NUM_BROKERS);
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props(APPID));
        this.kafkaStreams.setStateListener(new KafkaStreams.StateListener() { // from class: org.apache.kafka.streams.integration.RestoreIntegrationTest.8
            public void onChange(KafkaStreams.State state, KafkaStreams.State state2) {
                if (state == KafkaStreams.State.RUNNING && state2 == KafkaStreams.State.REBALANCING) {
                    countDownLatch.countDown();
                }
            }
        });
        this.kafkaStreams.start();
        Assert.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
    }

    @Test
    public void shouldProcessDataFromStoresWithLoggingDisabled() throws InterruptedException, ExecutionException {
        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_STREAM_2, Arrays.asList(KeyValue.pair(Integer.valueOf(NUM_BROKERS), Integer.valueOf(NUM_BROKERS)), KeyValue.pair(2, 2), KeyValue.pair(3, 3)), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), CLUSTER.time);
        StoreBuilder withLoggingDisabled = new KeyValueStoreBuilder(Stores.lruMap(INPUT_STREAM_2, 10), Serdes.Integer(), Serdes.Integer(), CLUSTER.time).withLoggingDisabled();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(withLoggingDisabled);
        KStream stream = streamsBuilder.stream(INPUT_STREAM_2);
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        stream.process(new ProcessorSupplier<Integer, Integer>() { // from class: org.apache.kafka.streams.integration.RestoreIntegrationTest.9
            public Processor<Integer, Integer> get() {
                return new KeyValueStoreProcessor(RestoreIntegrationTest.INPUT_STREAM_2, countDownLatch);
            }
        }, new String[]{INPUT_STREAM_2});
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props("restore-test-logging-disabled"));
        final CountDownLatch countDownLatch2 = new CountDownLatch(NUM_BROKERS);
        this.kafkaStreams.setStateListener(new KafkaStreams.StateListener() { // from class: org.apache.kafka.streams.integration.RestoreIntegrationTest.10
            public void onChange(KafkaStreams.State state, KafkaStreams.State state2) {
                if (state == KafkaStreams.State.RUNNING && state2 == KafkaStreams.State.REBALANCING) {
                    countDownLatch2.countDown();
                }
            }
        });
        this.kafkaStreams.start();
        countDownLatch2.await(30L, TimeUnit.SECONDS);
        Assert.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS));
    }

    private void createStateForRestoration(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new IntegerSerializer(), new IntegerSerializer());
        Throwable th = null;
        for (int i = 0; i < 10000; i += NUM_BROKERS) {
            try {
                try {
                    kafkaProducer.send(new ProducerRecord(str, Integer.valueOf(i), Integer.valueOf(i)));
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (kafkaProducer != null) {
                    if (th != null) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                throw th3;
            }
        }
        if (kafkaProducer != null) {
            if (0 == 0) {
                kafkaProducer.close();
                return;
            }
            try {
                kafkaProducer.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }

    private void setCommittedOffset(String str, int i) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("group.id", APPID);
        properties.put("client.id", "commit-consumer");
        properties.put("key.deserializer", IntegerDeserializer.class);
        properties.put("value.deserializer", IntegerDeserializer.class);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        List<TopicPartition> asList = Arrays.asList(new TopicPartition(str, 0), new TopicPartition(str, NUM_BROKERS));
        kafkaConsumer.assign(asList);
        kafkaConsumer.seekToEnd(asList);
        for (TopicPartition topicPartition : asList) {
            kafkaConsumer.seek(topicPartition, kafkaConsumer.position(topicPartition) - i);
        }
        kafkaConsumer.commitSync();
        kafkaConsumer.close();
    }
}
