package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.LoopingResumingIterator;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.metrics.PCMetrics;
import io.confluent.parallelconsumer.metrics.PCMetricsDef;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Tag;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/state/ShardManager.class */
public class ShardManager<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ShardManager.class);
    private final PCModule<K, V> module;
    private final ParallelConsumerOptions<?, ?> options;
    private final WorkManager<K, V> wm;
    private Map<ShardKey, ProcessingShard<K, V>> processingShards = new ConcurrentHashMap();
    private final Comparator<WorkContainer<?, ?>> retryQueueWorkContainerComparator = Comparator.comparing(workContainer -> {
        return workContainer.getRetryDueAt();
    }).thenComparing(workContainer2 -> {
        TopicPartition topicPartition = workContainer2.getTopicPartition();
        return topicPartition.topic() + topicPartition.partition();
    }).thenComparing((v0) -> {
        return v0.offset();
    });
    private final NavigableSet<WorkContainer<?, ?>> retryQueue = new ConcurrentSkipListSet(this.retryQueueWorkContainerComparator);
    private Optional<ShardKey> iterationResumePoint = Optional.empty();
    private Gauge shardsSizeGauge;
    private Gauge numberOfShardsGauge;
    private final PCMetrics pcMetrics;

    public ShardManager(PCModule<K, V> pCModule, WorkManager<K, V> workManager) {
        this.module = pCModule;
        this.wm = workManager;
        this.options = pCModule.options();
        this.pcMetrics = pCModule.pcMetrics();
        initMetrics();
    }

    Optional<ProcessingShard<K, V>> getShard(ShardKey shardKey) {
        return Optional.ofNullable(this.processingShards.get(shardKey));
    }

    ShardKey computeShardKey(WorkContainer<?, ?> workContainer) {
        return ShardKey.of(workContainer, this.options.getOrdering());
    }

    ShardKey computeShardKey(ConsumerRecord<?, ?> consumerRecord) {
        return ShardKey.of(consumerRecord, this.options.getOrdering());
    }

    public long getNumberOfWorkQueuedInShardsAwaitingSelection() {
        return (this.processingShards.values().stream().mapToLong((v0) -> {
            return v0.getCountOfWorkAwaitingSelection();
        }).sum() - this.retryQueue.size()) + getNumberOfFailedWorkReadyToBeRetried();
    }

    public boolean workIsWaitingToBeProcessed() {
        return getNumberOfWorkQueuedInShardsAwaitingSelection() > 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeAnyShardEntriesReferencedFrom(Collection<Optional<ConsumerRecord<K, V>>> collection) {
        Iterator it = ((List) collection.stream().filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(Collectors.toList())).iterator();
        while (it.hasNext()) {
            removeWorkFromShardFor((ConsumerRecord) it.next());
        }
    }

    private void removeWorkFromShardFor(ConsumerRecord<K, V> consumerRecord) {
        ShardKey computeShardKey = computeShardKey((ConsumerRecord<?, ?>) consumerRecord);
        if (!this.processingShards.containsKey(computeShardKey)) {
            log.trace("Shard referenced by WC: {} with shard key: {} already removed", consumerRecord, computeShardKey);
            return;
        }
        WorkContainer<K, V> remove = this.processingShards.get(computeShardKey).remove(consumerRecord.offset());
        if (Objects.nonNull(remove)) {
            this.retryQueue.remove(remove);
        }
        removeShardIfEmpty(computeShardKey);
    }

    public void addWorkContainer(long j, ConsumerRecord<K, V> consumerRecord) {
        WorkContainer<?, ?> workContainer = new WorkContainer<>(j, consumerRecord, this.module);
        ShardKey computeShardKey = computeShardKey(workContainer);
        this.processingShards.computeIfAbsent(computeShardKey, shardKey -> {
            return new ProcessingShard(computeShardKey, this.options, this.wm.getPm());
        }).addWorkContainer(workContainer);
    }

    void removeShardIfEmpty(ShardKey shardKey) {
        Optional<ProcessingShard<K, V>> shard = getShard(shardKey);
        if (this.options.getOrdering().equals(ParallelConsumerOptions.ProcessingOrder.KEY) && shard.isPresent() && shard.get().isEmpty()) {
            log.trace("Removing empty shard (key: {})", shardKey);
            this.processingShards.remove(shardKey);
        }
    }

    public void onSuccess(WorkContainer<?, ?> workContainer) {
        this.retryQueue.remove(workContainer);
        ShardKey computeShardKey = computeShardKey(workContainer);
        Optional<ProcessingShard<K, V>> shard = getShard(computeShardKey);
        if (!shard.isPresent()) {
            log.trace("Dropping successful result for revoked partition {}. Record in question was: {}", computeShardKey, workContainer.getCr());
        } else {
            shard.get().onSuccess(workContainer);
            removeShardIfEmpty(computeShardKey);
        }
    }

    public void onFailure(WorkContainer<?, ?> workContainer) {
        log.debug("Work FAILED");
        this.retryQueue.add(workContainer);
        Optional<ProcessingShard<K, V>> shard = getShard(computeShardKey(workContainer));
        if (shard.isPresent()) {
            shard.get().onFailure();
        }
    }

    public Optional<Duration> getLowestRetryTime() {
        for (WorkContainer<?, ?> workContainer : this.retryQueue) {
            if (workContainer.isNotInFlight()) {
                return Optional.of(workContainer.getDelayUntilRetryDue());
            }
        }
        return Optional.empty();
    }

    public List<WorkContainer<K, V>> getWorkIfAvailable(int i) {
        Optional<Map.Entry<ShardKey, ProcessingShard<K, V>>> optional;
        LoopingResumingIterator loopingResumingIterator = new LoopingResumingIterator(this.iterationResumePoint, this.processingShards);
        ArrayList arrayList = new ArrayList();
        Optional<Map.Entry<ShardKey, ProcessingShard<K, V>>> next = loopingResumingIterator.next();
        while (true) {
            optional = next;
            if (arrayList.size() >= i || !optional.isPresent()) {
                break;
            }
            arrayList.addAll(optional.get().getValue().getWorkIfAvailable(i - arrayList.size()));
            next = loopingResumingIterator.next();
        }
        if (arrayList.size() >= i) {
            log.debug("Work taken is now over max (iteration resume point is {})", this.iterationResumePoint);
        }
        updateResumePoint(optional);
        return arrayList;
    }

    public boolean removeStaleContainers() {
        Stream flatMap = this.processingShards.values().stream().map((v0) -> {
            return v0.removeStaleWorkContainersFromShard();
        }).flatMap((v0) -> {
            return v0.stream();
        });
        NavigableSet<WorkContainer<?, ?>> navigableSet = this.retryQueue;
        Objects.requireNonNull(navigableSet);
        return flatMap.map((v1) -> {
            return r1.remove(v1);
        }).findAny().isPresent();
    }

    private void updateResumePoint(Optional<Map.Entry<ShardKey, ProcessingShard<K, V>>> optional) {
        this.iterationResumePoint = optional.map((v0) -> {
            return v0.getKey();
        });
        if (this.iterationResumePoint.isPresent()) {
            log.debug("Work taken is now over max, stopping (saving iteration resume point {})", this.iterationResumePoint);
        }
    }

    private void initMetrics() {
        this.shardsSizeGauge = this.pcMetrics.gaugeFromMetricDef(PCMetricsDef.SHARDS_SIZE, this, shardManager -> {
            return shardManager.processingShards.values().stream().mapToInt(processingShard -> {
                return processingShard.getEntries().size();
            }).sum();
        }, new Tag[0]);
        this.numberOfShardsGauge = this.pcMetrics.gaugeFromMetricDef(PCMetricsDef.NUMBER_OF_SHARDS, this, shardManager2 -> {
            return shardManager2.processingShards.keySet().size();
        }, new Tag[0]);
    }

    private long getNumberOfFailedWorkReadyToBeRetried() {
        long j = 0;
        Iterator<WorkContainer<?, ?>> it = this.retryQueue.iterator();
        while (it.hasNext() && it.next().isDelayPassed()) {
            j++;
        }
        return j;
    }

    public ParallelConsumerOptions<?, ?> getOptions() {
        return this.options;
    }

    private Map<ShardKey, ProcessingShard<K, V>> getProcessingShards() {
        return this.processingShards;
    }

    void setProcessingShards(Map<ShardKey, ProcessingShard<K, V>> map) {
        this.processingShards = map;
    }

    Comparator<WorkContainer<?, ?>> getRetryQueueWorkContainerComparator() {
        return this.retryQueueWorkContainerComparator;
    }

    NavigableSet<WorkContainer<?, ?>> getRetryQueue() {
        return this.retryQueue;
    }
}
