/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileSystemSwapManager
implements FlowFileSwapManager {
    public static final int MINIMUM_SWAP_COUNT = 10000;
    private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
    private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part");
    public static final int SWAP_ENCODING_VERSION = 8;
    public static final String EVENT_CATEGORY = "Swap FlowFiles";
    private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
    private final File storageDirectory;
    private FlowFileRepository flowFileRepository;
    private EventReporter eventReporter;
    private ResourceClaimManager claimManager;

    public FileSystemSwapManager() {
        NiFiProperties properties = NiFiProperties.getInstance();
        Path flowFileRepoPath = properties.getFlowFileRepositoryPath();
        this.storageDirectory = flowFileRepoPath.resolve("swap").toFile();
        if (!this.storageDirectory.exists() && !this.storageDirectory.mkdirs()) {
            throw new RuntimeException("Cannot create Swap Storage directory " + this.storageDirectory.getAbsolutePath());
        }
    }

    public synchronized void initialize(SwapManagerInitializationContext initializationContext) {
        this.claimManager = initializationContext.getResourceClaimManager();
        this.eventReporter = initializationContext.getEventReporter();
        this.flowFileRepository = initializationContext.getFlowFileRepository();
    }

    public String swapOut(List<FlowFileRecord> toSwap, FlowFileQueue flowFileQueue) throws IOException {
        if (toSwap == null || toSwap.isEmpty()) {
            return null;
        }
        File swapFile = new File(this.storageDirectory, System.currentTimeMillis() + "-" + flowFileQueue.getIdentifier() + "-" + UUID.randomUUID().toString() + ".swap");
        File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part");
        String swapLocation = swapFile.getAbsolutePath();
        try (FileOutputStream fos = new FileOutputStream(swapTempFile);){
            FileSystemSwapManager.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
            fos.getFD().sync();
        }
        catch (IOException ioe) {
            swapTempFile.delete();
            throw ioe;
        }
        if (swapTempFile.renameTo(swapFile)) {
            this.flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, swapLocation);
        } else {
            this.error("Failed to swap out FlowFiles from " + flowFileQueue + " due to: Unable to rename swap file from " + swapTempFile + " to " + swapFile);
        }
        return swapLocation;
    }

    public List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
        File swapFile = new File(swapLocation);
        List<FlowFileRecord> swappedFlowFiles = this.peek(swapLocation, flowFileQueue);
        this.flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), swappedFlowFiles, flowFileQueue);
        if (!swapFile.delete()) {
            this.warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually");
        }
        return swappedFlowFiles;
    }

    public List<FlowFileRecord> peek(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
        List<FlowFileRecord> swappedFlowFiles;
        File swapFile = new File(swapLocation);
        if (!swapFile.exists()) {
            throw new FileNotFoundException("Failed to swap in FlowFiles from external storage location " + swapLocation + " into FlowFile Queue because the file could not be found");
        }
        try (FileInputStream fis = new FileInputStream(swapFile);
             DataInputStream in = new DataInputStream(fis);){
            swappedFlowFiles = FileSystemSwapManager.deserializeFlowFiles(in, swapLocation, flowFileQueue, this.claimManager);
        }
        return swappedFlowFiles;
    }

    public void purge() {
        File[] swapFiles;
        for (File file : swapFiles = this.storageDirectory.listFiles(new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return SWAP_FILE_PATTERN.matcher(name).matches() || TEMP_SWAP_FILE_PATTERN.matcher(name).matches();
            }
        })) {
            if (file.delete()) continue;
            this.warn("Failed to delete Swap File " + file + " when purging FlowFile Swap Manager");
        }
    }

    public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException {
        File[] swapFiles = this.storageDirectory.listFiles(new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return SWAP_FILE_PATTERN.matcher(name).matches() || TEMP_SWAP_FILE_PATTERN.matcher(name).matches();
            }
        });
        if (swapFiles == null) {
            return Collections.emptyList();
        }
        ArrayList<String> swapLocations = new ArrayList<String>();
        for (File swapFile : swapFiles) {
            String queueIdentifier;
            if (TEMP_SWAP_FILE_PATTERN.matcher(swapFile.getName()).matches()) {
                if (swapFile.delete()) {
                    logger.info("Removed incomplete/temporary Swap File " + swapFile);
                    continue;
                }
                this.warn("Failed to remove incomplete/temporary Swap File " + swapFile + "; this file should be cleaned up manually");
                continue;
            }
            String[] splits = swapFile.getName().split("-");
            if (splits.length == 3 && !(queueIdentifier = splits[1]).equals(flowFileQueue.getIdentifier())) continue;
            try (FileInputStream fis = new FileInputStream(swapFile);
                 BufferedInputStream bufferedIn = new BufferedInputStream(fis);
                 DataInputStream in = new DataInputStream(bufferedIn);){
                int swapEncodingVersion = in.readInt();
                if (swapEncodingVersion > 8) {
                    String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is " + swapEncodingVersion + ", which is too new (expecting " + 8 + " or less)";
                    this.eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
                    throw new IOException(errMsg);
                }
                String connectionId = in.readUTF();
                if (!connectionId.equals(flowFileQueue.getIdentifier())) continue;
                swapLocations.add(swapFile.getAbsolutePath());
            }
        }
        Collections.sort(swapLocations, new SwapFileComparator());
        return swapLocations;
    }

    /*
     * Exception decompiling
     */
    public QueueSize getSwapSize(String swapLocation) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 4 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    /*
     * Exception decompiling
     */
    public Long getMaxRecordId(String swapLocation) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [2[TRYBLOCK], 1[TRYBLOCK]], but top level block is 57[SIMPLE_IF_TAKEN]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static int serializeFlowFiles(List<FlowFileRecord> toSwap, FlowFileQueue queue, String swapLocation, OutputStream destination) throws IOException {
        if (toSwap == null || toSwap.isEmpty()) {
            return 0;
        }
        long contentSize = 0L;
        for (FlowFileRecord record : toSwap) {
            contentSize += record.getSize();
        }
        BufferedOutputStream bufferedOut = new BufferedOutputStream(destination);
        DataOutputStream out = new DataOutputStream((OutputStream)bufferedOut);
        try {
            out.writeInt(8);
            out.writeUTF(queue.getIdentifier());
            out.writeInt(toSwap.size());
            out.writeLong(contentSize);
            long maxRecordId = 0L;
            for (FlowFileRecord flowFile : toSwap) {
                if (flowFile.getId() <= maxRecordId) continue;
                maxRecordId = flowFile.getId();
            }
            out.writeLong(maxRecordId);
            for (FlowFileRecord flowFile : toSwap) {
                out.writeLong(flowFile.getId());
                out.writeLong(flowFile.getEntryDate());
                Set lineageIdentifiers = flowFile.getLineageIdentifiers();
                out.writeInt(lineageIdentifiers.size());
                for (String lineageId : lineageIdentifiers) {
                    out.writeUTF(lineageId);
                }
                out.writeLong(flowFile.getLineageStartDate());
                out.writeLong(flowFile.getLastQueueDate());
                out.writeLong(flowFile.getSize());
                ContentClaim claim = flowFile.getContentClaim();
                if (claim == null) {
                    out.writeBoolean(false);
                } else {
                    out.writeBoolean(true);
                    ResourceClaim resourceClaim = claim.getResourceClaim();
                    out.writeUTF(resourceClaim.getId());
                    out.writeUTF(resourceClaim.getContainer());
                    out.writeUTF(resourceClaim.getSection());
                    out.writeLong(claim.getOffset());
                    out.writeLong(claim.getLength());
                    out.writeLong(flowFile.getContentClaimOffset());
                    out.writeBoolean(resourceClaim.isLossTolerant());
                }
                Map attributes = flowFile.getAttributes();
                out.writeInt(attributes.size());
                for (Map.Entry entry : attributes.entrySet()) {
                    FileSystemSwapManager.writeString((String)entry.getKey(), out);
                    FileSystemSwapManager.writeString((String)entry.getValue(), out);
                }
            }
        }
        finally {
            out.flush();
        }
        logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{toSwap.size(), queue, swapLocation});
        return toSwap.size();
    }

    private static void writeString(String toWrite, OutputStream out) throws IOException {
        byte[] bytes = toWrite.getBytes(StandardCharsets.UTF_8);
        int utflen = bytes.length;
        if (utflen < 65535) {
            out.write(utflen >>> 8);
            out.write(utflen);
            out.write(bytes);
        } else {
            out.write(255);
            out.write(255);
            out.write(utflen >>> 24);
            out.write(utflen >>> 16);
            out.write(utflen >>> 8);
            out.write(utflen);
            out.write(bytes);
        }
    }

    static List<FlowFileRecord> deserializeFlowFiles(DataInputStream in, String swapLocation, FlowFileQueue queue, ResourceClaimManager claimManager) throws IOException {
        int swapEncodingVersion = in.readInt();
        if (swapEncodingVersion > 8) {
            throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is " + swapEncodingVersion + ", which is too new (expecting " + 8 + " or less)");
        }
        String connectionId = in.readUTF();
        if (!connectionId.equals(queue.getIdentifier())) {
            throw new IllegalArgumentException("Cannot deserialize FlowFiles from Swap File at location " + swapLocation + " because those FlowFiles belong to Connection with ID " + connectionId + " and an attempt was made to swap them into a Connection with ID " + queue.getIdentifier());
        }
        int numRecords = in.readInt();
        in.readLong();
        if (swapEncodingVersion > 7) {
            in.readLong();
        }
        return FileSystemSwapManager.deserializeFlowFiles(in, numRecords, swapEncodingVersion, false, claimManager);
    }

    private static List<FlowFileRecord> deserializeFlowFiles(DataInputStream in, int numFlowFiles, int serializationVersion, boolean incrementContentClaims, ResourceClaimManager claimManager) throws IOException {
        ArrayList<FlowFileRecord> flowFiles = new ArrayList<FlowFileRecord>();
        for (int i = 0; i < numFlowFiles; ++i) {
            boolean hasClaim;
            int action;
            if (serializationVersion < 3 && (action = in.read()) != 1) {
                throw new IOException("Swap File is version " + serializationVersion + " but did not contain a 'UPDATE' record type");
            }
            StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
            ffBuilder.id(in.readLong());
            ffBuilder.entryDate(in.readLong());
            if (serializationVersion > 1) {
                int numLineageIdentifiers = in.readInt();
                HashSet<String> lineageIdentifiers = new HashSet<String>(numLineageIdentifiers);
                for (int lineageIdIdx = 0; lineageIdIdx < numLineageIdentifiers; ++lineageIdIdx) {
                    lineageIdentifiers.add(in.readUTF());
                }
                ffBuilder.lineageIdentifiers(lineageIdentifiers);
                ffBuilder.lineageStartDate(in.readLong());
                if (serializationVersion > 5) {
                    ffBuilder.lastQueueDate(in.readLong());
                }
            }
            ffBuilder.size(in.readLong());
            if (serializationVersion < 3) {
                FileSystemSwapManager.readString(in);
            }
            if (hasClaim = in.readBoolean()) {
                long resourceLength;
                long resourceOffset;
                String claimId = serializationVersion < 5 ? String.valueOf(in.readLong()) : in.readUTF();
                String container = in.readUTF();
                String section = in.readUTF();
                if (serializationVersion < 6) {
                    resourceOffset = 0L;
                    resourceLength = -1L;
                } else {
                    resourceOffset = in.readLong();
                    resourceLength = in.readLong();
                }
                long claimOffset = in.readLong();
                boolean lossTolerant = serializationVersion >= 4 ? in.readBoolean() : false;
                ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant);
                StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset);
                claim.setLength(resourceLength);
                if (incrementContentClaims) {
                    claimManager.incrementClaimantCount(resourceClaim);
                }
                ffBuilder.contentClaim(claim);
                ffBuilder.contentClaimOffset(claimOffset);
            }
            boolean attributesChanged = true;
            if (serializationVersion < 3) {
                attributesChanged = in.readBoolean();
            }
            if (attributesChanged) {
                int numAttributes = in.readInt();
                for (int j = 0; j < numAttributes; ++j) {
                    String key = FileSystemSwapManager.readString(in);
                    String value = FileSystemSwapManager.readString(in);
                    ffBuilder.addAttribute(key, value);
                }
            }
            FlowFileRecord record = ffBuilder.build();
            flowFiles.add(record);
        }
        return flowFiles;
    }

    private static String readString(InputStream in) throws IOException {
        Integer numBytes = FileSystemSwapManager.readFieldLength(in);
        if (numBytes == null) {
            throw new EOFException();
        }
        byte[] bytes = new byte[numBytes.intValue()];
        FileSystemSwapManager.fillBuffer(in, bytes, numBytes);
        return new String(bytes, StandardCharsets.UTF_8);
    }

    private static Integer readFieldLength(InputStream in) throws IOException {
        int firstValue = in.read();
        int secondValue = in.read();
        if (firstValue < 0) {
            return null;
        }
        if (secondValue < 0) {
            throw new EOFException();
        }
        if (firstValue == 255 && secondValue == 255) {
            int ch4;
            int ch3;
            int ch2;
            int ch1 = in.read();
            if ((ch1 | (ch2 = in.read()) | (ch3 = in.read()) | (ch4 = in.read())) < 0) {
                throw new EOFException();
            }
            return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
        }
        return (firstValue << 8) + secondValue;
    }

    private static void fillBuffer(InputStream in, byte[] buffer, int length) throws IOException {
        int bytesRead;
        int totalBytesRead = 0;
        while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) {
            totalBytesRead += bytesRead;
        }
        if (totalBytesRead != length) {
            throw new EOFException();
        }
    }

    private void error(String error) {
        logger.error(error);
        if (this.eventReporter != null) {
            this.eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, error);
        }
    }

    private void warn(String warning) {
        logger.warn(warning);
        if (this.eventReporter != null) {
            this.eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, warning);
        }
    }

    private static class SwapFileComparator
    implements Comparator<String> {
        private SwapFileComparator() {
        }

        @Override
        public int compare(String o1, String o2) {
            if (o1 == o2) {
                return 0;
            }
            Long time1 = this.getTimestampFromFilename(o1);
            Long time2 = this.getTimestampFromFilename(o2);
            if (time1 == null && time2 == null) {
                return 0;
            }
            if (time1 == null) {
                return 1;
            }
            if (time2 == null) {
                return -1;
            }
            int timeComparisonValue = time1.compareTo(time2);
            if (timeComparisonValue != 0) {
                return timeComparisonValue;
            }
            return o1.compareTo(o2);
        }

        private Long getTimestampFromFilename(String fullyQualifiedFilename) {
            if (fullyQualifiedFilename == null) {
                return null;
            }
            File file = new File(fullyQualifiedFilename);
            String filename = file.getName();
            int idx = filename.indexOf("-");
            if (idx < 1) {
                return null;
            }
            String millisVal = filename.substring(0, idx);
            try {
                return Long.parseLong(millisVal);
            }
            catch (NumberFormatException e) {
                return null;
            }
        }
    }
}

