/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateManagerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StateDirectory {
    private static final Pattern TASK_DIR_PATH_NAME = Pattern.compile("\\d+_\\d+");
    private static final Pattern NAMED_TOPOLOGY_DIR_PATH_NAME = Pattern.compile("__.+__");
    private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
    static final String LOCK_FILE_NAME = ".lock";
    static final String PROCESS_FILE_NAME = "kafka-streams-process-metadata";
    private final Object taskDirCreationLock = new Object();
    private final Time time;
    private final String appId;
    private final File stateDir;
    private final boolean hasPersistentStores;
    private final boolean hasNamedTopologies;
    private final HashMap<TaskId, Thread> lockedTasksToOwner = new HashMap();
    private FileChannel stateDirLockChannel;
    private FileLock stateDirLock;

    public StateDirectory(StreamsConfig config, Time time, boolean hasPersistentStores, boolean hasNamedTopologies) {
        this.time = time;
        this.hasPersistentStores = hasPersistentStores;
        this.hasNamedTopologies = hasNamedTopologies;
        this.appId = config.getString("application.id");
        String stateDirName = config.getString("state.dir");
        File baseDir = new File(stateDirName);
        this.stateDir = new File(baseDir, this.appId);
        if (this.hasPersistentStores) {
            if (!baseDir.exists() && !baseDir.mkdirs()) {
                throw new ProcessorStateException(String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirName));
            }
            if (!this.stateDir.exists() && !this.stateDir.mkdir()) {
                throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", this.stateDir.getPath()));
            }
            if (this.stateDir.exists() && !this.stateDir.isDirectory()) {
                throw new ProcessorStateException(String.format("state directory [%s] can't be created as there is an existing file with the same name", this.stateDir.getPath()));
            }
            if (stateDirName.startsWith(System.getProperty("java.io.tmpdir"))) {
                log.warn("Using an OS temp directory in the state.dir property can cause failures with writing the checkpoint file due to the fact that this directory can be cleared by the OS. Resolved state.dir: [" + stateDirName + "]");
            }
            this.configurePermissions(baseDir);
            this.configurePermissions(this.stateDir);
        }
    }

    public StateDirectory(StreamsConfig config, Time time, boolean hasPersistentStores) {
        this(config, time, hasPersistentStores, false);
    }

    private void configurePermissions(File file) {
        Path path = file.toPath();
        if (path.getFileSystem().supportedFileAttributeViews().contains("posix")) {
            Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxr-x---");
            try {
                Files.setPosixFilePermissions(path, perms);
            }
            catch (IOException e) {
                log.error("Error changing permissions for the directory {} ", (Object)path, (Object)e);
            }
        } else {
            boolean set = file.setReadable(true, true);
            set &= file.setWritable(true, true);
            if (!(set &= file.setExecutable(true, true))) {
                log.error("Failed to change permissions for the directory {}", (Object)file);
            }
        }
    }

    private boolean lockStateDirectory() {
        File lockFile = new File(this.stateDir, LOCK_FILE_NAME);
        try {
            this.stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
            this.stateDirLock = this.tryLock(this.stateDirLockChannel);
        }
        catch (IOException e) {
            log.error("Unable to lock the state directory due to unexpected exception", (Throwable)e);
            throw new ProcessorStateException("Failed to lock the state directory during startup", e);
        }
        return this.stateDirLock != null;
    }

    public UUID initializeProcessId() {
        if (!this.hasPersistentStores) {
            return UUID.randomUUID();
        }
        if (!this.lockStateDirectory()) {
            log.error("Unable to obtain lock as state directory is already locked by another process");
            throw new StreamsException("Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory");
        }
        File processFile = new File(this.stateDir, PROCESS_FILE_NAME);
        ObjectMapper mapper = new ObjectMapper();
        try {
            StateDirectoryProcessFile processFileData;
            if (processFile.exists()) {
                try {
                    processFileData = (StateDirectoryProcessFile)mapper.readValue(processFile, StateDirectoryProcessFile.class);
                    log.info("Reading UUID from process file: {}", (Object)processFileData.processId);
                    if (processFileData.processId != null) {
                        return processFileData.processId;
                    }
                }
                catch (Exception e) {
                    log.warn("Failed to read json process file", (Throwable)e);
                }
            }
            processFileData = new StateDirectoryProcessFile(UUID.randomUUID());
            log.info("No process id found on disk, got fresh process id {}", (Object)processFileData.processId);
            mapper.writeValue(processFile, (Object)processFileData);
            return processFileData.processId;
        }
        catch (IOException e) {
            log.error("Unable to read/write process file due to unexpected exception", (Throwable)e);
            throw new ProcessorStateException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public File getOrCreateDirectoryForTask(TaskId taskId) {
        File taskParentDir = this.getTaskDirectoryParentName(taskId);
        File taskDir = new File(taskParentDir, StateManagerUtil.toTaskDirString(taskId));
        if (this.hasPersistentStores) {
            if (!taskDir.exists()) {
                Object object = this.taskDirCreationLock;
                synchronized (object) {
                    if (!taskParentDir.exists() && !taskParentDir.mkdir()) {
                        throw new ProcessorStateException(String.format("Parent [%s] of task directory [%s] doesn't exist and couldn't be created", taskParentDir.getPath(), taskDir.getPath()));
                    }
                    if (!taskDir.exists() && !taskDir.mkdir()) {
                        throw new ProcessorStateException(String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath()));
                    }
                }
            } else if (!taskDir.isDirectory()) {
                throw new ProcessorStateException(String.format("state directory [%s] can't be created as there is an existing file with the same name", taskDir.getPath()));
            }
        }
        return taskDir;
    }

    private File getTaskDirectoryParentName(TaskId taskId) {
        String namedTopology = taskId.topologyName();
        if (namedTopology != null) {
            if (!this.hasNamedTopologies) {
                throw new IllegalStateException("Tried to lookup taskId with named topology, but StateDirectory thinks hasNamedTopologies = false");
            }
            return new File(this.stateDir, "__" + namedTopology + "__");
        }
        return this.stateDir;
    }

    File checkpointFileFor(TaskId taskId) {
        return new File(this.getOrCreateDirectoryForTask(taskId), ".checkpoint");
    }

    boolean directoryForTaskIsEmpty(TaskId taskId) {
        File taskDir = this.getOrCreateDirectoryForTask(taskId);
        return this.taskDirIsEmpty(taskDir);
    }

    private boolean taskDirIsEmpty(File taskDir) {
        File[] storeDirs = taskDir.listFiles(pathname -> !pathname.getName().equals(".checkpoint"));
        boolean taskDirEmpty = true;
        if (storeDirs != null && storeDirs.length > 0) {
            for (File file : storeDirs) {
                if (file.getName().endsWith(LOCK_FILE_NAME)) {
                    if (file.delete()) continue;
                    log.warn("Error encountered deleting lock file in {}", (Object)taskDir);
                    continue;
                }
                log.trace("TaskDir {} was not empty, found {}", (Object)taskDir, (Object)file);
                taskDirEmpty = false;
            }
        }
        return taskDirEmpty;
    }

    File globalStateDir() {
        File dir = new File(this.stateDir, "global");
        if (this.hasPersistentStores) {
            if (!dir.exists() && !dir.mkdir()) {
                throw new ProcessorStateException(String.format("global state directory [%s] doesn't exist and couldn't be created", dir.getPath()));
            }
            if (dir.exists() && !dir.isDirectory()) {
                throw new ProcessorStateException(String.format("global state directory [%s] can't be created as there is an existing file with the same name", dir.getPath()));
            }
        }
        return dir;
    }

    private String logPrefix() {
        return String.format("stream-thread [%s]", Thread.currentThread().getName());
    }

    synchronized boolean lock(TaskId taskId) {
        if (!this.hasPersistentStores) {
            return true;
        }
        Thread lockOwner = this.lockedTasksToOwner.get(taskId);
        if (lockOwner != null) {
            if (lockOwner.equals(Thread.currentThread())) {
                log.trace("{} Found cached state dir lock for task {}", (Object)this.logPrefix(), (Object)taskId);
                return true;
            }
            return false;
        }
        if (!this.stateDir.exists()) {
            log.error("Tried to lock task directory for {} but the state directory does not exist", (Object)taskId);
            throw new IllegalStateException("The state directory has been deleted");
        }
        this.lockedTasksToOwner.put(taskId, Thread.currentThread());
        this.getOrCreateDirectoryForTask(taskId);
        return true;
    }

    synchronized void unlock(TaskId taskId) {
        Thread lockOwner = this.lockedTasksToOwner.get(taskId);
        if (lockOwner != null && lockOwner.equals(Thread.currentThread())) {
            this.lockedTasksToOwner.remove(taskId);
            log.debug("{} Released state dir lock for task {}", (Object)this.logPrefix(), (Object)taskId);
        }
    }

    public void close() {
        if (this.hasPersistentStores) {
            try {
                this.stateDirLock.release();
                this.stateDirLockChannel.close();
                this.stateDirLock = null;
                this.stateDirLockChannel = null;
            }
            catch (IOException e) {
                log.error("Unexpected exception while unlocking the state dir", (Throwable)e);
                throw new StreamsException("Failed to release the lock on the state directory", e);
            }
            if (!this.lockedTasksToOwner.isEmpty()) {
                log.error("Some task directories still locked while closing state, this indicates unclean shutdown: {}", this.lockedTasksToOwner);
            }
        }
    }

    public synchronized void clean() {
        try {
            this.cleanStateAndTaskDirectoriesCalledByUser();
        }
        catch (Exception e) {
            throw new StreamsException(e);
        }
        try {
            if (this.stateDir.exists()) {
                Utils.delete((File)this.globalStateDir().getAbsoluteFile());
            }
        }
        catch (IOException exception) {
            log.error(String.format("%s Failed to delete global state directory of %s due to an unexpected exception", this.logPrefix(), this.appId), (Throwable)exception);
            throw new StreamsException(exception);
        }
        try {
            if (this.hasPersistentStores && this.stateDir.exists() && !this.stateDir.delete()) {
                log.warn(String.format("%s Failed to delete state store directory of %s for it is not empty", this.logPrefix(), this.stateDir.getAbsolutePath()));
            }
        }
        catch (SecurityException exception) {
            log.error(String.format("%s Failed to delete state store directory of %s due to an unexpected exception", this.logPrefix(), this.stateDir.getAbsolutePath()), (Throwable)exception);
            throw new StreamsException(exception);
        }
    }

    public synchronized void cleanRemovedTasks(long cleanupDelayMs) {
        try {
            this.cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
        }
        catch (Exception cannotHappen) {
            throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanRemovedTasksCalledByCleanerThread(long cleanupDelayMs) {
        for (TaskDirectory taskDir : this.listAllTaskDirectories()) {
            String dirName = taskDir.file().getName();
            TaskId id = StateManagerUtil.parseTaskDirectoryName(dirName, taskDir.namedTopology());
            if (this.lockedTasksToOwner.containsKey(id)) continue;
            try {
                long lastModifiedMs;
                long now;
                if (!this.lock(id) || (now = this.time.milliseconds()) <= (lastModifiedMs = taskDir.file().lastModified()) + cleanupDelayMs) continue;
                log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).", new Object[]{this.logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs});
                Utils.delete((File)taskDir.file());
            }
            catch (IOException exception) {
                log.warn(String.format("%s Swallowed the following exception during deletion of obsolete state directory %s for task %s:", this.logPrefix(), dirName, id), (Throwable)exception);
            }
            finally {
                this.unlock(id);
            }
        }
        this.maybeCleanEmptyNamedTopologyDirs(true);
    }

    private IOException maybeCleanEmptyNamedTopologyDirs(boolean logExceptionAsWarn) {
        if (!this.hasNamedTopologies) {
            return null;
        }
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        File[] namedTopologyDirs = this.stateDir.listFiles(pathname -> pathname.isDirectory() && NAMED_TOPOLOGY_DIR_PATH_NAME.matcher(pathname.getName()).matches());
        if (namedTopologyDirs != null) {
            for (File namedTopologyDir : namedTopologyDirs) {
                File[] contents = namedTopologyDir.listFiles();
                if (contents == null || contents.length != 0) continue;
                try {
                    Utils.delete((File)namedTopologyDir);
                }
                catch (IOException exception) {
                    if (logExceptionAsWarn) {
                        log.warn(String.format("%sSwallowed the following exception during deletion of named topology directory %s", this.logPrefix(), namedTopologyDir.getName()), (Throwable)exception);
                    } else {
                        log.error(String.format("%s Failed to delete named topology directory %s with exception:", this.logPrefix(), namedTopologyDir.getName()), (Throwable)exception);
                    }
                    firstException.compareAndSet(null, exception);
                }
            }
        }
        return firstException.get();
    }

    private void cleanStateAndTaskDirectoriesCalledByUser() throws Exception {
        if (!this.lockedTasksToOwner.isEmpty()) {
            log.warn("Found some still-locked task directories when user requested to cleaning up the state, since Streams is not running any more these will be ignored to complete the cleanup");
        }
        AtomicReference<IOException> firstException = new AtomicReference<IOException>();
        for (TaskDirectory taskDir : this.listAllTaskDirectories()) {
            String dirName = taskDir.file().getName();
            TaskId id = StateManagerUtil.parseTaskDirectoryName(dirName, taskDir.namedTopology());
            try {
                log.info("{} Deleting task directory {} for {} as user calling cleanup.", new Object[]{this.logPrefix(), dirName, id});
                if (this.lockedTasksToOwner.containsKey(id)) {
                    log.warn("{} Task {} in state directory {} was still locked by {}", new Object[]{this.logPrefix(), dirName, id, this.lockedTasksToOwner.get(id)});
                }
                Utils.delete((File)taskDir.file());
            }
            catch (IOException exception) {
                log.error(String.format("%s Failed to delete task directory %s for %s with exception:", this.logPrefix(), dirName, id), (Throwable)exception);
                firstException.compareAndSet(null, exception);
            }
        }
        firstException.compareAndSet(null, this.maybeCleanEmptyNamedTopologyDirs(false));
        Exception exception = (Exception)firstException.get();
        if (exception != null) {
            throw exception;
        }
    }

    List<TaskDirectory> listNonEmptyTaskDirectories() {
        return this.listTaskDirectories(pathname -> {
            if (!pathname.isDirectory() || !TASK_DIR_PATH_NAME.matcher(pathname.getName()).matches()) {
                return false;
            }
            return !this.taskDirIsEmpty(pathname);
        });
    }

    List<TaskDirectory> listAllTaskDirectories() {
        return this.listTaskDirectories(pathname -> pathname.isDirectory() && TASK_DIR_PATH_NAME.matcher(pathname.getName()).matches());
    }

    private List<TaskDirectory> listTaskDirectories(FileFilter filter) {
        ArrayList<TaskDirectory> taskDirectories = new ArrayList<TaskDirectory>();
        if (this.hasPersistentStores && this.stateDir.exists()) {
            if (this.hasNamedTopologies) {
                for (File namedTopologyDir : this.listNamedTopologyDirs()) {
                    String namedTopology = this.parseNamedTopologyFromDirectory(namedTopologyDir.getName());
                    File[] taskDirs = namedTopologyDir.listFiles(filter);
                    if (taskDirs == null) continue;
                    taskDirectories.addAll(Arrays.stream(taskDirs).map(f -> new TaskDirectory((File)f, namedTopology)).collect(Collectors.toList()));
                }
            } else {
                File[] taskDirs = this.stateDir.listFiles(filter);
                if (taskDirs != null) {
                    taskDirectories.addAll(Arrays.stream(taskDirs).map(f -> new TaskDirectory((File)f, null)).collect(Collectors.toList()));
                }
            }
        }
        return taskDirectories;
    }

    private List<File> listNamedTopologyDirs() {
        File[] namedTopologyDirectories = this.stateDir.listFiles(f -> f.getName().startsWith("__") && f.getName().endsWith("__"));
        return namedTopologyDirectories != null ? Arrays.asList(namedTopologyDirectories) : Collections.emptyList();
    }

    private String parseNamedTopologyFromDirectory(String dirName) {
        return dirName.substring(2, dirName.length() - 2);
    }

    private FileLock tryLock(FileChannel channel) throws IOException {
        try {
            return channel.tryLock();
        }
        catch (OverlappingFileLockException e) {
            return null;
        }
    }

    public static class TaskDirectory {
        private final File file;
        private final String namedTopology;

        TaskDirectory(File file, String namedTopology) {
            this.file = file;
            this.namedTopology = namedTopology;
        }

        public File file() {
            return this.file;
        }

        public String namedTopology() {
            return this.namedTopology;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TaskDirectory that = (TaskDirectory)o;
            return this.file.equals(that.file) && Objects.equals(this.namedTopology, that.namedTopology);
        }

        public int hashCode() {
            return Objects.hash(this.file, this.namedTopology);
        }
    }

    @JsonIgnoreProperties(ignoreUnknown=true)
    static class StateDirectoryProcessFile {
        @JsonProperty
        private final UUID processId;

        public StateDirectoryProcessFile() {
            this.processId = null;
        }

        StateDirectoryProcessFile(UUID processId) {
            this.processId = processId;
        }
    }
}

