/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StreamThreadTest {
    private final String clientId = "clientId";
    private final String applicationId = "stream-thread-test";
    private UUID processId = UUID.randomUUID();
    private TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private TopicPartition t2p2 = new TopicPartition("topic2", 2);
    private TopicPartition t3p1 = new TopicPartition("topic3", 1);
    private TopicPartition t3p2 = new TopicPartition("topic3", 2);
    private List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]));
    private Cluster metadata = new Cluster("cluster", Arrays.asList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"), this.subscriptionUserData());
    private final TaskId task1 = new TaskId(0, 1);
    private final TaskId task2 = new TaskId(0, 2);
    private final TaskId task3 = new TaskId(0, 3);
    private final TaskId task4 = new TaskId(1, 1);
    private final TaskId task5 = new TaskId(1, 2);
    static final String TOPIC = "topic";
    final Set<TopicPartition> task0Assignment = Collections.singleton(new TopicPartition("topic", 0));
    final Set<TopicPartition> task1Assignment = Collections.singleton(new TopicPartition("topic", 1));

    @Before
    public void setUp() throws Exception {
        this.processId = UUID.randomUUID();
    }

    private ByteBuffer subscriptionUserData() {
        UUID uuid = UUID.randomUUID();
        ByteBuffer buf = ByteBuffer.allocate(28);
        buf.putInt(1);
        buf.putLong(uuid.getMostSignificantBits());
        buf.putLong(uuid.getLeastSignificantBits());
        buf.putInt(0);
        buf.putInt(0);
        buf.rewind();
        return buf;
    }

    private Properties configProps() {
        return new Properties(){
            {
                this.setProperty("application.id", "stream-thread-test");
                this.setProperty("bootstrap.servers", "localhost:2171");
                this.setProperty("buffered.records.per.partition", "3");
                this.setProperty("timestamp.extractor", MockTimestampExtractor.class.getName());
                this.setProperty("state.dir", TestUtils.tempDirectory().getAbsolutePath());
            }
        };
    }

    @Test
    public void testPartitionAssignmentChange() throws Exception {
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        StateListenerStub stateListener = new StateListenerStub();
        TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
        builder.addSource("source1", new String[]{"topic1"});
        builder.addSource("source2", new String[]{"topic2"});
        builder.addSource("source3", new String[]{"topic3"});
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source2", "source3"});
        MockClientSupplier mockClientSupplier = new MockClientSupplier();
        StreamThread thread = new StreamThread(builder, config, mockClientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0L){

            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                ProcessorTopology topology = this.builder.build(Integer.valueOf(id.topicGroupId));
                return new TestStreamTask(id, this.applicationId, partitionsForTask, topology, (Consumer<byte[], byte[]>)this.consumer, (Producer<byte[], byte[]>)this.producer, (Consumer<byte[], byte[]>)this.restoreConsumer, this.config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), this.stateDirectory);
            }
        };
        thread.setStateListener((StreamThread.StateListener)stateListener);
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.RUNNING);
        this.initPartitionGrouper(config, thread, mockClientSupplier);
        ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
        Assert.assertTrue((boolean)thread.tasks().isEmpty());
        List<Object> revokedPartitions = Collections.emptyList();
        List<Object> assignedPartitions = Collections.singletonList(this.t1p1);
        HashSet<TopicPartition> expectedGroup1 = new HashSet<TopicPartition>(Arrays.asList(this.t1p1));
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.PARTITIONS_REVOKED);
        Assert.assertEquals((long)stateListener.numChanges, (long)1L);
        Assert.assertEquals((Object)stateListener.oldState, (Object)StreamThread.State.RUNNING);
        Assert.assertEquals((Object)stateListener.newState, (Object)StreamThread.State.PARTITIONS_REVOKED);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        Assert.assertEquals((Object)thread.state(), (Object)StreamThread.State.RUNNING);
        Assert.assertEquals((long)stateListener.numChanges, (long)3L);
        Assert.assertEquals((Object)stateListener.oldState, (Object)StreamThread.State.ASSIGNING_PARTITIONS);
        Assert.assertEquals((Object)stateListener.newState, (Object)StreamThread.State.RUNNING);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertEquals(expectedGroup1, (Object)((StreamTask)thread.tasks().get(this.task1)).partitions());
        Assert.assertEquals((long)1L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        assignedPartitions = Collections.singletonList(this.t1p2);
        HashSet<TopicPartition> expectedGroup2 = new HashSet<TopicPartition>(Arrays.asList(this.t1p2));
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        Assert.assertFalse((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertEquals((long)0L, (long)thread.tasks().size());
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task2));
        Assert.assertEquals(expectedGroup2, (Object)((StreamTask)thread.tasks().get(this.task2)).partitions());
        Assert.assertEquals((long)1L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        assignedPartitions = Arrays.asList(this.t1p1, this.t1p2);
        expectedGroup1 = new HashSet<TopicPartition>(Collections.singleton(this.t1p1));
        expectedGroup2 = new HashSet<TopicPartition>(Collections.singleton(this.t1p2));
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task2));
        Assert.assertEquals(expectedGroup1, (Object)((StreamTask)thread.tasks().get(this.task1)).partitions());
        Assert.assertEquals(expectedGroup2, (Object)((StreamTask)thread.tasks().get(this.task2)).partitions());
        Assert.assertEquals((long)2L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        assignedPartitions = Arrays.asList(this.t2p1, this.t2p2, this.t3p1, this.t3p2);
        expectedGroup1 = new HashSet<TopicPartition>(Arrays.asList(this.t2p1, this.t3p1));
        expectedGroup2 = new HashSet<TopicPartition>(Arrays.asList(this.t2p2, this.t3p2));
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task4));
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task5));
        Assert.assertEquals(expectedGroup1, (Object)((StreamTask)thread.tasks().get(this.task4)).partitions());
        Assert.assertEquals(expectedGroup2, (Object)((StreamTask)thread.tasks().get(this.task5)).partitions());
        Assert.assertEquals((long)2L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        assignedPartitions = Arrays.asList(this.t1p1, this.t2p1, this.t3p1);
        expectedGroup1 = new HashSet<TopicPartition>(Arrays.asList(this.t1p1));
        expectedGroup2 = new HashSet<TopicPartition>(Arrays.asList(this.t2p1, this.t3p1));
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task4));
        Assert.assertEquals(expectedGroup1, (Object)((StreamTask)thread.tasks().get(this.task1)).partitions());
        Assert.assertEquals(expectedGroup2, (Object)((StreamTask)thread.tasks().get(this.task4)).partitions());
        Assert.assertEquals((long)2L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        assignedPartitions = Arrays.asList(this.t1p1, this.t2p1, this.t3p1);
        expectedGroup1 = new HashSet<TopicPartition>(Arrays.asList(this.t1p1));
        expectedGroup2 = new HashSet<TopicPartition>(Arrays.asList(this.t2p1, this.t3p1));
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task1));
        Assert.assertTrue((boolean)thread.tasks().containsKey(this.task4));
        Assert.assertEquals(expectedGroup1, (Object)((StreamTask)thread.tasks().get(this.task1)).partitions());
        Assert.assertEquals(expectedGroup2, (Object)((StreamTask)thread.tasks().get(this.task4)).partitions());
        Assert.assertEquals((long)2L, (long)thread.tasks().size());
        revokedPartitions = assignedPartitions;
        assignedPartitions = Collections.emptyList();
        rebalanceListener.onPartitionsRevoked(revokedPartitions);
        rebalanceListener.onPartitionsAssigned(assignedPartitions);
        Assert.assertTrue((boolean)thread.tasks().isEmpty());
        thread.close();
        Assert.assertTrue((thread.state() == StreamThread.State.PENDING_SHUTDOWN || thread.state() == StreamThread.State.NOT_RUNNING ? 1 : 0) != 0);
    }

    @Test
    public void testHandingOverTaskFromOneToAnotherThread() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.addStateStore(Stores.create((String)"store").withByteArrayKeys().withByteArrayValues().persistent().build(), new String[0]);
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        MockClientSupplier mockClientSupplier = new MockClientSupplier();
        mockClientSupplier.consumer.assign(Arrays.asList(new TopicPartition(TOPIC, 0), new TopicPartition(TOPIC, 1)));
        final StreamThread thread1 = new StreamThread(builder, config, (KafkaClientSupplier)mockClientSupplier, "stream-thread-test", "clientId1", this.processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        StreamThread thread2 = new StreamThread(builder, config, (KafkaClientSupplier)mockClientSupplier, "stream-thread-test", "clientId2", this.processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        Map<TaskId, Set<TopicPartition>> task0 = Collections.singletonMap(new TaskId(0, 0), this.task0Assignment);
        Map<TaskId, Set<TopicPartition>> task1 = Collections.singletonMap(new TaskId(0, 1), this.task1Assignment);
        HashMap<TaskId, Set<TopicPartition>> thread1Assignment = new HashMap<TaskId, Set<TopicPartition>>(task0);
        HashMap<TaskId, Set<TopicPartition>> thread2Assignment = new HashMap<TaskId, Set<TopicPartition>>(task1);
        thread1.partitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(thread1Assignment));
        thread2.partitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(thread2Assignment));
        thread1.rebalanceListener.onPartitionsRevoked((Collection)Collections.EMPTY_SET);
        thread2.rebalanceListener.onPartitionsRevoked((Collection)Collections.EMPTY_SET);
        thread1.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        thread2.rebalanceListener.onPartitionsAssigned(this.task1Assignment);
        HashSet<TaskId> originalTaskAssignmentThread1 = new HashSet<TaskId>();
        for (TaskId tid : thread1.tasks().keySet()) {
            originalTaskAssignmentThread1.add(tid);
        }
        HashSet<TaskId> originalTaskAssignmentThread2 = new HashSet<TaskId>();
        for (TaskId tid : thread2.tasks().keySet()) {
            originalTaskAssignmentThread2.add(tid);
        }
        thread1.rebalanceListener.onPartitionsRevoked(this.task0Assignment);
        thread2.rebalanceListener.onPartitionsRevoked(this.task1Assignment);
        thread1Assignment.clear();
        thread1Assignment.putAll(task1);
        thread2Assignment.clear();
        thread2Assignment.putAll(task0);
        Thread runIt = new Thread(new Runnable(){

            @Override
            public void run() {
                thread1.rebalanceListener.onPartitionsAssigned(StreamThreadTest.this.task1Assignment);
            }
        });
        runIt.start();
        thread2.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        runIt.join();
        Assert.assertThat(thread1.tasks().keySet(), (Matcher)CoreMatchers.equalTo(originalTaskAssignmentThread2));
        Assert.assertThat(thread2.tasks().keySet(), (Matcher)CoreMatchers.equalTo(originalTaskAssignmentThread1));
        Assert.assertThat((Object)thread1.prevTasks(), (Matcher)CoreMatchers.equalTo(originalTaskAssignmentThread1));
        Assert.assertThat((Object)thread2.prevTasks(), (Matcher)CoreMatchers.equalTo(originalTaskAssignmentThread2));
    }

    @Test
    public void testMetrics() throws Exception {
        TopologyBuilder builder = new TopologyBuilder().setApplicationId("MetricsApp");
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        MockClientSupplier clientSupplier = new MockClientSupplier();
        Metrics metrics = new Metrics();
        StreamThread thread = new StreamThread(builder, config, (KafkaClientSupplier)clientSupplier, "stream-thread-test", "clientId", this.processId, metrics, (Time)new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        String defaultGroupName = "stream-metrics";
        String defaultPrefix = "thread." + thread.threadClientId();
        Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.threadClientId());
        Assert.assertNotNull((Object)metrics.getSensor(defaultPrefix + ".commit-latency"));
        Assert.assertNotNull((Object)metrics.getSensor(defaultPrefix + ".poll-latency"));
        Assert.assertNotNull((Object)metrics.getSensor(defaultPrefix + ".process-latency"));
        Assert.assertNotNull((Object)metrics.getSensor(defaultPrefix + ".punctuate-latency"));
        Assert.assertNotNull((Object)metrics.getSensor(defaultPrefix + ".task-created"));
        Assert.assertNotNull((Object)metrics.getSensor(defaultPrefix + ".task-closed"));
        Assert.assertNotNull((Object)metrics.getSensor(defaultPrefix + ".skipped-records"));
        Assert.assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-avg", defaultGroupName, "The average commit time in ms", defaultTags)));
        Assert.assertNotNull(metrics.metrics().get(metrics.metricName("commit-latency-max", defaultGroupName, "The maximum commit time in ms", defaultTags)));
        Assert.assertNotNull(metrics.metrics().get(metrics.metricName("commit-rate", defaultGroupName, "The average per-second number of commit calls", defaultTags)));
        Assert.assertNotNull(metrics.metrics().get(metrics.metricName("poll-latency-avg", defaultGroupName, "The average poll time in ms", defaultTags)));
        Assert.assertNotNull(metrics.metrics().get(metrics.metricName("poll-latency-max", defaultGroupName, "The maximum poll time in ms", defaultTags)));
        Assert.assertNotNull(metrics.metrics().get(metrics.metricName("poll-rate", defaultGroupName, "The average per-second number of record-poll calls", defaultTags)));
        Assert.assertNotNull(metrics.metrics().get(metrics.metricName("process-latency-avg", defaultGroupName, "The average process time in ms", defaultTags)));
        Assert.assertNotNull(metrics.metrics().get(metrics.metricName("process-latency-max", defaultGroupName, "The maximum process time in ms", defaultTags)));
        Assert.assertNotNull(metrics.metrics().get(metrics.metricName("process-rate", defaultGroupName, "The average per-second number of process calls", defaultTags)));
        Assert.assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-latency-avg", defaultGroupName, "The average punctuate time in ms", defaultTags)));
        Assert.assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-latency-max", defaultGroupName, "The maximum punctuate time in ms", defaultTags)));
        Assert.assertNotNull(metrics.metrics().get(metrics.metricName("punctuate-rate", defaultGroupName, "The average per-second number of punctuate calls", defaultTags)));
        Assert.assertNotNull(metrics.metrics().get(metrics.metricName("task-created-rate", defaultGroupName, "The average per-second number of newly created tasks", defaultTags)));
        Assert.assertNotNull(metrics.metrics().get(metrics.metricName("task-closed-rate", defaultGroupName, "The average per-second number of closed tasks", defaultTags)));
        Assert.assertNotNull(metrics.metrics().get(metrics.metricName("skipped-records-rate", defaultGroupName, "The average per-second number of skipped records.", defaultTags)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMaybeClean() throws Exception {
        File baseDir = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            long cleanupDelay = 1000L;
            Properties props = this.configProps();
            props.setProperty("state.cleanup.delay.ms", Long.toString(1000L));
            props.setProperty("state.dir", baseDir.getCanonicalPath());
            StreamsConfig config = new StreamsConfig((Map)props);
            File applicationDir = new File(baseDir, "stream-thread-test");
            applicationDir.mkdir();
            File stateDir1 = new File(applicationDir, this.task1.toString());
            File stateDir2 = new File(applicationDir, this.task2.toString());
            File stateDir3 = new File(applicationDir, this.task3.toString());
            File extraDir = new File(applicationDir, "X");
            stateDir1.mkdir();
            stateDir2.mkdir();
            stateDir3.mkdir();
            extraDir.mkdir();
            MockTime mockTime = new MockTime();
            TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
            builder.addSource("source1", new String[]{"topic1"});
            MockClientSupplier mockClientSupplier = new MockClientSupplier();
            StreamThread thread = new StreamThread(builder, config, mockClientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0L){

                public void maybeClean() {
                    super.maybeClean();
                }

                protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                    ProcessorTopology topology = this.builder.build(Integer.valueOf(id.topicGroupId));
                    return new TestStreamTask(id, this.applicationId, partitionsForTask, topology, (Consumer<byte[], byte[]>)this.consumer, (Producer<byte[], byte[]>)this.producer, (Consumer<byte[], byte[]>)this.restoreConsumer, this.config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), this.stateDirectory);
                }
            };
            this.initPartitionGrouper(config, thread, mockClientSupplier);
            ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
            Assert.assertTrue((boolean)thread.tasks().isEmpty());
            mockTime.sleep(1000L);
            Assert.assertTrue((boolean)stateDir1.exists());
            Assert.assertTrue((boolean)stateDir2.exists());
            Assert.assertTrue((boolean)stateDir3.exists());
            Assert.assertTrue((boolean)extraDir.exists());
            HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
            activeTasks.put(this.task1, Collections.singleton(this.t1p1));
            activeTasks.put(this.task2, Collections.singleton(this.t1p2));
            thread.partitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(activeTasks));
            List<Object> revokedPartitions = Collections.emptyList();
            List<Object> assignedPartitions = Arrays.asList(this.t1p1, this.t1p2);
            HashMap prevTasks = new HashMap(thread.tasks());
            rebalanceListener.onPartitionsRevoked(revokedPartitions);
            rebalanceListener.onPartitionsAssigned(assignedPartitions);
            Assert.assertTrue((boolean)prevTasks.isEmpty());
            Assert.assertEquals((long)2L, (long)thread.tasks().size());
            mockTime.sleep(990L);
            thread.maybeClean();
            Assert.assertTrue((boolean)stateDir1.exists());
            Assert.assertTrue((boolean)stateDir2.exists());
            Assert.assertTrue((boolean)stateDir3.exists());
            Assert.assertTrue((boolean)extraDir.exists());
            mockTime.sleep(11L);
            thread.maybeClean();
            Assert.assertTrue((boolean)stateDir1.exists());
            Assert.assertTrue((boolean)stateDir2.exists());
            Assert.assertFalse((boolean)stateDir3.exists());
            Assert.assertTrue((boolean)extraDir.exists());
            activeTasks.clear();
            revokedPartitions = assignedPartitions;
            assignedPartitions = Collections.emptyList();
            prevTasks = new HashMap(thread.tasks());
            rebalanceListener.onPartitionsRevoked(revokedPartitions);
            rebalanceListener.onPartitionsAssigned(assignedPartitions);
            Assert.assertEquals((long)2L, (long)prevTasks.size());
            for (StreamTask task : prevTasks.values()) {
                Assert.assertTrue((boolean)((TestStreamTask)task).committed);
                ((TestStreamTask)task).committed = false;
            }
            Assert.assertTrue((boolean)thread.tasks().isEmpty());
            mockTime.sleep(990L);
            thread.maybeClean();
            Assert.assertTrue((boolean)stateDir1.exists());
            Assert.assertTrue((boolean)stateDir2.exists());
            Assert.assertFalse((boolean)stateDir3.exists());
            Assert.assertTrue((boolean)extraDir.exists());
            mockTime.sleep(11L);
            thread.maybeClean();
            Assert.assertFalse((boolean)stateDir1.exists());
            Assert.assertFalse((boolean)stateDir2.exists());
            Assert.assertFalse((boolean)stateDir3.exists());
            Assert.assertTrue((boolean)extraDir.exists());
        }
        finally {
            Utils.delete((File)baseDir);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMaybeCommit() throws Exception {
        File baseDir = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            long commitInterval = 1000L;
            Properties props = this.configProps();
            props.setProperty("state.dir", baseDir.getCanonicalPath());
            props.setProperty("commit.interval.ms", Long.toString(1000L));
            StreamsConfig config = new StreamsConfig((Map)props);
            MockTime mockTime = new MockTime();
            TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
            builder.addSource("source1", new String[]{"topic1"});
            MockClientSupplier mockClientSupplier = new MockClientSupplier();
            StreamThread thread = new StreamThread(builder, config, mockClientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0L){

                public void maybeCommit() {
                    super.maybeCommit();
                }

                protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                    ProcessorTopology topology = this.builder.build(Integer.valueOf(id.topicGroupId));
                    return new TestStreamTask(id, this.applicationId, partitionsForTask, topology, (Consumer<byte[], byte[]>)this.consumer, (Producer<byte[], byte[]>)this.producer, (Consumer<byte[], byte[]>)this.restoreConsumer, this.config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), this.stateDirectory);
                }
            };
            this.initPartitionGrouper(config, thread, mockClientSupplier);
            ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener;
            List revokedPartitions = Collections.emptyList();
            List<TopicPartition> assignedPartitions = Arrays.asList(this.t1p1, this.t1p2);
            rebalanceListener.onPartitionsRevoked(revokedPartitions);
            rebalanceListener.onPartitionsAssigned(assignedPartitions);
            Assert.assertEquals((long)2L, (long)thread.tasks().size());
            mockTime.sleep(990L);
            thread.maybeCommit();
            for (StreamTask task : thread.tasks().values()) {
                Assert.assertFalse((boolean)((TestStreamTask)task).committed);
            }
            mockTime.sleep(11L);
            thread.maybeCommit();
            for (StreamTask task : thread.tasks().values()) {
                Assert.assertTrue((boolean)((TestStreamTask)task).committed);
                ((TestStreamTask)task).committed = false;
            }
            mockTime.sleep(990L);
            thread.maybeCommit();
            for (StreamTask task : thread.tasks().values()) {
                Assert.assertFalse((boolean)((TestStreamTask)task).committed);
            }
            mockTime.sleep(11L);
            thread.maybeCommit();
            for (StreamTask task : thread.tasks().values()) {
                Assert.assertTrue((boolean)((TestStreamTask)task).committed);
                ((TestStreamTask)task).committed = false;
            }
        }
        finally {
            Utils.delete((File)baseDir);
        }
    }

    @Test
    public void testInjectClients() {
        TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        MockClientSupplier clientSupplier = new MockClientSupplier();
        StreamThread thread = new StreamThread(builder, config, (KafkaClientSupplier)clientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        Assert.assertSame(clientSupplier.producer, (Object)thread.producer);
        Assert.assertSame(clientSupplier.consumer, (Object)thread.consumer);
        Assert.assertSame(clientSupplier.restoreConsumer, (Object)thread.restoreConsumer);
    }

    @Test
    public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setApplicationId("stream-thread-test").addSource("name", new String[]{TOPIC}).addSink("out", "output", new String[0]);
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        StreamThread thread = new StreamThread(builder, config, (KafkaClientSupplier)new MockClientSupplier(), "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        thread.partitionAssignor(new StreamPartitionAssignor(){

            Map<TaskId, Set<TopicPartition>> standbyTasks() {
                return Collections.singletonMap(new TaskId(0, 0), Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition(StreamThreadTest.TOPIC, 0)}));
            }
        });
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
    }

    @Test
    public void shouldInitializeRestoreConsumerWithOffsetsFromStandbyTasks() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        builder.setApplicationId("stream-thread-test");
        builder.stream(new String[]{"t1"}).groupByKey().count("count-one");
        builder.stream(new String[]{"t2"}).groupByKey().count("count-two");
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        MockClientSupplier clientSupplier = new MockClientSupplier();
        StreamThread thread = new StreamThread((TopologyBuilder)builder, config, (KafkaClientSupplier)clientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)new MockTime(), new StreamsMetadataState((TopologyBuilder)builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
        restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", 0, null, new Node[0], new Node[0])));
        restoreConsumer.updatePartitions("stream-thread-test-count-two-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-two-changelog", 0, null, new Node[0], new Node[0])));
        final HashMap<TaskId, Set> standbyTasks = new HashMap<TaskId, Set>();
        TopicPartition t1 = new TopicPartition("t1", 0);
        standbyTasks.put(new TaskId(0, 0), Utils.mkSet((Object[])new TopicPartition[]{t1}));
        thread.partitionAssignor(new StreamPartitionAssignor(){

            Map<TaskId, Set<TopicPartition>> standbyTasks() {
                return standbyTasks;
            }
        });
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
        Assert.assertThat((Object)restoreConsumer.assignment(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("stream-thread-test-count-one-changelog", 0)})));
        standbyTasks.put(new TaskId(1, 0), Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("t2", 0)}));
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
        Assert.assertThat((Object)restoreConsumer.assignment(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("stream-thread-test-count-one-changelog", 0), new TopicPartition("stream-thread-test-count-two-changelog", 0)})));
    }

    @Test
    public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        builder.setApplicationId("stream-thread-test");
        builder.stream(new String[]{"t1"}).groupByKey().count("count-one");
        builder.stream(new String[]{"t2"}).groupByKey().count("count-two");
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        MockClientSupplier clientSupplier = new MockClientSupplier();
        StreamThread thread = new StreamThread((TopologyBuilder)builder, config, (KafkaClientSupplier)clientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)new MockTime(), new StreamsMetadataState((TopologyBuilder)builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
        restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", 0, null, new Node[0], new Node[0])));
        restoreConsumer.updatePartitions("stream-thread-test-count-two-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-two-changelog", 0, null, new Node[0], new Node[0])));
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 0), 0L);
        offsets.put(new TopicPartition("stream-thread-test-count-two-changelog", 0), 0L);
        restoreConsumer.updateEndOffsets(offsets);
        restoreConsumer.updateBeginningOffsets(offsets);
        final HashMap<TaskId, Set> standbyTasks = new HashMap<TaskId, Set>();
        TopicPartition t1 = new TopicPartition("t1", 0);
        standbyTasks.put(new TaskId(0, 0), Utils.mkSet((Object[])new TopicPartition[]{t1}));
        final HashMap<TaskId, Set> activeTasks = new HashMap<TaskId, Set>();
        TopicPartition t2 = new TopicPartition("t2", 0);
        activeTasks.put(new TaskId(1, 0), Utils.mkSet((Object[])new TopicPartition[]{t2}));
        thread.partitionAssignor(new StreamPartitionAssignor(){

            Map<TaskId, Set<TopicPartition>> standbyTasks() {
                return standbyTasks;
            }

            Map<TaskId, Set<TopicPartition>> activeTasks() {
                return activeTasks;
            }
        });
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned((Collection)Utils.mkSet((Object[])new TopicPartition[]{t2}));
        standbyTasks.clear();
        activeTasks.clear();
        standbyTasks.put(new TaskId(1, 0), Utils.mkSet((Object[])new TopicPartition[]{t2}));
        activeTasks.put(new TaskId(0, 0), Utils.mkSet((Object[])new TopicPartition[]{t1}));
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned((Collection)Utils.mkSet((Object[])new TopicPartition[]{t1}));
    }

    @Test
    public void shouldCloseActiveTasksThatAreAssignedToThisStreamThreadButAssignmentHasChangedBeforeCreatingNewTasks() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        builder.setApplicationId("stream-thread-test");
        builder.stream(Pattern.compile("t.*")).to("out");
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        MockClientSupplier clientSupplier = new MockClientSupplier();
        final HashMap createdTasks = new HashMap();
        StreamThread thread = new StreamThread((TopologyBuilder)builder, config, clientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)new MockTime(), new StreamsMetadataState((TopologyBuilder)builder, StreamsMetadataState.UNKNOWN_HOST), 0L){

            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
                ProcessorTopology topology = this.builder.build(Integer.valueOf(id.topicGroupId));
                TestStreamTask task = new TestStreamTask(id, this.applicationId, partitions, topology, (Consumer<byte[], byte[]>)this.consumer, (Producer<byte[], byte[]>)this.producer, (Consumer<byte[], byte[]>)this.restoreConsumer, this.config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), this.stateDirectory);
                createdTasks.put(partitions, task);
                return task;
            }
        };
        final HashMap activeTasks = new HashMap();
        TopicPartition t1 = new TopicPartition("t1", 0);
        HashSet<TopicPartition> task00Partitions = new HashSet<TopicPartition>();
        task00Partitions.add(t1);
        TaskId taskId = new TaskId(0, 0);
        activeTasks.put(taskId, task00Partitions);
        thread.partitionAssignor(new StreamPartitionAssignor(){

            Map<TaskId, Set<TopicPartition>> activeTasks() {
                return activeTasks;
            }
        });
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(task00Partitions);
        TestStreamTask firstTask = (TestStreamTask)((Object)createdTasks.get(task00Partitions));
        Assert.assertThat((Object)firstTask.id(), (Matcher)CoreMatchers.is((Object)taskId));
        task00Partitions.add(new TopicPartition("t2", 0));
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned(task00Partitions);
        Assert.assertTrue((String)"task should have been closed as assignment has changed", (boolean)firstTask.closed);
        Assert.assertTrue((String)"tasks state manager should have been closed as assignment has changed", (boolean)firstTask.closedStateManager);
        Assert.assertThat((Object)((TestStreamTask)((Object)createdTasks.get(task00Partitions))).id(), (Matcher)CoreMatchers.is((Object)taskId));
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskCloseDuringShutdown() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        builder.setApplicationId("stream-thread-test");
        builder.stream(new String[]{"t1"}).groupByKey();
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        MockClientSupplier clientSupplier = new MockClientSupplier();
        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), "stream-thread-test", Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("t1", 0)}), builder.build(Integer.valueOf(0)), (Consumer)clientSupplier.consumer, (Producer)clientSupplier.producer, (Consumer)clientSupplier.restoreConsumer, config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), new StateDirectory("stream-thread-test", config.getString("state.dir"))){

            @Override
            public void close() {
                throw new RuntimeException("KABOOM!");
            }
        };
        StreamsConfig config1 = new StreamsConfig((Map)this.configProps());
        StreamThread thread = new StreamThread((TopologyBuilder)builder, config1, clientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)new MockTime(), new StreamsMetadataState((TopologyBuilder)builder, StreamsMetadataState.UNKNOWN_HOST), 0L){

            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
                return testStreamTask;
            }
        };
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        activeTasks.put(testStreamTask.id, testStreamTask.partitions);
        thread.partitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(activeTasks));
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned((Collection)testStreamTask.partitions);
        thread.start();
        thread.close();
        thread.join();
        Assert.assertFalse((String)"task shouldn't have been committed as there was an exception during shutdown", (boolean)testStreamTask.committed);
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskFlushDuringShutdown() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        builder.setApplicationId("stream-thread-test");
        MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore("foo", false);
        builder.stream(new String[]{"t1"}).groupByKey().count((StateStoreSupplier)new MockStateStoreSupplier(stateStore));
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        MockClientSupplier clientSupplier = new MockClientSupplier();
        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), "stream-thread-test", Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("t1", 0)}), builder.build(Integer.valueOf(0)), (Consumer)clientSupplier.consumer, (Producer)clientSupplier.producer, (Consumer)clientSupplier.restoreConsumer, config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), new StateDirectory("stream-thread-test", config.getString("state.dir"))){

            public void flushState() {
                throw new RuntimeException("KABOOM!");
            }
        };
        StreamThread thread = new StreamThread((TopologyBuilder)builder, config, clientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)new MockTime(), new StreamsMetadataState((TopologyBuilder)builder, StreamsMetadataState.UNKNOWN_HOST), 0L){

            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
                return testStreamTask;
            }
        };
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        activeTasks.put(testStreamTask.id, testStreamTask.partitions);
        thread.partitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(activeTasks));
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned((Collection)testStreamTask.partitions);
        Assert.assertTrue((boolean)stateStore.isOpen());
        thread.start();
        thread.close();
        thread.join();
        Assert.assertFalse((String)"task shouldn't have been committed as there was an exception during shutdown", (boolean)testStreamTask.committed);
        Assert.assertFalse((boolean)stateStore.isOpen());
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringCloseTopologyWhenSuspendingState() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        builder.setApplicationId("stream-thread-test");
        builder.stream(new String[]{"t1"}).groupByKey();
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        MockClientSupplier clientSupplier = new MockClientSupplier();
        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), "stream-thread-test", Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("t1", 0)}), builder.build(Integer.valueOf(0)), (Consumer)clientSupplier.consumer, (Producer)clientSupplier.producer, (Consumer)clientSupplier.restoreConsumer, config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), new StateDirectory("stream-thread-test", config.getString("state.dir"))){

            public void closeTopology() {
                throw new RuntimeException("KABOOM!");
            }
        };
        StreamsConfig config1 = new StreamsConfig((Map)this.configProps());
        StreamThread thread = new StreamThread((TopologyBuilder)builder, config1, clientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)new MockTime(), new StreamsMetadataState((TopologyBuilder)builder, StreamsMetadataState.UNKNOWN_HOST), 0L){

            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
                return testStreamTask;
            }
        };
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        activeTasks.put(testStreamTask.id, testStreamTask.partitions);
        thread.partitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(activeTasks));
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned((Collection)testStreamTask.partitions);
        try {
            thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
            Assert.fail((String)"should have thrown exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertFalse((boolean)testStreamTask.committed);
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushStateWhileSuspendingState() throws Exception {
        KStreamBuilder builder = new KStreamBuilder();
        builder.setApplicationId("stream-thread-test");
        builder.stream(new String[]{"t1"}).groupByKey();
        StreamsConfig config = new StreamsConfig((Map)this.configProps());
        MockClientSupplier clientSupplier = new MockClientSupplier();
        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), "stream-thread-test", Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("t1", 0)}), builder.build(Integer.valueOf(0)), (Consumer)clientSupplier.consumer, (Producer)clientSupplier.producer, (Consumer)clientSupplier.restoreConsumer, config, (StreamsMetrics)new MockStreamsMetrics(new Metrics()), new StateDirectory("stream-thread-test", config.getString("state.dir"))){

            public void flushState() {
                throw new RuntimeException("KABOOM!");
            }
        };
        StreamsConfig config1 = new StreamsConfig((Map)this.configProps());
        StreamThread thread = new StreamThread((TopologyBuilder)builder, config1, clientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), (Time)new MockTime(), new StreamsMetadataState((TopologyBuilder)builder, StreamsMetadataState.UNKNOWN_HOST), 0L){

            protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
                return testStreamTask;
            }
        };
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        activeTasks.put(testStreamTask.id, testStreamTask.partitions);
        thread.partitionAssignor((StreamPartitionAssignor)new MockStreamsPartitionAssignor(activeTasks));
        thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        thread.rebalanceListener.onPartitionsAssigned((Collection)testStreamTask.partitions);
        try {
            thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
            Assert.fail((String)"should have thrown exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertFalse((boolean)testStreamTask.committed);
    }

    private void initPartitionGrouper(StreamsConfig config, StreamThread thread, MockClientSupplier clientSupplier) {
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        partitionAssignor.configure(config.getConsumerConfigs(thread, thread.applicationId, thread.clientId));
        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread.config, clientSupplier.restoreConsumer);
        partitionAssignor.setInternalTopicManager((InternalTopicManager)internalTopicManager);
        Map assignments = partitionAssignor.assign(this.metadata, Collections.singletonMap("client", this.subscription));
        partitionAssignor.onAssignment((PartitionAssignor.Assignment)assignments.get("client"));
    }

    public static class StateListenerStub
    implements StreamThread.StateListener {
        public int numChanges = 0;
        public StreamThread.State oldState = null;
        public StreamThread.State newState = null;

        public void onChange(StreamThread thread, StreamThread.State newState, StreamThread.State oldState) {
            ++this.numChanges;
            if (this.newState != null && this.newState != oldState) {
                throw new RuntimeException("State mismatch " + oldState + " different from " + this.newState);
            }
            this.oldState = oldState;
            this.newState = newState;
        }
    }

    private class MockStreamsPartitionAssignor
    extends StreamPartitionAssignor {
        private final Map<TaskId, Set<TopicPartition>> activeTaskAssignment;

        public MockStreamsPartitionAssignor(Map<TaskId, Set<TopicPartition>> activeTaskAssignment) {
            this.activeTaskAssignment = activeTaskAssignment;
        }

        Map<TaskId, Set<TopicPartition>> activeTasks() {
            return this.activeTaskAssignment;
        }
    }

    private static class TestStreamTask
    extends StreamTask {
        public boolean committed = false;
        private boolean closed;
        private boolean closedStateManager;

        public TestStreamTask(TaskId id, String applicationId, Collection<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[], byte[]> consumer, Producer<byte[], byte[]> producer, Consumer<byte[], byte[]> restoreConsumer, StreamsConfig config, StreamsMetrics metrics, StateDirectory stateDirectory) {
            super(id, applicationId, partitions, topology, consumer, restoreConsumer, config, metrics, stateDirectory, null, (Time)new MockTime(), (RecordCollector)new RecordCollectorImpl(producer, id.toString()));
        }

        public void commit() {
            super.commit();
            this.committed = true;
        }

        public void commitOffsets() {
            super.commitOffsets();
            this.committed = true;
        }

        protected void initializeOffsetLimits() {
        }

        public void close() {
            this.closed = true;
            super.close();
        }

        void closeStateManager(boolean writeCheckpoint) {
            super.closeStateManager(writeCheckpoint);
            this.closedStateManager = true;
        }
    }
}

