/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.client.api.Range;

public class HashRangeAutoSplitStickyKeyConsumerSelector
implements StickyKeyConsumerSelector {
    private final int rangeSize;
    private final ConcurrentSkipListMap<Integer, Consumer> rangeMap;
    private final Map<Consumer, Integer> consumerRange;

    public HashRangeAutoSplitStickyKeyConsumerSelector() {
        this(65536);
    }

    public HashRangeAutoSplitStickyKeyConsumerSelector(int rangeSize) {
        if (rangeSize < 2) {
            throw new IllegalArgumentException("range size must greater than 2");
        }
        if (!this.is2Power(rangeSize)) {
            throw new IllegalArgumentException("range size must be nth power with 2");
        }
        this.rangeMap = new ConcurrentSkipListMap();
        this.consumerRange = new HashMap<Consumer, Integer>();
        this.rangeSize = rangeSize;
    }

    @Override
    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException.ConsumerAssignException {
        if (this.rangeMap.size() == 0) {
            this.rangeMap.put(this.rangeSize, consumer);
            this.consumerRange.put(consumer, this.rangeSize);
        } else {
            this.splitRange(this.findBiggestRange(), consumer);
        }
    }

    @Override
    public synchronized void removeConsumer(Consumer consumer) {
        Integer removeRange = this.consumerRange.remove(consumer);
        if (removeRange != null) {
            if (removeRange == this.rangeSize && this.rangeMap.size() > 1) {
                Map.Entry<Integer, Consumer> lowerEntry = this.rangeMap.lowerEntry(removeRange);
                this.rangeMap.put(removeRange, lowerEntry.getValue());
                this.rangeMap.remove(lowerEntry.getKey());
                this.consumerRange.put(lowerEntry.getValue(), removeRange);
            } else {
                this.rangeMap.remove(removeRange);
            }
        }
    }

    @Override
    public Consumer select(int hash) {
        if (this.rangeMap.size() > 0) {
            int slot = hash % this.rangeSize;
            return this.rangeMap.ceilingEntry(slot).getValue();
        }
        return null;
    }

    @Override
    public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
        HashMap<Consumer, List<Range>> result = new HashMap<Consumer, List<Range>>();
        int start = 0;
        for (Map.Entry<Integer, Consumer> entry : this.rangeMap.entrySet()) {
            result.computeIfAbsent(entry.getValue(), key -> new ArrayList()).add(Range.of((int)start, (int)entry.getKey()));
            start = entry.getKey() + 1;
        }
        return result;
    }

    private int findBiggestRange() {
        int slots = 0;
        int busiestRange = this.rangeSize;
        for (Map.Entry<Integer, Consumer> entry : this.rangeMap.entrySet()) {
            Integer lowerKey = this.rangeMap.lowerKey(entry.getKey());
            if (lowerKey == null) {
                lowerKey = 0;
            }
            if (entry.getKey() - lowerKey <= slots) continue;
            slots = entry.getKey() - lowerKey;
            busiestRange = entry.getKey();
        }
        return busiestRange;
    }

    private void splitRange(int range, Consumer targetConsumer) throws BrokerServiceException.ConsumerAssignException {
        Integer lowerKey = this.rangeMap.lowerKey(range);
        if (lowerKey == null) {
            lowerKey = 0;
        }
        if (range - lowerKey <= 1) {
            throw new BrokerServiceException.ConsumerAssignException("No more range can assigned to new consumer, assigned consumers " + this.rangeMap.size());
        }
        int splitRange = range - (range - lowerKey >> 1);
        this.rangeMap.put(splitRange, targetConsumer);
        this.consumerRange.put(targetConsumer, splitRange);
    }

    private boolean is2Power(int num) {
        if (num < 2) {
            return false;
        }
        return (num & num - 1) == 0;
    }

    Map<Integer, Consumer> getRangeConsumer() {
        return Collections.unmodifiableMap(this.rangeMap);
    }
}

