/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.FixedOrderMap;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateManager;
import org.apache.kafka.streams.processor.internals.StateManagerUtil;
import org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter;
import org.apache.kafka.streams.processor.internals.StateRestorer;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.slf4j.Logger;

public class ProcessorStateManager
implements StateManager {
    private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
    private final Logger log;
    private final TaskId taskId;
    private final String logPrefix;
    private final boolean isStandby;
    private final ChangelogReader changelogReader;
    private final Map<TopicPartition, Long> offsetLimits;
    private final Map<TopicPartition, Long> standbyRestoredOffsets;
    private final Map<String, StateRestoreCallback> restoreCallbacks;
    private final Map<String, RecordConverter> recordConverters;
    private final Map<String, String> storeToChangelogTopic;
    private final FixedOrderMap<String, Optional<StateStore>> registeredStores = new FixedOrderMap();
    private final FixedOrderMap<String, Optional<StateStore>> globalStores = new FixedOrderMap();
    private final List<TopicPartition> changelogPartitions = new ArrayList<TopicPartition>();
    private final Map<String, TopicPartition> partitionForTopic;
    private final boolean eosEnabled;
    private final File baseDir;
    private OffsetCheckpoint checkpointFile;
    private final Map<TopicPartition, Long> checkpointFileCache = new HashMap<TopicPartition, Long>();
    private final Map<TopicPartition, Long> initialLoadedCheckpoints;

    public ProcessorStateManager(TaskId taskId, Collection<TopicPartition> sources, boolean isStandby, StateDirectory stateDirectory, Map<String, String> storeToChangelogTopic, ChangelogReader changelogReader, boolean eosEnabled, LogContext logContext) throws IOException {
        this.eosEnabled = eosEnabled;
        this.log = logContext.logger(ProcessorStateManager.class);
        this.taskId = taskId;
        this.changelogReader = changelogReader;
        this.logPrefix = String.format("task [%s] ", taskId);
        this.partitionForTopic = new HashMap<String, TopicPartition>();
        for (TopicPartition source : sources) {
            this.partitionForTopic.put(source.topic(), source);
        }
        this.offsetLimits = new HashMap<TopicPartition, Long>();
        this.standbyRestoredOffsets = new ConcurrentHashMap<TopicPartition, Long>();
        this.isStandby = isStandby;
        this.restoreCallbacks = isStandby ? new ConcurrentHashMap() : null;
        this.recordConverters = isStandby ? new HashMap() : null;
        this.storeToChangelogTopic = new HashMap<String, String>(storeToChangelogTopic);
        this.baseDir = stateDirectory.directoryForTask(taskId);
        this.checkpointFile = new OffsetCheckpoint(new File(this.baseDir, ".checkpoint"));
        this.initialLoadedCheckpoints = this.checkpointFile.read();
        this.log.trace("Checkpointable offsets read from checkpoint: {}", this.initialLoadedCheckpoints);
        if (eosEnabled) {
            this.checkpointFile.delete();
            this.checkpointFile = null;
        }
        this.log.debug("Created state store manager for task {}", (Object)taskId);
    }

    public static String storeChangelogTopic(String applicationId, String storeName) {
        return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
    }

    @Override
    public File baseDir() {
        return this.baseDir;
    }

    @Override
    public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
        String storeName = store.name();
        this.log.debug("Registering state store {} to its state manager", (Object)storeName);
        if (".checkpoint".equals(storeName)) {
            throw new IllegalArgumentException(String.format("%sIllegal store name: %s", this.logPrefix, storeName));
        }
        if (this.registeredStores.containsKey((Object)storeName) && ((Optional)this.registeredStores.get((Object)storeName)).isPresent()) {
            throw new IllegalArgumentException(String.format("%sStore %s has already been registered.", this.logPrefix, storeName));
        }
        String topic = this.storeToChangelogTopic.get(storeName);
        if (topic != null) {
            TopicPartition storePartition = new TopicPartition(topic, this.getPartition(topic));
            RecordConverter recordConverter = StateManagerUtil.converterForStore(store);
            if (this.isStandby) {
                this.log.trace("Preparing standby replica of persistent state store {} with changelog topic {}", (Object)storeName, (Object)topic);
                this.restoreCallbacks.put(topic, stateRestoreCallback);
                this.recordConverters.put(topic, recordConverter);
            } else {
                Long restoreCheckpoint;
                Long l = restoreCheckpoint = store.persistent() ? this.initialLoadedCheckpoints.get(storePartition) : null;
                if (restoreCheckpoint != null) {
                    this.checkpointFileCache.put(storePartition, restoreCheckpoint);
                }
                this.log.trace("Restoring state store {} from changelog topic {} at checkpoint {}", new Object[]{storeName, topic, restoreCheckpoint});
                StateRestorer restorer = new StateRestorer(storePartition, new CompositeRestoreListener(stateRestoreCallback), restoreCheckpoint, this.offsetLimit(storePartition), store.persistent(), storeName, recordConverter);
                this.changelogReader.register(restorer);
            }
            this.changelogPartitions.add(storePartition);
        }
        this.registeredStores.put((Object)storeName, Optional.of(store));
    }

    @Override
    public void reinitializeStateStoresForPartitions(Collection<TopicPartition> partitions, InternalProcessorContext processorContext) {
        StateManagerUtil.reinitializeStateStoresForPartitions(this.log, this.eosEnabled, this.baseDir, this.registeredStores, this.storeToChangelogTopic, partitions, processorContext, this.checkpointFile, this.checkpointFileCache);
    }

    void clearCheckpoints() throws IOException {
        if (this.checkpointFile != null) {
            this.checkpointFile.delete();
            this.checkpointFile = null;
            this.checkpointFileCache.clear();
        }
    }

    @Override
    public Map<TopicPartition, Long> checkpointed() {
        this.updateCheckpointFileCache(Collections.emptyMap());
        HashMap<TopicPartition, Long> partitionsAndOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<String, StateRestoreCallback> entry : this.restoreCallbacks.entrySet()) {
            String topicName = entry.getKey();
            int partition = this.getPartition(topicName);
            TopicPartition storePartition = new TopicPartition(topicName, partition);
            partitionsAndOffsets.put(storePartition, this.checkpointFileCache.getOrDefault(storePartition, -1L));
        }
        return partitionsAndOffsets;
    }

    void updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> restoreRecords, long lastOffset) {
        RecordBatchingStateRestoreCallback restoreCallback = StateRestoreCallbackAdapter.adapt(this.restoreCallbacks.get(storePartition.topic()));
        if (!restoreRecords.isEmpty()) {
            RecordConverter converter = this.recordConverters.get(storePartition.topic());
            ArrayList<ConsumerRecord<byte[], byte[]>> convertedRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>(restoreRecords.size());
            for (ConsumerRecord<byte[], byte[]> record : restoreRecords) {
                convertedRecords.add(converter.convert(record));
            }
            try {
                restoreCallback.restoreBatch(convertedRecords);
            }
            catch (RuntimeException e) {
                throw new ProcessorStateException(String.format("%sException caught while trying to restore state from %s", this.logPrefix, storePartition), e);
            }
        }
        this.standbyRestoredOffsets.put(storePartition, lastOffset + 1L);
    }

    void putOffsetLimits(Map<TopicPartition, Long> offsets) {
        this.log.trace("Updating store offset limit with {}", offsets);
        this.offsetLimits.putAll(offsets);
    }

    private long offsetLimit(TopicPartition partition) {
        Long limit = this.offsetLimits.get(partition);
        return limit != null ? limit : Long.MAX_VALUE;
    }

    ChangelogReader changelogReader() {
        return this.changelogReader;
    }

    @Override
    public StateStore getStore(String name) {
        return ((Optional)this.registeredStores.getOrDefault((Object)name, Optional.empty())).orElse(null);
    }

    @Override
    public void flush() {
        ProcessorStateException firstException = null;
        if (!this.registeredStores.isEmpty()) {
            this.log.debug("Flushing all stores registered in the state manager");
            for (Map.Entry entry : this.registeredStores.entrySet()) {
                if (((Optional)entry.getValue()).isPresent()) {
                    StateStore store = (StateStore)((Optional)entry.getValue()).get();
                    this.log.trace("Flushing store {}", (Object)store.name());
                    try {
                        store.flush();
                    }
                    catch (RuntimeException e) {
                        if (firstException == null) {
                            firstException = new ProcessorStateException(String.format("%sFailed to flush state store %s", this.logPrefix, store.name()), e);
                        }
                        this.log.error("Failed to flush state store {}: ", (Object)store.name(), (Object)e);
                    }
                    continue;
                }
                throw new IllegalStateException("Expected " + (String)entry.getKey() + " to have been initialized");
            }
        }
        if (firstException != null) {
            throw firstException;
        }
    }

    @Override
    public void close(boolean clean) throws ProcessorStateException {
        ProcessorStateException firstException = null;
        if (!this.registeredStores.isEmpty()) {
            this.log.debug("Closing its state manager and all the registered state stores");
            for (Map.Entry entry : this.registeredStores.entrySet()) {
                if (((Optional)entry.getValue()).isPresent()) {
                    StateStore store = (StateStore)((Optional)entry.getValue()).get();
                    this.log.debug("Closing storage engine {}", (Object)store.name());
                    try {
                        store.close();
                        this.registeredStores.put((Object)store.name(), Optional.empty());
                    }
                    catch (RuntimeException e) {
                        if (firstException == null) {
                            firstException = new ProcessorStateException(String.format("%sFailed to close state store %s", this.logPrefix, store.name()), e);
                        }
                        this.log.error("Failed to close state store {}: ", (Object)store.name(), (Object)e);
                    }
                    continue;
                }
                this.log.info("Skipping to close non-initialized store {}", entry.getKey());
            }
        }
        if (!clean && this.eosEnabled) {
            try {
                this.clearCheckpoints();
            }
            catch (IOException e) {
                throw new ProcessorStateException(String.format("%sError while deleting the checkpoint file", this.logPrefix), e);
            }
        }
        if (firstException != null) {
            throw firstException;
        }
    }

    @Override
    public void checkpoint(Map<TopicPartition, Long> checkpointableOffsetsFromProcessing) {
        this.ensureStoresRegistered();
        if (this.checkpointFile == null) {
            this.checkpointFile = new OffsetCheckpoint(new File(this.baseDir, ".checkpoint"));
        }
        this.updateCheckpointFileCache(checkpointableOffsetsFromProcessing);
        this.log.debug("Writing checkpoint: {}", this.checkpointFileCache);
        try {
            this.checkpointFile.write(this.checkpointFileCache);
        }
        catch (IOException e) {
            this.log.warn("Failed to write offset checkpoint file to [{}]", (Object)this.checkpointFile, (Object)e);
        }
    }

    private void updateCheckpointFileCache(Map<TopicPartition, Long> checkpointableOffsetsFromProcessing) {
        Set<TopicPartition> validCheckpointableTopics = this.validCheckpointableTopics();
        Map<TopicPartition, Long> restoredOffsets = ProcessorStateManager.validCheckpointableOffsets(this.changelogReader.restoredOffsets(), validCheckpointableTopics);
        this.log.trace("Checkpointable offsets updated with restored offsets: {}", this.checkpointFileCache);
        for (TopicPartition topicPartition : validCheckpointableTopics) {
            Long loadedOffset;
            if (checkpointableOffsetsFromProcessing.containsKey(topicPartition)) {
                this.checkpointFileCache.put(topicPartition, checkpointableOffsetsFromProcessing.get(topicPartition) + 1L);
                continue;
            }
            if (this.standbyRestoredOffsets.containsKey(topicPartition)) {
                this.checkpointFileCache.put(topicPartition, this.standbyRestoredOffsets.get(topicPartition));
                continue;
            }
            if (restoredOffsets.containsKey(topicPartition)) {
                this.checkpointFileCache.put(topicPartition, restoredOffsets.get(topicPartition));
                continue;
            }
            if (this.checkpointFileCache.containsKey(topicPartition) || (loadedOffset = ProcessorStateManager.validCheckpointableOffsets(this.initialLoadedCheckpoints, validCheckpointableTopics).get(topicPartition)) == null) continue;
            this.checkpointFileCache.put(topicPartition, loadedOffset);
        }
    }

    private int getPartition(String topic) {
        TopicPartition partition = this.partitionForTopic.get(topic);
        return partition == null ? this.taskId.partition : partition.partition();
    }

    void registerGlobalStateStores(List<StateStore> stateStores) {
        this.log.debug("Register global stores {}", stateStores);
        for (StateStore stateStore : stateStores) {
            this.globalStores.put((Object)stateStore.name(), Optional.of(stateStore));
        }
    }

    @Override
    public StateStore getGlobalStore(String name) {
        return ((Optional)this.globalStores.getOrDefault((Object)name, Optional.empty())).orElse(null);
    }

    Collection<TopicPartition> changelogPartitions() {
        return Collections.unmodifiableList(this.changelogPartitions);
    }

    void ensureStoresRegistered() {
        for (Map.Entry entry : this.registeredStores.entrySet()) {
            if (((Optional)entry.getValue()).isPresent()) continue;
            throw new IllegalStateException("store [" + (String)entry.getKey() + "] has not been correctly registered. This is a bug in Kafka Streams.");
        }
    }

    private Set<TopicPartition> validCheckpointableTopics() {
        HashSet<TopicPartition> result = new HashSet<TopicPartition>(this.storeToChangelogTopic.size());
        for (Map.Entry<String, String> storeToChangelog : this.storeToChangelogTopic.entrySet()) {
            String storeName = storeToChangelog.getKey();
            if (!this.registeredStores.containsKey((Object)storeName) || !((Optional)this.registeredStores.get((Object)storeName)).isPresent() || !((StateStore)((Optional)this.registeredStores.get((Object)storeName)).get()).persistent()) continue;
            String changelogTopic = storeToChangelog.getValue();
            result.add(new TopicPartition(changelogTopic, this.getPartition(changelogTopic)));
        }
        return result;
    }

    private static Map<TopicPartition, Long> validCheckpointableOffsets(Map<TopicPartition, Long> checkpointableOffsets, Set<TopicPartition> validCheckpointableTopics) {
        HashMap<TopicPartition, Long> result = new HashMap<TopicPartition, Long>(checkpointableOffsets.size());
        for (Map.Entry<TopicPartition, Long> topicToCheckpointableOffset : checkpointableOffsets.entrySet()) {
            TopicPartition topic = topicToCheckpointableOffset.getKey();
            if (!validCheckpointableTopics.contains(topic)) continue;
            Long checkpointableOffset = topicToCheckpointableOffset.getValue();
            result.put(topic, checkpointableOffset);
        }
        return result;
    }

    Map<TopicPartition, Long> standbyRestoredOffsets() {
        return Collections.unmodifiableMap(this.standbyRestoredOffsets);
    }
}

