/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.group.assignor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.consumer.MemberAssignmentImpl;

public class RangeAssignor
implements ConsumerGroupPartitionAssignor {
    public static final String RANGE_ASSIGNOR_NAME = "range";

    public String name() {
        return RANGE_ASSIGNOR_NAME;
    }

    private Map<Uuid, Collection<String>> membersPerTopic(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
        HashMap<Uuid, Collection<String>> membersPerTopic = new HashMap<Uuid, Collection<String>>();
        if (groupSpec.subscriptionType().equals((Object)SubscriptionType.HOMOGENEOUS)) {
            Collection allMembers = groupSpec.memberIds();
            Set topics = groupSpec.memberSubscription((String)groupSpec.memberIds().iterator().next()).subscribedTopicIds();
            for (Uuid topicId : topics) {
                if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
                    throw new PartitionAssignorException("Member is subscribed to a non-existent topic");
                }
                membersPerTopic.put(topicId, allMembers);
            }
        } else {
            groupSpec.memberIds().forEach(memberId -> {
                Set topics = groupSpec.memberSubscription(memberId).subscribedTopicIds();
                for (Uuid topicId : topics) {
                    if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
                        throw new PartitionAssignorException("Member is subscribed to a non-existent topic");
                    }
                    membersPerTopic.computeIfAbsent(topicId, k -> new ArrayList()).add(memberId);
                }
            });
        }
        return membersPerTopic;
    }

    public GroupAssignment assign(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException {
        HashMap newTargetAssignment = new HashMap();
        Map<Uuid, Collection<String>> membersPerTopic = this.membersPerTopic(groupSpec, subscribedTopicDescriber);
        membersPerTopic.forEach((topicId, membersForTopic) -> {
            int remaining;
            int numPartitionsForTopic = subscribedTopicDescriber.numPartitions(topicId);
            int minRequiredQuota = numPartitionsForTopic / membersForTopic.size();
            int numMembersWithExtraPartition = numPartitionsForTopic % membersForTopic.size();
            HashSet assignedStickyPartitionsForTopic = new HashSet();
            ArrayList<MemberWithRemainingAssignments> potentiallyUnfilledMembers = new ArrayList<MemberWithRemainingAssignments>();
            for (String memberId : membersForTopic) {
                Set assignedPartitionsForTopic = groupSpec.memberAssignment(memberId).partitions().getOrDefault(topicId, Collections.emptySet());
                int currentAssignmentSize = assignedPartitionsForTopic.size();
                ArrayList currentAssignmentListForTopic = new ArrayList(assignedPartitionsForTopic);
                if (currentAssignmentSize > 0) {
                    int retainedPartitionsCount = Math.min(currentAssignmentSize, minRequiredQuota);
                    Collections.sort(currentAssignmentListForTopic);
                    for (int i = 0; i < retainedPartitionsCount; ++i) {
                        assignedStickyPartitionsForTopic.add(currentAssignmentListForTopic.get(i));
                        newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignmentImpl(new HashMap<Uuid, Set<Integer>>())).partitions().computeIfAbsent(topicId, k -> new HashSet()).add(currentAssignmentListForTopic.get(i));
                    }
                }
                if ((remaining = minRequiredQuota - currentAssignmentSize) < 0 && numMembersWithExtraPartition > 0) {
                    --numMembersWithExtraPartition;
                    assignedStickyPartitionsForTopic.add(currentAssignmentListForTopic.get(minRequiredQuota));
                    newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignmentImpl(new HashMap<Uuid, Set<Integer>>())).partitions().computeIfAbsent(topicId, k -> new HashSet()).add(currentAssignmentListForTopic.get(minRequiredQuota));
                    continue;
                }
                MemberWithRemainingAssignments newPair = new MemberWithRemainingAssignments(memberId, remaining);
                potentiallyUnfilledMembers.add(newPair);
            }
            ArrayList<Integer> unassignedPartitionsForTopic = new ArrayList<Integer>();
            for (int i = 0; i < numPartitionsForTopic; ++i) {
                if (assignedStickyPartitionsForTopic.contains(i)) continue;
                unassignedPartitionsForTopic.add(i);
            }
            int unassignedPartitionsListStartPointer = 0;
            for (MemberWithRemainingAssignments pair : potentiallyUnfilledMembers) {
                String memberId = pair.memberId;
                remaining = pair.remaining;
                if (numMembersWithExtraPartition > 0) {
                    ++remaining;
                    --numMembersWithExtraPartition;
                }
                if (remaining <= 0) continue;
                List partitionsToAssign = unassignedPartitionsForTopic.subList(unassignedPartitionsListStartPointer, unassignedPartitionsListStartPointer + remaining);
                unassignedPartitionsListStartPointer += remaining;
                newTargetAssignment.computeIfAbsent(memberId, k -> new MemberAssignmentImpl(new HashMap<Uuid, Set<Integer>>())).partitions().computeIfAbsent(topicId, k -> new HashSet()).addAll(partitionsToAssign);
            }
        });
        return new GroupAssignment(newTargetAssignment);
    }

    private static class MemberWithRemainingAssignments {
        private final String memberId;
        private final int remaining;

        public MemberWithRemainingAssignments(String memberId, int remaining) {
            this.memberId = memberId;
            this.remaining = remaining;
        }
    }
}

