/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AssignedStreamsTasks;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.slf4j.Logger;

abstract class AssignedTasks<T extends Task> {
    final Logger log;
    final String taskTypeName;
    final Map<TaskId, T> created = new HashMap<TaskId, T>();
    final Map<TaskId, T> running = new ConcurrentHashMap<TaskId, T>();
    final Map<TopicPartition, T> runningByPartition = new HashMap<TopicPartition, T>();

    AssignedTasks(LogContext logContext, String taskTypeName) {
        this.taskTypeName = taskTypeName;
        this.log = logContext.logger(this.getClass());
    }

    void addNewTask(T task) {
        this.created.put(task.id(), task);
    }

    void initializeNewTasks() {
        if (!this.created.isEmpty()) {
            this.log.debug("Initializing {}s {}", (Object)this.taskTypeName, this.created.keySet());
        }
        Iterator<Map.Entry<TaskId, T>> it = this.created.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TaskId, T> entry = it.next();
            try {
                Task task = (Task)entry.getValue();
                task.initializeMetadata();
                this.removeTaskFromAllStateMaps(task, this.created);
                if (!task.initializeStateStores()) {
                    this.log.debug("Transitioning {} {} to restoring", (Object)this.taskTypeName, (Object)entry.getKey());
                    ((AssignedStreamsTasks)this).addTaskToRestoring((StreamTask)task);
                } else {
                    this.transitionToRunning(task);
                }
                it.remove();
            }
            catch (LockException e) {
                this.log.debug("Could not create {} {} due to {}; will retry", new Object[]{this.taskTypeName, entry.getKey(), e.toString()});
            }
        }
    }

    boolean allTasksRunning() {
        return this.created.isEmpty();
    }

    Collection<T> running() {
        return this.running.values();
    }

    void tryCloseZombieTask(T task) {
        try {
            task.close(false, true);
        }
        catch (RuntimeException e) {
            this.log.warn("Failed to close zombie {} {} due to {}; ignore and proceed.", new Object[]{this.taskTypeName, task.id(), e.toString()});
        }
    }

    boolean hasRunningTasks() {
        return !this.running.isEmpty();
    }

    void transitionToRunning(T task) {
        this.log.debug("Transitioning {} {} to running", (Object)this.taskTypeName, (Object)task.id());
        this.running.put(task.id(), task);
        task.initializeTopology();
        for (TopicPartition topicPartition : task.partitions()) {
            this.runningByPartition.put(topicPartition, task);
        }
        for (TopicPartition topicPartition : task.changelogPartitions()) {
            this.runningByPartition.put(topicPartition, task);
        }
    }

    void removeTaskFromAllStateMaps(T task, Map<TaskId, T> currentStateMap) {
        TaskId id = task.id();
        HashSet<TopicPartition> taskPartitions = new HashSet<TopicPartition>(task.partitions());
        taskPartitions.addAll(task.changelogPartitions());
        if (currentStateMap != this.running) {
            this.running.remove(id);
            this.runningByPartition.keySet().removeAll(taskPartitions);
        }
        if (currentStateMap != this.created) {
            this.created.remove(id);
        }
    }

    T runningTaskFor(TopicPartition partition) {
        return (T)((Task)this.runningByPartition.get(partition));
    }

    Set<TaskId> runningTaskIds() {
        return this.running.keySet();
    }

    Map<TaskId, T> runningTaskMap() {
        return Collections.unmodifiableMap(this.running);
    }

    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        StringBuilder builder = new StringBuilder();
        this.describeTasks(builder, this.running.values(), indent, "Running:");
        this.describePartitions(builder, this.runningByPartition.keySet(), indent, "Running Partitions:");
        this.describeTasks(builder, this.created.values(), indent, "New:");
        return builder.toString();
    }

    void describeTasks(StringBuilder builder, Collection<T> tasks, String indent, String name) {
        builder.append(indent).append(name);
        for (Task t : tasks) {
            builder.append(indent).append(t.toString(indent + "\t\t"));
        }
        builder.append("\n");
    }

    void describePartitions(StringBuilder builder, Collection<TopicPartition> partitions, String indent, String name) {
        builder.append(indent).append(name);
        for (TopicPartition tp : partitions) {
            builder.append(indent).append(tp.toString());
        }
        builder.append("\n");
    }

    List<T> allTasks() {
        ArrayList<T> tasks = new ArrayList<T>();
        tasks.addAll(this.running.values());
        tasks.addAll(this.created.values());
        return tasks;
    }

    Set<TaskId> allAssignedTaskIds() {
        HashSet<TaskId> taskIds = new HashSet<TaskId>();
        taskIds.addAll(this.running.keySet());
        taskIds.addAll(this.created.keySet());
        return taskIds;
    }

    void clear() {
        this.runningByPartition.clear();
        this.running.clear();
        this.created.clear();
    }

    int commit() {
        int committed = 0;
        RuntimeException firstException = null;
        for (Task task : this.running.values()) {
            try {
                if (!task.commitNeeded()) continue;
                task.commit();
                ++committed;
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to commit {} {} since it got migrated to another thread already. Will trigger a new rebalance and close all tasks as zombies together.", (Object)this.taskTypeName, (Object)task.id());
                throw e;
            }
            catch (RuntimeException t) {
                this.log.error("Failed to commit {} {} due to the following error:", new Object[]{this.taskTypeName, task.id(), t});
                if (firstException != null) continue;
                firstException = t;
            }
        }
        if (firstException != null) {
            throw firstException;
        }
        return committed;
    }

    void shutdown(boolean clean) {
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        for (Task task : this.allTasks()) {
            try {
                this.closeTask(task, clean);
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to close {} {} since it got migrated to another thread already. Closing it as zombie and move on.", (Object)this.taskTypeName, (Object)task.id());
                this.tryCloseZombieTask(task);
            }
            catch (RuntimeException t) {
                this.log.error("Failed while closing {} {} due to the following error:", new Object[]{task.getClass().getSimpleName(), task.id(), t});
                if (clean) {
                    this.closeUnclean(task);
                }
                firstException.compareAndSet(null, t);
            }
        }
        this.clear();
        RuntimeException fatalException = firstException.get();
        if (fatalException != null) {
            throw fatalException;
        }
    }

    void closeTask(T task, boolean clean) {
        task.close(clean, false);
    }

    private void closeUnclean(T task) {
        this.log.info("Try to close {} {} unclean.", (Object)task.getClass().getSimpleName(), (Object)task.id());
        try {
            task.close(false, false);
        }
        catch (RuntimeException fatalException) {
            this.log.error("Failed while closing {} {} due to the following error:", new Object[]{task.getClass().getSimpleName(), task.id(), fatalException});
        }
    }
}

