/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class SuppressionIntegrationTest {
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[0])), 0L);
    private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
    private static final Serde<String> STRING_SERDE = Serdes.String();
    private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
    private static final int COMMIT_INTERVAL = 100;
    private static final long TIMEOUT_MS = 30000L;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldSuppressIntermediateEventsWithEmitAfter() {
        String testId = "-shouldSuppressIntermediateEventsWithEmitAfter";
        String appId = this.getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-shouldSuppressIntermediateEventsWithEmitAfter";
        String input = "input-shouldSuppressIntermediateEventsWithEmitAfter";
        String outputSuppressed = "output-suppressed-shouldSuppressIntermediateEventsWithEmitAfter";
        String outputRaw = "output-raw-shouldSuppressIntermediateEventsWithEmitAfter";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-shouldSuppressIntermediateEventsWithEmitAfter", "output-suppressed-shouldSuppressIntermediateEventsWithEmitAfter", "output-raw-shouldSuppressIntermediateEventsWithEmitAfter");
        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, Long> valueCounts = this.buildCountsTable("input-shouldSuppressIntermediateEventsWithEmitAfter", builder);
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(this.scaledTime(2L)), (Suppressed.BufferConfig)Suppressed.BufferConfig.unbounded())).toStream().to("output-suppressed-shouldSuppressIntermediateEventsWithEmitAfter", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().to("output-raw-shouldSuppressIntermediateEventsWithEmitAfter", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Properties streamsConfig = this.getStreamsConfig(appId);
        KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
        try {
            this.produceSynchronously("input-shouldSuppressIntermediateEventsWithEmitAfter", Arrays.asList(new KeyValueTimestamp<String, String>("k1", "v1", this.scaledTime(0L)), new KeyValueTimestamp<String, String>("k1", "v2", this.scaledTime(1L)), new KeyValueTimestamp<String, String>("k2", "v1", this.scaledTime(2L)), new KeyValueTimestamp<String, String>("tick", "tick", this.scaledTime(5L))));
            this.verifyOutput("output-raw-shouldSuppressIntermediateEventsWithEmitAfter", Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, this.scaledTime(0L)), new KeyValueTimestamp<String, Long>("v1", 0L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("v2", 1L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("v1", 1L, this.scaledTime(2L)), new KeyValueTimestamp<String, Long>("tick", 1L, this.scaledTime(5L))));
            this.verifyOutput("output-suppressed-shouldSuppressIntermediateEventsWithEmitAfter", Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, this.scaledTime(2L)), new KeyValueTimestamp<String, Long>("v2", 1L, this.scaledTime(1L))));
        }
        finally {
            driver.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, driver);
        }
    }

    private KTable<String, Long> buildCountsTable(String input, StreamsBuilder builder) {
        return builder.table(input, Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.with(STRING_SERDE, STRING_SERDE).withCachingDisabled().withLoggingDisabled()).groupBy((k, v) -> new KeyValue(v, k), Grouped.with(STRING_SERDE, STRING_SERDE)).count(Materialized.as((String)"counts").withCachingDisabled());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() {
        String testId = "-shouldNotSuppressIntermediateEventsWithZeroEmitAfter";
        String appId = this.getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-shouldNotSuppressIntermediateEventsWithZeroEmitAfter";
        String input = "input-shouldNotSuppressIntermediateEventsWithZeroEmitAfter";
        String outputSuppressed = "output-suppressed-shouldNotSuppressIntermediateEventsWithZeroEmitAfter";
        String outputRaw = "output-raw-shouldNotSuppressIntermediateEventsWithZeroEmitAfter";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-shouldNotSuppressIntermediateEventsWithZeroEmitAfter", "output-suppressed-shouldNotSuppressIntermediateEventsWithZeroEmitAfter", "output-raw-shouldNotSuppressIntermediateEventsWithZeroEmitAfter");
        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, Long> valueCounts = this.buildCountsTable("input-shouldNotSuppressIntermediateEventsWithZeroEmitAfter", builder);
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ZERO, (Suppressed.BufferConfig)Suppressed.BufferConfig.unbounded())).toStream().to("output-suppressed-shouldNotSuppressIntermediateEventsWithZeroEmitAfter", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().to("output-raw-shouldNotSuppressIntermediateEventsWithZeroEmitAfter", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Properties streamsConfig = this.getStreamsConfig(appId);
        KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
        try {
            this.produceSynchronously("input-shouldNotSuppressIntermediateEventsWithZeroEmitAfter", Arrays.asList(new KeyValueTimestamp<String, String>("k1", "v1", this.scaledTime(0L)), new KeyValueTimestamp<String, String>("k1", "v2", this.scaledTime(1L)), new KeyValueTimestamp<String, String>("k2", "v1", this.scaledTime(2L)), new KeyValueTimestamp<String, String>("x", "x", this.scaledTime(4L))));
            this.verifyOutput("output-raw-shouldNotSuppressIntermediateEventsWithZeroEmitAfter", Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, this.scaledTime(0L)), new KeyValueTimestamp<String, Long>("v1", 0L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("v2", 1L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("v1", 1L, this.scaledTime(2L)), new KeyValueTimestamp<String, Long>("x", 1L, this.scaledTime(4L))));
            this.verifyOutput("output-suppressed-shouldNotSuppressIntermediateEventsWithZeroEmitAfter", Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, this.scaledTime(0L)), new KeyValueTimestamp<String, Long>("v1", 0L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("v2", 1L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("v1", 1L, this.scaledTime(2L)), new KeyValueTimestamp<String, Long>("x", 1L, this.scaledTime(4L))));
        }
        finally {
            driver.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, driver);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldSuppressIntermediateEventsWithRecordLimit() {
        String testId = "-shouldSuppressIntermediateEventsWithRecordLimit";
        String appId = this.getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-shouldSuppressIntermediateEventsWithRecordLimit";
        String input = "input-shouldSuppressIntermediateEventsWithRecordLimit";
        String outputSuppressed = "output-suppressed-shouldSuppressIntermediateEventsWithRecordLimit";
        String outputRaw = "output-raw-shouldSuppressIntermediateEventsWithRecordLimit";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-shouldSuppressIntermediateEventsWithRecordLimit", "output-raw-shouldSuppressIntermediateEventsWithRecordLimit", "output-suppressed-shouldSuppressIntermediateEventsWithRecordLimit");
        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, Long> valueCounts = this.buildCountsTable("input-shouldSuppressIntermediateEventsWithRecordLimit", builder);
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(Long.MAX_VALUE), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)1L).emitEarlyWhenFull())).toStream().to("output-suppressed-shouldSuppressIntermediateEventsWithRecordLimit", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().to("output-raw-shouldSuppressIntermediateEventsWithRecordLimit", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Properties streamsConfig = this.getStreamsConfig(appId);
        KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
        try {
            this.produceSynchronously("input-shouldSuppressIntermediateEventsWithRecordLimit", Arrays.asList(new KeyValueTimestamp<String, String>("k1", "v1", this.scaledTime(0L)), new KeyValueTimestamp<String, String>("k1", "v2", this.scaledTime(1L)), new KeyValueTimestamp<String, String>("k2", "v1", this.scaledTime(2L)), new KeyValueTimestamp<String, String>("x", "x", this.scaledTime(3L))));
            this.verifyOutput("output-raw-shouldSuppressIntermediateEventsWithRecordLimit", Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, this.scaledTime(0L)), new KeyValueTimestamp<String, Long>("v1", 0L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("v2", 1L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("v1", 1L, this.scaledTime(2L)), new KeyValueTimestamp<String, Long>("x", 1L, this.scaledTime(3L))));
            this.verifyOutput("output-suppressed-shouldSuppressIntermediateEventsWithRecordLimit", Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 0L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("v2", 1L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("v1", 1L, this.scaledTime(2L))));
        }
        finally {
            driver.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, driver);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldShutdownWhenRecordConstraintIsViolated() throws InterruptedException {
        String testId = "-shouldShutdownWhenRecordConstraintIsViolated";
        String appId = this.getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-shouldShutdownWhenRecordConstraintIsViolated";
        String input = "input-shouldShutdownWhenRecordConstraintIsViolated";
        String outputSuppressed = "output-suppressed-shouldShutdownWhenRecordConstraintIsViolated";
        String outputRaw = "output-raw-shouldShutdownWhenRecordConstraintIsViolated";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-shouldShutdownWhenRecordConstraintIsViolated", "output-raw-shouldShutdownWhenRecordConstraintIsViolated", "output-suppressed-shouldShutdownWhenRecordConstraintIsViolated");
        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, Long> valueCounts = this.buildCountsTable("input-shouldShutdownWhenRecordConstraintIsViolated", builder);
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(Long.MAX_VALUE), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxRecords((long)1L).shutDownWhenFull())).toStream().to("output-suppressed-shouldShutdownWhenRecordConstraintIsViolated", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().to("output-raw-shouldShutdownWhenRecordConstraintIsViolated", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Properties streamsConfig = this.getStreamsConfig(appId);
        KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
        try {
            this.produceSynchronously("input-shouldShutdownWhenRecordConstraintIsViolated", Arrays.asList(new KeyValueTimestamp<String, String>("k1", "v1", this.scaledTime(0L)), new KeyValueTimestamp<String, String>("k1", "v2", this.scaledTime(1L)), new KeyValueTimestamp<String, String>("k2", "v1", this.scaledTime(2L)), new KeyValueTimestamp<String, String>("x", "x", this.scaledTime(3L))));
            this.verifyErrorShutdown(driver);
        }
        finally {
            driver.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, driver);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldSuppressIntermediateEventsWithBytesLimit() {
        String testId = "-shouldSuppressIntermediateEventsWithBytesLimit";
        String appId = this.getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-shouldSuppressIntermediateEventsWithBytesLimit";
        String input = "input-shouldSuppressIntermediateEventsWithBytesLimit";
        String outputSuppressed = "output-suppressed-shouldSuppressIntermediateEventsWithBytesLimit";
        String outputRaw = "output-raw-shouldSuppressIntermediateEventsWithBytesLimit";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-shouldSuppressIntermediateEventsWithBytesLimit", "output-raw-shouldSuppressIntermediateEventsWithBytesLimit", "output-suppressed-shouldSuppressIntermediateEventsWithBytesLimit");
        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, Long> valueCounts = this.buildCountsTable("input-shouldSuppressIntermediateEventsWithBytesLimit", builder);
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(Long.MAX_VALUE), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxBytes((long)200L).emitEarlyWhenFull())).toStream().to("output-suppressed-shouldSuppressIntermediateEventsWithBytesLimit", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().to("output-raw-shouldSuppressIntermediateEventsWithBytesLimit", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Properties streamsConfig = this.getStreamsConfig(appId);
        KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
        try {
            this.produceSynchronously("input-shouldSuppressIntermediateEventsWithBytesLimit", Arrays.asList(new KeyValueTimestamp<String, String>("k1", "v1", this.scaledTime(0L)), new KeyValueTimestamp<String, String>("k1", "v2", this.scaledTime(1L)), new KeyValueTimestamp<String, String>("k2", "v1", this.scaledTime(2L)), new KeyValueTimestamp<String, String>("x", "x", this.scaledTime(3L))));
            this.verifyOutput("output-raw-shouldSuppressIntermediateEventsWithBytesLimit", Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 1L, this.scaledTime(0L)), new KeyValueTimestamp<String, Long>("v1", 0L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("v2", 1L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("v1", 1L, this.scaledTime(2L)), new KeyValueTimestamp<String, Long>("x", 1L, this.scaledTime(3L))));
            this.verifyOutput("output-suppressed-shouldSuppressIntermediateEventsWithBytesLimit", Arrays.asList(new KeyValueTimestamp<String, Long>("v1", 0L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("v2", 1L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>("v1", 1L, this.scaledTime(2L))));
        }
        finally {
            driver.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, driver);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldShutdownWhenBytesConstraintIsViolated() throws InterruptedException {
        String testId = "-shouldShutdownWhenBytesConstraintIsViolated";
        String appId = this.getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-shouldShutdownWhenBytesConstraintIsViolated";
        String input = "input-shouldShutdownWhenBytesConstraintIsViolated";
        String outputSuppressed = "output-suppressed-shouldShutdownWhenBytesConstraintIsViolated";
        String outputRaw = "output-raw-shouldShutdownWhenBytesConstraintIsViolated";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-shouldShutdownWhenBytesConstraintIsViolated", "output-raw-shouldShutdownWhenBytesConstraintIsViolated", "output-suppressed-shouldShutdownWhenBytesConstraintIsViolated");
        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, Long> valueCounts = this.buildCountsTable("input-shouldShutdownWhenBytesConstraintIsViolated", builder);
        valueCounts.suppress(Suppressed.untilTimeLimit((Duration)Duration.ofMillis(Long.MAX_VALUE), (Suppressed.BufferConfig)Suppressed.BufferConfig.maxBytes((long)200L).shutDownWhenFull())).toStream().to("output-suppressed-shouldShutdownWhenBytesConstraintIsViolated", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().to("output-raw-shouldShutdownWhenBytesConstraintIsViolated", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Properties streamsConfig = this.getStreamsConfig(appId);
        KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
        try {
            this.produceSynchronously("input-shouldShutdownWhenBytesConstraintIsViolated", Arrays.asList(new KeyValueTimestamp<String, String>("k1", "v1", this.scaledTime(0L)), new KeyValueTimestamp<String, String>("k1", "v2", this.scaledTime(1L)), new KeyValueTimestamp<String, String>("k2", "v1", this.scaledTime(2L)), new KeyValueTimestamp<String, String>("x", "x", this.scaledTime(3L))));
            this.verifyErrorShutdown(driver);
        }
        finally {
            driver.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, driver);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldSupportFinalResultsForTimeWindows() {
        String testId = "-shouldSupportFinalResultsForTimeWindows";
        String appId = this.getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-shouldSupportFinalResultsForTimeWindows";
        String input = "input-shouldSupportFinalResultsForTimeWindows";
        String outputSuppressed = "output-suppressed-shouldSupportFinalResultsForTimeWindows";
        String outputRaw = "output-raw-shouldSupportFinalResultsForTimeWindows";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-shouldSupportFinalResultsForTimeWindows", "output-raw-shouldSupportFinalResultsForTimeWindows", "output-suppressed-shouldSupportFinalResultsForTimeWindows");
        StreamsBuilder builder = new StreamsBuilder();
        KTable valueCounts = builder.stream("input-shouldSupportFinalResultsForTimeWindows", Consumed.with(STRING_SERDE, STRING_SERDE)).groupBy((k1, v1) -> k1, Grouped.with(STRING_SERDE, STRING_SERDE)).windowedBy((Windows)TimeWindows.of((Duration)Duration.ofMillis(this.scaledTime(2L))).grace(Duration.ofMillis(this.scaledTime(1L)))).count(Materialized.as((String)"counts").withCachingDisabled().withLoggingDisabled());
        valueCounts.suppress(Suppressed.untilWindowCloses((Suppressed.StrictBufferConfig)Suppressed.BufferConfig.unbounded())).toStream().map((k, v) -> new KeyValue((Object)k.toString(), v)).to("output-suppressed-shouldSupportFinalResultsForTimeWindows", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        valueCounts.toStream().map((k, v) -> new KeyValue((Object)k.toString(), v)).to("output-raw-shouldSupportFinalResultsForTimeWindows", Produced.with(STRING_SERDE, (Serde)Serdes.Long()));
        Properties streamsConfig = this.getStreamsConfig(appId);
        KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
        try {
            this.produceSynchronously("input-shouldSupportFinalResultsForTimeWindows", Arrays.asList(new KeyValueTimestamp<String, String>("k1", "v1", this.scaledTime(0L)), new KeyValueTimestamp<String, String>("k1", "v1", this.scaledTime(1L)), new KeyValueTimestamp<String, String>("k1", "v1", this.scaledTime(2L)), new KeyValueTimestamp<String, String>("k1", "v1", this.scaledTime(1L)), new KeyValueTimestamp<String, String>("k1", "v1", this.scaledTime(0L)), new KeyValueTimestamp<String, String>("k1", "v1", this.scaledTime(4L)), new KeyValueTimestamp<String, String>("k1", "v1", this.scaledTime(0L))));
            this.verifyOutput("output-raw-shouldSupportFinalResultsForTimeWindows", Arrays.asList(new KeyValueTimestamp<String, Long>(this.scaledWindowKey("k1", 0L, 2L), 1L, this.scaledTime(0L)), new KeyValueTimestamp<String, Long>(this.scaledWindowKey("k1", 0L, 2L), 2L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>(this.scaledWindowKey("k1", 2L, 4L), 1L, this.scaledTime(2L)), new KeyValueTimestamp<String, Long>(this.scaledWindowKey("k1", 0L, 2L), 3L, this.scaledTime(1L)), new KeyValueTimestamp<String, Long>(this.scaledWindowKey("k1", 0L, 2L), 4L, this.scaledTime(0L)), new KeyValueTimestamp<String, Long>(this.scaledWindowKey("k1", 4L, 6L), 1L, this.scaledTime(4L))));
            this.verifyOutput("output-suppressed-shouldSupportFinalResultsForTimeWindows", Collections.singletonList(new KeyValueTimestamp<String, Long>(this.scaledWindowKey("k1", 0L, 2L), 4L, this.scaledTime(0L))));
        }
        finally {
            driver.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, driver);
        }
    }

    private Properties getStreamsConfig(String appId) {
        return Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)appId), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"poll.ms", (Object)Integer.toString(100)), Utils.mkEntry((Object)"commit.interval.ms", (Object)Integer.toString(100)), Utils.mkEntry((Object)"processing.guarantee", (Object)"at_least_once")}));
    }

    private String scaledWindowKey(String key, long unscaledStart, long unscaledEnd) {
        return new Windowed((Object)key, (Window)new TimeWindow(this.scaledTime(unscaledStart), this.scaledTime(unscaledEnd))).toString();
    }

    private long scaledTime(long unscaledTime) {
        return 200L * unscaledTime;
    }

    private void produceSynchronously(String topic, List<KeyValueTimestamp<String, String>> toProduce) {
        Properties producerConfig = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client.id", (Object)"anything"), Utils.mkEntry((Object)"key.serializer", (Object)STRING_SERIALIZER.getClass().getName()), Utils.mkEntry((Object)"value.serializer", (Object)STRING_SERIALIZER.getClass().getName()), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers())}));
        IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, toProduce);
    }

    private void verifyErrorShutdown(KafkaStreams driver) throws InterruptedException {
        TestUtils.waitForCondition(() -> !driver.state().isRunning(), (long)30000L, (String)"Streams didn't shut down.");
        MatcherAssert.assertThat((Object)driver.state(), (Matcher)CoreMatchers.is((Object)KafkaStreams.State.ERROR));
    }

    private void verifyOutput(String topic, List<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
        Properties properties = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"group.id", (Object)"test-group"), Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"key.deserializer", (Object)STRING_DESERIALIZER.getClass().getName()), Utils.mkEntry((Object)"value.deserializer", (Object)LONG_DESERIALIZER.getClass().getName())}));
        IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps);
    }
}

