/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.internals.AssignedTasks;
import org.apache.kafka.streams.processor.internals.RestoringTasks;
import org.apache.kafka.streams.processor.internals.StreamTask;

class AssignedStreamsTasks
extends AssignedTasks<StreamTask>
implements RestoringTasks {
    AssignedStreamsTasks(LogContext logContext) {
        super(logContext, "stream task");
    }

    @Override
    public StreamTask restoringTaskFor(TopicPartition partition) {
        return (StreamTask)this.restoringByPartition.get(partition);
    }

    int maybeCommitPerUserRequested() {
        int committed = 0;
        RuntimeException firstException = null;
        Iterator it = this.running().iterator();
        while (it.hasNext()) {
            StreamTask task = (StreamTask)it.next();
            try {
                if (!task.commitRequested() || !task.commitNeeded()) continue;
                task.commit();
                ++committed;
                this.log.debug("Committed active task {} per user request in", (Object)task.id());
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to commit {} since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.", (Object)task.id());
                RuntimeException fatalException = this.closeZombieTask(task);
                if (fatalException != null) {
                    throw fatalException;
                }
                it.remove();
                throw e;
            }
            catch (RuntimeException t) {
                this.log.error("Failed to commit StreamTask {} due to the following error:", (Object)task.id(), (Object)t);
                if (firstException != null) continue;
                firstException = t;
            }
        }
        if (firstException != null) {
            throw firstException;
        }
        return committed;
    }

    Map<TopicPartition, Long> recordsToDelete() {
        HashMap<TopicPartition, Long> recordsToDelete = new HashMap<TopicPartition, Long>();
        for (StreamTask task : this.running.values()) {
            recordsToDelete.putAll(task.purgableOffsets());
        }
        return recordsToDelete;
    }

    int process(long now) {
        int processed = 0;
        Iterator it = this.running.entrySet().iterator();
        while (it.hasNext()) {
            StreamTask task = (StreamTask)it.next().getValue();
            try {
                if (!task.isProcessable(now) || !task.process()) continue;
                ++processed;
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to process stream task {} since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.", (Object)task.id());
                RuntimeException fatalException = this.closeZombieTask(task);
                if (fatalException != null) {
                    throw fatalException;
                }
                it.remove();
                throw e;
            }
            catch (RuntimeException e) {
                this.log.error("Failed to process stream task {} due to the following error:", (Object)task.id(), (Object)e);
                throw e;
            }
        }
        return processed;
    }

    int punctuate() {
        int punctuated = 0;
        Iterator it = this.running.entrySet().iterator();
        while (it.hasNext()) {
            StreamTask task = (StreamTask)it.next().getValue();
            try {
                if (task.maybePunctuateStreamTime()) {
                    ++punctuated;
                }
                if (!task.maybePunctuateSystemTime()) continue;
                ++punctuated;
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to punctuate stream task {} since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.", (Object)task.id());
                RuntimeException fatalException = this.closeZombieTask(task);
                if (fatalException != null) {
                    throw fatalException;
                }
                it.remove();
                throw e;
            }
            catch (KafkaException e) {
                this.log.error("Failed to punctuate stream task {} due to the following error:", (Object)task.id(), (Object)e);
                throw e;
            }
        }
        return punctuated;
    }
}

