/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;

public class StreamsPartitionAssignor
implements ConsumerPartitionAssignor,
Configurable {
    private Logger log;
    private String logPrefix;
    protected static final Comparator<TopicPartition> PARTITION_COMPARATOR = Comparator.comparing(TopicPartition::topic).thenComparingInt(TopicPartition::partition);
    private String userEndPoint;
    private int numStandbyReplicas;
    private TaskManager taskManager;
    private PartitionGrouper partitionGrouper;
    private AtomicInteger assignmentErrorCode;
    protected int usedSubscriptionMetadataVersion = 5;
    private InternalTopicManager internalTopicManager;
    private CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
    private ConsumerPartitionAssignor.RebalanceProtocol rebalanceProtocol;

    protected String userEndPoint() {
        return this.userEndPoint;
    }

    protected TaskManager taskManger() {
        return this.taskManager;
    }

    public void configure(Map<String, ?> configs) {
        AssignorConfiguration assignorConfiguration = new AssignorConfiguration(configs);
        this.logPrefix = assignorConfiguration.logPrefix();
        this.log = new LogContext(this.logPrefix).logger(this.getClass());
        this.usedSubscriptionMetadataVersion = assignorConfiguration.configuredMetadataVersion(this.usedSubscriptionMetadataVersion);
        this.taskManager = assignorConfiguration.getTaskManager();
        this.assignmentErrorCode = assignorConfiguration.getAssignmentErrorCode(configs);
        this.numStandbyReplicas = assignorConfiguration.getNumStandbyReplicas();
        this.partitionGrouper = assignorConfiguration.getPartitionGrouper();
        this.userEndPoint = assignorConfiguration.getUserEndPoint();
        this.internalTopicManager = assignorConfiguration.getInternalTopicManager();
        this.copartitionedTopicsEnforcer = assignorConfiguration.getCopartitionedTopicsEnforcer();
        this.rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
    }

    public String name() {
        return "stream";
    }

    public List<ConsumerPartitionAssignor.RebalanceProtocol> supportedProtocols() {
        ArrayList<ConsumerPartitionAssignor.RebalanceProtocol> supportedProtocols = new ArrayList<ConsumerPartitionAssignor.RebalanceProtocol>();
        supportedProtocols.add(ConsumerPartitionAssignor.RebalanceProtocol.EAGER);
        if (this.rebalanceProtocol == ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE) {
            supportedProtocols.add(this.rebalanceProtocol);
        }
        return supportedProtocols;
    }

    public ByteBuffer subscriptionUserData(Set<String> topics) {
        Set<TaskId> standbyTasks = this.taskManager.cachedTasksIds();
        Set<TaskId> activeTasks = StreamsPartitionAssignor.prepareForSubscription(this.taskManager, topics, standbyTasks, this.rebalanceProtocol);
        return new SubscriptionInfo(this.usedSubscriptionMetadataVersion, this.taskManager.processId(), activeTasks, standbyTasks, this.userEndPoint).encode();
    }

    protected static Set<TaskId> prepareForSubscription(TaskManager taskManager, Set<String> topics, Set<TaskId> standbyTasks, ConsumerPartitionAssignor.RebalanceProtocol rebalanceProtocol) {
        Set<TaskId> activeTasks;
        switch (rebalanceProtocol) {
            case EAGER: {
                activeTasks = taskManager.previousRunningTaskIds();
                standbyTasks.removeAll(activeTasks);
                break;
            }
            case COOPERATIVE: {
                activeTasks = Collections.emptySet();
                standbyTasks.removeAll(taskManager.activeTaskIds());
                break;
            }
            default: {
                throw new IllegalStateException("Streams partition assignor's rebalance protocol is unknown");
            }
        }
        taskManager.updateSubscriptionsFromMetadata(topics);
        taskManager.setRebalanceInProgress(true);
        return activeTasks;
    }

    private Map<String, ConsumerPartitionAssignor.Assignment> errorAssignment(Map<UUID, ClientMetadata> clientsMetadata, String topic, int errorCode) {
        this.log.error("{} is unknown yet during rebalance, please make sure they have been pre-created before starting the Streams application.", (Object)topic);
        HashMap<String, ConsumerPartitionAssignor.Assignment> assignment = new HashMap<String, ConsumerPartitionAssignor.Assignment>();
        for (ClientMetadata clientMetadata : clientsMetadata.values()) {
            for (String consumerId : clientMetadata.consumers) {
                assignment.put(consumerId, new ConsumerPartitionAssignor.Assignment(Collections.emptyList(), new AssignmentInfo(5, Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), errorCode).encode()));
            }
        }
        return assignment;
    }

    /*
     * WARNING - void declaration
     */
    public ConsumerPartitionAssignor.GroupAssignment assign(Cluster metadata, ConsumerPartitionAssignor.GroupSubscription groupSubscription) {
        boolean numPartitionsNeeded;
        boolean versionProbing;
        Map subscriptions = groupSubscription.groupSubscription();
        HashMap<UUID, ClientMetadata> clientMetadataMap = new HashMap<UUID, ClientMetadata>();
        HashSet<TopicPartition> allOwnedPartitions = new HashSet<TopicPartition>();
        UUID futureId = UUID.randomUUID();
        ClientMetadata futureClient = new ClientMetadata(null);
        clientMetadataMap.put(futureId, futureClient);
        int minReceivedMetadataVersion = 5;
        int minSupportedMetadataVersion = 5;
        int futureMetadataVersion = -1;
        for (Map.Entry entry : subscriptions.entrySet()) {
            void var17_27;
            String consumerId = (String)entry.getKey();
            ConsumerPartitionAssignor.Subscription subscription = (ConsumerPartitionAssignor.Subscription)entry.getValue();
            SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
            int usedVersion = subscriptionInfo.version();
            minReceivedMetadataVersion = this.updateMinReceivedVersion(usedVersion, minReceivedMetadataVersion);
            minSupportedMetadataVersion = this.updateMinSupportedVersion(subscriptionInfo.latestSupportedVersion(), minSupportedMetadataVersion);
            if (usedVersion > 5) {
                futureMetadataVersion = usedVersion;
                UUID uUID = futureId;
            } else {
                UUID uUID = subscriptionInfo.processId();
            }
            ClientMetadata clientMetadata = (ClientMetadata)clientMetadataMap.get(var17_27);
            if (clientMetadata == null) {
                clientMetadata = new ClientMetadata(subscriptionInfo.userEndPoint());
                clientMetadataMap.put(subscriptionInfo.processId(), clientMetadata);
            }
            clientMetadata.addConsumer(consumerId, subscription.ownedPartitions());
            allOwnedPartitions.addAll(subscription.ownedPartitions());
            if (subscriptionInfo.prevTasks() == null || subscriptionInfo.standbyTasks() == null) continue;
            clientMetadata.addPreviousTasks(subscriptionInfo);
        }
        if (futureMetadataVersion == -1) {
            versionProbing = false;
            clientMetadataMap.remove(futureId);
        } else if (minReceivedMetadataVersion >= 3) {
            versionProbing = true;
            this.log.info("Received a future (version probing) subscription (version: {}). Sending assignment back (with supported version {}).", (Object)futureMetadataVersion, (Object)minSupportedMetadataVersion);
        } else {
            throw new IllegalStateException("Received a future (version probing) subscription (version: " + futureMetadataVersion + ") and an incompatible pre Kafka 2.0 subscription (version: " + minReceivedMetadataVersion + ") at the same time.");
        }
        if (minReceivedMetadataVersion < 5) {
            this.log.info("Downgrade metadata to version {}. Latest supported version is {}.", (Object)minReceivedMetadataVersion, (Object)5);
        }
        if (minSupportedMetadataVersion < 5) {
            this.log.info("Downgrade latest supported metadata to version {}. Latest supported version is {}.", (Object)minSupportedMetadataVersion, (Object)5);
        }
        this.log.debug("Constructed client metadata {} from the member subscriptions.", clientMetadataMap);
        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = this.taskManager.builder().topicGroups();
        HashMap<String, InternalTopicConfig> repartitionTopicMetadata = new HashMap<String, InternalTopicConfig>();
        for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
            for (String string : topicsInfo.sourceTopics) {
                if (topicsInfo.repartitionSourceTopics.keySet().contains(string) || metadata.topics().contains(string)) continue;
                this.log.error("Missing source topic {} during assignment. Returning error {}.", (Object)string, (Object)AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
                return new ConsumerPartitionAssignor.GroupAssignment(this.errorAssignment(clientMetadataMap, string, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()));
            }
            for (InternalTopicConfig internalTopicConfig : topicsInfo.repartitionSourceTopics.values()) {
                repartitionTopicMetadata.put(internalTopicConfig.name(), internalTopicConfig);
            }
        }
        do {
            numPartitionsNeeded = false;
            for (InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
                for (String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
                    Optional<Integer> maybeNumPartitions = ((InternalTopicConfig)repartitionTopicMetadata.get(topicName)).numberOfPartitions();
                    Integer numPartitions = null;
                    if (maybeNumPartitions.isPresent()) continue;
                    for (InternalTopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
                        Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
                        if (!otherSinkTopics.contains(topicName)) continue;
                        for (String string : otherTopicsInfo.sourceTopics) {
                            int n = 0;
                            if (repartitionTopicMetadata.containsKey(string)) {
                                if (((InternalTopicConfig)repartitionTopicMetadata.get(string)).numberOfPartitions().isPresent()) {
                                    n = ((InternalTopicConfig)repartitionTopicMetadata.get(string)).numberOfPartitions().get();
                                }
                            } else {
                                Integer count = metadata.partitionCountForTopic(string);
                                if (count == null) {
                                    throw new IllegalStateException("No partition count found for source topic " + string + ", but it should have been.");
                                }
                                n = count;
                            }
                            if (numPartitions != null && n <= numPartitions) continue;
                            numPartitions = n;
                        }
                    }
                    if (numPartitions == null) {
                        numPartitionsNeeded = true;
                        continue;
                    }
                    ((InternalTopicConfig)repartitionTopicMetadata.get(topicName)).setNumberOfPartitions(numPartitions);
                }
            }
        } while (numPartitionsNeeded);
        this.ensureCopartitioning(this.taskManager.builder().copartitionGroups(), repartitionTopicMetadata, metadata);
        this.prepareTopic(repartitionTopicMetadata);
        HashMap<TopicPartition, PartitionInfo> hashMap = new HashMap<TopicPartition, PartitionInfo>();
        for (Map.Entry entry : repartitionTopicMetadata.entrySet()) {
            String topic = (String)entry.getKey();
            int numPartitions = ((InternalTopicConfig)entry.getValue()).numberOfPartitions().orElse(-1);
            for (int partition = 0; partition < numPartitions; ++partition) {
                hashMap.put(new TopicPartition(topic, partition), new PartitionInfo(topic, partition, null, new Node[0], new Node[0]));
            }
        }
        Cluster fullMetadata = metadata.withPartitions(hashMap);
        this.taskManager.setClusterMetadata(fullMetadata);
        this.log.debug("Created repartition topics {} from the parsed topology.", hashMap.values());
        HashSet<String> hashSet = new HashSet<String>();
        HashMap<Integer, Set<String>> sourceTopicsByGroup = new HashMap<Integer, Set<String>>();
        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
            hashSet.addAll(entry.getValue().sourceTopics);
            sourceTopicsByGroup.put(entry.getKey(), entry.getValue().sourceTopics);
        }
        Map<TaskId, Set<TopicPartition>> partitionsForTask = this.partitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata);
        HashMap<TopicPartition, TaskId> taskForPartition = new HashMap<TopicPartition, TaskId>();
        HashSet allAssignedPartitions = new HashSet();
        HashMap<Integer, Set> tasksByTopicGroup = new HashMap<Integer, Set>();
        for (Map.Entry entry : partitionsForTask.entrySet()) {
            TaskId taskId = (TaskId)entry.getKey();
            Set set = (Set)entry.getValue();
            for (TopicPartition topicPartition : set) {
                taskForPartition.put(topicPartition, taskId);
                if (!allAssignedPartitions.contains(topicPartition)) continue;
                this.log.warn("Partition {} is assigned to more than one tasks: {}", (Object)topicPartition, partitionsForTask);
            }
            allAssignedPartitions.addAll(set);
            tasksByTopicGroup.computeIfAbsent(taskId.topicGroupId, k -> new HashSet()).add(taskId);
        }
        for (String string : hashSet) {
            List list = fullMetadata.partitionsForTopic(string);
            if (list.isEmpty()) {
                this.log.warn("No partitions found for topic {}", (Object)string);
                continue;
            }
            for (PartitionInfo partitionInfo : list) {
                TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                if (allAssignedPartitions.contains(topicPartition)) continue;
                this.log.warn("Partition {} is not assigned to any tasks: {} Possible causes of a partition not getting assigned is that another topic defined in the topology has not been created when starting your streams application, resulting in no tasks created for this topology at all.", (Object)topicPartition, partitionsForTask);
            }
        }
        HashMap<String, InternalTopicConfig> changelogTopicMetadata = new HashMap<String, InternalTopicConfig>();
        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
            int n = entry.getKey();
            Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics;
            for (InternalTopicConfig internalTopicConfig : stateChangelogTopics.values()) {
                int n2 = -1;
                if (tasksByTopicGroup.get(n) != null) {
                    for (TaskId task : (Set)tasksByTopicGroup.get(n)) {
                        if (n2 >= task.partition + 1) continue;
                        n2 = task.partition + 1;
                    }
                    internalTopicConfig.setNumberOfPartitions(n2);
                    changelogTopicMetadata.put(internalTopicConfig.name(), internalTopicConfig);
                    continue;
                }
                this.log.debug("No tasks found for topic group {}", (Object)n);
            }
        }
        this.prepareTopic(changelogTopicMetadata);
        this.log.debug("Created state changelog topics {} from the parsed topology.", changelogTopicMetadata.values());
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : clientMetadataMap.entrySet()) {
            ClientState state = ((ClientMetadata)entry.getValue()).state;
            hashMap2.put(entry.getKey(), state);
            if (state.ownedPartitions().isEmpty()) continue;
            HashSet<TaskId> hashSet2 = new HashSet<TaskId>();
            for (Map.Entry<TopicPartition, String> entry2 : state.ownedPartitions().entrySet()) {
                TaskId task;
                TopicPartition tp = entry2.getKey();
                task = (TaskId)taskForPartition.get(tp);
                if (task != null) {
                    hashSet2.add(task);
                    continue;
                }
                this.log.error("No task found for topic partition {}", (Object)tp);
            }
            state.addPreviousActiveTasks(hashSet2);
        }
        this.log.debug("Assigning tasks {} to clients {} with number of replicas {}", new Object[]{partitionsForTask.keySet(), hashMap2, this.numStandbyReplicas});
        StickyTaskAssignor stickyTaskAssignor = new StickyTaskAssignor(hashMap2, partitionsForTask.keySet());
        stickyTaskAssignor.assign(this.numStandbyReplicas);
        this.log.info("Assigned tasks to clients as {}{}.", (Object)Utils.NL, (Object)hashMap2.entrySet().stream().map(Object::toString).collect(Collectors.joining(Utils.NL)));
        HashMap<HostInfo, Set<TopicPartition>> hashMap3 = new HashMap<HostInfo, Set<TopicPartition>>();
        if (minReceivedMetadataVersion >= 2) {
            for (Map.Entry entry : clientMetadataMap.entrySet()) {
                HostInfo hostInfo = ((ClientMetadata)entry.getValue()).hostInfo;
                if (hostInfo == null) continue;
                HashSet hashSet3 = new HashSet();
                ClientState state = ((ClientMetadata)entry.getValue()).state;
                for (TaskId id : state.activeTasks()) {
                    hashSet3.addAll(partitionsForTask.get(id));
                }
                hashMap3.put(hostInfo, hashSet3);
            }
        }
        this.taskManager.setPartitionsByHostState(hashMap3);
        Map<String, ConsumerPartitionAssignor.Assignment> assignment = versionProbing ? this.versionProbingAssignment(clientMetadataMap, partitionsForTask, hashMap3, allOwnedPartitions, minReceivedMetadataVersion, minSupportedMetadataVersion) : this.computeNewAssignment(clientMetadataMap, partitionsForTask, hashMap3, allOwnedPartitions, minReceivedMetadataVersion, minSupportedMetadataVersion);
        return new ConsumerPartitionAssignor.GroupAssignment(assignment);
    }

    private Map<String, ConsumerPartitionAssignor.Assignment> computeNewAssignment(Map<UUID, ClientMetadata> clientsMetadata, Map<TaskId, Set<TopicPartition>> partitionsForTask, Map<HostInfo, Set<TopicPartition>> partitionsByHostState, Set<TopicPartition> allOwnedPartitions, int minUserMetadataVersion, int minSupportedMetadataVersion) {
        boolean rebalanceRequired = false;
        HashMap<String, ConsumerPartitionAssignor.Assignment> assignment = new HashMap<String, ConsumerPartitionAssignor.Assignment>();
        for (ClientMetadata clientMetadata : clientsMetadata.values()) {
            Map<String, List<TaskId>> activeTaskAssignments;
            ClientState state = clientMetadata.state;
            Set consumers = clientMetadata.consumers;
            if (rebalanceRequired || state.ownedPartitions().isEmpty()) {
                activeTaskAssignments = StreamsPartitionAssignor.interleaveConsumerTasksByGroupId(state.activeTasks(), consumers);
            } else {
                activeTaskAssignments = this.tryStickyAndBalancedTaskAssignmentWithinClient(state, consumers, partitionsForTask, allOwnedPartitions);
                if (activeTaskAssignments.equals(Collections.emptyMap())) {
                    rebalanceRequired = true;
                    activeTaskAssignments = StreamsPartitionAssignor.interleaveConsumerTasksByGroupId(state.activeTasks(), consumers);
                }
            }
            Map<String, List<TaskId>> interleavedStandby = StreamsPartitionAssignor.interleaveConsumerTasksByGroupId(state.standbyTasks(), consumers);
            this.addClientAssignments(assignment, clientMetadata, partitionsForTask, partitionsByHostState, allOwnedPartitions, activeTaskAssignments, interleavedStandby, minUserMetadataVersion, minSupportedMetadataVersion);
        }
        return assignment;
    }

    private Map<String, ConsumerPartitionAssignor.Assignment> versionProbingAssignment(Map<UUID, ClientMetadata> clientsMetadata, Map<TaskId, Set<TopicPartition>> partitionsForTask, Map<HostInfo, Set<TopicPartition>> partitionsByHostState, Set<TopicPartition> allOwnedPartitions, int minUserMetadataVersion, int minSupportedMetadataVersion) {
        HashMap<String, ConsumerPartitionAssignor.Assignment> assignment = new HashMap<String, ConsumerPartitionAssignor.Assignment>();
        for (ClientMetadata clientMetadata : clientsMetadata.values()) {
            ClientState state = clientMetadata.state;
            Map<String, List<TaskId>> interleavedActive = StreamsPartitionAssignor.interleaveConsumerTasksByGroupId(state.activeTasks(), clientMetadata.consumers);
            Map<String, List<TaskId>> interleavedStandby = StreamsPartitionAssignor.interleaveConsumerTasksByGroupId(state.standbyTasks(), clientMetadata.consumers);
            this.addClientAssignments(assignment, clientMetadata, partitionsForTask, partitionsByHostState, allOwnedPartitions, interleavedActive, interleavedStandby, minUserMetadataVersion, minSupportedMetadataVersion);
        }
        return assignment;
    }

    private void addClientAssignments(Map<String, ConsumerPartitionAssignor.Assignment> assignment, ClientMetadata clientMetadata, Map<TaskId, Set<TopicPartition>> partitionsForTask, Map<HostInfo, Set<TopicPartition>> partitionsByHostState, Set<TopicPartition> allOwnedPartitions, Map<String, List<TaskId>> activeTaskAssignments, Map<String, List<TaskId>> standbyTaskAssignments, int minUserMetadataVersion, int minSupportedMetadataVersion) {
        for (String consumer : clientMetadata.consumers) {
            List<TaskId> activeTasksForConsumer = activeTaskAssignments.get(consumer);
            ArrayList<TopicPartition> activePartitionsList = new ArrayList<TopicPartition>();
            ArrayList<TaskId> assignedActiveList = new ArrayList<TaskId>();
            this.buildAssignedActiveTaskAndPartitionsList(consumer, clientMetadata.state, activeTasksForConsumer, partitionsForTask, allOwnedPartitions, activePartitionsList, assignedActiveList);
            Map<TaskId, Set<TopicPartition>> standbyTaskMap = StreamsPartitionAssignor.buildStandbyTaskMap((Collection<TaskId>)standbyTaskAssignments.get(consumer), partitionsForTask);
            assignment.put(consumer, new ConsumerPartitionAssignor.Assignment(activePartitionsList, new AssignmentInfo(minUserMetadataVersion, minSupportedMetadataVersion, assignedActiveList, standbyTaskMap, partitionsByHostState, AssignorError.NONE.code()).encode()));
        }
    }

    private void buildAssignedActiveTaskAndPartitionsList(String consumer, ClientState clientState, List<TaskId> activeTasksForConsumer, Map<TaskId, Set<TopicPartition>> partitionsForTask, Set<TopicPartition> allOwnedPartitions, List<TopicPartition> activePartitionsList, List<TaskId> assignedActiveList) {
        ArrayList assignedPartitions = new ArrayList();
        for (TaskId taskId : activeTasksForConsumer) {
            ArrayList<AssignedPartition> assignedPartitionsForTask = new ArrayList<AssignedPartition>();
            for (TopicPartition partition : partitionsForTask.get(taskId)) {
                boolean newPartitionForConsumer;
                String oldOwner = clientState.ownedPartitions().get(partition);
                boolean bl = newPartitionForConsumer = oldOwner == null || !oldOwner.equals(consumer);
                if (newPartitionForConsumer && allOwnedPartitions.contains(partition)) {
                    this.log.debug("Removing task {} from assignment until it is safely revoked", (Object)taskId);
                    clientState.removeFromAssignment(taskId);
                    assignedPartitionsForTask.clear();
                    break;
                }
                assignedPartitionsForTask.add(new AssignedPartition(taskId, partition));
            }
            assignedPartitions.addAll(assignedPartitionsForTask);
        }
        Collections.sort(assignedPartitions);
        for (AssignedPartition partition : assignedPartitions) {
            assignedActiveList.add(partition.taskId);
            activePartitionsList.add(partition.partition);
        }
    }

    private static Map<TaskId, Set<TopicPartition>> buildStandbyTaskMap(Collection<TaskId> standbys, Map<TaskId, Set<TopicPartition>> partitionsForTask) {
        HashMap<TaskId, Set<TopicPartition>> standbyTaskMap = new HashMap<TaskId, Set<TopicPartition>>();
        for (TaskId task : standbys) {
            standbyTaskMap.put(task, partitionsForTask.get(task));
        }
        return standbyTaskMap;
    }

    Map<String, List<TaskId>> tryStickyAndBalancedTaskAssignmentWithinClient(ClientState state, Set<String> consumers, Map<TaskId, Set<TopicPartition>> partitionsForTask, Set<TopicPartition> allOwnedPartitions) {
        HashMap<String, List<TaskId>> assignments = new HashMap<String, List<TaskId>>();
        LinkedList<TaskId> newTasks = new LinkedList<TaskId>();
        HashSet<String> unfilledConsumers = new HashSet<String>(consumers);
        int maxTasksPerClient = (int)Math.ceil((double)state.activeTaskCount() / (double)consumers.size());
        for (String consumer : consumers) {
            assignments.put(consumer, new ArrayList());
        }
        for (TaskId task : state.activeTasks()) {
            Set<String> previousConsumers = this.previousConsumersOfTaskPartitions(partitionsForTask.get(task), state.ownedPartitions(), allOwnedPartitions);
            if (previousConsumers.size() > 1) {
                this.log.warn("The partitions of task {} were claimed as owned by different StreamThreads. This indicates the mapping from partitions to tasks has changed!", (Object)task);
                return Collections.emptyMap();
            }
            if (previousConsumers.isEmpty()) {
                this.log.debug("Task {} was not previously owned by any consumers still in the group. It's owner may have died or it may be a new task", (Object)task);
                newTasks.add(task);
                continue;
            }
            String consumer = previousConsumers.iterator().next();
            if (!consumers.contains(consumer)) {
                this.log.debug("This client was assigned a task {} whose partition(s) were previously owned by another client, falling back to an interleaved assignment since a rebalance is inevitable.", (Object)task);
                return Collections.emptyMap();
            }
            if (((List)assignments.get(consumer)).size() >= maxTasksPerClient) {
                this.log.debug("Cannot create a sticky and balanced assignment as this client's consumers owned more previous tasks than it has capacity for during this assignment, falling back to interleaved assignment since a realance is inevitable.");
                return Collections.emptyMap();
            }
            ((List)assignments.get(consumer)).add(task);
            if (((List)assignments.get(consumer)).size() != maxTasksPerClient) continue;
            unfilledConsumers.remove(consumer);
        }
        Collections.sort(newTasks);
        block2: while (!newTasks.isEmpty()) {
            if (unfilledConsumers.isEmpty()) {
                throw new IllegalStateException("Some tasks could not be distributed");
            }
            Iterator consumerIt = unfilledConsumers.iterator();
            while (consumerIt.hasNext()) {
                String consumer;
                consumer = (String)consumerIt.next();
                List consumerAssignment = (List)assignments.get(consumer);
                TaskId task = (TaskId)newTasks.poll();
                if (task == null) continue block2;
                consumerAssignment.add(task);
                if (consumerAssignment.size() != maxTasksPerClient) continue;
                consumerIt.remove();
            }
        }
        return assignments;
    }

    Set<String> previousConsumersOfTaskPartitions(Set<TopicPartition> taskPartitions, Map<TopicPartition, String> clientOwnedPartitions, Set<TopicPartition> allOwnedPartitions) {
        String foreignConsumer = "";
        HashSet<String> previousConsumers = new HashSet<String>();
        for (TopicPartition tp : taskPartitions) {
            String currentPartitionConsumer = clientOwnedPartitions.get(tp);
            if (currentPartitionConsumer != null) {
                previousConsumers.add(currentPartitionConsumer);
                continue;
            }
            if (!allOwnedPartitions.contains(tp)) continue;
            previousConsumers.add("");
        }
        return previousConsumers;
    }

    static Map<String, List<TaskId>> interleaveConsumerTasksByGroupId(Collection<TaskId> taskIds, Set<String> consumers) {
        LinkedList<TaskId> sortedTasks = new LinkedList<TaskId>(taskIds);
        Collections.sort(sortedTasks);
        TreeMap<String, List<TaskId>> taskIdsForConsumerAssignment = new TreeMap<String, List<TaskId>>();
        for (String string : consumers) {
            taskIdsForConsumerAssignment.put(string, new ArrayList());
        }
        block1: while (!sortedTasks.isEmpty()) {
            for (Map.Entry entry : taskIdsForConsumerAssignment.entrySet()) {
                List taskIdList = (List)entry.getValue();
                TaskId taskId = sortedTasks.poll();
                if (taskId == null) continue block1;
                taskIdList.add(taskId);
            }
        }
        return taskIdsForConsumerAssignment;
    }

    private void validateMetadataVersions(int receivedAssignmentMetadataVersion, int latestCommonlySupportedVersion) {
        if (receivedAssignmentMetadataVersion > this.usedSubscriptionMetadataVersion) {
            this.log.error("Leader sent back an assignment with version {} which was greater than our used version {}", (Object)receivedAssignmentMetadataVersion, (Object)this.usedSubscriptionMetadataVersion);
            throw new TaskAssignmentException("Sent a version " + this.usedSubscriptionMetadataVersion + " subscription but got an assignment with higher version " + receivedAssignmentMetadataVersion + ".");
        }
        if (latestCommonlySupportedVersion > 5) {
            this.log.error("Leader sent back assignment with commonly supported version {} that is greater than our actual latest supported version {}", (Object)latestCommonlySupportedVersion, (Object)5);
            throw new TaskAssignmentException("Can't upgrade to metadata version greater than we support");
        }
    }

    protected boolean maybeUpdateSubscriptionVersion(int receivedAssignmentMetadataVersion, int latestCommonlySupportedVersion) {
        if (receivedAssignmentMetadataVersion >= 3) {
            if (latestCommonlySupportedVersion > this.usedSubscriptionMetadataVersion) {
                this.log.info("Sent a version {} subscription and group's latest commonly supported version is {} (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to {} for next rebalance.", new Object[]{this.usedSubscriptionMetadataVersion, latestCommonlySupportedVersion, latestCommonlySupportedVersion});
                this.usedSubscriptionMetadataVersion = latestCommonlySupportedVersion;
                return true;
            }
            if (receivedAssignmentMetadataVersion < this.usedSubscriptionMetadataVersion) {
                this.log.info("Sent a version {} subscription and got version {} assignment back (successful version probing). Downgrade subscription metadata to commonly supported version {} and trigger new rebalance.", new Object[]{this.usedSubscriptionMetadataVersion, receivedAssignmentMetadataVersion, latestCommonlySupportedVersion});
                this.usedSubscriptionMetadataVersion = latestCommonlySupportedVersion;
                return true;
            }
        } else {
            this.log.debug("Received an assignment version {} that is less than the earliest version that allows version probing {}. If this is not during a rolling upgrade from version 2.0 or below, this is an error.", (Object)receivedAssignmentMetadataVersion, (Object)3);
        }
        return false;
    }

    public void onAssignment(ConsumerPartitionAssignor.Assignment assignment, ConsumerGroupMetadata metadata) {
        Map<HostInfo, Set<TopicPartition>> partitionsByHost;
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>(assignment.partitions());
        partitions.sort(PARTITION_COMPARATOR);
        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
        if (info.errCode() != AssignorError.NONE.code()) {
            this.setAssignmentErrorCode(info.errCode());
            return;
        }
        int receivedAssignmentMetadataVersion = info.version();
        int latestCommonlySupportedVersion = info.commonlySupportedVersion();
        this.validateMetadataVersions(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion);
        if (this.maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion)) {
            this.setAssignmentErrorCode(AssignorError.VERSION_PROBING.code());
        }
        HashMap<TaskId, Set<TopicPartition>> activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        HashMap<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<TopicPartition, PartitionInfo>();
        HashMap<TopicPartition, TaskId> partitionsToTaskId = new HashMap<TopicPartition, TaskId>();
        switch (receivedAssignmentMetadataVersion) {
            case 1: {
                StreamsPartitionAssignor.processVersionOneAssignment(this.logPrefix, info, partitions, activeTasks, partitionsToTaskId);
                partitionsByHost = Collections.emptyMap();
                break;
            }
            case 2: 
            case 3: 
            case 4: 
            case 5: {
                StreamsPartitionAssignor.processVersionTwoAssignment(this.logPrefix, info, partitions, activeTasks, topicToPartitionInfo, partitionsToTaskId);
                partitionsByHost = info.partitionsByHost();
                break;
            }
            default: {
                throw new IllegalStateException("This code should never be reached. Please file a bug report at https://issues.apache.org/jira/projects/KAFKA/");
            }
        }
        this.taskManager.setClusterMetadata(Cluster.empty().withPartitions(topicToPartitionInfo));
        this.taskManager.setPartitionsByHostState(partitionsByHost);
        this.taskManager.setPartitionsToTaskId(partitionsToTaskId);
        this.taskManager.setAssignmentMetadata(activeTasks, info.standbyTasks());
        this.taskManager.updateSubscriptionsFromAssignment(partitions);
        this.taskManager.setRebalanceInProgress(false);
    }

    private static void processVersionOneAssignment(String logPrefix, AssignmentInfo info, List<TopicPartition> partitions, Map<TaskId, Set<TopicPartition>> activeTasks, Map<TopicPartition, TaskId> partitionsToTaskId) {
        if (partitions.size() != info.activeTasks().size()) {
            throw new TaskAssignmentException(String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d, assignmentInfo=%s", logPrefix, partitions.size(), info.activeTasks().size(), info.toString()));
        }
        for (int i = 0; i < partitions.size(); ++i) {
            TopicPartition partition = partitions.get(i);
            TaskId id = info.activeTasks().get(i);
            activeTasks.computeIfAbsent(id, k -> new HashSet()).add(partition);
            partitionsToTaskId.put(partition, id);
        }
    }

    public static void processVersionTwoAssignment(String logPrefix, AssignmentInfo info, List<TopicPartition> partitions, Map<TaskId, Set<TopicPartition>> activeTasks, Map<TopicPartition, PartitionInfo> topicToPartitionInfo, Map<TopicPartition, TaskId> partitionsToTaskId) {
        StreamsPartitionAssignor.processVersionOneAssignment(logPrefix, info, partitions, activeTasks, partitionsToTaskId);
        Map<HostInfo, Set<TopicPartition>> partitionsByHost = info.partitionsByHost();
        for (Set<TopicPartition> value : partitionsByHost.values()) {
            for (TopicPartition topicPartition : value) {
                topicToPartitionInfo.put(topicPartition, new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, new Node[0], new Node[0]));
            }
        }
    }

    private void prepareTopic(Map<String, InternalTopicConfig> topicPartitions) {
        this.log.debug("Starting to validate internal topics {} in partition assignor.", topicPartitions);
        HashMap<String, InternalTopicConfig> topicsToMakeReady = new HashMap<String, InternalTopicConfig>();
        for (InternalTopicConfig topic : topicPartitions.values()) {
            Optional<Integer> numPartitions = topic.numberOfPartitions();
            if (!numPartitions.isPresent()) {
                throw new StreamsException(String.format("%sTopic [%s] number of partitions not defined", this.logPrefix, topic.name()));
            }
            topic.setNumberOfPartitions(numPartitions.get());
            topicsToMakeReady.put(topic.name(), topic);
        }
        if (!topicsToMakeReady.isEmpty()) {
            this.internalTopicManager.makeReady(topicsToMakeReady);
        }
        this.log.debug("Completed validating internal topics {} in partition assignor.", topicPartitions);
    }

    private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<String, InternalTopicConfig> allRepartitionTopicsNumPartitions, Cluster metadata) {
        for (Set<String> copartitionGroup : copartitionGroups) {
            this.copartitionedTopicsEnforcer.enforce(copartitionGroup, allRepartitionTopicsNumPartitions, metadata);
        }
    }

    private int updateMinReceivedVersion(int usedVersion, int minReceivedMetadataVersion) {
        return usedVersion < minReceivedMetadataVersion ? usedVersion : minReceivedMetadataVersion;
    }

    private int updateMinSupportedVersion(int supportedVersion, int minSupportedMetadataVersion) {
        if (supportedVersion < minSupportedMetadataVersion) {
            this.log.debug("Downgrade the current minimum supported version {} to the smaller seen supported version {}", (Object)minSupportedMetadataVersion, (Object)supportedVersion);
            return supportedVersion;
        }
        this.log.debug("Current minimum supported version remains at {}, last seen supported version was {}", (Object)minSupportedMetadataVersion, (Object)supportedVersion);
        return minSupportedMetadataVersion;
    }

    protected void setAssignmentErrorCode(Integer errorCode) {
        this.assignmentErrorCode.set(errorCode);
    }

    void setRebalanceProtocol(ConsumerPartitionAssignor.RebalanceProtocol rebalanceProtocol) {
        this.rebalanceProtocol = rebalanceProtocol;
    }

    void setInternalTopicManager(InternalTopicManager internalTopicManager) {
        this.internalTopicManager = internalTopicManager;
    }

    private static class ClientMetadata {
        private final HostInfo hostInfo;
        private final Set<String> consumers;
        private final ClientState state;

        ClientMetadata(String endPoint) {
            if (endPoint != null) {
                String host = Utils.getHost((String)endPoint);
                Integer port = Utils.getPort((String)endPoint);
                if (host == null || port == null) {
                    throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint));
                }
                this.hostInfo = new HostInfo(host, port);
            } else {
                this.hostInfo = null;
            }
            this.consumers = new HashSet<String>();
            this.state = new ClientState();
        }

        void addConsumer(String consumerMemberId, List<TopicPartition> ownedPartitions) {
            this.consumers.add(consumerMemberId);
            this.state.incrementCapacity();
            this.state.addOwnedPartitions(ownedPartitions, consumerMemberId);
        }

        void addPreviousTasks(SubscriptionInfo info) {
            this.state.addPreviousActiveTasks(info.prevTasks());
            this.state.addPreviousStandbyTasks(info.standbyTasks());
        }

        public String toString() {
            return "ClientMetadata{hostInfo=" + this.hostInfo + ", consumers=" + this.consumers + ", state=" + this.state + '}';
        }
    }

    private static class AssignedPartition
    implements Comparable<AssignedPartition> {
        private final TaskId taskId;
        private final TopicPartition partition;

        AssignedPartition(TaskId taskId, TopicPartition partition) {
            this.taskId = taskId;
            this.partition = partition;
        }

        @Override
        public int compareTo(AssignedPartition that) {
            return PARTITION_COMPARATOR.compare(this.partition, that.partition);
        }

        public boolean equals(Object o) {
            if (!(o instanceof AssignedPartition)) {
                return false;
            }
            AssignedPartition other = (AssignedPartition)o;
            return this.compareTo(other) == 0;
        }

        public int hashCode() {
            return this.partition.hashCode();
        }
    }
}

