/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.assignment;

import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.internals.assignment.ClientStateTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientState {
    private static final Logger LOG = LoggerFactory.getLogger(ClientState.class);
    public static final Comparator<TopicPartition> TOPIC_PARTITION_COMPARATOR = Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition);
    private final Map<String, String> clientTags;
    private final Map<TaskId, Long> taskOffsetSums;
    private final Map<TaskId, Long> taskLagTotals;
    private final Map<TopicPartition, String> ownedPartitions = new TreeMap<TopicPartition, String>(TOPIC_PARTITION_COMPARATOR);
    private final Map<String, Set<TaskId>> consumerToPreviousStatefulTaskIds = new TreeMap<String, Set<TaskId>>();
    private final ClientStateTask assignedActiveTasks = new ClientStateTask(new TreeSet<TaskId>(), new TreeMap<String, Set<TaskId>>());
    private final ClientStateTask assignedStandbyTasks = new ClientStateTask(new TreeSet<TaskId>(), new TreeMap<String, Set<TaskId>>());
    private final ClientStateTask previousActiveTasks = new ClientStateTask(null, new TreeMap<String, Set<TaskId>>());
    private final ClientStateTask previousStandbyTasks = new ClientStateTask(null, null);
    private final ClientStateTask revokingActiveTasks = new ClientStateTask(null, new TreeMap<String, Set<TaskId>>());
    private final ProcessId processId;
    private Optional<Instant> followupRebalanceDeadline = Optional.empty();
    private int capacity;

    public ClientState() {
        this(null, 0);
    }

    public ClientState(ProcessId processId, Map<String, String> clientTags) {
        this(processId, 0, clientTags);
    }

    ClientState(int capacity) {
        this(null, capacity);
    }

    ClientState(ProcessId processId, int capacity) {
        this(processId, capacity, Collections.emptyMap());
    }

    ClientState(ProcessId processId, int capacity, Map<String, String> clientTags) {
        this.previousStandbyTasks.setTaskIds(new TreeSet<TaskId>());
        this.previousActiveTasks.setTaskIds(new TreeSet<TaskId>());
        this.taskOffsetSums = new TreeMap<TaskId, Long>();
        this.taskLagTotals = new TreeMap<TaskId, Long>();
        this.capacity = capacity;
        this.processId = processId;
        this.clientTags = Collections.unmodifiableMap(clientTags);
    }

    public ClientState(Set<TaskId> previousActiveTasks, Set<TaskId> previousStandbyTasks, Map<TaskId, Long> taskLagTotals, Map<String, String> clientTags, int capacity) {
        this(previousActiveTasks, previousStandbyTasks, taskLagTotals, clientTags, capacity, null);
    }

    public ClientState(Set<TaskId> previousActiveTasks, Set<TaskId> previousStandbyTasks, Map<TaskId, Long> taskLagTotals, Map<String, String> clientTags, int capacity, ProcessId processId) {
        this.previousStandbyTasks.setTaskIds(Collections.unmodifiableSet(new TreeSet<TaskId>(previousStandbyTasks)));
        this.previousActiveTasks.setTaskIds(Collections.unmodifiableSet(new TreeSet<TaskId>(previousActiveTasks)));
        this.taskOffsetSums = Collections.emptyMap();
        this.taskLagTotals = Collections.unmodifiableMap(taskLagTotals);
        this.capacity = capacity;
        this.clientTags = Collections.unmodifiableMap(clientTags);
        this.processId = processId;
    }

    public ClientState(ClientState clientState) {
        this(new HashSet<TaskId>(clientState.previousActiveTasks.taskIds()), new HashSet<TaskId>(clientState.previousStandbyTasks.taskIds()), clientState.taskLagTotals, clientState.clientTags, clientState.capacity, clientState.processId);
    }

    public int capacity() {
        return this.capacity;
    }

    ProcessId processId() {
        return this.processId;
    }

    public void incrementCapacity() {
        ++this.capacity;
    }

    boolean reachedCapacity() {
        return this.assignedTaskCount() >= this.capacity;
    }

    public Optional<Instant> followupRebalanceDeadline() {
        return this.followupRebalanceDeadline;
    }

    public void setFollowupRebalanceDeadline(Instant followupRebalanceDeadline) {
        this.followupRebalanceDeadline = Optional.of(followupRebalanceDeadline);
    }

    public Set<TaskId> activeTasks() {
        return Collections.unmodifiableSet(this.assignedActiveTasks.taskIds());
    }

    public int activeTaskCount() {
        return this.assignedActiveTasks.taskIds().size();
    }

    double activeTaskLoad() {
        return (double)this.activeTaskCount() / (double)this.capacity;
    }

    public void assignActiveTasks(Collection<TaskId> tasks) {
        this.assignedActiveTasks.taskIds().addAll(tasks);
    }

    public void assignStandbyTasks(Collection<TaskId> tasks) {
        this.assignedStandbyTasks.taskIds().addAll(tasks);
    }

    public void assignActiveToConsumer(TaskId task, String consumer) {
        if (!this.assignedActiveTasks.taskIds().contains(task)) {
            throw new IllegalStateException("added not assign active task " + task + " to this client state.");
        }
        this.assignedActiveTasks.consumerToTaskIds().computeIfAbsent(consumer, k -> new HashSet()).add(task);
    }

    public void assignStandbyToConsumer(TaskId task, String consumer) {
        this.assignedStandbyTasks.consumerToTaskIds().computeIfAbsent(consumer, k -> new HashSet()).add(task);
    }

    public void revokeActiveFromConsumer(TaskId task, String consumer) {
        this.revokingActiveTasks.consumerToTaskIds().computeIfAbsent(consumer, k -> new HashSet()).add(task);
    }

    public Map<String, Set<TaskId>> prevOwnedActiveTasksByConsumer() {
        return this.previousActiveTasks.consumerToTaskIds();
    }

    public Map<String, Set<TaskId>> prevOwnedStandbyByConsumer() {
        TreeMap<String, Set<TaskId>> consumerToPreviousStandbyTaskIds = new TreeMap<String, Set<TaskId>>();
        Map<String, Set<TaskId>> consumerToPreviousActiveTaskIds = this.previousActiveTasks.consumerToTaskIds();
        for (Map.Entry<String, Set<TaskId>> entry : this.consumerToPreviousStatefulTaskIds.entrySet()) {
            HashSet standbyTaskIds = new HashSet(entry.getValue());
            if (consumerToPreviousActiveTaskIds.containsKey(entry.getKey())) {
                standbyTaskIds.removeAll((Collection)consumerToPreviousActiveTaskIds.get(entry.getKey()));
            }
            consumerToPreviousStandbyTaskIds.put(entry.getKey(), standbyTaskIds);
        }
        return consumerToPreviousStandbyTaskIds;
    }

    public Set<TaskId> prevOwnedStatefulTasksByConsumer(String memberId) {
        return this.consumerToPreviousStatefulTaskIds.get(memberId);
    }

    public Map<String, Set<TaskId>> assignedActiveTasksByConsumer() {
        return this.assignedActiveTasks.consumerToTaskIds();
    }

    public Map<String, Set<TaskId>> revokingActiveTasksByConsumer() {
        return this.revokingActiveTasks.consumerToTaskIds();
    }

    public Map<String, Set<TaskId>> assignedStandbyTasksByConsumer() {
        return this.assignedStandbyTasks.consumerToTaskIds();
    }

    public void assignActive(TaskId task) {
        this.assertNotAssigned(task);
        this.assignedActiveTasks.taskIds().add(task);
    }

    public void unassignActive(TaskId task) {
        Set<TaskId> taskIds = this.assignedActiveTasks.taskIds();
        if (!taskIds.contains(task)) {
            throw new IllegalArgumentException("Tried to unassign active task " + task + ", but it is not currently assigned: " + this);
        }
        taskIds.remove(task);
    }

    public Set<TaskId> standbyTasks() {
        return Collections.unmodifiableSet(this.assignedStandbyTasks.taskIds());
    }

    boolean hasStandbyTask(TaskId taskId) {
        return this.assignedStandbyTasks.taskIds().contains(taskId);
    }

    boolean hasActiveTask(TaskId taskId) {
        return this.assignedActiveTasks.taskIds().contains(taskId);
    }

    int standbyTaskCount() {
        return this.assignedStandbyTasks.taskIds().size();
    }

    public void assignStandby(TaskId task) {
        this.assertNotAssigned(task);
        this.assignedStandbyTasks.taskIds().add(task);
    }

    void unassignStandby(TaskId task) {
        Set<TaskId> taskIds = this.assignedStandbyTasks.taskIds();
        if (!taskIds.contains(task)) {
            throw new IllegalArgumentException("Tried to unassign standby task " + task + ", but it is not currently assigned: " + this);
        }
        taskIds.remove(task);
    }

    Set<TaskId> assignedTasks() {
        Set<TaskId> assignedActiveTaskIds = this.assignedActiveTasks.taskIds();
        Set<TaskId> assignedStandbyTaskIds = this.assignedStandbyTasks.taskIds();
        return Collections.unmodifiableSet(Utils.union(() -> new HashSet(assignedActiveTaskIds.size() + assignedStandbyTaskIds.size()), (Set[])new Set[]{assignedActiveTaskIds, assignedStandbyTaskIds}));
    }

    public boolean previouslyOwnedStandby(TaskId task) {
        return this.previousStandbyTasks.taskIds().contains(task);
    }

    public int assignedTaskCount() {
        return this.activeTaskCount() + this.standbyTaskCount();
    }

    double assignedTaskLoad() {
        return (double)this.assignedTaskCount() / (double)this.capacity;
    }

    boolean hasAssignedTask(TaskId taskId) {
        return this.assignedActiveTasks.taskIds().contains(taskId) || this.assignedStandbyTasks.taskIds().contains(taskId);
    }

    Set<TaskId> prevActiveTasks() {
        return Collections.unmodifiableSet(this.previousActiveTasks.taskIds());
    }

    private void addPreviousActiveTask(TaskId task) {
        this.previousActiveTasks.taskIds().add(task);
    }

    void addPreviousActiveTasks(Set<TaskId> prevTasks) {
        this.previousActiveTasks.taskIds().addAll(prevTasks);
    }

    Set<TaskId> prevStandbyTasks() {
        return Collections.unmodifiableSet(this.previousStandbyTasks.taskIds());
    }

    private void addPreviousStandbyTask(TaskId task) {
        this.previousStandbyTasks.taskIds().add(task);
    }

    void addPreviousStandbyTasks(Set<TaskId> standbyTasks) {
        this.previousStandbyTasks.taskIds().addAll(standbyTasks);
    }

    Set<TaskId> previousAssignedTasks() {
        Set<TaskId> previousActiveTaskIds = this.previousActiveTasks.taskIds();
        Set<TaskId> previousStandbyTaskIds = this.previousStandbyTasks.taskIds();
        return Utils.union(() -> new HashSet(previousActiveTaskIds.size() + previousStandbyTaskIds.size()), (Set[])new Set[]{previousActiveTaskIds, previousStandbyTaskIds});
    }

    public String previousOwnerForPartition(TopicPartition partition) {
        return this.ownedPartitions.get(partition);
    }

    public Map<String, String> clientTags() {
        return this.clientTags;
    }

    public void addOwnedPartitions(Collection<TopicPartition> ownedPartitions, String consumer) {
        for (TopicPartition tp : ownedPartitions) {
            this.ownedPartitions.put(tp, consumer);
        }
    }

    public void addPreviousTasksAndOffsetSums(String consumerId, Map<TaskId, Long> taskOffsetSums) {
        this.taskOffsetSums.putAll(taskOffsetSums);
        this.consumerToPreviousStatefulTaskIds.put(consumerId, taskOffsetSums.keySet());
    }

    public void initializePrevTasks(Map<TopicPartition, TaskId> taskForPartitionMap, boolean hasNamedTopologies) {
        if (!this.previousActiveTasks.taskIds().isEmpty() || !this.previousStandbyTasks.taskIds().isEmpty()) {
            throw new IllegalStateException("Already added previous tasks to this client state.");
        }
        this.maybeFilterUnknownPrevTasksAndPartitions(taskForPartitionMap, hasNamedTopologies);
        this.initializePrevActiveTasksFromOwnedPartitions(taskForPartitionMap);
        this.initializeRemainingPrevTasksFromTaskOffsetSums();
    }

    private void maybeFilterUnknownPrevTasksAndPartitions(Map<TopicPartition, TaskId> taskForPartitionMap, boolean hasNamedTopologies) {
        if (hasNamedTopologies) {
            this.ownedPartitions.keySet().retainAll(taskForPartitionMap.keySet());
            this.previousActiveTasks.taskIds().retainAll(taskForPartitionMap.values());
            this.previousStandbyTasks.taskIds().retainAll(taskForPartitionMap.values());
        }
    }

    public void computeTaskLags(ProcessId uuid, Map<TaskId, Long> allTaskEndOffsetSums) {
        if (!this.taskLagTotals.isEmpty()) {
            throw new IllegalStateException("Already computed task lags for this client.");
        }
        for (Map.Entry<TaskId, Long> taskEntry : allTaskEndOffsetSums.entrySet()) {
            TaskId task = taskEntry.getKey();
            Long endOffsetSum = taskEntry.getValue();
            Long offsetSum = this.taskOffsetSums.getOrDefault(task, 0L);
            if (offsetSum == -2L) {
                this.taskLagTotals.put(task, -2L);
                continue;
            }
            if (offsetSum == -3L) {
                this.taskLagTotals.put(task, -3L);
                continue;
            }
            if (endOffsetSum < offsetSum) {
                LOG.warn("Task " + task + " had endOffsetSum=" + endOffsetSum + " smaller than offsetSum=" + offsetSum + " on member " + uuid + ". This probably means the task is corrupted, which in turn indicates that it will need to restore from scratch if it gets assigned. The assignor will de-prioritize returning this task to this member in the hopes that some other member may be able to re-use its state.");
                this.taskLagTotals.put(task, endOffsetSum);
                continue;
            }
            this.taskLagTotals.put(task, endOffsetSum - offsetSum);
        }
    }

    public long lagFor(TaskId task) {
        Long totalLag = this.taskLagTotals.get(task);
        if (totalLag == null) {
            throw new IllegalStateException("Tried to lookup lag for unknown task " + task);
        }
        return totalLag;
    }

    public SortedSet<TaskId> prevTasksByLag(String consumer) {
        TreeSet<TaskId> prevTasksByLag = new TreeSet<TaskId>(Comparator.comparingLong(this::lagFor).thenComparing(TaskId::compareTo));
        for (TaskId task : this.prevOwnedStatefulTasksByConsumer(consumer)) {
            if (this.taskLagTotals.containsKey(task)) {
                prevTasksByLag.add(task);
                continue;
            }
            LOG.debug("Skipping previous task {} since it's not part of the current assignment", (Object)task);
        }
        return prevTasksByLag;
    }

    public Set<TaskId> statefulActiveTasks() {
        return this.assignedActiveTasks.taskIds().stream().filter(this::isStateful).collect(Collectors.toSet());
    }

    public Set<TaskId> statelessActiveTasks() {
        return this.assignedActiveTasks.taskIds().stream().filter(task -> !this.isStateful((TaskId)task)).collect(Collectors.toSet());
    }

    boolean hasUnfulfilledQuota(int tasksPerThread) {
        return this.assignedActiveTasks.taskIds().size() < this.capacity * tasksPerThread;
    }

    boolean hasMoreAvailableCapacityThan(ClientState other) {
        if (this.capacity <= 0) {
            throw new IllegalStateException("Capacity of this ClientState must be greater than 0.");
        }
        if (other.capacity <= 0) {
            throw new IllegalStateException("Capacity of other ClientState must be greater than 0");
        }
        double otherLoad = (double)other.assignedTaskCount() / (double)other.capacity;
        double thisLoad = (double)this.assignedTaskCount() / (double)this.capacity;
        if (thisLoad < otherLoad) {
            return true;
        }
        if (thisLoad > otherLoad) {
            return false;
        }
        return this.capacity > other.capacity;
    }

    public String consumers() {
        return this.consumerToPreviousStatefulTaskIds.keySet().toString();
    }

    public Map<TaskId, Long> taskLagTotals() {
        return this.taskLagTotals;
    }

    public SortedSet<TaskId> previousActiveTasks() {
        return new TreeSet<TaskId>(this.previousActiveTasks.taskIds());
    }

    public SortedSet<TaskId> previousStandbyTasks() {
        return new TreeSet<TaskId>(this.previousStandbyTasks.taskIds());
    }

    public SortedMap<String, Set<TaskId>> taskIdsByPreviousConsumer() {
        return new TreeMap<String, Set<TaskId>>(this.consumerToPreviousStatefulTaskIds);
    }

    public void setAssignedTasks(KafkaStreamsAssignment assignment) {
        Set<TaskId> activeTasks = assignment.tasks().values().stream().filter(task -> task.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE).map(KafkaStreamsAssignment.AssignedTask::id).collect(Collectors.toSet());
        Set<TaskId> standbyTasks = assignment.tasks().values().stream().filter(task -> task.type() == KafkaStreamsAssignment.AssignedTask.Type.STANDBY).map(KafkaStreamsAssignment.AssignedTask::id).collect(Collectors.toSet());
        this.assignedActiveTasks.setTaskIds(activeTasks);
        this.assignedStandbyTasks.setTaskIds(standbyTasks);
    }

    public String currentAssignment() {
        return "[activeTasks: (" + this.assignedActiveTasks.taskIds() + ") standbyTasks: (" + this.assignedStandbyTasks.taskIds() + ")]";
    }

    public String toString() {
        return "[activeTasks: (" + this.assignedActiveTasks.taskIds() + ") standbyTasks: (" + this.assignedStandbyTasks.taskIds() + ") prevActiveTasks: (" + this.previousActiveTasks.taskIds() + ") prevStandbyTasks: (" + this.previousStandbyTasks.taskIds() + ") changelogOffsetTotalsByTask: (" + this.taskOffsetSums.entrySet() + ") taskLagTotals: (" + this.taskLagTotals.entrySet() + ") clientTags: (" + this.clientTags.entrySet() + ") capacity: " + this.capacity + " assigned: " + this.assignedTaskCount() + "]";
    }

    private boolean isStateful(TaskId task) {
        return this.taskLagTotals.containsKey(task);
    }

    private void initializePrevActiveTasksFromOwnedPartitions(Map<TopicPartition, TaskId> taskForPartitionMap) {
        for (Map.Entry<TopicPartition, String> partitionEntry : this.ownedPartitions.entrySet()) {
            TopicPartition tp = partitionEntry.getKey();
            TaskId task = taskForPartitionMap.get(tp);
            if (task != null) {
                this.addPreviousActiveTask(task);
                this.previousActiveTasks.consumerToTaskIds().computeIfAbsent(partitionEntry.getValue(), k -> new HashSet()).add(task);
                continue;
            }
            LOG.error("No task found for topic partition {}", (Object)tp);
        }
    }

    private void initializeRemainingPrevTasksFromTaskOffsetSums() {
        Set<TaskId> previousActiveTaskIds = this.previousActiveTasks.taskIds();
        if (previousActiveTaskIds.isEmpty() && !this.ownedPartitions.isEmpty()) {
            LOG.error("Tried to process tasks in offset sum map before processing tasks from ownedPartitions = {}", this.ownedPartitions);
            throw new IllegalStateException("Must initialize prevActiveTasks from ownedPartitions before initializing remaining tasks.");
        }
        for (Map.Entry<TaskId, Long> taskEntry : this.taskOffsetSums.entrySet()) {
            TaskId task = taskEntry.getKey();
            if (previousActiveTaskIds.contains(task)) continue;
            long offsetSum = taskEntry.getValue();
            if (offsetSum == -2L) {
                this.addPreviousActiveTask(task);
                continue;
            }
            this.addPreviousStandbyTask(task);
        }
    }

    private void assertNotAssigned(TaskId task) {
        if (this.assignedStandbyTasks.taskIds().contains(task) || this.assignedActiveTasks.taskIds().contains(task)) {
            throw new IllegalArgumentException("Tried to assign task " + task + ", but it is already assigned: " + this);
        }
    }
}

