/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.internals.GlobalStateManager;
import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.test.GlobalStateManagerStub;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.NoOpProcessorContext;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class GlobalStateTaskTest {
    private Map<TopicPartition, Long> offsets;
    private GlobalStateUpdateTask globalStateTask;
    private GlobalStateManagerStub stateMgr;
    private List<ProcessorNode> processorNodes;
    private NoOpProcessorContext context;
    private TopicPartition t1;
    private TopicPartition t2;
    private MockSourceNode sourceOne;
    private MockSourceNode sourceTwo;

    @Before
    public void before() {
        this.sourceOne = new MockSourceNode(new String[]{"t1"}, new StringDeserializer(), new StringDeserializer());
        this.sourceTwo = new MockSourceNode(new String[]{"t2"}, new IntegerDeserializer(), new IntegerDeserializer());
        this.processorNodes = Arrays.asList(new ProcessorNode[]{this.sourceOne, this.sourceTwo, new MockProcessorNode(-1L), new MockProcessorNode(-1L)});
        Set storeNames = Utils.mkSet((Object[])new String[]{"t1-store", "t2-store"});
        HashMap<String, MockSourceNode> sourceByTopics = new HashMap<String, MockSourceNode>();
        sourceByTopics.put("t1", this.sourceOne);
        sourceByTopics.put("t2", this.sourceTwo);
        HashMap<String, String> storeToTopic = new HashMap<String, String>();
        storeToTopic.put("t1-store", "t1");
        storeToTopic.put("t2-store", "t2");
        ProcessorTopology topology = new ProcessorTopology(this.processorNodes, sourceByTopics, Collections.emptyMap(), Collections.emptyList(), storeToTopic, Collections.emptyList());
        this.context = new NoOpProcessorContext();
        this.t1 = new TopicPartition("t1", 1);
        this.t2 = new TopicPartition("t2", 1);
        this.offsets = new HashMap<TopicPartition, Long>();
        this.offsets.put(this.t1, 50L);
        this.offsets.put(this.t2, 100L);
        this.stateMgr = new GlobalStateManagerStub(storeNames, this.offsets);
        this.globalStateTask = new GlobalStateUpdateTask(topology, (InternalProcessorContext)this.context, (GlobalStateManager)this.stateMgr);
    }

    @Test
    public void shouldInitializeStateManager() throws Exception {
        Map startingOffsets = this.globalStateTask.initialize();
        Assert.assertTrue((boolean)this.stateMgr.initialized);
        Assert.assertEquals(this.offsets, (Object)startingOffsets);
    }

    @Test
    public void shouldInitializeContext() throws Exception {
        this.globalStateTask.initialize();
        Assert.assertTrue((boolean)this.context.initialized);
    }

    @Test
    public void shouldInitializeProcessorTopology() throws Exception {
        this.globalStateTask.initialize();
        for (ProcessorNode processorNode : this.processorNodes) {
            if (processorNode instanceof MockProcessorNode) {
                Assert.assertTrue((boolean)((MockProcessorNode)processorNode).initialized);
                continue;
            }
            Assert.assertTrue((boolean)((MockSourceNode)processorNode).initialized);
        }
    }

    @Test
    public void shouldProcessRecordsForTopic() throws Exception {
        this.globalStateTask.initialize();
        this.globalStateTask.update(new ConsumerRecord("t1", 1, 1L, (Object)"foo".getBytes(), (Object)"bar".getBytes()));
        Assert.assertEquals((long)1L, (long)this.sourceOne.numReceived);
        Assert.assertEquals((long)0L, (long)this.sourceTwo.numReceived);
    }

    @Test
    public void shouldProcessRecordsForOtherTopic() throws Exception {
        byte[] integerBytes = new IntegerSerializer().serialize("foo", Integer.valueOf(1));
        this.globalStateTask.initialize();
        this.globalStateTask.update(new ConsumerRecord("t2", 1, 1L, (Object)integerBytes, (Object)integerBytes));
        Assert.assertEquals((long)1L, (long)this.sourceTwo.numReceived);
        Assert.assertEquals((long)0L, (long)this.sourceOne.numReceived);
    }

    @Test
    public void shouldCloseStateManagerWithOffsets() throws Exception {
        HashMap<TopicPartition, Long> expectedOffsets = new HashMap<TopicPartition, Long>();
        expectedOffsets.put(this.t1, 52L);
        expectedOffsets.put(this.t2, 100L);
        this.globalStateTask.initialize();
        this.globalStateTask.update(new ConsumerRecord("t1", 1, 51L, (Object)"foo".getBytes(), (Object)"foo".getBytes()));
        this.globalStateTask.close();
        Assert.assertEquals(expectedOffsets, this.stateMgr.checkpointedOffsets());
        Assert.assertTrue((boolean)this.stateMgr.closed);
    }
}

