/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.swap.SchemaSwapDeserializer;
import org.apache.nifi.controller.swap.SchemaSwapSerializer;
import org.apache.nifi.controller.swap.SimpleSwapDeserializer;
import org.apache.nifi.controller.swap.StandardSwapContents;
import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.controller.swap.SwapDeserializer;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileSystemSwapManager
implements FlowFileSwapManager {
    private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap");
    private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap\\.part");
    public static final int SWAP_ENCODING_VERSION = 10;
    public static final String EVENT_CATEGORY = "Swap FlowFiles";
    private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
    private final File storageDirectory;
    private FlowFileRepository flowFileRepository;
    private EventReporter eventReporter;
    private ResourceClaimManager claimManager;
    private static final byte[] MAGIC_HEADER = new byte[]{83, 87, 65, 80};

    public FileSystemSwapManager() {
        this.storageDirectory = null;
    }

    public FileSystemSwapManager(NiFiProperties nifiProperties) {
        this(nifiProperties.getFlowFileRepositoryPath());
    }

    public FileSystemSwapManager(Path flowFileRepoPath) {
        this.storageDirectory = flowFileRepoPath.resolve("swap").toFile();
        if (!this.storageDirectory.exists() && !this.storageDirectory.mkdirs()) {
            throw new RuntimeException("Cannot create Swap Storage directory " + this.storageDirectory.getAbsolutePath());
        }
    }

    public synchronized void initialize(SwapManagerInitializationContext initializationContext) {
        this.claimManager = initializationContext.getResourceClaimManager();
        this.eventReporter = initializationContext.getEventReporter();
        this.flowFileRepository = initializationContext.getFlowFileRepository();
    }

    public String swapOut(List<FlowFileRecord> toSwap, FlowFileQueue flowFileQueue, String partitionName) throws IOException {
        if (toSwap == null || toSwap.isEmpty()) {
            return null;
        }
        String swapFilePrefix = System.currentTimeMillis() + "-" + flowFileQueue.getIdentifier() + "-" + UUID.randomUUID().toString();
        String swapFileBaseName = partitionName == null ? swapFilePrefix : swapFilePrefix + "." + partitionName;
        String swapFileName = swapFileBaseName + ".swap";
        File swapFile = new File(this.storageDirectory, swapFileName);
        File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part");
        String swapLocation = swapFile.getAbsolutePath();
        SchemaSwapSerializer serializer = new SchemaSwapSerializer();
        try (FileOutputStream fos = new FileOutputStream(swapTempFile);
             BufferedOutputStream out = new BufferedOutputStream(fos);){
            ((OutputStream)out).write(MAGIC_HEADER);
            DataOutputStream dos = new DataOutputStream(out);
            dos.writeUTF(serializer.getSerializationName());
            serializer.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, out);
            fos.getFD().sync();
        }
        catch (IOException ioe) {
            swapTempFile.delete();
            throw ioe;
        }
        if (swapTempFile.renameTo(swapFile)) {
            this.flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, swapLocation);
        } else {
            this.error("Failed to swap out FlowFiles from " + flowFileQueue + " due to: Unable to rename swap file from " + swapTempFile + " to " + swapFile);
        }
        return swapLocation;
    }

    public SwapContents swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
        File swapFile = new File(swapLocation);
        boolean validLocation = this.flowFileRepository.isValidSwapLocationSuffix(swapFile.getName());
        if (!validLocation) {
            this.warn("Cannot swap in FlowFiles from location " + swapLocation + " because the FlowFile Repository does not know about this Swap Location. This file should be manually removed. This typically occurs when a Swap File is written but the FlowFile Repository is not updated yet to reflect this. This is generally not a cause for concern, but may be indicative of a failure to update the FlowFile Repository.");
            StandardSwapSummary swapSummary = new StandardSwapSummary(new QueueSize(0, 0L), 0L, Collections.emptyList());
            return new StandardSwapContents(swapSummary, Collections.emptyList());
        }
        SwapContents swapContents = this.peek(swapLocation, flowFileQueue);
        this.flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), swapContents.getFlowFiles(), flowFileQueue);
        if (!swapFile.delete()) {
            this.warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually");
        }
        return swapContents;
    }

    /*
     * Exception decompiling
     */
    public SwapContents peek(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 4 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    public void purge() {
        File[] swapFiles;
        for (File file : swapFiles = this.storageDirectory.listFiles(new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return SWAP_FILE_PATTERN.matcher(name).matches() || TEMP_SWAP_FILE_PATTERN.matcher(name).matches();
            }
        })) {
            if (file.delete()) continue;
            this.warn("Failed to delete Swap File " + file + " when purging FlowFile Swap Manager");
        }
    }

    private String getOwnerQueueIdentifier(File swapFile) {
        String[] splits = swapFile.getName().split("-");
        if (splits.length > 6) {
            String queueIdentifier = splits[1] + "-" + splits[2] + "-" + splits[3] + "-" + splits[4] + "-" + splits[5];
            return queueIdentifier;
        }
        return null;
    }

    private String getOwnerPartition(File swapFile) {
        String filename = swapFile.getName();
        int indexOfDot = filename.indexOf(".");
        if (indexOfDot < 1) {
            return null;
        }
        int lastIndexOfDot = filename.lastIndexOf(".");
        if (lastIndexOfDot == indexOfDot) {
            return null;
        }
        return filename.substring(indexOfDot + 1, lastIndexOfDot);
    }

    public Set<String> getSwappedPartitionNames(FlowFileQueue queue) {
        File[] swapFiles = this.storageDirectory.listFiles(new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return SWAP_FILE_PATTERN.matcher(name).matches();
            }
        });
        if (swapFiles == null) {
            return Collections.emptySet();
        }
        String queueId = queue.getIdentifier();
        return Stream.of(swapFiles).filter(swapFile -> queueId.equals(this.getOwnerQueueIdentifier((File)swapFile))).map(this::getOwnerPartition).filter(Objects::nonNull).collect(Collectors.toSet());
    }

    public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue, String partitionName) throws IOException {
        File[] swapFiles = this.storageDirectory.listFiles(new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return SWAP_FILE_PATTERN.matcher(name).matches() || TEMP_SWAP_FILE_PATTERN.matcher(name).matches();
            }
        });
        if (swapFiles == null) {
            return Collections.emptyList();
        }
        ArrayList<String> swapLocations = new ArrayList<String>();
        for (File swapFile : swapFiles) {
            if (TEMP_SWAP_FILE_PATTERN.matcher(swapFile.getName()).matches()) {
                if (swapFile.delete()) {
                    logger.info("Removed incomplete/temporary Swap File " + swapFile);
                    continue;
                }
                this.warn("Failed to remove incomplete/temporary Swap File " + swapFile + "; this file should be cleaned up manually");
                continue;
            }
            String ownerQueueId = this.getOwnerQueueIdentifier(swapFile);
            if (ownerQueueId != null) {
                String ownerPartition;
                if (!ownerQueueId.equals(flowFileQueue.getIdentifier()) || partitionName != null && !partitionName.equals(ownerPartition = this.getOwnerPartition(swapFile))) continue;
                boolean validLocation = this.flowFileRepository.isValidSwapLocationSuffix(swapFile.getName());
                if (!validLocation) {
                    logger.warn("Encountered unknown Swap File {}; will ignore this Swap File. This file should be cleaned up manually", (Object)swapFile);
                    continue;
                }
                swapLocations.add(swapFile.getAbsolutePath());
                continue;
            }
            try (FileInputStream fis = new FileInputStream(swapFile);
                 BufferedInputStream bufferedIn = new BufferedInputStream(fis);
                 DataInputStream in = new DataInputStream(bufferedIn);){
                String connectionId;
                SwapDeserializer deserializer;
                try {
                    deserializer = this.createSwapDeserializer(in);
                }
                catch (Exception e) {
                    String errMsg = "Cannot swap FlowFiles in from " + swapFile + " due to " + e;
                    this.eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
                    throw new IOException(errMsg);
                }
                if (!(deserializer instanceof SimpleSwapDeserializer) || !(connectionId = in.readUTF()).equals(flowFileQueue.getIdentifier())) continue;
                swapLocations.add(swapFile.getAbsolutePath());
            }
        }
        swapLocations.sort(new SwapFileComparator());
        return swapLocations;
    }

    /*
     * Exception decompiling
     */
    public SwapSummary getSwapSummary(String swapLocation) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 4 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private SwapDeserializer createSwapDeserializer(DataInputStream dis) throws IOException {
        dis.mark(MAGIC_HEADER.length);
        byte[] magicHeader = new byte[MAGIC_HEADER.length];
        try {
            StreamUtils.fillBuffer((InputStream)dis, (byte[])magicHeader);
        }
        catch (EOFException eof) {
            throw new IOException("Failed to read swap file because the file contained less than 4 bytes of data");
        }
        if (Arrays.equals(magicHeader, MAGIC_HEADER)) {
            String serializationName = dis.readUTF();
            if (serializationName.equals(SchemaSwapDeserializer.getSerializationName())) {
                return new SchemaSwapDeserializer();
            }
            throw new IOException("Cannot find a suitable Deserializer for swap file, written with Serialization Name '" + serializationName + "'");
        }
        dis.reset();
        return new SimpleSwapDeserializer();
    }

    private void error(String error) {
        logger.error(error);
        if (this.eventReporter != null) {
            this.eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, error);
        }
    }

    private void warn(String warning) {
        logger.warn(warning);
        if (this.eventReporter != null) {
            this.eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, warning);
        }
    }

    public String changePartitionName(String swapLocation, String newPartitionName) throws IOException {
        File existingFile = new File(swapLocation);
        if (!existingFile.exists()) {
            throw new FileNotFoundException("Could not change name of partition for swap location " + swapLocation + " because no swap file exists at that location");
        }
        String existingFilename = existingFile.getName();
        int dotIndex = existingFilename.indexOf(".");
        String newFilename = dotIndex < 0 ? existingFilename + "." + newPartitionName + ".swap" : existingFilename.substring(0, dotIndex) + "." + newPartitionName + ".swap";
        File newFile = new File(existingFile.getParentFile(), newFilename);
        Files.move(existingFile.toPath(), newFile.toPath(), new CopyOption[0]);
        return newFile.getAbsolutePath();
    }

    private static class SwapFileComparator
    implements Comparator<String> {
        private SwapFileComparator() {
        }

        @Override
        public int compare(String o1, String o2) {
            if (o1 == o2) {
                return 0;
            }
            Long time1 = this.getTimestampFromFilename(o1);
            Long time2 = this.getTimestampFromFilename(o2);
            if (time1 == null && time2 == null) {
                return 0;
            }
            if (time1 == null) {
                return 1;
            }
            if (time2 == null) {
                return -1;
            }
            int timeComparisonValue = time1.compareTo(time2);
            if (timeComparisonValue != 0) {
                return timeComparisonValue;
            }
            return o1.compareTo(o2);
        }

        private Long getTimestampFromFilename(String fullyQualifiedFilename) {
            if (fullyQualifiedFilename == null) {
                return null;
            }
            File file = new File(fullyQualifiedFilename);
            String filename = file.getName();
            int idx = filename.indexOf("-");
            if (idx < 1) {
                return null;
            }
            String millisVal = filename.substring(0, idx);
            try {
                return Long.parseLong(millisVal);
            }
            catch (NumberFormatException e) {
                return null;
            }
        }
    }
}

