/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.FixedOrderMap;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.GlobalStateManager;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateManagerUtil;
import org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.slf4j.Logger;

public class GlobalStateManagerImpl
implements GlobalStateManager {
    private static final long NO_DEADLINE = -1L;
    private final Logger log;
    private final Time time;
    private final Consumer<byte[], byte[]> globalConsumer;
    private final File baseDir;
    private final StateDirectory stateDirectory;
    private final Set<String> globalStoreNames = new HashSet<String>();
    private final FixedOrderMap<String, Optional<StateStore>> globalStores = new FixedOrderMap();
    private final StateRestoreListener stateRestoreListener;
    private InternalProcessorContext globalProcessorContext;
    private final Duration pollMsPlusRequestTimeout;
    private final long taskTimeoutMs;
    private final Set<String> globalNonPersistentStoresTopics = new HashSet<String>();
    private final OffsetCheckpoint checkpointFile;
    private final Map<TopicPartition, Long> checkpointFileCache;
    private final Map<String, String> storeToChangelogTopic;
    private final List<StateStore> globalStateStores;

    public GlobalStateManagerImpl(LogContext logContext, Time time, ProcessorTopology topology, Consumer<byte[], byte[]> globalConsumer, StateDirectory stateDirectory, StateRestoreListener stateRestoreListener, StreamsConfig config) {
        this.time = time;
        this.storeToChangelogTopic = topology.storeToChangelogTopic();
        this.globalStateStores = topology.globalStateStores();
        this.baseDir = stateDirectory.globalStateDir();
        this.checkpointFile = new OffsetCheckpoint(new File(this.baseDir, ".checkpoint"));
        this.checkpointFileCache = new HashMap<TopicPartition, Long>();
        for (StateStore store : this.globalStateStores) {
            if (store.persistent()) continue;
            this.globalNonPersistentStoresTopics.add(this.changelogFor(store.name()));
        }
        this.log = logContext.logger(GlobalStateManagerImpl.class);
        this.globalConsumer = globalConsumer;
        this.stateDirectory = stateDirectory;
        this.stateRestoreListener = stateRestoreListener;
        Map<String, Object> consumerProps = config.getGlobalConsumerConfigs("dummy");
        consumerProps.put("key.deserializer", ByteArrayDeserializer.class);
        consumerProps.put("value.deserializer", ByteArrayDeserializer.class);
        int requestTimeoutMs = new ClientUtils.QuietConsumerConfig(consumerProps).getInt("request.timeout.ms");
        this.pollMsPlusRequestTimeout = Duration.ofMillis(config.getLong("poll.ms") + (long)requestTimeoutMs);
        this.taskTimeoutMs = config.getLong("task.timeout.ms");
    }

    @Override
    public void setGlobalProcessorContext(InternalProcessorContext globalProcessorContext) {
        this.globalProcessorContext = globalProcessorContext;
    }

    @Override
    public Set<String> initialize() {
        try {
            if (!this.stateDirectory.lockGlobalState()) {
                throw new LockException(String.format("Failed to lock the global state directory: %s", this.baseDir));
            }
        }
        catch (IOException e) {
            throw new LockException(String.format("Failed to lock the global state directory: %s", this.baseDir), e);
        }
        try {
            this.checkpointFileCache.putAll(this.checkpointFile.read());
        }
        catch (IOException e) {
            try {
                this.stateDirectory.unlockGlobalState();
            }
            catch (IOException e1) {
                this.log.error("Failed to unlock the global state directory", (Throwable)e);
            }
            throw new StreamsException("Failed to read checkpoints for global state globalStores", e);
        }
        HashSet<String> changelogTopics = new HashSet<String>();
        for (StateStore stateStore : this.globalStateStores) {
            this.globalStoreNames.add(stateStore.name());
            String sourceTopic = this.storeToChangelogTopic.get(stateStore.name());
            changelogTopics.add(sourceTopic);
            stateStore.init(this.globalProcessorContext, stateStore);
        }
        this.checkpointFileCache.keySet().forEach(tp -> {
            if (!changelogTopics.contains(tp.topic())) {
                this.log.error("Encountered a topic-partition in the global checkpoint file not associated with any global state store, topic-partition: {}, checkpoint file: {}. If this topic-partition is no longer valid, an application reset and state store directory cleanup will be required.", (Object)tp.topic(), (Object)this.checkpointFile.toString());
                try {
                    this.stateDirectory.unlockGlobalState();
                }
                catch (IOException e) {
                    this.log.error("Failed to unlock the global state directory", (Throwable)e);
                }
                throw new StreamsException("Encountered a topic-partition not associated with any global state store");
            }
        });
        return Collections.unmodifiableSet(this.globalStoreNames);
    }

    @Override
    public StateStore getGlobalStore(String name) {
        return ((Optional)this.globalStores.getOrDefault((Object)name, Optional.empty())).orElse(null);
    }

    @Override
    public StateStore getStore(String name) {
        return this.getGlobalStore(name);
    }

    @Override
    public File baseDir() {
        return this.baseDir;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void registerStore(StateStore store, StateRestoreCallback stateRestoreCallback) {
        if (this.globalStores.containsKey((Object)store.name())) {
            throw new IllegalArgumentException(String.format("Global Store %s has already been registered", store.name()));
        }
        if (!this.globalStoreNames.contains(store.name())) {
            throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name()));
        }
        if (stateRestoreCallback == null) {
            throw new IllegalArgumentException(String.format("The stateRestoreCallback provided for store %s was null", store.name()));
        }
        this.log.info("Restoring state for global store {}", (Object)store.name());
        List<TopicPartition> topicPartitions = this.topicPartitionsForStore(store);
        Map highWatermarks = this.retryUntilSuccessOrThrowOnTaskTimeout(() -> this.globalConsumer.endOffsets((Collection)topicPartitions), String.format("Failed to get offsets for partitions %s. The broker may be transiently unavailable at the moment.", topicPartitions));
        try {
            this.restoreState(stateRestoreCallback, topicPartitions, highWatermarks, store.name(), StateManagerUtil.converterForStore(store));
            this.globalStores.put((Object)store.name(), Optional.of(store));
        }
        finally {
            this.globalConsumer.unsubscribe();
        }
    }

    private List<TopicPartition> topicPartitionsForStore(StateStore store) {
        String sourceTopic = this.storeToChangelogTopic.get(store.name());
        List partitionInfos = this.retryUntilSuccessOrThrowOnTaskTimeout(() -> this.globalConsumer.partitionsFor(sourceTopic), String.format("Failed to get partitions for topic %s. The broker may be transiently unavailable at the moment.", sourceTopic));
        if (partitionInfos == null || partitionInfos.isEmpty()) {
            throw new StreamsException(String.format("There are no partitions available for topic %s when initializing global store %s", sourceTopic, store.name()));
        }
        ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        for (PartitionInfo partition : partitionInfos) {
            topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
        }
        return topicPartitions;
    }

    private void restoreState(StateRestoreCallback stateRestoreCallback, List<TopicPartition> topicPartitions, Map<TopicPartition, Long> highWatermarks, String storeName, RecordConverter recordConverter) {
        for (TopicPartition topicPartition : topicPartitions) {
            long offset;
            long currentDeadline = -1L;
            this.globalConsumer.assign(Collections.singletonList(topicPartition));
            Long checkpoint = this.checkpointFileCache.get(topicPartition);
            if (checkpoint != null) {
                this.globalConsumer.seek(topicPartition, checkpoint.longValue());
                offset = checkpoint;
            } else {
                this.globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
                offset = this.getGlobalConsumerOffset(topicPartition);
            }
            Long highWatermark = highWatermarks.get(topicPartition);
            RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
            this.stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark);
            long restoreCount = 0L;
            while (offset < highWatermark) {
                ConsumerRecords records = this.globalConsumer.poll(this.pollMsPlusRequestTimeout);
                currentDeadline = records.isEmpty() ? this.maybeUpdateDeadlineOrThrow(currentDeadline) : -1L;
                ArrayList<ConsumerRecord<byte[], byte[]>> restoreRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>();
                for (ConsumerRecord record : records.records(topicPartition)) {
                    if (record.key() == null) continue;
                    restoreRecords.add(recordConverter.convert((ConsumerRecord<byte[], byte[]>)record));
                }
                offset = this.getGlobalConsumerOffset(topicPartition);
                stateRestoreAdapter.restoreBatch(restoreRecords);
                this.stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size());
                restoreCount += (long)restoreRecords.size();
            }
            this.stateRestoreListener.onRestoreEnd(topicPartition, storeName, restoreCount);
            this.checkpointFileCache.put(topicPartition, offset);
        }
    }

    private long getGlobalConsumerOffset(TopicPartition topicPartition) {
        return this.retryUntilSuccessOrThrowOnTaskTimeout(() -> this.globalConsumer.position(topicPartition), String.format("Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", topicPartition));
    }

    private <R> R retryUntilSuccessOrThrowOnTaskTimeout(Supplier<R> supplier, String errorMessage) {
        long deadlineMs = -1L;
        while (true) {
            try {
                return supplier.get();
            }
            catch (TimeoutException retriableException) {
                if (this.taskTimeoutMs == 0L) {
                    throw new StreamsException(String.format("Retrying is disabled. You can enable it by setting `%s` to a value larger than zero.", "task.timeout.ms"), retriableException);
                }
                deadlineMs = this.maybeUpdateDeadlineOrThrow(deadlineMs);
                this.log.warn(errorMessage, (Throwable)retriableException);
                continue;
            }
            break;
        }
    }

    private long maybeUpdateDeadlineOrThrow(long currentDeadlineMs) {
        long currentWallClockMs = this.time.milliseconds();
        if (currentDeadlineMs == -1L) {
            long newDeadlineMs = currentWallClockMs + this.taskTimeoutMs;
            return newDeadlineMs < 0L ? Long.MAX_VALUE : newDeadlineMs;
        }
        if (currentWallClockMs >= currentDeadlineMs) {
            throw new TimeoutException(String.format("Global task did not make progress to restore state within %d ms. Adjust `%s` if needed.", currentWallClockMs - currentDeadlineMs + this.taskTimeoutMs, "task.timeout.ms"));
        }
        return currentDeadlineMs;
    }

    @Override
    public void flush() {
        this.log.debug("Flushing all global globalStores registered in the state manager");
        for (Map.Entry entry : this.globalStores.entrySet()) {
            if (((Optional)entry.getValue()).isPresent()) {
                StateStore store = (StateStore)((Optional)entry.getValue()).get();
                try {
                    this.log.trace("Flushing global store={}", (Object)store.name());
                    store.flush();
                    continue;
                }
                catch (RuntimeException e) {
                    throw new ProcessorStateException(String.format("Failed to flush global state store %s", store.name()), e);
                }
            }
            throw new IllegalStateException("Expected " + (String)entry.getKey() + " to have been initialized");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        try {
            if (this.globalStores.isEmpty()) {
                return;
            }
            StringBuilder closeFailed = new StringBuilder();
            for (Map.Entry entry : this.globalStores.entrySet()) {
                if (((Optional)entry.getValue()).isPresent()) {
                    this.log.debug("Closing global storage engine {}", entry.getKey());
                    try {
                        ((StateStore)((Optional)entry.getValue()).get()).close();
                    }
                    catch (RuntimeException e) {
                        this.log.error("Failed to close global state store {}", entry.getKey(), (Object)e);
                        closeFailed.append("Failed to close global state store:").append((String)entry.getKey()).append(". Reason: ").append(e).append("\n");
                    }
                    this.globalStores.put(entry.getKey(), Optional.empty());
                    continue;
                }
                this.log.info("Skipping to close non-initialized store {}", entry.getKey());
            }
            if (closeFailed.length() > 0) {
                throw new ProcessorStateException("Exceptions caught during close of 1 or more global state globalStores\n" + closeFailed);
            }
        }
        finally {
            this.stateDirectory.unlockGlobalState();
        }
    }

    @Override
    public void updateChangelogOffsets(Map<TopicPartition, Long> offsets) {
        this.checkpointFileCache.putAll(offsets);
    }

    @Override
    public void checkpoint() {
        HashMap<TopicPartition, Long> filteredOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<TopicPartition, Long> topicPartitionOffset : this.checkpointFileCache.entrySet()) {
            String topic = topicPartitionOffset.getKey().topic();
            if (this.globalNonPersistentStoresTopics.contains(topic)) continue;
            filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue());
        }
        try {
            this.checkpointFile.write(filteredOffsets);
        }
        catch (IOException e) {
            this.log.warn("Failed to write offset checkpoint file to {} for global stores: {}. This may occur if OS cleaned the state.dir in case when it is located in the (default) ${java.io.tmpdir}/kafka-streams directory. Changing the location of state.dir may resolve the problem", (Object)this.checkpointFile, (Object)e);
        }
    }

    @Override
    public Task.TaskType taskType() {
        return Task.TaskType.GLOBAL;
    }

    @Override
    public Map<TopicPartition, Long> changelogOffsets() {
        return Collections.unmodifiableMap(this.checkpointFileCache);
    }

    @Override
    public String changelogFor(String storeName) {
        return this.storeToChangelogTopic.get(storeName);
    }
}

