/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.RestoringTasks;
import org.apache.kafka.streams.processor.internals.StateRestorer;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.slf4j.Logger;

public class StoreChangelogReader
implements ChangelogReader {
    private final Logger log;
    private final Consumer<byte[], byte[]> restoreConsumer;
    private final StateRestoreListener userStateRestoreListener;
    private final Map<TopicPartition, Long> restoreToOffsets = new HashMap<TopicPartition, Long>();
    private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<String, List<PartitionInfo>>();
    private final Map<TopicPartition, StateRestorer> stateRestorers = new ConcurrentHashMap<TopicPartition, StateRestorer>();
    private final Set<TopicPartition> needsRestoring = new HashSet<TopicPartition>();
    private final Set<TopicPartition> needsInitializing = new HashSet<TopicPartition>();
    private final Set<TopicPartition> completedRestorers = new HashSet<TopicPartition>();
    private final Duration pollTime;

    public StoreChangelogReader(Consumer<byte[], byte[]> restoreConsumer, Duration pollTime, StateRestoreListener userStateRestoreListener, LogContext logContext) {
        this.restoreConsumer = restoreConsumer;
        this.pollTime = pollTime;
        this.log = logContext.logger(this.getClass());
        this.userStateRestoreListener = userStateRestoreListener;
    }

    @Override
    public void register(StateRestorer restorer) {
        if (!this.stateRestorers.containsKey(restorer.partition())) {
            restorer.setUserRestoreListener(this.userStateRestoreListener);
            this.stateRestorers.put(restorer.partition(), restorer);
            this.log.trace("Added restorer for changelog {}", (Object)restorer.partition());
        } else {
            this.log.debug("Skip re-adding restorer for changelog {}", (Object)restorer.partition());
        }
        this.needsInitializing.add(restorer.partition());
    }

    @Override
    public Collection<TopicPartition> restore(RestoringTasks active) {
        if (!this.needsInitializing.isEmpty()) {
            this.initialize(active);
        }
        if (this.checkForCompletedRestoration()) {
            return this.completedRestorers;
        }
        try {
            ConsumerRecords records = this.restoreConsumer.poll(this.pollTime);
            for (TopicPartition partition : this.needsRestoring) {
                StateRestorer restorer = this.stateRestorers.get(partition);
                long pos = this.processNext(records.records(partition), restorer, this.restoreToOffsets.get(partition));
                restorer.setRestoredOffset(pos);
                if (!restorer.hasCompleted(pos, this.restoreToOffsets.get(partition))) continue;
                restorer.restoreDone();
                this.restoreToOffsets.remove(partition);
                this.completedRestorers.add(partition);
            }
        }
        catch (InvalidOffsetException recoverableException) {
            this.log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from scratch.", (Throwable)recoverableException);
            Set partitions = recoverableException.partitions();
            for (TopicPartition partition : partitions) {
                StreamTask task = active.restoringTaskFor(partition);
                this.log.info("Reinitializing StreamTask {} for changelog {}", (Object)task, (Object)partition);
                this.needsInitializing.remove(partition);
                this.needsRestoring.remove(partition);
                StateRestorer restorer = this.stateRestorers.get(partition);
                restorer.setCheckpointOffset(-1L);
                task.reinitializeStateStoresForPartitions(recoverableException.partitions());
            }
            this.restoreConsumer.seekToBeginning((Collection)partitions);
        }
        this.needsRestoring.removeAll(this.completedRestorers);
        this.checkForCompletedRestoration();
        return this.completedRestorers;
    }

    private void initialize(RestoringTasks active) {
        Map endOffsets;
        if (!this.restoreConsumer.subscription().isEmpty()) {
            throw new StreamsException("Restore consumer should not be subscribed to any topics (" + this.restoreConsumer.subscription() + ")");
        }
        this.refreshChangelogInfo();
        HashSet<TopicPartition> initializable = new HashSet<TopicPartition>();
        for (TopicPartition topicPartition : this.needsInitializing) {
            if (!this.hasPartition(topicPartition)) continue;
            initializable.add(topicPartition);
        }
        try {
            endOffsets = this.restoreConsumer.endOffsets(initializable);
        }
        catch (TimeoutException e) {
            this.log.debug("Could not fetch end offset for {}; will fall back to partition by partition fetching", initializable);
            return;
        }
        endOffsets.forEach((partition, endOffset) -> {
            if (endOffset != null) {
                StateRestorer restorer = this.stateRestorers.get(partition);
                long offsetLimit = restorer.offsetLimit();
                this.restoreToOffsets.put((TopicPartition)partition, Math.min(endOffset, offsetLimit));
            } else {
                this.log.info("End offset cannot be found form the returned metadata; removing this partition from the current run loop");
                initializable.remove(partition);
            }
        });
        Iterator iter = initializable.iterator();
        while (iter.hasNext()) {
            TopicPartition topicPartition = (TopicPartition)iter.next();
            Long restoreOffset = this.restoreToOffsets.get(topicPartition);
            StateRestorer restorer = this.stateRestorers.get(topicPartition);
            if (restorer.checkpoint() >= restoreOffset) {
                restorer.setRestoredOffset(restorer.checkpoint());
                iter.remove();
                this.completedRestorers.add(topicPartition);
            } else if (restoreOffset == 0L) {
                restorer.setRestoredOffset(0L);
                iter.remove();
                this.completedRestorers.add(topicPartition);
            } else {
                restorer.setEndingOffset(restoreOffset);
            }
            this.needsInitializing.remove(topicPartition);
        }
        if (!initializable.isEmpty()) {
            this.startRestoration(initializable, active);
        }
    }

    private void startRestoration(Set<TopicPartition> initialized, RestoringTasks active) {
        this.log.debug("Start restoring state stores from changelog topics {}", initialized);
        HashSet<TopicPartition> assignment = new HashSet<TopicPartition>(this.restoreConsumer.assignment());
        assignment.addAll(initialized);
        this.restoreConsumer.assign(assignment);
        ArrayList<StateRestorer> needsPositionUpdate = new ArrayList<StateRestorer>();
        for (TopicPartition partition : initialized) {
            StateRestorer restorer = this.stateRestorers.get(partition);
            if (restorer.checkpoint() != -1L) {
                this.log.trace("Found checkpoint {} from changelog {} for store {}.", new Object[]{restorer.checkpoint(), partition, restorer.storeName()});
                this.restoreConsumer.seek(partition, restorer.checkpoint());
                this.logRestoreOffsets(partition, restorer.checkpoint(), this.restoreToOffsets.get(partition));
                restorer.setStartingOffset(this.restoreConsumer.position(partition));
                this.log.debug("Calling restorer for partition {}", (Object)partition);
                restorer.restoreStarted();
                continue;
            }
            this.log.trace("Did not find checkpoint from changelog {} for store {}, rewinding to beginning.", (Object)partition, (Object)restorer.storeName());
            this.restoreConsumer.seekToBeginning(Collections.singletonList(partition));
            needsPositionUpdate.add(restorer);
        }
        for (StateRestorer restorer : needsPositionUpdate) {
            TopicPartition partition = restorer.partition();
            StreamTask task = active.restoringTaskFor(partition);
            if (task.isEosEnabled()) {
                this.log.info("No checkpoint found for task {} state store {} changelog {} with EOS turned on. Reinitializing the task and restore its state from the beginning.", new Object[]{task.id, restorer.storeName(), partition});
                this.needsInitializing.remove(partition);
                initialized.remove(partition);
                restorer.setCheckpointOffset(this.restoreConsumer.position(partition));
                task.reinitializeStateStoresForPartitions(Collections.singleton(partition));
                continue;
            }
            this.log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", new Object[]{task.id, restorer.storeName(), partition});
            long position = this.restoreConsumer.position(restorer.partition());
            this.logRestoreOffsets(restorer.partition(), position, this.restoreToOffsets.get(restorer.partition()));
            restorer.setStartingOffset(position);
            restorer.restoreStarted();
        }
        this.needsRestoring.addAll(initialized);
    }

    private void logRestoreOffsets(TopicPartition partition, long startingOffset, Long endOffset) {
        this.log.debug("Restoring partition {} from offset {} to endOffset {}", new Object[]{partition, startingOffset, endOffset});
    }

    private void refreshChangelogInfo() {
        try {
            this.partitionInfo.putAll(this.restoreConsumer.listTopics());
        }
        catch (TimeoutException e) {
            this.log.debug("Could not fetch topic metadata within the timeout, will retry in the next run loop");
        }
    }

    @Override
    public Map<TopicPartition, Long> restoredOffsets() {
        HashMap<TopicPartition, Long> restoredOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<TopicPartition, StateRestorer> entry : this.stateRestorers.entrySet()) {
            StateRestorer restorer = entry.getValue();
            if (!restorer.isPersistent()) continue;
            restoredOffsets.put(entry.getKey(), restorer.restoredOffset());
        }
        return restoredOffsets;
    }

    @Override
    public void remove(List<TopicPartition> revokedPartitions) {
        for (TopicPartition partition : revokedPartitions) {
            this.partitionInfo.remove(partition.topic());
            this.stateRestorers.remove(partition);
            this.needsRestoring.remove(partition);
            this.restoreToOffsets.remove(partition);
            this.needsInitializing.remove(partition);
            this.completedRestorers.remove(partition);
        }
    }

    @Override
    public void clear() {
        this.partitionInfo.clear();
        this.stateRestorers.clear();
        this.needsRestoring.clear();
        this.restoreToOffsets.clear();
        this.needsInitializing.clear();
        this.completedRestorers.clear();
    }

    @Override
    public boolean isEmpty() {
        return this.stateRestorers.isEmpty() && this.needsRestoring.isEmpty() && this.restoreToOffsets.isEmpty() && this.needsInitializing.isEmpty() && this.completedRestorers.isEmpty();
    }

    public String toString() {
        return "RestoreToOffset: " + this.restoreToOffsets + "\nStateRestorers: " + this.stateRestorers + "\nNeedsRestoring: " + this.needsRestoring + "\nNeedsInitializing: " + this.needsInitializing + "\nCompletedRestorers: " + this.completedRestorers + "\n";
    }

    private long processNext(List<ConsumerRecord<byte[], byte[]>> records, StateRestorer restorer, Long endOffset) {
        ArrayList<ConsumerRecord<byte[], byte[]>> restoreRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>();
        long nextPosition = -1L;
        int numberRecords = records.size();
        int numberRestored = 0;
        long lastRestoredOffset = -1L;
        for (ConsumerRecord<byte[], byte[]> record : records) {
            long offset = record.offset();
            if (restorer.hasCompleted(offset, endOffset)) {
                nextPosition = record.offset();
                break;
            }
            lastRestoredOffset = offset;
            ++numberRestored;
            if (record.key() == null) continue;
            restoreRecords.add(record);
        }
        if (nextPosition == -1L || restorer.offsetLimit() == Long.MAX_VALUE && numberRecords != numberRestored) {
            nextPosition = this.restoreConsumer.position(restorer.partition());
        }
        if (!restoreRecords.isEmpty()) {
            restorer.restore(restoreRecords);
            restorer.restoreBatchCompleted(lastRestoredOffset, records.size());
            this.log.trace("Restored from {} to {} with {} records, ending offset is {}, next starting position is {}", new Object[]{restorer.partition(), restorer.storeName(), records.size(), lastRestoredOffset, nextPosition});
        }
        return nextPosition;
    }

    private boolean checkForCompletedRestoration() {
        if (this.needsRestoring.isEmpty()) {
            this.log.info("Finished restoring all active tasks");
            this.restoreConsumer.unsubscribe();
            return true;
        }
        return false;
    }

    private boolean hasPartition(TopicPartition topicPartition) {
        List<PartitionInfo> partitions = this.partitionInfo.get(topicPartition.topic());
        if (partitions == null) {
            return false;
        }
        for (PartitionInfo partition : partitions) {
            if (partition.partition() != topicPartition.partition()) continue;
            return true;
        }
        return false;
    }
}

