/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessorStateManager {
    private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class);
    public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
    public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
    public static final String LOCK_FILE_NAME = ".lock";
    private final String applicationId;
    private final int defaultPartition;
    private final Map<String, TopicPartition> partitionForTopic;
    private final File baseDir;
    private final FileLock directoryLock;
    private final Map<String, StateStore> stores;
    private final Set<String> loggingEnabled;
    private final Consumer<byte[], byte[]> restoreConsumer;
    private final Map<TopicPartition, Long> restoredOffsets;
    private final Map<TopicPartition, Long> checkpointedOffsets;
    private final Map<TopicPartition, Long> offsetLimits;
    private final boolean isStandby;
    private final Map<String, StateRestoreCallback> restoreCallbacks;

    public ProcessorStateManager(String applicationId, int defaultPartition, Collection<TopicPartition> sources, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
        this.applicationId = applicationId;
        this.defaultPartition = defaultPartition;
        this.partitionForTopic = new HashMap<String, TopicPartition>();
        for (TopicPartition source : sources) {
            this.partitionForTopic.put(source.topic(), source);
        }
        this.baseDir = baseDir;
        this.stores = new HashMap<String, StateStore>();
        this.loggingEnabled = new HashSet<String>();
        this.restoreConsumer = restoreConsumer;
        this.restoredOffsets = new HashMap<TopicPartition, Long>();
        this.isStandby = isStandby;
        this.restoreCallbacks = isStandby ? new HashMap() : null;
        this.offsetLimits = new HashMap<TopicPartition, Long>();
        ProcessorStateManager.createStateDirectory(baseDir);
        this.directoryLock = ProcessorStateManager.lockStateDirectory(baseDir, 5);
        if (this.directoryLock == null) {
            throw new IOException("Failed to lock the state directory: " + baseDir.getCanonicalPath());
        }
        OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
        this.checkpointedOffsets = new HashMap<TopicPartition, Long>(checkpoint.read());
        checkpoint.delete();
    }

    private static void createStateDirectory(File stateDir) throws IOException {
        if (!stateDir.exists()) {
            stateDir.mkdir();
        }
    }

    public static String storeChangelogTopic(String applicationId, String storeName) {
        return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
    }

    public static FileLock lockStateDirectory(File stateDir) throws IOException {
        return ProcessorStateManager.lockStateDirectory(stateDir, 0);
    }

    private static FileLock lockStateDirectory(File stateDir, int retry) throws IOException {
        File lockFile = new File(stateDir, LOCK_FILE_NAME);
        FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel();
        FileLock lock = ProcessorStateManager.lockStateDirectory(channel);
        while (lock == null && retry > 0) {
            try {
                Thread.sleep(200L);
            }
            catch (Exception exception) {
                // empty catch block
            }
            --retry;
            lock = ProcessorStateManager.lockStateDirectory(channel);
        }
        if (lock == null) {
            channel.close();
        }
        return lock;
    }

    private static FileLock lockStateDirectory(FileChannel channel) throws IOException {
        try {
            return channel.tryLock();
        }
        catch (OverlappingFileLockException e) {
            return null;
        }
    }

    public File baseDir() {
        return this.baseDir;
    }

    public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
        if (store.name().equals(CHECKPOINT_FILE_NAME)) {
            throw new IllegalArgumentException("Illegal store name: .checkpoint");
        }
        if (this.stores.containsKey(store.name())) {
            throw new IllegalArgumentException("Store " + store.name() + " has already been registered.");
        }
        if (loggingEnabled) {
            this.loggingEnabled.add(store.name());
        }
        String topic = loggingEnabled ? ProcessorStateManager.storeChangelogTopic(this.applicationId, store.name()) : store.name();
        int partition = this.getPartition(topic);
        boolean partitionNotFound = true;
        long startTime = System.currentTimeMillis();
        long waitTime = 5000L;
        block2: do {
            try {
                Thread.sleep(50L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            List partitionInfos = this.restoreConsumer.partitionsFor(topic);
            if (partitionInfos == null) {
                throw new StreamsException("Could not find partition info for topic: " + topic);
            }
            for (PartitionInfo partitionInfo : partitionInfos) {
                if (partitionInfo.partition() != partition) continue;
                partitionNotFound = false;
                continue block2;
            }
        } while (partitionNotFound && System.currentTimeMillis() < startTime + waitTime);
        if (partitionNotFound) {
            throw new StreamsException("Store " + store.name() + "'s change log (" + topic + ") does not contain partition " + partition);
        }
        this.stores.put(store.name(), store);
        if (this.isStandby) {
            if (store.persistent()) {
                this.restoreCallbacks.put(topic, stateRestoreCallback);
            }
        } else {
            this.restoreActiveState(topic, stateRestoreCallback);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restoreActiveState(String topicName, StateRestoreCallback stateRestoreCallback) {
        if (!this.restoreConsumer.subscription().isEmpty()) {
            throw new IllegalStateException("Restore consumer should have not subscribed to any partitions beforehand");
        }
        TopicPartition storePartition = new TopicPartition(topicName, this.getPartition(topicName));
        this.restoreConsumer.assign(Collections.singletonList(storePartition));
        try {
            long limit;
            block8: {
                this.restoreConsumer.seekToEnd(Collections.singleton(storePartition));
                long endOffset = this.restoreConsumer.position(storePartition);
                if (this.checkpointedOffsets.containsKey(storePartition)) {
                    this.restoreConsumer.seek(storePartition, this.checkpointedOffsets.get(storePartition).longValue());
                } else {
                    this.restoreConsumer.seekToBeginning(Collections.singleton(storePartition));
                }
                limit = this.offsetLimit(storePartition);
                do {
                    ConsumerRecord record;
                    long offset = 0L;
                    Iterator i$ = this.restoreConsumer.poll(100L).records(storePartition).iterator();
                    while (i$.hasNext() && (offset = (record = (ConsumerRecord)i$.next()).offset()) < limit) {
                        stateRestoreCallback.restore((byte[])record.key(), (byte[])record.value());
                    }
                    if (offset >= limit || this.restoreConsumer.position(storePartition) == endOffset) break block8;
                } while (this.restoreConsumer.position(storePartition) <= endOffset);
                throw new IllegalStateException("Log end offset should not change while restoring");
            }
            long newOffset = Math.min(limit, this.restoreConsumer.position(storePartition));
            this.restoredOffsets.put(storePartition, newOffset);
        }
        finally {
            this.restoreConsumer.assign(Collections.emptyList());
        }
    }

    public Map<TopicPartition, Long> checkpointedOffsets() {
        HashMap<TopicPartition, Long> partitionsAndOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<String, StateRestoreCallback> entry : this.restoreCallbacks.entrySet()) {
            int partition;
            String topicName = entry.getKey();
            TopicPartition storePartition = new TopicPartition(topicName, partition = this.getPartition(topicName));
            if (this.checkpointedOffsets.containsKey(storePartition)) {
                partitionsAndOffsets.put(storePartition, this.checkpointedOffsets.get(storePartition));
                continue;
            }
            partitionsAndOffsets.put(storePartition, -1L);
        }
        return partitionsAndOffsets;
    }

    public List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) {
        long limit = this.offsetLimit(storePartition);
        ArrayList<ConsumerRecord<byte[], byte[]>> remainingRecords = null;
        StateRestoreCallback restoreCallback = this.restoreCallbacks.get(storePartition.topic());
        long lastOffset = -1L;
        int count = 0;
        for (ConsumerRecord<byte[], byte[]> record : records) {
            if (record.offset() < limit) {
                restoreCallback.restore((byte[])record.key(), (byte[])record.value());
                lastOffset = record.offset();
            } else {
                if (remainingRecords == null) {
                    remainingRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>(records.size() - count);
                }
                remainingRecords.add(record);
            }
            ++count;
        }
        this.restoredOffsets.put(storePartition, lastOffset + 1L);
        return remainingRecords;
    }

    public void putOffsetLimit(TopicPartition partition, long limit) {
        this.offsetLimits.put(partition, limit);
    }

    private long offsetLimit(TopicPartition partition) {
        Long limit = this.offsetLimits.get(partition);
        return limit != null ? limit : Long.MAX_VALUE;
    }

    public StateStore getStore(String name) {
        return this.stores.get(name);
    }

    public void flush() {
        if (!this.stores.isEmpty()) {
            log.debug("Flushing stores.");
            for (StateStore store : this.stores.values()) {
                store.flush();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
        try {
            if (!this.stores.isEmpty()) {
                log.debug("Closing stores.");
                for (Map.Entry<String, StateStore> entry : this.stores.entrySet()) {
                    log.debug("Closing storage engine {}", (Object)entry.getKey());
                    entry.getValue().flush();
                    entry.getValue().close();
                }
                HashMap<TopicPartition, Long> checkpointOffsets = new HashMap<TopicPartition, Long>();
                for (String storeName : this.stores.keySet()) {
                    TopicPartition part = this.loggingEnabled.contains(storeName) ? new TopicPartition(ProcessorStateManager.storeChangelogTopic(this.applicationId, storeName), this.getPartition(storeName)) : new TopicPartition(storeName, this.getPartition(storeName));
                    if (!this.stores.get(storeName).persistent()) continue;
                    Long offset = ackedOffsets.get(part);
                    if (offset != null) {
                        checkpointOffsets.put(part, offset + 1L);
                        continue;
                    }
                    offset = this.restoredOffsets.get(part);
                    if (offset == null) continue;
                    checkpointOffsets.put(part, offset);
                }
                OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
                checkpoint.write(checkpointOffsets);
            }
        }
        finally {
            this.directoryLock.release();
            this.directoryLock.channel().close();
        }
    }

    private int getPartition(String topic) {
        TopicPartition partition = this.partitionForTopic.get(topic);
        return partition == null ? this.defaultPartition : partition.partition();
    }
}

