/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ActiveTaskCreator;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StandbyTaskCreator;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamsProducer;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;

class Tasks {
    private final Logger log;
    private final TopologyMetadata topologyMetadata;
    private final StreamsMetricsImpl streamsMetrics;
    private final Map<TaskId, Task> allTasksPerId = new TreeMap<TaskId, Task>();
    private final Map<TaskId, Task> readOnlyTasksPerId = Collections.unmodifiableMap(this.allTasksPerId);
    private final Collection<Task> readOnlyTasks = Collections.unmodifiableCollection(this.allTasksPerId.values());
    private final Map<TaskId, Task> activeTasksPerId = new TreeMap<TaskId, Task>();
    private final Map<TopicPartition, Task> activeTasksPerPartition = new HashMap<TopicPartition, Task>();
    private final Map<TaskId, Task> readOnlyActiveTasksPerId = Collections.unmodifiableMap(this.activeTasksPerId);
    private final Set<TaskId> readOnlyActiveTaskIds = Collections.unmodifiableSet(this.activeTasksPerId.keySet());
    private final Collection<Task> readOnlyActiveTasks = Collections.unmodifiableCollection(this.activeTasksPerId.values());
    private final Map<TaskId, Task> standbyTasksPerId = new TreeMap<TaskId, Task>();
    private final Map<TaskId, Task> readOnlyStandbyTasksPerId = Collections.unmodifiableMap(this.standbyTasksPerId);
    private final Set<TaskId> readOnlyStandbyTaskIds = Collections.unmodifiableSet(this.standbyTasksPerId.keySet());
    private final ActiveTaskCreator activeTaskCreator;
    private final StandbyTaskCreator standbyTaskCreator;
    private Consumer<byte[], byte[]> mainConsumer;

    Tasks(String logPrefix, TopologyMetadata topologyMetadata, StreamsMetricsImpl streamsMetrics, ActiveTaskCreator activeTaskCreator, StandbyTaskCreator standbyTaskCreator) {
        LogContext logContext = new LogContext(logPrefix);
        this.log = logContext.logger(this.getClass());
        this.topologyMetadata = topologyMetadata;
        this.streamsMetrics = streamsMetrics;
        this.activeTaskCreator = activeTaskCreator;
        this.standbyTaskCreator = standbyTaskCreator;
    }

    void setMainConsumer(Consumer<byte[], byte[]> mainConsumer) {
        this.mainConsumer = mainConsumer;
    }

    void handleNewAssignmentAndCreateTasks(Map<TaskId, Set<TopicPartition>> activeTasksToCreate, Map<TaskId, Set<TopicPartition>> standbyTasksToCreate, Set<TaskId> assignedActiveTasks, Set<TaskId> assignedStandbyTasks) {
        this.activeTaskCreator.removeRevokedUnknownTasks(assignedActiveTasks);
        this.standbyTaskCreator.removeRevokedUnknownTasks(assignedStandbyTasks);
        this.createTasks(activeTasksToCreate, standbyTasksToCreate);
    }

    void maybeCreateTasksFromNewTopologies() {
        Set<String> currentNamedTopologies = this.topologyMetadata.namedTopologiesView();
        this.createTasks(this.activeTaskCreator.uncreatedTasksForTopologies(currentNamedTopologies), this.standbyTaskCreator.uncreatedTasksForTopologies(currentNamedTopologies));
    }

    double totalProducerBlockedTime() {
        return this.activeTaskCreator.totalProducerBlockedTime();
    }

    void createTasks(Map<TaskId, Set<TopicPartition>> activeTasksToCreate, Map<TaskId, Set<TopicPartition>> standbyTasksToCreate) {
        TaskId taskId;
        for (Map.Entry<TaskId, Set<TopicPartition>> taskToBeCreated : activeTasksToCreate.entrySet()) {
            taskId = taskToBeCreated.getKey();
            if (!this.activeTasksPerId.containsKey(taskId)) continue;
            throw new IllegalStateException("Attempted to create an active task that we already own: " + taskId);
        }
        for (Map.Entry<TaskId, Set<TopicPartition>> taskToBeCreated : standbyTasksToCreate.entrySet()) {
            taskId = taskToBeCreated.getKey();
            if (!this.standbyTasksPerId.containsKey(taskId)) continue;
            throw new IllegalStateException("Attempted to create a standby task that we already own: " + taskId);
        }
        if (!activeTasksToCreate.isEmpty()) {
            for (Task activeTask : this.activeTaskCreator.createTasks(this.mainConsumer, activeTasksToCreate)) {
                this.activeTasksPerId.put(activeTask.id(), activeTask);
                this.allTasksPerId.put(activeTask.id(), activeTask);
                for (TopicPartition topicPartition : activeTask.inputPartitions()) {
                    this.activeTasksPerPartition.put(topicPartition, activeTask);
                }
            }
        }
        if (!standbyTasksToCreate.isEmpty()) {
            for (Task standbyTask : this.standbyTaskCreator.createTasks(standbyTasksToCreate)) {
                this.standbyTasksPerId.put(standbyTask.id(), standbyTask);
                this.allTasksPerId.put(standbyTask.id(), standbyTask);
            }
        }
    }

    void convertActiveToStandby(StreamTask activeTask, Set<TopicPartition> partitions, Map<TaskId, RuntimeException> taskCloseExceptions) {
        if (this.activeTasksPerId.remove(activeTask.id()) == null) {
            throw new IllegalStateException("Attempted to convert unknown active task to standby task: " + activeTask.id());
        }
        Set<TopicPartition> toBeRemoved = this.activeTasksPerPartition.entrySet().stream().filter(e -> ((Task)e.getValue()).id().equals(activeTask.id())).map(Map.Entry::getKey).collect(Collectors.toSet());
        toBeRemoved.forEach(this.activeTasksPerPartition::remove);
        this.cleanUpTaskProducerAndRemoveTask(activeTask.id(), taskCloseExceptions);
        StandbyTask standbyTask = this.standbyTaskCreator.createStandbyTaskFromActive(activeTask, partitions);
        this.standbyTasksPerId.put(standbyTask.id(), standbyTask);
        this.allTasksPerId.put(standbyTask.id(), standbyTask);
    }

    void convertStandbyToActive(StandbyTask standbyTask, Set<TopicPartition> partitions) {
        if (this.standbyTasksPerId.remove(standbyTask.id()) == null) {
            throw new IllegalStateException("Attempted to convert unknown standby task to stream task: " + standbyTask.id());
        }
        StreamTask activeTask = this.activeTaskCreator.createActiveTaskFromStandby(standbyTask, partitions, this.mainConsumer);
        this.activeTasksPerId.put(activeTask.id(), activeTask);
        for (TopicPartition topicPartition : activeTask.inputPartitions()) {
            this.activeTasksPerPartition.put(topicPartition, activeTask);
        }
        this.allTasksPerId.put(activeTask.id(), activeTask);
    }

    void updateInputPartitionsAndResume(Task task, Set<TopicPartition> topicPartitions) {
        boolean requiresUpdate;
        boolean bl = requiresUpdate = !task.inputPartitions().equals(topicPartitions);
        if (requiresUpdate) {
            this.log.debug("Update task {} inputPartitions: current {}, new {}", new Object[]{task, task.inputPartitions(), topicPartitions});
            for (TopicPartition inputPartition : task.inputPartitions()) {
                this.activeTasksPerPartition.remove(inputPartition);
            }
            if (task.isActive()) {
                for (TopicPartition topicPartition : topicPartitions) {
                    this.activeTasksPerPartition.put(topicPartition, task);
                }
            }
            task.updateInputPartitions(topicPartitions, this.topologyMetadata.nodeToSourceTopics(task.id()));
        }
        task.resume();
    }

    void cleanUpTaskProducerAndRemoveTask(TaskId taskId, Map<TaskId, RuntimeException> taskCloseExceptions) {
        try {
            this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId);
        }
        catch (RuntimeException e) {
            String uncleanMessage = String.format("Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", taskId);
            this.log.error(uncleanMessage, (Throwable)e);
            taskCloseExceptions.putIfAbsent(taskId, e);
        }
        this.removeTaskBeforeClosing(taskId);
    }

    void reInitializeThreadProducer() {
        this.activeTaskCreator.reInitializeThreadProducer();
    }

    void closeThreadProducerIfNeeded() {
        this.activeTaskCreator.closeThreadProducerIfNeeded();
    }

    void closeAndRemoveTaskProducerIfNeeded(Task activeTask) {
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTask.id());
    }

    void removeTaskBeforeClosing(TaskId taskId) {
        this.activeTasksPerId.remove(taskId);
        Set<TopicPartition> toBeRemoved = this.activeTasksPerPartition.entrySet().stream().filter(e -> ((Task)e.getValue()).id().equals(taskId)).map(Map.Entry::getKey).collect(Collectors.toSet());
        toBeRemoved.forEach(this.activeTasksPerPartition::remove);
        this.standbyTasksPerId.remove(taskId);
        this.allTasksPerId.remove(taskId);
    }

    void clear() {
        this.activeTasksPerId.clear();
        this.activeTasksPerPartition.clear();
        this.standbyTasksPerId.clear();
        this.allTasksPerId.clear();
    }

    Task activeTasksForInputPartition(TopicPartition partition) {
        return this.activeTasksPerPartition.get(partition);
    }

    Task standbyTask(TaskId taskId) {
        if (!this.standbyTasksPerId.containsKey(taskId)) {
            throw new IllegalStateException("Standby task unknown: " + taskId);
        }
        return this.standbyTasksPerId.get(taskId);
    }

    Task task(TaskId taskId) {
        if (!this.allTasksPerId.containsKey(taskId)) {
            throw new IllegalStateException("Task unknown: " + taskId);
        }
        return this.allTasksPerId.get(taskId);
    }

    Collection<Task> tasks(Collection<TaskId> taskIds) {
        HashSet<Task> tasks = new HashSet<Task>();
        for (TaskId taskId : taskIds) {
            tasks.add(this.task(taskId));
        }
        return tasks;
    }

    Collection<Task> activeTasks() {
        return this.readOnlyActiveTasks;
    }

    Collection<Task> allTasks() {
        return this.readOnlyTasks;
    }

    Set<TaskId> activeTaskIds() {
        return this.readOnlyActiveTaskIds;
    }

    Set<TaskId> standbyTaskIds() {
        return this.readOnlyStandbyTaskIds;
    }

    Map<TaskId, Task> activeTaskMap() {
        return this.readOnlyActiveTasksPerId;
    }

    Map<TaskId, Task> standbyTaskMap() {
        return this.readOnlyStandbyTasksPerId;
    }

    Map<TaskId, Task> tasksPerId() {
        return this.readOnlyTasksPerId;
    }

    boolean owned(TaskId taskId) {
        return this.allTasksPerId.containsKey(taskId);
    }

    StreamsProducer streamsProducerForTask(TaskId taskId) {
        return this.activeTaskCreator.streamsProducerForTask(taskId);
    }

    StreamsProducer threadProducer() {
        return this.activeTaskCreator.threadProducer();
    }

    Map<MetricName, Metric> producerMetrics() {
        return this.activeTaskCreator.producerMetrics();
    }

    Set<String> producerClientIds() {
        return this.activeTaskCreator.producerClientIds();
    }

    void addTask(Task task) {
        if (task.isActive()) {
            this.activeTasksPerId.put(task.id(), task);
        } else {
            this.standbyTasksPerId.put(task.id(), task);
        }
        this.allTasksPerId.put(task.id(), task);
    }
}

