/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AssignedStandbyTasks;
import org.apache.kafka.streams.processor.internals.AssignedStreamsTasks;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;

public class TaskManager {
    private final Logger log;
    private final UUID processId;
    private final AssignedStreamsTasks active;
    private final AssignedStandbyTasks standby;
    private final ChangelogReader changelogReader;
    private final String logPrefix;
    private final Consumer<byte[], byte[]> restoreConsumer;
    private final StreamThread.AbstractTaskCreator<StreamTask> taskCreator;
    private final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator;
    private final StreamsMetadataState streamsMetadataState;
    private final Admin adminClient;
    private DeleteRecordsResult deleteRecordsResult;
    private boolean rebalanceInProgress = false;
    private boolean restoreConsumerAssignedStandbys = false;
    private Cluster cluster;
    private Map<TopicPartition, TaskId> partitionsToTaskId = new HashMap<TopicPartition, TaskId>();
    private Map<TaskId, Set<TopicPartition>> assignedActiveTasks = new HashMap<TaskId, Set<TopicPartition>>();
    private Map<TaskId, Set<TopicPartition>> assignedStandbyTasks = new HashMap<TaskId, Set<TopicPartition>>();
    private Map<TaskId, Set<TopicPartition>> addedActiveTasks = new HashMap<TaskId, Set<TopicPartition>>();
    private Map<TaskId, Set<TopicPartition>> addedStandbyTasks = new HashMap<TaskId, Set<TopicPartition>>();
    private Map<TaskId, Set<TopicPartition>> revokedActiveTasks = new HashMap<TaskId, Set<TopicPartition>>();
    private Map<TaskId, Set<TopicPartition>> revokedStandbyTasks = new HashMap<TaskId, Set<TopicPartition>>();
    private Consumer<byte[], byte[]> consumer;

    TaskManager(ChangelogReader changelogReader, UUID processId, String logPrefix, Consumer<byte[], byte[]> restoreConsumer, StreamsMetadataState streamsMetadataState, StreamThread.AbstractTaskCreator<StreamTask> taskCreator, StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator, Admin adminClient, AssignedStreamsTasks active, AssignedStandbyTasks standby) {
        this.changelogReader = changelogReader;
        this.processId = processId;
        this.logPrefix = logPrefix;
        this.streamsMetadataState = streamsMetadataState;
        this.restoreConsumer = restoreConsumer;
        this.taskCreator = taskCreator;
        this.standbyTaskCreator = standbyTaskCreator;
        this.active = active;
        this.standby = standby;
        LogContext logContext = new LogContext(logPrefix);
        this.log = logContext.logger(this.getClass());
        this.adminClient = adminClient;
    }

    public Admin adminClient() {
        return this.adminClient;
    }

    void createTasks(Collection<TopicPartition> assignment) {
        if (this.consumer == null) {
            throw new IllegalStateException(this.logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen.");
        }
        if (!assignment.isEmpty() && !this.assignedActiveTasks.isEmpty()) {
            this.resumeSuspended(assignment);
        }
        if (!this.addedActiveTasks.isEmpty()) {
            this.addNewActiveTasks(this.addedActiveTasks);
        }
        if (!this.addedStandbyTasks.isEmpty()) {
            this.addNewStandbyTasks(this.addedStandbyTasks);
        }
        if (!this.addedActiveTasks.isEmpty() && this.restoreConsumerAssignedStandbys) {
            this.restoreConsumer.unsubscribe();
            this.restoreConsumerAssignedStandbys = false;
        }
        this.log.debug("Pausing all active task partitions until the underlying state stores are ready");
        this.pausePartitions();
    }

    private void resumeSuspended(Collection<TopicPartition> assignment) {
        Set<TaskId> suspendedTasks = this.partitionsToTaskSet(assignment);
        suspendedTasks.removeAll(this.addedActiveTasks.keySet());
        this.log.debug("Suspended tasks to be resumed: {}", suspendedTasks);
        for (TaskId taskId : suspendedTasks) {
            Set<TopicPartition> partitions = this.assignedActiveTasks.get(taskId);
            try {
                if (this.active.maybeResumeSuspendedTask(taskId, partitions)) continue;
                this.addedActiveTasks.put(taskId, partitions);
            }
            catch (StreamsException e) {
                this.log.error("Failed to resume a suspended active task {} due to the following error:", (Object)taskId, (Object)e);
                throw e;
            }
        }
    }

    private void addNewActiveTasks(Map<TaskId, Set<TopicPartition>> newActiveTasks) {
        this.log.debug("New active tasks to be created: {}", newActiveTasks);
        for (StreamTask task : this.taskCreator.createTasks(this.consumer, newActiveTasks)) {
            this.active.addNewTask(task);
        }
    }

    private void addNewStandbyTasks(Map<TaskId, Set<TopicPartition>> newStandbyTasks) {
        this.log.debug("New standby tasks to be created: {}", newStandbyTasks);
        for (StandbyTask task : this.standbyTaskCreator.createTasks(this.consumer, newStandbyTasks)) {
            this.standby.addNewTask(task);
        }
    }

    public Set<TaskId> cachedTasksIds() {
        HashSet<TaskId> tasks = new HashSet<TaskId>();
        File[] stateDirs = this.taskCreator.stateDirectory().listTaskDirectories();
        if (stateDirs != null) {
            for (File dir : stateDirs) {
                try {
                    TaskId id = TaskId.parse(dir.getName());
                    if (!new File(dir, ".checkpoint").exists()) continue;
                    tasks.add(id);
                }
                catch (TaskIdFormatException taskIdFormatException) {
                    // empty catch block
                }
            }
        }
        return tasks;
    }

    List<TopicPartition> closeRevokedStandbyTasks() {
        List<TopicPartition> revokedChangelogs = this.standby.closeRevokedStandbyTasks(this.revokedStandbyTasks);
        this.removeChangelogsFromRestoreConsumer(revokedChangelogs, true);
        return revokedChangelogs;
    }

    void closeRevokedSuspendedTasks() {
        RuntimeException exception = this.active.closeNotAssignedSuspendedTasks(this.revokedActiveTasks.keySet());
        if (exception != null) {
            throw exception;
        }
    }

    Set<TaskId> suspendActiveTasksAndState(Collection<TopicPartition> revokedPartitions) {
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        ArrayList<TopicPartition> revokedChangelogs = new ArrayList<TopicPartition>();
        Set<TaskId> revokedTasks = this.partitionsToTaskSet(revokedPartitions);
        firstException.compareAndSet(null, this.active.suspendOrCloseTasks(revokedTasks, revokedChangelogs));
        this.changelogReader.remove(revokedChangelogs);
        this.removeChangelogsFromRestoreConsumer(revokedChangelogs, false);
        Exception exception = firstException.get();
        if (exception != null) {
            throw new StreamsException(this.logPrefix + "failed to suspend stream tasks", exception);
        }
        return this.active.suspendedTaskIds();
    }

    Set<TaskId> closeLostTasks() {
        HashSet<TaskId> lostTasks = new HashSet<TaskId>(this.assignedActiveTasks.keySet());
        this.log.debug("Closing lost active tasks as zombies: {}", lostTasks);
        RuntimeException exception = this.active.closeAllTasksAsZombies();
        this.log.debug("Clearing assigned active tasks: {}", this.assignedActiveTasks);
        this.assignedActiveTasks.clear();
        this.log.debug("Clearing the store changelog reader: {}", (Object)this.changelogReader);
        this.changelogReader.clear();
        if (!this.restoreConsumerAssignedStandbys) {
            this.log.debug("Clearing the restore consumer's assignment: {}", (Object)this.restoreConsumer.assignment());
            this.restoreConsumer.unsubscribe();
        }
        if (exception != null) {
            throw exception;
        }
        return lostTasks;
    }

    void shutdown(boolean clean) {
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        try {
            this.active.shutdown(clean);
        }
        catch (RuntimeException fatalException) {
            firstException.compareAndSet(null, fatalException);
        }
        this.standby.shutdown(clean);
        try {
            this.restoreConsumer.unsubscribe();
        }
        catch (RuntimeException fatalException) {
            firstException.compareAndSet(null, fatalException);
        }
        this.taskCreator.close();
        this.standbyTaskCreator.close();
        RuntimeException fatalException = firstException.get();
        if (fatalException != null) {
            throw fatalException;
        }
    }

    public Set<TaskId> previousRunningTaskIds() {
        return this.active.previousRunningTaskIds();
    }

    public Set<TaskId> activeTaskIds() {
        return this.active.allAssignedTaskIds();
    }

    Set<TaskId> standbyTaskIds() {
        return this.standby.allAssignedTaskIds();
    }

    Set<TaskId> revokedActiveTaskIds() {
        return this.revokedActiveTasks.keySet();
    }

    Set<TaskId> revokedStandbyTaskIds() {
        return this.revokedStandbyTasks.keySet();
    }

    Set<TaskId> previousActiveTaskIds() {
        HashSet<TaskId> previousActiveTasks = new HashSet<TaskId>(this.assignedActiveTasks.keySet());
        previousActiveTasks.addAll(this.revokedActiveTasks.keySet());
        previousActiveTasks.removeAll(this.addedActiveTasks.keySet());
        return previousActiveTasks;
    }

    Set<TaskId> previousStandbyTaskIds() {
        HashSet<TaskId> previousStandbyTasks = new HashSet<TaskId>(this.assignedStandbyTasks.keySet());
        previousStandbyTasks.addAll(this.revokedStandbyTasks.keySet());
        previousStandbyTasks.removeAll(this.addedStandbyTasks.keySet());
        return previousStandbyTasks;
    }

    StreamTask activeTask(TopicPartition partition) {
        return (StreamTask)this.active.runningTaskFor(partition);
    }

    StandbyTask standbyTask(TopicPartition partition) {
        return (StandbyTask)this.standby.runningTaskFor(partition);
    }

    Map<TaskId, StreamTask> activeTasks() {
        return this.active.runningTaskMap();
    }

    Map<TaskId, StandbyTask> standbyTasks() {
        return this.standby.runningTaskMap();
    }

    void setConsumer(Consumer<byte[], byte[]> consumer) {
        this.consumer = consumer;
    }

    public UUID processId() {
        return this.processId;
    }

    InternalTopologyBuilder builder() {
        return this.taskCreator.builder();
    }

    void pausePartitions() {
        this.log.trace("Pausing partitions: {}", (Object)this.consumer.assignment());
        this.consumer.pause((Collection)this.consumer.assignment());
    }

    boolean updateNewAndRestoringTasks() {
        this.active.initializeNewTasks();
        this.standby.initializeNewTasks();
        if (this.active.hasRestoringTasks()) {
            Collection<TopicPartition> restored = this.changelogReader.restore(this.active);
            this.active.updateRestored(restored);
            this.removeChangelogsFromRestoreConsumer(restored, false);
        } else {
            this.active.clearRestoringPartitions();
        }
        if (this.active.allTasksRunning()) {
            Set assignment = this.consumer.assignment();
            this.log.trace("Resuming partitions {}", (Object)assignment);
            this.consumer.resume((Collection)assignment);
            this.assignStandbyPartitions();
            return this.standby.allTasksRunning();
        }
        return false;
    }

    boolean hasActiveRunningTasks() {
        return this.active.hasRunningTasks();
    }

    boolean hasStandbyRunningTasks() {
        return this.standby.hasRunningTasks();
    }

    private void assignStandbyPartitions() {
        Collection running = this.standby.running();
        HashMap<TopicPartition, Long> checkpointedOffsets = new HashMap<TopicPartition, Long>();
        for (StandbyTask standbyTask : running) {
            checkpointedOffsets.putAll(standbyTask.checkpointedOffsets());
        }
        this.restoreConsumerAssignedStandbys = true;
        this.restoreConsumer.assign(checkpointedOffsets.keySet());
        for (Map.Entry entry : checkpointedOffsets.entrySet()) {
            TopicPartition partition = (TopicPartition)entry.getKey();
            long offset = (Long)entry.getValue();
            if (offset >= 0L) {
                this.restoreConsumer.seek(partition, offset);
                continue;
            }
            this.restoreConsumer.seekToBeginning(Collections.singleton(partition));
        }
    }

    public void setRebalanceInProgress(boolean rebalanceInProgress) {
        this.rebalanceInProgress = rebalanceInProgress;
    }

    public void setClusterMetadata(Cluster cluster) {
        this.cluster = cluster;
    }

    public void setPartitionsByHostState(Map<HostInfo, Set<TopicPartition>> partitionsByHostState) {
        this.streamsMetadataState.onChange(partitionsByHostState, this.cluster);
    }

    public void setPartitionsToTaskId(Map<TopicPartition, TaskId> partitionsToTaskId) {
        this.partitionsToTaskId = partitionsToTaskId;
    }

    public void setAssignmentMetadata(Map<TaskId, Set<TopicPartition>> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks) {
        this.addedActiveTasks.clear();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : activeTasks.entrySet()) {
            if (this.assignedActiveTasks.containsKey(entry.getKey())) continue;
            this.addedActiveTasks.put(entry.getKey(), entry.getValue());
        }
        this.addedStandbyTasks.clear();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
            if (this.assignedStandbyTasks.containsKey(entry.getKey())) continue;
            this.addedStandbyTasks.put(entry.getKey(), entry.getValue());
        }
        this.revokedActiveTasks.clear();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : this.assignedActiveTasks.entrySet()) {
            if (activeTasks.containsKey(entry.getKey())) continue;
            this.revokedActiveTasks.put(entry.getKey(), entry.getValue());
        }
        this.revokedStandbyTasks.clear();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : this.assignedStandbyTasks.entrySet()) {
            if (standbyTasks.containsKey(entry.getKey())) continue;
            this.revokedStandbyTasks.put(entry.getKey(), entry.getValue());
        }
        this.log.debug("Assigning metadata with: \tpreviousAssignedActiveTasks: {},\n\tpreviousAssignedStandbyTasks: {}\nThe updated task states are: \n\tassignedActiveTasks {},\n\tassignedStandbyTasks {},\n\taddedActiveTasks {},\n\taddedStandbyTasks {},\n\trevokedActiveTasks {},\n\trevokedStandbyTasks {}", new Object[]{this.assignedActiveTasks, this.assignedStandbyTasks, activeTasks, standbyTasks, this.addedActiveTasks, this.addedStandbyTasks, this.revokedActiveTasks, this.revokedStandbyTasks});
        this.assignedActiveTasks = activeTasks;
        this.assignedStandbyTasks = standbyTasks;
    }

    public void updateSubscriptionsFromAssignment(List<TopicPartition> partitions) {
        if (this.builder().sourceTopicPattern() != null) {
            HashSet<String> assignedTopics = new HashSet<String>();
            for (TopicPartition topicPartition : partitions) {
                assignedTopics.add(topicPartition.topic());
            }
            Collection<String> existingTopics = this.builder().subscriptionUpdates().getUpdates();
            if (!existingTopics.containsAll(assignedTopics)) {
                assignedTopics.addAll(existingTopics);
                this.builder().updateSubscribedTopics(assignedTopics, this.logPrefix);
            }
        }
    }

    public void updateSubscriptionsFromMetadata(Set<String> topics) {
        Collection<String> existingTopics;
        if (this.builder().sourceTopicPattern() != null && !(existingTopics = this.builder().subscriptionUpdates().getUpdates()).equals(topics)) {
            this.builder().updateSubscribedTopics(topics, this.logPrefix);
        }
    }

    int commitAll() {
        return this.rebalanceInProgress ? -1 : this.active.commit() + this.standby.commit();
    }

    int process(long now) {
        return this.active.process(now);
    }

    int punctuate() {
        return this.active.punctuate();
    }

    int maybeCommitActiveTasksPerUserRequested() {
        return this.rebalanceInProgress ? -1 : this.active.maybeCommitPerUserRequested();
    }

    void maybePurgeCommitedRecords() {
        if (this.deleteRecordsResult == null || this.deleteRecordsResult.all().isDone()) {
            if (this.deleteRecordsResult != null && this.deleteRecordsResult.all().isCompletedExceptionally()) {
                this.log.debug("Previous delete-records request has failed: {}. Try sending the new request now", (Object)this.deleteRecordsResult.lowWatermarks());
            }
            HashMap<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<TopicPartition, RecordsToDelete>();
            for (Map.Entry<TopicPartition, Long> entry : this.active.recordsToDelete().entrySet()) {
                recordsToDelete.put(entry.getKey(), RecordsToDelete.beforeOffset((long)entry.getValue()));
            }
            if (!recordsToDelete.isEmpty()) {
                this.deleteRecordsResult = this.adminClient.deleteRecords(recordsToDelete);
                this.log.trace("Sent delete-records request: {}", recordsToDelete);
            }
        }
    }

    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        StringBuilder builder = new StringBuilder();
        builder.append("TaskManager\n");
        builder.append(indent).append("\tMetadataState:\n");
        builder.append(this.streamsMetadataState.toString(indent + "\t\t"));
        builder.append(indent).append("\tActive tasks:\n");
        builder.append(this.active.toString(indent + "\t\t"));
        builder.append(indent).append("\tStandby tasks:\n");
        builder.append(this.standby.toString(indent + "\t\t"));
        return builder.toString();
    }

    private void removeChangelogsFromRestoreConsumer(Collection<TopicPartition> changelogs, boolean areStandbyPartitions) {
        if (!changelogs.isEmpty() && areStandbyPartitions == this.restoreConsumerAssignedStandbys) {
            HashSet updatedAssignment = new HashSet(this.restoreConsumer.assignment());
            updatedAssignment.removeAll(changelogs);
            this.restoreConsumer.assign(updatedAssignment);
        }
    }

    private Set<TaskId> partitionsToTaskSet(Collection<TopicPartition> partitions) {
        HashSet<TaskId> taskIds = new HashSet<TaskId>();
        for (TopicPartition tp : partitions) {
            TaskId id = this.partitionsToTaskId.get(tp);
            if (id != null) {
                taskIds.add(id);
                continue;
            }
            this.log.error("Failed to lookup taskId for partition {}", (Object)tp);
            throw new StreamsException("Found partition in assignment with no corresponding task");
        }
        return taskIds;
    }

    Map<TaskId, Set<TopicPartition>> assignedActiveTasks() {
        return this.assignedActiveTasks;
    }

    Map<TaskId, Set<TopicPartition>> assignedStandbyTasks() {
        return this.assignedStandbyTasks;
    }
}

