/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.QuickUnion;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.WindowStoreSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public class TopologyBuilder {
    private static final Logger log = LoggerFactory.getLogger(TopologyBuilder.class);
    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile("");
    private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap();
    private final Map<String, StateStoreFactory> stateFactories = new HashMap<String, StateStoreFactory>();
    private final Map<String, StateStore> globalStateStores = new LinkedHashMap<String, StateStore>();
    private final Set<String> sourceTopicNames = new HashSet<String>();
    private final Set<String> internalTopicNames = new HashSet<String>();
    private final List<Set<String>> copartitionSourceGroups = new ArrayList<Set<String>>();
    private final HashMap<String, List<String>> nodeToSourceTopics = new HashMap();
    private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<String, Pattern>();
    private final HashMap<String, String> nodeToSinkTopic = new HashMap();
    private final HashMap<String, Pattern> topicToPatterns = new HashMap();
    private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<String, Set<String>>();
    private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new HashMap<String, Set<Pattern>>();
    private final Map<String, String> storeToChangelogTopic = new HashMap<String, String>();
    private final Set<String> globalTopics = new HashSet<String>();
    private final Set<String> earliestResetTopics = new HashSet<String>();
    private final Set<String> latestResetTopics = new HashSet<String>();
    private final Set<Pattern> earliestResetPatterns = new HashSet<Pattern>();
    private final Set<Pattern> latestResetPatterns = new HashSet<Pattern>();
    private final QuickUnion<String> nodeGrouper = new QuickUnion();
    private StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
    private String applicationId = null;
    private Pattern topicPattern = null;
    private Map<Integer, Set<String>> nodeGroups = null;

    public final synchronized TopologyBuilder setApplicationId(String applicationId) {
        Objects.requireNonNull(applicationId, "applicationId can't be null");
        this.applicationId = applicationId;
        return this;
    }

    public final synchronized TopologyBuilder addSource(String name, String ... topics) {
        return this.addSource((AutoOffsetReset)null, name, (TimestampExtractor)null, (Deserializer)null, (Deserializer)null, topics);
    }

    public final synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, String ... topics) {
        return this.addSource(offsetReset, name, (TimestampExtractor)null, (Deserializer)null, (Deserializer)null, topics);
    }

    public final synchronized TopologyBuilder addSource(TimestampExtractor timestampExtractor, String name, String ... topics) {
        return this.addSource(null, name, timestampExtractor, null, null, topics);
    }

    public final synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, String ... topics) {
        return this.addSource(offsetReset, name, timestampExtractor, null, null, topics);
    }

    public final synchronized TopologyBuilder addSource(String name, Pattern topicPattern) {
        return this.addSource(null, name, null, null, null, topicPattern);
    }

    public final synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Pattern topicPattern) {
        return this.addSource(offsetReset, name, null, null, null, topicPattern);
    }

    public final synchronized TopologyBuilder addSource(TimestampExtractor timestampExtractor, String name, Pattern topicPattern) {
        return this.addSource(null, name, timestampExtractor, null, null, topicPattern);
    }

    public final synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, TimestampExtractor timestampExtractor, String name, Pattern topicPattern) {
        return this.addSource(offsetReset, name, timestampExtractor, null, null, topicPattern);
    }

    public final synchronized TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String ... topics) {
        return this.addSource(null, name, null, keyDeserializer, valDeserializer, topics);
    }

    public final synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer keyDeserializer, Deserializer valDeserializer, String ... topics) {
        if (topics.length == 0) {
            throw new TopologyBuilderException("You must provide at least one topic");
        }
        Objects.requireNonNull(name, "name must not be null");
        if (this.nodeFactories.containsKey(name)) {
            throw new TopologyBuilderException("Processor " + name + " is already added.");
        }
        for (String topic : topics) {
            Objects.requireNonNull(topic, "topic names cannot be null");
            this.validateTopicNotAlreadyRegistered(topic);
            this.maybeAddToResetList(this.earliestResetTopics, this.latestResetTopics, offsetReset, topic);
            this.sourceTopicNames.add(topic);
        }
        this.nodeFactories.put(name, new SourceNodeFactory(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer));
        this.nodeToSourceTopics.put(name, Arrays.asList(topics));
        this.nodeGrouper.add(name);
        return this;
    }

    public synchronized TopologyBuilder addGlobalStore(StateStoreSupplier<KeyValueStore> storeSupplier, String sourceName, Deserializer keyDeserializer, Deserializer valueDeserializer, String topic, String processorName, ProcessorSupplier stateUpdateSupplier) {
        return this.addGlobalStore(storeSupplier, sourceName, null, keyDeserializer, valueDeserializer, topic, processorName, stateUpdateSupplier);
    }

    public synchronized TopologyBuilder addGlobalStore(StateStoreSupplier<KeyValueStore> storeSupplier, String sourceName, TimestampExtractor timestampExtractor, Deserializer keyDeserializer, Deserializer valueDeserializer, String topic, String processorName, ProcessorSupplier stateUpdateSupplier) {
        Objects.requireNonNull(storeSupplier, "store supplier must not be null");
        Objects.requireNonNull(sourceName, "sourceName must not be null");
        Objects.requireNonNull(topic, "topic must not be null");
        Objects.requireNonNull(stateUpdateSupplier, "supplier must not be null");
        Objects.requireNonNull(processorName, "processorName must not be null");
        if (this.nodeFactories.containsKey(sourceName)) {
            throw new TopologyBuilderException("Processor " + sourceName + " is already added.");
        }
        if (this.nodeFactories.containsKey(processorName)) {
            throw new TopologyBuilderException("Processor " + processorName + " is already added.");
        }
        if (this.stateFactories.containsKey(storeSupplier.name()) || this.globalStateStores.containsKey(storeSupplier.name())) {
            throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " is already added.");
        }
        if (storeSupplier.loggingEnabled()) {
            throw new TopologyBuilderException("StateStore " + storeSupplier.name() + " for global table must not have logging enabled.");
        }
        if (sourceName.equals(processorName)) {
            throw new TopologyBuilderException("sourceName and processorName must be different.");
        }
        this.validateTopicNotAlreadyRegistered(topic);
        this.globalTopics.add(topic);
        String[] topics = new String[]{topic};
        this.nodeFactories.put(sourceName, new SourceNodeFactory(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
        this.nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
        this.nodeGrouper.add(sourceName);
        String[] parents = new String[]{sourceName};
        ProcessorNodeFactory nodeFactory = new ProcessorNodeFactory(processorName, parents, stateUpdateSupplier);
        nodeFactory.addStateStore(storeSupplier.name());
        this.nodeFactories.put(processorName, nodeFactory);
        this.nodeGrouper.add(processorName);
        this.nodeGrouper.unite(processorName, (String[])parents);
        this.globalStateStores.put(storeSupplier.name(), storeSupplier.get());
        this.connectSourceStoreAndTopic(storeSupplier.name(), topic);
        return this;
    }

    private void validateTopicNotAlreadyRegistered(String topic) {
        if (this.sourceTopicNames.contains(topic) || this.globalTopics.contains(topic)) {
            throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
        }
        for (Pattern pattern : this.nodeToSourcePatterns.values()) {
            if (!pattern.matcher(topic).matches()) continue;
            throw new TopologyBuilderException("Topic " + topic + " matches a Pattern already registered by another source.");
        }
    }

    public final synchronized TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) {
        return this.addSource(null, name, null, keyDeserializer, valDeserializer, topicPattern);
    }

    public final synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) {
        Objects.requireNonNull(topicPattern, "topicPattern can't be null");
        Objects.requireNonNull(name, "name can't be null");
        if (this.nodeFactories.containsKey(name)) {
            throw new TopologyBuilderException("Processor " + name + " is already added.");
        }
        for (String sourceTopicName : this.sourceTopicNames) {
            if (!topicPattern.matcher(sourceTopicName).matches()) continue;
            throw new TopologyBuilderException("Pattern  " + topicPattern + " will match a topic that has already been registered by another source.");
        }
        this.maybeAddToResetList(this.earliestResetPatterns, this.latestResetPatterns, offsetReset, topicPattern);
        this.nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
        this.nodeToSourcePatterns.put(name, topicPattern);
        this.nodeGrouper.add(name);
        return this;
    }

    public final synchronized TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) {
        return this.addSource(offsetReset, name, null, keyDeserializer, valDeserializer, topicPattern);
    }

    public final synchronized TopologyBuilder addSink(String name, String topic, String ... parentNames) {
        return this.addSink(name, topic, (Serializer)null, (Serializer)null, parentNames);
    }

    public final synchronized TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String ... parentNames) {
        return this.addSink(name, topic, null, null, partitioner, parentNames);
    }

    public final synchronized TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String ... parentNames) {
        return this.addSink(name, topic, (Serializer)keySerializer, (Serializer)valSerializer, (StreamPartitioner<? super K, ? super V>)null, parentNames);
    }

    public final synchronized <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner, String ... parentNames) {
        Objects.requireNonNull(name, "name must not be null");
        Objects.requireNonNull(topic, "topic must not be null");
        if (this.nodeFactories.containsKey(name)) {
            throw new TopologyBuilderException("Processor " + name + " is already added.");
        }
        for (String parent : parentNames) {
            if (parent.equals(name)) {
                throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
            }
            if (this.nodeFactories.containsKey(parent)) continue;
            throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
        }
        this.nodeFactories.put(name, new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer, partitioner));
        this.nodeToSinkTopic.put(name, topic);
        this.nodeGrouper.add(name);
        this.nodeGrouper.unite(name, (String[])parentNames);
        return this;
    }

    public final synchronized TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String ... parentNames) {
        Objects.requireNonNull(name, "name must not be null");
        Objects.requireNonNull(supplier, "supplier must not be null");
        if (this.nodeFactories.containsKey(name)) {
            throw new TopologyBuilderException("Processor " + name + " is already added.");
        }
        for (String parent : parentNames) {
            if (parent.equals(name)) {
                throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
            }
            if (this.nodeFactories.containsKey(parent)) continue;
            throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
        }
        this.nodeFactories.put(name, new ProcessorNodeFactory(name, parentNames, supplier));
        this.nodeGrouper.add(name);
        this.nodeGrouper.unite(name, (String[])parentNames);
        return this;
    }

    public final synchronized TopologyBuilder addStateStore(StateStoreSupplier supplier, String ... processorNames) {
        Objects.requireNonNull(supplier, "supplier can't be null");
        if (this.stateFactories.containsKey(supplier.name())) {
            throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added.");
        }
        this.stateFactories.put(supplier.name(), new StateStoreFactory(supplier));
        if (processorNames != null) {
            for (String processorName : processorNames) {
                this.connectProcessorAndStateStore(processorName, supplier.name());
            }
        }
        return this;
    }

    public final synchronized TopologyBuilder connectProcessorAndStateStores(String processorName, String ... stateStoreNames) {
        Objects.requireNonNull(processorName, "processorName can't be null");
        if (stateStoreNames != null) {
            for (String stateStoreName : stateStoreNames) {
                this.connectProcessorAndStateStore(processorName, stateStoreName);
            }
        }
        return this;
    }

    protected final synchronized TopologyBuilder connectSourceStoreAndTopic(String sourceStoreName, String topic) {
        if (this.storeToChangelogTopic.containsKey(sourceStoreName)) {
            throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added.");
        }
        this.storeToChangelogTopic.put(sourceStoreName, topic);
        return this;
    }

    public final synchronized TopologyBuilder connectProcessors(String ... processorNames) {
        if (processorNames.length < 2) {
            throw new TopologyBuilderException("At least two processors need to participate in the connection.");
        }
        for (String processorName : processorNames) {
            if (this.nodeFactories.containsKey(processorName)) continue;
            throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
        }
        String firstProcessorName = processorNames[0];
        this.nodeGrouper.unite(firstProcessorName, (String[])Arrays.copyOfRange(processorNames, 1, processorNames.length));
        return this;
    }

    public final synchronized TopologyBuilder addInternalTopic(String topicName) {
        Objects.requireNonNull(topicName, "topicName can't be null");
        this.internalTopicNames.add(topicName);
        return this;
    }

    public final synchronized TopologyBuilder copartitionSources(Collection<String> sourceNodes) {
        this.copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<String>(sourceNodes)));
        return this;
    }

    private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
        if (!this.stateFactories.containsKey(stateStoreName)) {
            throw new TopologyBuilderException("StateStore " + stateStoreName + " is not added yet.");
        }
        if (!this.nodeFactories.containsKey(processorName)) {
            throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
        }
        StateStoreFactory stateStoreFactory = this.stateFactories.get(stateStoreName);
        Iterator<String> iter = stateStoreFactory.users.iterator();
        if (iter.hasNext()) {
            String user = iter.next();
            this.nodeGrouper.unite(user, (String[])new String[]{processorName});
        }
        stateStoreFactory.users.add(processorName);
        NodeFactory nodeFactory = this.nodeFactories.get(processorName);
        if (!(nodeFactory instanceof ProcessorNodeFactory)) {
            throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
        }
        ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory)nodeFactory;
        processorNodeFactory.addStateStore(stateStoreName);
        this.connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
    }

    private Set<SourceNodeFactory> findSourcesForProcessorParents(String[] parents) {
        HashSet<SourceNodeFactory> sourceNodes = new HashSet<SourceNodeFactory>();
        for (String parent : parents) {
            NodeFactory nodeFactory = this.nodeFactories.get(parent);
            if (nodeFactory instanceof SourceNodeFactory) {
                sourceNodes.add((SourceNodeFactory)nodeFactory);
                continue;
            }
            if (!(nodeFactory instanceof ProcessorNodeFactory)) continue;
            sourceNodes.addAll(this.findSourcesForProcessorParents(((ProcessorNodeFactory)nodeFactory).parents));
        }
        return sourceNodes;
    }

    private void connectStateStoreNameToSourceTopicsOrPattern(String stateStoreName, ProcessorNodeFactory processorNodeFactory) {
        if (this.stateStoreNameToSourceTopics.containsKey(stateStoreName) || this.stateStoreNameToSourceRegex.containsKey(stateStoreName)) {
            return;
        }
        HashSet sourceTopics = new HashSet();
        HashSet<Pattern> sourcePatterns = new HashSet<Pattern>();
        Set<SourceNodeFactory> sourceNodesForParent = this.findSourcesForProcessorParents(processorNodeFactory.parents);
        for (SourceNodeFactory sourceNodeFactory : sourceNodesForParent) {
            if (sourceNodeFactory.pattern != null) {
                sourcePatterns.add(sourceNodeFactory.pattern);
                continue;
            }
            sourceTopics.addAll(sourceNodeFactory.topics);
        }
        if (!sourceTopics.isEmpty()) {
            this.stateStoreNameToSourceTopics.put(stateStoreName, Collections.unmodifiableSet(sourceTopics));
        }
        if (!sourcePatterns.isEmpty()) {
            this.stateStoreNameToSourceRegex.put(stateStoreName, Collections.unmodifiableSet(sourcePatterns));
        }
    }

    private <T> void maybeAddToResetList(Collection<T> earliestResets, Collection<T> latestResets, AutoOffsetReset offsetReset, T item) {
        if (offsetReset != null) {
            switch (offsetReset) {
                case EARLIEST: {
                    earliestResets.add(item);
                    break;
                }
                case LATEST: {
                    latestResets.add(item);
                    break;
                }
                default: {
                    throw new TopologyBuilderException(String.format("Unrecognized reset format %s", new Object[]{offsetReset}));
                }
            }
        }
    }

    public synchronized Map<Integer, Set<String>> nodeGroups() {
        if (this.nodeGroups == null) {
            this.nodeGroups = this.makeNodeGroups();
        }
        return this.nodeGroups;
    }

    private Map<Integer, Set<String>> makeNodeGroups() {
        Set<String> nodeGroup;
        String root;
        LinkedHashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<Integer, Set<String>>();
        HashMap rootToNodeGroup = new HashMap();
        int nodeGroupId = 0;
        HashSet<String> allSourceNodes = new HashSet<String>(this.nodeToSourceTopics.keySet());
        allSourceNodes.addAll(this.nodeToSourcePatterns.keySet());
        for (String nodeName : Utils.sorted(allSourceNodes)) {
            root = this.nodeGrouper.root(nodeName);
            nodeGroup = (HashSet<String>)rootToNodeGroup.get(root);
            if (nodeGroup == null) {
                nodeGroup = new HashSet<String>();
                rootToNodeGroup.put(root, nodeGroup);
                nodeGroups.put(nodeGroupId++, nodeGroup);
            }
            nodeGroup.add(nodeName);
        }
        for (String nodeName : Utils.sorted(this.nodeFactories.keySet())) {
            if (this.nodeToSourceTopics.containsKey(nodeName)) continue;
            root = this.nodeGrouper.root(nodeName);
            nodeGroup = (Set)rootToNodeGroup.get(root);
            if (nodeGroup == null) {
                nodeGroup = new HashSet();
                rootToNodeGroup.put(root, nodeGroup);
                nodeGroups.put(nodeGroupId++, nodeGroup);
            }
            nodeGroup.add(nodeName);
        }
        return nodeGroups;
    }

    public synchronized ProcessorTopology build(Integer topicGroupId) {
        Set<String> nodeGroup;
        if (topicGroupId != null) {
            nodeGroup = this.nodeGroups().get(topicGroupId);
        } else {
            Set<String> globalNodeGroups = this.globalNodeGroups();
            Collection<Set<String>> values = this.nodeGroups().values();
            nodeGroup = new HashSet<String>();
            for (Set<String> value : values) {
                nodeGroup.addAll(value);
            }
            nodeGroup.removeAll(globalNodeGroups);
        }
        return this.build(nodeGroup);
    }

    public synchronized ProcessorTopology buildGlobalStateTopology() {
        Set<String> globalGroups = this.globalNodeGroups();
        if (globalGroups.isEmpty()) {
            return null;
        }
        return this.build(globalGroups);
    }

    private Set<String> globalNodeGroups() {
        HashSet<String> globalGroups = new HashSet<String>();
        for (Map.Entry<Integer, Set<String>> nodeGroup : this.nodeGroups().entrySet()) {
            Set<String> nodes = nodeGroup.getValue();
            for (String node : nodes) {
                List topics;
                NodeFactory nodeFactory = this.nodeFactories.get(node);
                if (!(nodeFactory instanceof SourceNodeFactory) || (topics = ((SourceNodeFactory)nodeFactory).topics) == null || topics.size() != 1 || !this.globalTopics.contains(topics.get(0))) continue;
                globalGroups.addAll(nodes);
            }
        }
        return globalGroups;
    }

    private ProcessorTopology build(Set<String> nodeGroup) {
        ArrayList<ProcessorNode> processorNodes = new ArrayList<ProcessorNode>(this.nodeFactories.size());
        HashMap<String, ProcessorNode> processorMap = new HashMap<String, ProcessorNode>();
        HashMap<String, SourceNode> topicSourceMap = new HashMap<String, SourceNode>();
        HashMap<String, SinkNode> topicSinkMap = new HashMap<String, SinkNode>();
        LinkedHashMap<String, StateStore> stateStoreMap = new LinkedHashMap<String, StateStore>();
        for (NodeFactory factory : this.nodeFactories.values()) {
            if (nodeGroup != null && !nodeGroup.contains(factory.name)) continue;
            ProcessorNode node = factory.build();
            processorNodes.add(node);
            processorMap.put(node.name(), node);
            if (factory instanceof ProcessorNodeFactory) {
                for (String parent : ((ProcessorNodeFactory)factory).parents) {
                    ProcessorNode parentNode = (ProcessorNode)processorMap.get(parent);
                    parentNode.addChild(node);
                }
                for (String stateStoreName : ((ProcessorNodeFactory)factory).stateStoreNames) {
                    StateStore stateStore;
                    if (stateStoreMap.containsKey(stateStoreName)) continue;
                    if (this.stateFactories.containsKey(stateStoreName)) {
                        StateStoreSupplier supplier = this.stateFactories.get((Object)stateStoreName).supplier;
                        stateStore = supplier.get();
                        if (supplier.loggingEnabled() && !this.storeToChangelogTopic.containsKey(stateStoreName)) {
                            String changelogTopic = ProcessorStateManager.storeChangelogTopic(this.applicationId, stateStoreName);
                            this.storeToChangelogTopic.put(stateStoreName, changelogTopic);
                        }
                    } else {
                        stateStore = this.globalStateStores.get(stateStoreName);
                    }
                    stateStoreMap.put(stateStoreName, stateStore);
                }
                continue;
            }
            if (factory instanceof SourceNodeFactory) {
                SourceNodeFactory sourceNodeFactory = (SourceNodeFactory)factory;
                List<String> topics = sourceNodeFactory.pattern != null ? sourceNodeFactory.getTopics(this.subscriptionUpdates.getUpdates()) : sourceNodeFactory.topics;
                for (String topic : topics) {
                    if (this.internalTopicNames.contains(topic)) {
                        topicSourceMap.put(this.decorateTopic(topic), (SourceNode)node);
                        continue;
                    }
                    topicSourceMap.put(topic, (SourceNode)node);
                }
                continue;
            }
            if (factory instanceof SinkNodeFactory) {
                SinkNodeFactory sinkNodeFactory = (SinkNodeFactory)factory;
                for (String parent : sinkNodeFactory.parents) {
                    ((ProcessorNode)processorMap.get(parent)).addChild(node);
                    if (this.internalTopicNames.contains(sinkNodeFactory.topic)) {
                        topicSinkMap.put(this.decorateTopic(sinkNodeFactory.topic), (SinkNode)node);
                        continue;
                    }
                    topicSinkMap.put(sinkNodeFactory.topic, (SinkNode)node);
                }
                continue;
            }
            throw new TopologyBuilderException("Unknown definition class: " + factory.getClass().getName());
        }
        return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<StateStore>(stateStoreMap.values()), this.storeToChangelogTopic, new ArrayList<StateStore>(this.globalStateStores.values()));
    }

    public Map<String, StateStore> globalStateStores() {
        return Collections.unmodifiableMap(this.globalStateStores);
    }

    public synchronized Map<Integer, TopicsInfo> topicGroups() {
        LinkedHashMap<Integer, TopicsInfo> topicGroups = new LinkedHashMap<Integer, TopicsInfo>();
        if (this.nodeGroups == null) {
            this.nodeGroups = this.makeNodeGroups();
        }
        for (Map.Entry<Integer, Set<String>> entry : this.nodeGroups.entrySet()) {
            HashSet<String> sinkTopics = new HashSet<String>();
            HashSet<String> sourceTopics = new HashSet<String>();
            HashMap<String, InternalTopicConfig> internalSourceTopics = new HashMap<String, InternalTopicConfig>();
            HashMap<String, InternalTopicConfig> stateChangelogTopics = new HashMap<String, InternalTopicConfig>();
            for (String node : entry.getValue()) {
                String topic;
                List<String> topics = this.nodeToSourceTopics.get(node);
                if (topics != null) {
                    for (String topic2 : topics) {
                        if (this.globalTopics.contains(topic2)) continue;
                        if (this.internalTopicNames.contains(topic2)) {
                            String internalTopic = this.decorateTopic(topic2);
                            internalSourceTopics.put(internalTopic, new InternalTopicConfig(internalTopic, Collections.singleton(InternalTopicConfig.CleanupPolicy.delete), Collections.emptyMap()));
                            sourceTopics.add(internalTopic);
                            continue;
                        }
                        sourceTopics.add(topic2);
                    }
                }
                if ((topic = this.nodeToSinkTopic.get(node)) != null) {
                    if (this.internalTopicNames.contains(topic)) {
                        sinkTopics.add(this.decorateTopic(topic));
                    } else {
                        sinkTopics.add(topic);
                    }
                }
                for (StateStoreFactory stateFactory : this.stateFactories.values()) {
                    StateStoreSupplier supplier = stateFactory.supplier;
                    if (!supplier.loggingEnabled() || !stateFactory.users.contains(node)) continue;
                    String name = ProcessorStateManager.storeChangelogTopic(this.applicationId, supplier.name());
                    InternalTopicConfig internalTopicConfig = this.createInternalTopicConfig(supplier, name);
                    stateChangelogTopics.put(name, internalTopicConfig);
                }
            }
            if (sourceTopics.isEmpty()) continue;
            topicGroups.put(entry.getKey(), new TopicsInfo(Collections.unmodifiableSet(sinkTopics), Collections.unmodifiableSet(sourceTopics), Collections.unmodifiableMap(internalSourceTopics), Collections.unmodifiableMap(stateChangelogTopics)));
        }
        return Collections.unmodifiableMap(topicGroups);
    }

    private void setRegexMatchedTopicsToSourceNodes() {
        if (this.subscriptionUpdates.hasUpdates()) {
            for (Map.Entry<String, Pattern> stringPatternEntry : this.nodeToSourcePatterns.entrySet()) {
                SourceNodeFactory sourceNode = (SourceNodeFactory)this.nodeFactories.get(stringPatternEntry.getKey());
                this.nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(this.subscriptionUpdates.getUpdates()));
                log.debug("nodeToSourceTopics {}", this.nodeToSourceTopics);
            }
        }
    }

    private void setRegexMatchedTopicToStateStore() {
        if (this.subscriptionUpdates.hasUpdates()) {
            for (Map.Entry<String, Set<Pattern>> storePattern : this.stateStoreNameToSourceRegex.entrySet()) {
                HashSet<String> updatedTopicsForStateStore = new HashSet<String>();
                for (String subscriptionUpdateTopic : this.subscriptionUpdates.getUpdates()) {
                    for (Pattern pattern : storePattern.getValue()) {
                        if (!pattern.matcher(subscriptionUpdateTopic).matches()) continue;
                        updatedTopicsForStateStore.add(subscriptionUpdateTopic);
                    }
                }
                if (updatedTopicsForStateStore.isEmpty()) continue;
                Collection storeTopics = this.stateStoreNameToSourceTopics.get(storePattern.getKey());
                if (storeTopics != null) {
                    updatedTopicsForStateStore.addAll(storeTopics);
                }
                this.stateStoreNameToSourceTopics.put(storePattern.getKey(), Collections.unmodifiableSet(updatedTopicsForStateStore));
            }
        }
    }

    private InternalTopicConfig createInternalTopicConfig(StateStoreSupplier<?> supplier, String name) {
        if (!(supplier instanceof WindowStoreSupplier)) {
            return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig());
        }
        WindowStoreSupplier windowStoreSupplier = (WindowStoreSupplier)supplier;
        InternalTopicConfig config = new InternalTopicConfig(name, Utils.mkSet((Object[])new InternalTopicConfig.CleanupPolicy[]{InternalTopicConfig.CleanupPolicy.compact, InternalTopicConfig.CleanupPolicy.delete}), supplier.logConfig());
        config.setRetentionMs(windowStoreSupplier.retentionPeriod());
        return config;
    }

    public synchronized Pattern earliestResetTopicsPattern() {
        List<String> topics = this.maybeDecorateInternalSourceTopics(this.earliestResetTopics);
        Pattern earliestPattern = TopologyBuilder.buildPatternForOffsetResetTopics(topics, this.earliestResetPatterns);
        this.ensureNoRegexOverlap(earliestPattern, this.latestResetPatterns, this.latestResetTopics);
        return earliestPattern;
    }

    public synchronized Pattern latestResetTopicsPattern() {
        List<String> topics = this.maybeDecorateInternalSourceTopics(this.latestResetTopics);
        Pattern latestPattern = TopologyBuilder.buildPatternForOffsetResetTopics(topics, this.latestResetPatterns);
        this.ensureNoRegexOverlap(latestPattern, this.earliestResetPatterns, this.earliestResetTopics);
        return latestPattern;
    }

    private void ensureNoRegexOverlap(Pattern builtPattern, Set<Pattern> otherPatterns, Set<String> otherTopics) {
        for (Pattern otherPattern : otherPatterns) {
            if (!builtPattern.pattern().contains(otherPattern.pattern())) continue;
            throw new TopologyBuilderException(String.format("Found overlapping regex [%s] against [%s] for a KStream with auto offset resets", otherPattern.pattern(), builtPattern.pattern()));
        }
        for (String otherTopic : otherTopics) {
            if (!builtPattern.matcher(otherTopic).matches()) continue;
            throw new TopologyBuilderException(String.format("Found overlapping regex [%s] matching topic [%s] for a KStream with auto offset resets", builtPattern.pattern(), otherTopic));
        }
    }

    private static synchronized Pattern buildPatternForOffsetResetTopics(Collection<String> sourceTopics, Collection<Pattern> sourcePatterns) {
        StringBuilder builder = new StringBuilder();
        for (String topic : sourceTopics) {
            builder.append(topic).append("|");
        }
        for (Pattern sourcePattern : sourcePatterns) {
            builder.append(sourcePattern.pattern()).append("|");
        }
        if (builder.length() > 0) {
            builder.setLength(builder.length() - 1);
            return Pattern.compile(builder.toString());
        }
        return EMPTY_ZERO_LENGTH_PATTERN;
    }

    public Map<String, List<String>> stateStoreNameToSourceTopics() {
        HashMap<String, List<String>> results = new HashMap<String, List<String>>();
        for (Map.Entry<String, Set<String>> entry : this.stateStoreNameToSourceTopics.entrySet()) {
            results.put(entry.getKey(), this.maybeDecorateInternalSourceTopics((Collection<String>)entry.getValue()));
        }
        return results;
    }

    public synchronized Collection<Set<String>> copartitionGroups() {
        ArrayList list = new ArrayList(this.copartitionSourceGroups.size());
        for (Set<String> nodeNames : this.copartitionSourceGroups) {
            HashSet<String> copartitionGroup = new HashSet<String>();
            for (String node : nodeNames) {
                List<String> topics = this.nodeToSourceTopics.get(node);
                if (topics == null) continue;
                copartitionGroup.addAll(this.maybeDecorateInternalSourceTopics(topics));
            }
            list.add(Collections.unmodifiableSet(copartitionGroup));
        }
        return Collections.unmodifiableList(list);
    }

    private List<String> maybeDecorateInternalSourceTopics(Collection<String> sourceTopics) {
        ArrayList<String> decoratedTopics = new ArrayList<String>();
        for (String topic : sourceTopics) {
            if (this.internalTopicNames.contains(topic)) {
                decoratedTopics.add(this.decorateTopic(topic));
                continue;
            }
            decoratedTopics.add(topic);
        }
        return decoratedTopics;
    }

    private String decorateTopic(String topic) {
        if (this.applicationId == null) {
            throw new TopologyBuilderException("there are internal topics and applicationId hasn't been set. Call setApplicationId first");
        }
        return this.applicationId + "-" + topic;
    }

    public StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates() {
        return this.subscriptionUpdates;
    }

    public synchronized Pattern sourceTopicPattern() {
        if (this.topicPattern == null) {
            ArrayList<String> allSourceTopics = new ArrayList<String>();
            if (!this.nodeToSourceTopics.isEmpty()) {
                for (List<String> topics : this.nodeToSourceTopics.values()) {
                    allSourceTopics.addAll(this.maybeDecorateInternalSourceTopics(topics));
                }
            }
            Collections.sort(allSourceTopics);
            this.topicPattern = TopologyBuilder.buildPatternForOffsetResetTopics(allSourceTopics, this.nodeToSourcePatterns.values());
        }
        return this.topicPattern;
    }

    public synchronized void updateSubscriptions(StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates, String threadId) {
        log.debug("stream-thread [{}] updating builder with {} topic(s) with possible matching regex subscription(s)", (Object)threadId, (Object)subscriptionUpdates);
        this.subscriptionUpdates = subscriptionUpdates;
        this.setRegexMatchedTopicsToSourceNodes();
        this.setRegexMatchedTopicToStateStore();
    }

    public static enum AutoOffsetReset {
        EARLIEST,
        LATEST;

    }

    public static class TopicsInfo {
        public Set<String> sinkTopics;
        public Set<String> sourceTopics;
        public Map<String, InternalTopicConfig> stateChangelogTopics;
        public Map<String, InternalTopicConfig> repartitionSourceTopics;

        TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Map<String, InternalTopicConfig> repartitionSourceTopics, Map<String, InternalTopicConfig> stateChangelogTopics) {
            this.sinkTopics = sinkTopics;
            this.sourceTopics = sourceTopics;
            this.stateChangelogTopics = stateChangelogTopics;
            this.repartitionSourceTopics = repartitionSourceTopics;
        }

        public boolean equals(Object o) {
            if (o instanceof TopicsInfo) {
                TopicsInfo other = (TopicsInfo)o;
                return other.sourceTopics.equals(this.sourceTopics) && other.stateChangelogTopics.equals(this.stateChangelogTopics);
            }
            return false;
        }

        public int hashCode() {
            long n = (long)this.sourceTopics.hashCode() << 32 | (long)this.stateChangelogTopics.hashCode();
            return (int)(n % 0xFFFFFFFFL);
        }

        public String toString() {
            return "TopicsInfo{sinkTopics=" + this.sinkTopics + ", sourceTopics=" + this.sourceTopics + ", repartitionSourceTopics=" + this.repartitionSourceTopics + ", stateChangelogTopics=" + this.stateChangelogTopics + '}';
        }
    }

    private class SinkNodeFactory<K, V>
    extends NodeFactory {
        private final String[] parents;
        private final String topic;
        private final Serializer<K> keySerializer;
        private final Serializer<V> valSerializer;
        private final StreamPartitioner<? super K, ? super V> partitioner;

        private SinkNodeFactory(String name, String[] parents, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner) {
            super(name);
            this.parents = (String[])parents.clone();
            this.topic = topic;
            this.keySerializer = keySerializer;
            this.valSerializer = valSerializer;
            this.partitioner = partitioner;
        }

        @Override
        public ProcessorNode build() {
            if (TopologyBuilder.this.internalTopicNames.contains(this.topic)) {
                return new SinkNode<K, V>(this.name, TopologyBuilder.this.decorateTopic(this.topic), this.keySerializer, this.valSerializer, this.partitioner);
            }
            return new SinkNode<K, V>(this.name, this.topic, this.keySerializer, this.valSerializer, this.partitioner);
        }
    }

    private class SourceNodeFactory
    extends NodeFactory {
        private final List<String> topics;
        private final Pattern pattern;
        private final Deserializer<?> keyDeserializer;
        private final Deserializer<?> valDeserializer;
        private final TimestampExtractor timestampExtractor;

        private SourceNodeFactory(String name, String[] topics, Pattern pattern, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valDeserializer) {
            super(name);
            this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>();
            this.pattern = pattern;
            this.keyDeserializer = keyDeserializer;
            this.valDeserializer = valDeserializer;
            this.timestampExtractor = timestampExtractor;
        }

        List<String> getTopics(Collection<String> subscribedTopics) {
            if (subscribedTopics.isEmpty()) {
                return Collections.singletonList("" + this.pattern + "");
            }
            ArrayList<String> matchedTopics = new ArrayList<String>();
            for (String update : subscribedTopics) {
                if (this.pattern == TopologyBuilder.this.topicToPatterns.get(update)) {
                    matchedTopics.add(update);
                    continue;
                }
                if (TopologyBuilder.this.topicToPatterns.containsKey(update) && this.isMatch(update)) {
                    throw new TopologyBuilderException("Topic " + update + " is already matched for another regex pattern " + TopologyBuilder.this.topicToPatterns.get(update) + " and hence cannot be matched to this regex pattern " + this.pattern + " any more.");
                }
                if (!this.isMatch(update)) continue;
                TopologyBuilder.this.topicToPatterns.put(update, this.pattern);
                matchedTopics.add(update);
            }
            return matchedTopics;
        }

        @Override
        public ProcessorNode build() {
            List sourceTopics = (List)TopologyBuilder.this.nodeToSourceTopics.get(this.name);
            if (sourceTopics == null) {
                return new SourceNode(this.name, Collections.singletonList("" + this.pattern + ""), this.timestampExtractor, this.keyDeserializer, this.valDeserializer);
            }
            return new SourceNode(this.name, TopologyBuilder.this.maybeDecorateInternalSourceTopics(sourceTopics), this.timestampExtractor, this.keyDeserializer, this.valDeserializer);
        }

        private boolean isMatch(String topic) {
            return this.pattern.matcher(topic).matches();
        }
    }

    private static class ProcessorNodeFactory
    extends NodeFactory {
        private final String[] parents;
        private final ProcessorSupplier<?, ?> supplier;
        private final Set<String> stateStoreNames = new HashSet<String>();

        ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier<?, ?> supplier) {
            super(name);
            this.parents = (String[])parents.clone();
            this.supplier = supplier;
        }

        public void addStateStore(String stateStoreName) {
            this.stateStoreNames.add(stateStoreName);
        }

        @Override
        public ProcessorNode build() {
            return new ProcessorNode(this.name, this.supplier.get(), this.stateStoreNames);
        }
    }

    private static abstract class NodeFactory {
        public final String name;

        NodeFactory(String name) {
            this.name = name;
        }

        public abstract ProcessorNode build();
    }

    private static class StateStoreFactory {
        public final Set<String> users;
        public final StateStoreSupplier supplier;

        StateStoreFactory(StateStoreSupplier supplier) {
            this.supplier = supplier;
            this.users = new HashSet<String>();
        }
    }
}

