package org.apache.flink.runtime.filecache;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.shaded.org.apache.curator.utils.ZKPaths;
import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/filecache/FileCache.class */
public class FileCache {
    static final Logger LOG = LoggerFactory.getLogger(FileCache.class);
    private final Object lock = new Object();
    private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries;
    private final ScheduledExecutorService executorService;
    private final File[] storageDirectories;
    private final Thread shutdownHook;
    private int nextDirectory;

    /* loaded from: input_file:org/apache/flink/runtime/filecache/FileCache$CopyProcess.class */
    private static class CopyProcess implements Callable<Path> {
        private final Path filePath;
        private final Path cachedPath;
        private boolean executable;

        public CopyProcess(DistributedCache.DistributedCacheEntry distributedCacheEntry, Path path) {
            this.filePath = new Path(distributedCacheEntry.filePath);
            this.executable = distributedCacheEntry.isExecutable.booleanValue();
            this.cachedPath = path;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Path call() throws IOException {
            FileCache.copy(this.filePath, this.cachedPath, this.executable);
            return this.cachedPath;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/filecache/FileCache$DeleteProcess.class */
    private static class DeleteProcess implements Runnable {
        private final Object lock;
        private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries;
        private final String name;
        private final JobID jobID;

        public DeleteProcess(Object obj, Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> map, String str, JobID jobID) {
            this.lock = obj;
            this.entries = map;
            this.name = str;
            this.jobID = jobID;
        }

        @Override // java.lang.Runnable
        public void run() {
            Tuple4<Integer, File, Path, Future<Path>> tuple4;
            String[] list;
            try {
                synchronized (this.lock) {
                    Map<String, Tuple4<Integer, File, Path, Future<Path>>> map = this.entries.get(this.jobID);
                    if (map != null && (tuple4 = map.get(this.name)) != null) {
                        int intValue = ((Integer) tuple4.f0).intValue();
                        if (intValue > 1) {
                            tuple4.f0 = Integer.valueOf(intValue - 1);
                        } else {
                            map.remove(this.name);
                            if (map.isEmpty()) {
                                this.entries.remove(this.jobID);
                            }
                            ((Future) tuple4.f3).cancel(true);
                            File file = new File(((Path) tuple4.f2).toString());
                            if (file.exists()) {
                                if (file.isDirectory()) {
                                    FileUtils.deleteDirectory(file);
                                } else if (!file.delete()) {
                                    FileCache.LOG.error("Could not delete locally cached file " + file.getAbsolutePath());
                                }
                            }
                            File file2 = (File) tuple4.f1;
                            if (file2.isDirectory() && ((list = file2.list()) == null || list.length == 0)) {
                                file2.delete();
                            }
                        }
                    }
                }
            } catch (IOException e) {
                FileCache.LOG.error("Could not delete file from local file cache.", e);
            }
        }
    }

    public FileCache(Configuration configuration) throws IOException {
        String[] split = configuration.getString("taskmanager.tmp.dirs", ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
        this.storageDirectories = new File[split.length];
        for (int i = 0; i < split.length; i++) {
            this.storageDirectories[i] = new File(split[i], "flink-dist-cache-" + UUID.randomUUID().toString());
            String absolutePath = this.storageDirectories[i].getAbsolutePath();
            if (!this.storageDirectories[i].mkdirs()) {
                LOG.error("User file cache cannot create directory " + absolutePath);
                for (int i2 = 0; i2 < i; i2++) {
                    if (!this.storageDirectories[i2].delete()) {
                        LOG.warn("User file cache cannot remove prior directory " + this.storageDirectories[i2].getAbsolutePath());
                    }
                }
                throw new IOException("File cache cannot create temp storage directory: " + absolutePath);
            }
            LOG.info("User file cache uses directory " + absolutePath);
        }
        this.shutdownHook = createShutdownHook(this, LOG);
        this.entries = new HashMap();
        this.executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE);
    }

    public void shutdown() {
        synchronized (this.lock) {
            ScheduledExecutorService scheduledExecutorService = this.executorService;
            if (scheduledExecutorService != null) {
                scheduledExecutorService.shutdown();
                try {
                    scheduledExecutorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                }
            }
            this.entries.clear();
            for (File file : this.storageDirectories) {
                try {
                    FileUtils.deleteDirectory(file);
                } catch (IOException e2) {
                    LOG.error("File cache could not properly clean up storage directory.");
                }
            }
            if (this.shutdownHook != null && this.shutdownHook != Thread.currentThread()) {
                try {
                    Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
                } catch (IllegalStateException e3) {
                } catch (Throwable th) {
                    LOG.warn("Exception while unregistering file cache's cleanup shutdown hook.");
                }
            }
        }
    }

    public Future<Path> createTmpFile(String str, DistributedCache.DistributedCacheEntry distributedCacheEntry, JobID jobID) {
        synchronized (this.lock) {
            Map<String, Tuple4<Integer, File, Path, Future<Path>>> map = this.entries.get(jobID);
            if (map == null) {
                map = new HashMap();
                this.entries.put(jobID, map);
            }
            Tuple4<Integer, File, Path, Future<Path>> tuple4 = map.get(str);
            if (tuple4 != null) {
                tuple4.f0 = Integer.valueOf(((Integer) tuple4.f0).intValue() + 1);
                return (Future) tuple4.f3;
            }
            File[] fileArr = this.storageDirectories;
            int i = this.nextDirectory;
            this.nextDirectory = i + 1;
            File file = new File(fileArr[i], jobID.toString());
            if (this.nextDirectory >= this.storageDirectories.length) {
                this.nextDirectory = 0;
            }
            String str2 = distributedCacheEntry.filePath;
            int lastIndexOf = str2.lastIndexOf(ZKPaths.PATH_SEPARATOR);
            if (lastIndexOf > 0) {
                str2 = str2.substring(lastIndexOf + 1);
            }
            Path path = new Path(file.getAbsolutePath() + ZKPaths.PATH_SEPARATOR + str2);
            FutureTask futureTask = new FutureTask(new CopyProcess(distributedCacheEntry, path));
            this.executorService.submit(futureTask);
            map.put(str, new Tuple4<>(1, file, path, futureTask));
            return futureTask;
        }
    }

    public void deleteTmpFile(String str, JobID jobID) {
        this.executorService.schedule(new DeleteProcess(this.lock, this.entries, str, jobID), 5000L, TimeUnit.MILLISECONDS);
    }

    boolean holdsStillReference(String str, JobID jobID) {
        Tuple4<Integer, File, Path, Future<Path>> tuple4;
        Map<String, Tuple4<Integer, File, Path, Future<Path>>> map = this.entries.get(jobID);
        return (map == null || (tuple4 = map.get(str)) == null || ((Integer) tuple4.f0).intValue() <= 0) ? false : true;
    }

    public static void copy(Path path, Path path2, boolean z) throws IOException {
        FileSystem fileSystem = path.getFileSystem();
        FileSystem fileSystem2 = path2.getFileSystem();
        if (fileSystem2.exists(path2)) {
            return;
        }
        if (!fileSystem.getFileStatus(path).isDir()) {
            try {
                IOUtils.copyBytes(fileSystem.open(path), fileSystem2.create(path2, false));
                new File(path2.toString()).setExecutable(z);
                return;
            } catch (IOException e) {
                LOG.error("could not copy file to local file cache.", e);
                return;
            }
        }
        fileSystem2.mkdirs(path2);
        for (FileStatus fileStatus : fileSystem.listStatus(path)) {
            String path3 = fileStatus.getPath().toString();
            if (fileStatus.isDir() && path3.endsWith(ZKPaths.PATH_SEPARATOR)) {
                path3 = path3.substring(0, path3.length() - 1);
            }
            copy(fileStatus.getPath(), new Path(path2.toString() + path3.substring(path3.lastIndexOf(ZKPaths.PATH_SEPARATOR))), z);
        }
    }

    private static Thread createShutdownHook(FileCache fileCache, final Logger logger) {
        Thread thread = new Thread(new Runnable() { // from class: org.apache.flink.runtime.filecache.FileCache.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    FileCache.this.shutdown();
                } catch (Throwable th) {
                    logger.error("Error during shutdown of file cache via JVM shutdown hook: " + th.getMessage(), th);
                }
            }
        });
        try {
            Runtime.getRuntime().addShutdownHook(thread);
            return thread;
        } catch (IllegalStateException e) {
            return null;
        } catch (Throwable th) {
            logger.error("Cannot register shutdown hook that cleanly terminates the file cache service.");
            return null;
        }
    }
}
