package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.class */
public class KStreamWindowReduceTest {
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
    private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());

    @Test
    public void shouldLogAndMeterOnNullKey() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Serialized.with(Serdes.String(), Serdes.String())).windowedBy(TimeWindows.of(Duration.ofMillis(500L))).reduce((str, str2) -> {
            return str + "+" + str2;
        });
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            try {
                LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister();
                topologyTestDriver.pipeInput(this.recordFactory.create("TOPIC", (Object) null, "asdf"));
                LogCaptureAppender.unregister(createAndRegister);
                Assert.assertEquals(Double.valueOf(1.0d), StreamsTestUtils.getMetricByName(topologyTestDriver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
                MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record due to null key. value=[asdf] topic=[TOPIC] partition=[0] offset=[0]"));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @Test
    @Deprecated
    public void shouldLogAndMeterOnExpiredEvent() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Serialized.with(Serdes.String(), Serdes.String())).windowedBy(TimeWindows.of(Duration.ofMillis(5L)).until(100L)).reduce((str, str2) -> {
            return str + "+" + str2;
        }).toStream().map((windowed, str3) -> {
            return new KeyValue(windowed.toString(), str3);
        }).to("output");
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        Throwable th = null;
        try {
            LogCaptureAppender.setClassLoggerToDebug(KStreamWindowReduce.class);
            LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister();
            topologyTestDriver.pipeInput(this.recordFactory.create("TOPIC", "k", "100", 100L));
            topologyTestDriver.pipeInput(this.recordFactory.create("TOPIC", "k", "0", 0L));
            topologyTestDriver.pipeInput(this.recordFactory.create("TOPIC", "k", "1", 1L));
            topologyTestDriver.pipeInput(this.recordFactory.create("TOPIC", "k", "2", 2L));
            topologyTestDriver.pipeInput(this.recordFactory.create("TOPIC", "k", "3", 3L));
            topologyTestDriver.pipeInput(this.recordFactory.create("TOPIC", "k", "4", 4L));
            topologyTestDriver.pipeInput(this.recordFactory.create("TOPIC", "k", "5", 5L));
            LogCaptureAppender.unregister(createAndRegister);
            MatcherAssert.assertThat(((Metric) topologyTestDriver.metrics().get(new MetricName("late-record-drop-total", "stream-processor-node-metrics", "The total number of occurrence of late-record-drop operations.", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("client-id", "topology-test-driver-virtual-thread"), Utils.mkEntry("task-id", "0_0"), Utils.mkEntry("processor-node-id", "KSTREAM-REDUCE-0000000002")})))).metricValue(), CoreMatchers.equalTo(Double.valueOf(5.0d)));
            MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItems(new String[]{"Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[1] timestamp=[0] window=[0,5) expiration=[5]", "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[2] timestamp=[1] window=[0,5) expiration=[5]", "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[3] timestamp=[2] window=[0,5) expiration=[5]", "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[4] timestamp=[3] window=[0,5) expiration=[5]", "Skipping record for expired window. key=[k] topic=[TOPIC] partition=[0] offset=[5] timestamp=[4] window=[0,5) expiration=[5]"}));
            OutputVerifier.compareKeyValueTimestamp(getOutput(topologyTestDriver), "[k@100/105]", "100", 100L);
            OutputVerifier.compareKeyValueTimestamp(getOutput(topologyTestDriver), "[k@5/10]", "5", 5L);
            MatcherAssert.assertThat(topologyTestDriver.readOutput("output"), CoreMatchers.nullValue());
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    private ProducerRecord<String, String> getOutput(TopologyTestDriver topologyTestDriver) {
        return topologyTestDriver.readOutput("output", new StringDeserializer(), new StringDeserializer());
    }
}
