/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.repository;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.LiveSerializedRepositoryRecord;
import org.apache.nifi.controller.repository.QueueProvider;
import org.apache.nifi.controller.repository.ReconstitutedSerializedRepositoryRecord;
import org.apache.nifi.controller.repository.RepositoryRecord;
import org.apache.nifi.controller.repository.RepositoryRecordSerdeFactory;
import org.apache.nifi.controller.repository.RepositoryRecordType;
import org.apache.nifi.controller.repository.ResourceClaimReference;
import org.apache.nifi.controller.repository.SerializedRepositoryRecord;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.StandardRepositoryRecordSerdeFactory;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
import org.apache.nifi.wali.SnapshotCapture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.MinimalLockingWriteAheadLog;
import org.wali.SerDeFactory;
import org.wali.SyncListener;
import org.wali.WriteAheadRepository;

public class WriteAheadFlowFileRepository
implements FlowFileRepository,
SyncListener {
    static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = "nifi.flowfile.repository.directory";
    private static final String WRITE_AHEAD_LOG_IMPL = "nifi.flowfile.repository.wal.implementation";
    private static final String RETAIN_ORPHANED_FLOWFILES = "nifi.flowfile.repository.retain.orphaned.flowfiles";
    static final String SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.SequentialAccessWriteAheadLog";
    static final String ENCRYPTED_SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog";
    private static final String MINIMAL_LOCKING_WALI = "org.wali.MinimalLockingWriteAheadLog";
    private static final String DEFAULT_WAL_IMPLEMENTATION = "org.apache.nifi.wali.SequentialAccessWriteAheadLog";
    final String walImplementation;
    protected final NiFiProperties nifiProperties;
    final AtomicLong flowFileSequenceGenerator = new AtomicLong(0L);
    private final boolean alwaysSync;
    private final boolean retainOrphanedFlowFiles;
    private static final Logger logger = LoggerFactory.getLogger(WriteAheadFlowFileRepository.class);
    volatile ScheduledFuture<?> checkpointFuture;
    final long checkpointDelayMillis;
    private final List<File> flowFileRepositoryPaths = new ArrayList<File>();
    final List<File> recoveryFiles = new ArrayList<File>();
    final ScheduledExecutorService checkpointExecutor;
    private volatile Collection<SerializedRepositoryRecord> recoveredRecords = null;
    private final Set<ResourceClaim> orphanedResourceClaims = Collections.synchronizedSet(new HashSet());
    private final Set<String> swapLocationSuffixes = new HashSet<String>();
    private WriteAheadRepository<SerializedRepositoryRecord> wal;
    private RepositoryRecordSerdeFactory serdeFactory;
    private ResourceClaimManager claimManager;
    private final ConcurrentMap<Integer, BlockingQueue<ResourceClaim>> claimsAwaitingDestruction = new ConcurrentHashMap<Integer, BlockingQueue<ResourceClaim>>();

    public WriteAheadFlowFileRepository() {
        this.alwaysSync = false;
        this.checkpointDelayMillis = 0L;
        this.checkpointExecutor = null;
        this.walImplementation = null;
        this.nifiProperties = null;
        this.retainOrphanedFlowFiles = true;
    }

    public WriteAheadFlowFileRepository(NiFiProperties nifiProperties) {
        this.alwaysSync = Boolean.parseBoolean(nifiProperties.getProperty("nifi.flowfile.repository.always.sync", "false"));
        this.nifiProperties = nifiProperties;
        String orphanedFlowFileProperty = nifiProperties.getProperty(RETAIN_ORPHANED_FLOWFILES);
        this.retainOrphanedFlowFiles = orphanedFlowFileProperty == null || Boolean.parseBoolean(orphanedFlowFileProperty);
        String writeAheadLogImpl = nifiProperties.getProperty(WRITE_AHEAD_LOG_IMPL);
        if (writeAheadLogImpl == null) {
            writeAheadLogImpl = "org.apache.nifi.wali.SequentialAccessWriteAheadLog";
        }
        this.walImplementation = writeAheadLogImpl;
        for (String propertyName : nifiProperties.getPropertyKeys()) {
            if (!propertyName.startsWith(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX)) continue;
            String dirName = nifiProperties.getProperty(propertyName);
            this.recoveryFiles.add(new File(dirName));
        }
        if (WriteAheadFlowFileRepository.isSequentialAccessWAL(this.walImplementation)) {
            String directoryName = nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX);
            this.flowFileRepositoryPaths.add(new File(directoryName));
        } else {
            this.flowFileRepositoryPaths.addAll(this.recoveryFiles);
        }
        this.checkpointDelayMillis = FormatUtils.getTimeDuration((String)nifiProperties.getFlowFileRepositoryCheckpointInterval(), (TimeUnit)TimeUnit.MILLISECONDS);
        this.checkpointExecutor = Executors.newSingleThreadScheduledExecutor();
    }

    private static boolean isSequentialAccessWAL(String walImplementation) {
        return walImplementation.equals("org.apache.nifi.wali.SequentialAccessWriteAheadLog") || walImplementation.equals(ENCRYPTED_SEQUENTIAL_ACCESS_WAL);
    }

    public void initialize(ResourceClaimManager claimManager) throws IOException {
        this.initialize(claimManager, (RepositoryRecordSerdeFactory)new StandardRepositoryRecordSerdeFactory(claimManager));
    }

    public void initialize(ResourceClaimManager claimManager, RepositoryRecordSerdeFactory serdeFactory) throws IOException {
        this.claimManager = claimManager;
        for (File file : this.flowFileRepositoryPaths) {
            Files.createDirectories(file.toPath(), new FileAttribute[0]);
        }
        this.serdeFactory = serdeFactory;
        if (WriteAheadFlowFileRepository.isSequentialAccessWAL(this.walImplementation)) {
            this.wal = new SequentialAccessWriteAheadLog(this.flowFileRepositoryPaths.get(0), (SerDeFactory)serdeFactory, (SyncListener)this);
        } else if (this.walImplementation.equals(MINIMAL_LOCKING_WALI)) {
            SortedSet paths = this.flowFileRepositoryPaths.stream().map(File::toPath).collect(Collectors.toCollection(TreeSet::new));
            this.wal = new MinimalLockingWriteAheadLog(paths, 1, (SerDeFactory)serdeFactory, (SyncListener)this);
        } else {
            throw new IllegalStateException("Cannot create Write-Ahead Log because the configured property 'nifi.flowfile.repository.wal.implementation' has an invalid value of '" + this.walImplementation + "'. Please update nifi.properties to indicate a valid value for this property.");
        }
        logger.info("Initialized FlowFile Repository");
    }

    public void close() throws IOException {
        if (this.checkpointFuture != null) {
            this.checkpointFuture.cancel(false);
        }
        this.checkpointExecutor.shutdown();
        this.wal.shutdown();
    }

    public boolean isVolatile() {
        return false;
    }

    public Map<ResourceClaim, Set<ResourceClaimReference>> findResourceClaimReferences(Set<ResourceClaim> resourceClaims, FlowFileSwapManager swapManager) {
        if (!WriteAheadFlowFileRepository.isSequentialAccessWAL(this.walImplementation)) {
            return null;
        }
        HashMap<ResourceClaim, Set<ResourceClaimReference>> references = new HashMap<ResourceClaim, Set<ResourceClaimReference>>();
        SnapshotCapture snapshot = ((SequentialAccessWriteAheadLog)this.wal).captureSnapshot();
        for (SerializedRepositoryRecord repositoryRecord : snapshot.getRecords().values()) {
            ResourceClaim resourceClaim;
            ContentClaim contentClaim = repositoryRecord.getContentClaim();
            if (contentClaim == null || !resourceClaims.contains(resourceClaim = contentClaim.getResourceClaim())) continue;
            Set claimReferences = references.computeIfAbsent(resourceClaim, key -> new HashSet());
            claimReferences.add(this.createResourceClaimReference(repositoryRecord));
        }
        for (String swapLocation : snapshot.getSwapLocations()) {
            String queueIdentifier = swapManager.getQueueIdentifier(swapLocation);
            ResourceClaimReference swapReference = this.createResourceClaimReference(swapLocation, queueIdentifier);
            try {
                SwapSummary swapSummary = swapManager.getSwapSummary(swapLocation);
                for (ResourceClaim resourceClaim : swapSummary.getResourceClaims()) {
                    if (!resourceClaims.contains(resourceClaim)) continue;
                    Set claimReferences = references.computeIfAbsent(resourceClaim, key -> new HashSet());
                    claimReferences.add(swapReference);
                }
            }
            catch (Exception e) {
                logger.warn("Failed to read swap file " + swapLocation + " when attempting to find resource claim references", (Throwable)e);
            }
        }
        return references;
    }

    private ResourceClaimReference createResourceClaimReference(final String swapLocation, final String queueIdentifier) {
        return new ResourceClaimReference(){

            public String getQueueIdentifier() {
                return queueIdentifier;
            }

            public boolean isSwappedOut() {
                return true;
            }

            public String getFlowFileUuid() {
                return null;
            }

            public String getSwapLocation() {
                return swapLocation;
            }

            public String toString() {
                return "Swap File[location=" + this.getSwapLocation() + ", queue=" + this.getQueueIdentifier() + "]";
            }

            public int hashCode() {
                return Objects.hash(queueIdentifier, swapLocation);
            }

            public boolean equals(Object obj) {
                if (obj == null) {
                    return false;
                }
                if (obj == this) {
                    return true;
                }
                if (obj.getClass() != this.getClass()) {
                    return false;
                }
                ResourceClaimReference other = (ResourceClaimReference)obj;
                return Objects.equals(queueIdentifier, other.getQueueIdentifier()) && Objects.equals(swapLocation, other.getSwapLocation());
            }
        };
    }

    private ResourceClaimReference createResourceClaimReference(SerializedRepositoryRecord repositoryRecord) {
        final String queueIdentifier = repositoryRecord.getQueueIdentifier();
        final String flowFileUuid = repositoryRecord.getFlowFileRecord().getAttribute(CoreAttributes.UUID.key());
        return new ResourceClaimReference(){

            public String getQueueIdentifier() {
                return queueIdentifier;
            }

            public boolean isSwappedOut() {
                return false;
            }

            public String getFlowFileUuid() {
                return flowFileUuid;
            }

            public String getSwapLocation() {
                return null;
            }

            public String toString() {
                return "FlowFile[uuid=" + this.getFlowFileUuid() + ", queue=" + this.getQueueIdentifier() + "]";
            }

            public int hashCode() {
                return Objects.hash(queueIdentifier, flowFileUuid);
            }

            public boolean equals(Object obj) {
                if (obj == null) {
                    return false;
                }
                if (obj == this) {
                    return true;
                }
                if (obj.getClass() != this.getClass()) {
                    return false;
                }
                ResourceClaimReference other = (ResourceClaimReference)obj;
                return Objects.equals(queueIdentifier, other.getQueueIdentifier()) && Objects.equals(flowFileUuid, other.getFlowFileUuid());
            }
        };
    }

    public long getStorageCapacity() throws IOException {
        long capacity = 0L;
        for (File file : this.flowFileRepositoryPaths) {
            capacity += Files.getFileStore(file.toPath()).getTotalSpace();
        }
        return capacity;
    }

    public long getUsableStorageSpace() throws IOException {
        long usableSpace = 0L;
        for (File file : this.flowFileRepositoryPaths) {
            usableSpace += Files.getFileStore(file.toPath()).getUsableSpace();
        }
        return usableSpace;
    }

    public String getFileStoreName() {
        Path path = this.flowFileRepositoryPaths.iterator().next().toPath();
        try {
            return Files.getFileStore(path).name();
        }
        catch (IOException e) {
            return null;
        }
    }

    public void updateRepository(Collection<RepositoryRecord> records) throws IOException {
        this.updateRepository(records, this.alwaysSync);
    }

    private void markDestructable(ResourceClaim resourceClaim) {
        if (resourceClaim == null) {
            return;
        }
        this.claimManager.markDestructable(resourceClaim);
    }

    private boolean isDestructable(ContentClaim claim) {
        if (claim == null) {
            return false;
        }
        ResourceClaim resourceClaim = claim.getResourceClaim();
        if (resourceClaim == null) {
            return false;
        }
        return !resourceClaim.isInUse();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isValidSwapLocationSuffix(String swapLocationSuffix) {
        Set<String> set = this.swapLocationSuffixes;
        synchronized (set) {
            return this.swapLocationSuffixes.contains(WriteAheadFlowFileRepository.normalizeSwapLocation(swapLocationSuffix));
        }
    }

    private void updateRepository(Collection<RepositoryRecord> records, boolean sync) throws IOException {
        for (RepositoryRecord record2 : records) {
            if (record2.getType() == RepositoryRecordType.DELETE || record2.getType() == RepositoryRecordType.CONTENTMISSING || record2.getType() == RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS || record2.getDestination() != null) continue;
            throw new IllegalArgumentException("Record " + record2 + " has no destination and Type is " + record2.getType());
        }
        Map<Boolean, List<RepositoryRecord>> partitionedRecords = records.stream().collect(Collectors.partitioningBy(record -> record.getType() == RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS));
        List<Object> recordsForWal = partitionedRecords.get(Boolean.FALSE);
        if (recordsForWal == null) {
            recordsForWal = Collections.emptyList();
        }
        ArrayList serializedRecords = new ArrayList(recordsForWal.size());
        recordsForWal.forEach(record -> serializedRecords.add(new LiveSerializedRepositoryRecord((RepositoryRecord)record)));
        int partitionIndex = this.wal.update(serializedRecords, sync);
        this.updateContentClaims(records, partitionIndex);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void updateContentClaims(Collection<RepositoryRecord> repositoryRecords, int partitionIndex) {
        HashSet<ResourceClaim> claimsToAdd = new HashSet<ResourceClaim>();
        HashSet<String> swapLocationsAdded = new HashSet<String>();
        HashSet<String> swapLocationsRemoved = new HashSet<String>();
        for (RepositoryRecord record : repositoryRecords) {
            String swapLocation;
            this.updateClaimCounts(record);
            if (record.getType() == RepositoryRecordType.DELETE) {
                if (record.getCurrentClaim() != null && this.isDestructable(record.getCurrentClaim())) {
                    claimsToAdd.add(record.getCurrentClaim().getResourceClaim());
                }
                if (record.getOriginalClaim() == null || record.getOriginalClaim().equals(record.getCurrentClaim()) || !this.isDestructable(record.getOriginalClaim())) continue;
                claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
                continue;
            }
            if (record.getType() == RepositoryRecordType.UPDATE) {
                if (record.getOriginalClaim() == null || record.getCurrentClaim() == record.getOriginalClaim() || !this.isDestructable(record.getOriginalClaim())) continue;
                claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
                continue;
            }
            if (record.getType() == RepositoryRecordType.SWAP_OUT) {
                swapLocation = record.getSwapLocation();
                swapLocationsAdded.add(swapLocation);
                swapLocationsRemoved.remove(swapLocation);
                continue;
            }
            if (record.getType() != RepositoryRecordType.SWAP_IN) continue;
            swapLocation = record.getSwapLocation();
            swapLocationsRemoved.add(swapLocation);
            swapLocationsAdded.remove(swapLocation);
        }
        for (RepositoryRecord record : repositoryRecords) {
            List transientClaims = record.getTransientClaims();
            if (transientClaims == null) continue;
            for (ContentClaim transientClaim : transientClaims) {
                if (!this.isDestructable(transientClaim)) continue;
                claimsToAdd.add(transientClaim.getResourceClaim());
            }
        }
        if (!swapLocationsAdded.isEmpty() || !swapLocationsRemoved.isEmpty()) {
            Set<String> set = this.swapLocationSuffixes;
            synchronized (set) {
                swapLocationsRemoved.forEach(loc -> this.swapLocationSuffixes.remove(WriteAheadFlowFileRepository.normalizeSwapLocation(loc)));
                swapLocationsAdded.forEach(loc -> this.swapLocationSuffixes.add(WriteAheadFlowFileRepository.normalizeSwapLocation(loc)));
            }
        }
        if (!claimsToAdd.isEmpty()) {
            BlockingQueue existingClaimQueue;
            Integer partitionKey = partitionIndex;
            BlockingQueue claimQueue = (LinkedBlockingQueue)this.claimsAwaitingDestruction.get(partitionKey);
            if (claimQueue == null && (existingClaimQueue = (BlockingQueue)this.claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue = new LinkedBlockingQueue())) != null) {
                claimQueue = existingClaimQueue;
            }
            claimQueue.addAll(claimsToAdd);
        }
    }

    private void updateClaimCounts(RepositoryRecord record) {
        ContentClaim currentClaim = record.getCurrentClaim();
        ContentClaim originalClaim = record.getOriginalClaim();
        if (record.getType() == RepositoryRecordType.DELETE || record.getType() == RepositoryRecordType.CONTENTMISSING) {
            this.decrementClaimCount(currentClaim);
        }
        if (record.isContentModified()) {
            this.decrementClaimCount(originalClaim);
        }
    }

    private void decrementClaimCount(ContentClaim claim) {
        if (claim == null) {
            return;
        }
        this.claimManager.decrementClaimantCount(claim.getResourceClaim());
    }

    protected static String normalizeSwapLocation(String swapLocation) {
        if (swapLocation == null) {
            return null;
        }
        String normalizedPath = swapLocation.replace("\\", "/");
        String withoutTrailing = normalizedPath.endsWith("/") && normalizedPath.length() > 1 ? normalizedPath.substring(0, normalizedPath.length() - 1) : normalizedPath;
        String pathRemoved = WriteAheadFlowFileRepository.getLocationSuffix(withoutTrailing);
        String normalized = StringUtils.substringBefore((String)pathRemoved, (String)".");
        return normalized;
    }

    private static String getLocationSuffix(String swapLocation) {
        int lastIndex = swapLocation.lastIndexOf("/");
        if (lastIndex < 0 || lastIndex >= swapLocation.length() - 1) {
            return swapLocation;
        }
        return swapLocation.substring(lastIndex + 1);
    }

    public void onSync(int partitionIndex) {
        BlockingQueue claimQueue = (BlockingQueue)this.claimsAwaitingDestruction.get(partitionIndex);
        if (claimQueue == null) {
            return;
        }
        HashSet claimsToDestroy = new HashSet();
        claimQueue.drainTo(claimsToDestroy);
        for (ResourceClaim claim : claimsToDestroy) {
            this.markDestructable(claim);
        }
    }

    public void onGlobalSync() {
        for (BlockingQueue claimQueue : this.claimsAwaitingDestruction.values()) {
            HashSet claimsToDestroy = new HashSet();
            claimQueue.drainTo(claimsToDestroy);
            for (ResourceClaim claim : claimsToDestroy) {
                this.markDestructable(claim);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void swapFlowFilesOut(List<FlowFileRecord> swappedOut, FlowFileQueue queue, String swapLocation) throws IOException {
        ArrayList<StandardRepositoryRecord> repoRecords = new ArrayList<StandardRepositoryRecord>();
        if (swappedOut == null || swappedOut.isEmpty()) {
            return;
        }
        for (FlowFileRecord swapRecord : swappedOut) {
            StandardRepositoryRecord repoRecord = new StandardRepositoryRecord(queue, swapRecord, swapLocation);
            repoRecords.add(repoRecord);
        }
        ArrayList serializedRepositoryRecords = new ArrayList(repoRecords.size());
        repoRecords.forEach(record -> serializedRepositoryRecords.add(new LiveSerializedRepositoryRecord((RepositoryRecord)record)));
        this.wal.update(serializedRepositoryRecords, true);
        Set<String> set = this.swapLocationSuffixes;
        synchronized (set) {
            this.swapLocationSuffixes.add(WriteAheadFlowFileRepository.normalizeSwapLocation(swapLocation));
        }
        logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{swappedOut.size(), queue, swapLocation});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void swapFlowFilesIn(String swapLocation, List<FlowFileRecord> swapRecords, FlowFileQueue queue) throws IOException {
        ArrayList<RepositoryRecord> repoRecords = new ArrayList<RepositoryRecord>();
        for (FlowFileRecord swapRecord : swapRecords) {
            StandardRepositoryRecord repoRecord = new StandardRepositoryRecord(queue, swapRecord);
            repoRecord.setSwapLocation(swapLocation);
            repoRecord.setDestination(queue);
            repoRecords.add((RepositoryRecord)repoRecord);
        }
        this.updateRepository(repoRecords, true);
        Set<String> set = this.swapLocationSuffixes;
        synchronized (set) {
            this.swapLocationSuffixes.remove(WriteAheadFlowFileRepository.normalizeSwapLocation(swapLocation));
        }
        logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue});
    }

    void deleteRecursively(File dir) {
        File[] children = dir.listFiles();
        if (children != null) {
            for (File child : children) {
                boolean deleted = child.delete();
                if (deleted) continue;
                logger.warn("Failed to delete old file {}; this file should be cleaned up manually", (Object)child);
            }
        }
        if (!dir.delete()) {
            logger.warn("Failed to delete old directory {}; this directory should be cleaned up manually", (Object)dir);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<Collection<SerializedRepositoryRecord>> migrateFromSequentialAccessLog(WriteAheadRepository<SerializedRepositoryRecord> toUpdate) throws IOException {
        File partialFile;
        Collection recordList;
        String recoveryDirName = this.nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX);
        File recoveryDir = new File(recoveryDirName);
        if (!recoveryDir.exists()) {
            return Optional.empty();
        }
        SequentialAccessWriteAheadLog recoveryWal = new SequentialAccessWriteAheadLog(recoveryDir, (SerDeFactory)this.serdeFactory, (SyncListener)this);
        logger.info("Encountered FlowFile Repository that was written using the Sequential Access Write Ahead Log. Will recover from this version.");
        try {
            recordList = recoveryWal.recoverRecords();
        }
        finally {
            recoveryWal.shutdown();
        }
        toUpdate.update(recordList, true);
        logger.info("Successfully recovered files from existing Write-Ahead Log and transitioned to new Write-Ahead Log. Will not delete old files.");
        File journalsDir = new File(recoveryDir, "journals");
        this.deleteRecursively(journalsDir);
        File checkpointFile = new File(recoveryDir, "checkpoint");
        if (!checkpointFile.delete() && checkpointFile.exists()) {
            logger.warn("Failed to delete old file {}; this file should be cleaned up manually", (Object)checkpointFile);
        }
        if (!(partialFile = new File(recoveryDir, "checkpoint.partial")).delete() && partialFile.exists()) {
            logger.warn("Failed to delete old file {}; this file should be cleaned up manually", (Object)partialFile);
        }
        return Optional.of(recordList);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<Collection<SerializedRepositoryRecord>> migrateFromMinimalLockingLog(WriteAheadRepository<SerializedRepositoryRecord> toUpdate) throws IOException {
        Collection recordList;
        ArrayList<File> partitionDirs = new ArrayList<File>();
        for (File recoveryFile : this.recoveryFiles) {
            File[] partitions = recoveryFile.listFiles(file -> file.getName().startsWith("partition-"));
            for (File partition : partitions) {
                partitionDirs.add(partition);
            }
        }
        if (partitionDirs == null || partitionDirs.isEmpty()) {
            return Optional.empty();
        }
        logger.info("Encountered FlowFile Repository that was written using the 'Minimal Locking Write-Ahead Log'. Will recover from this version and re-write the repository using the new version of the Write-Ahead Log.");
        SortedSet paths = this.recoveryFiles.stream().map(File::toPath).collect(Collectors.toCollection(TreeSet::new));
        MinimalLockingWriteAheadLog minimalLockingWal = new MinimalLockingWriteAheadLog(paths, partitionDirs.size(), (SerDeFactory)this.serdeFactory, null);
        try {
            recordList = minimalLockingWal.recoverRecords();
        }
        finally {
            minimalLockingWal.shutdown();
        }
        toUpdate.update(recordList, true);
        logger.info("Successfully recovered files from existing Write-Ahead Log and transitioned to new implementation. Will now delete old files.");
        for (File partitionDir : partitionDirs) {
            this.deleteRecursively(partitionDir);
        }
        for (File recoveryFile : this.recoveryFiles) {
            File partialFile;
            File snapshotFile = new File(recoveryFile, "snapshot");
            if (!snapshotFile.delete() && snapshotFile.exists()) {
                logger.warn("Failed to delete old file {}; this file should be cleaned up manually", (Object)snapshotFile);
            }
            if ((partialFile = new File(recoveryFile, "snapshot.partial")).delete() || !partialFile.exists()) continue;
            logger.warn("Failed to delete old file {}; this file should be cleaned up manually", (Object)partialFile);
        }
        return Optional.of(recordList);
    }

    public Set<String> findQueuesWithFlowFiles(FlowFileSwapManager swapManager) throws IOException {
        String queueId;
        this.recoveredRecords = this.wal.recoverRecords();
        HashSet<String> queueIds = new HashSet<String>();
        for (SerializedRepositoryRecord record : this.recoveredRecords) {
            RepositoryRecordType recordType = record.getType();
            if (recordType != RepositoryRecordType.CREATE && recordType != RepositoryRecordType.UPDATE || (queueId = record.getQueueIdentifier()) == null) continue;
            queueIds.add(queueId);
        }
        Set recoveredSwapLocations = this.wal.getRecoveredSwapLocations();
        for (String swapLocation : recoveredSwapLocations) {
            queueId = swapManager.getQueueIdentifier(swapLocation);
            queueIds.add(queueId);
        }
        return queueIds;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long loadFlowFiles(QueueProvider queueProvider) throws IOException {
        Collection recordList = this.recoveredRecords == null ? this.wal.recoverRecords() : this.recoveredRecords;
        Set recoveredSwapLocations = this.wal.getRecoveredSwapLocations();
        Set<String> set = this.swapLocationSuffixes;
        synchronized (set) {
            recoveredSwapLocations.forEach(loc -> this.swapLocationSuffixes.add(WriteAheadFlowFileRepository.normalizeSwapLocation(loc)));
            logger.debug("Recovered {} Swap Files: {}", (Object)this.swapLocationSuffixes.size(), this.swapLocationSuffixes);
        }
        if (recordList == null || recordList.isEmpty()) {
            recordList = WriteAheadFlowFileRepository.isSequentialAccessWAL(this.walImplementation) ? (Collection)this.migrateFromMinimalLockingLog(this.wal).orElse(new ArrayList()) : (Collection)this.migrateFromSequentialAccessLog(this.wal).orElse(new ArrayList());
        }
        HashMap<String, FlowFileQueue> queueMap = new HashMap<String, FlowFileQueue>();
        for (FlowFileQueue queue : queueProvider.getAllQueues()) {
            queueMap.put(queue.getIdentifier(), queue);
        }
        ArrayList<ReconstitutedSerializedRepositoryRecord> dropRecords = new ArrayList<ReconstitutedSerializedRepositoryRecord>();
        int numFlowFilesMissingQueue = 0;
        long maxId = 0L;
        for (SerializedRepositoryRecord record : recordList) {
            boolean orphaned;
            String queueId;
            long recordId = this.serdeFactory.getRecordIdentifier(record);
            if (recordId > maxId) {
                maxId = recordId;
            }
            if ((queueId = record.getQueueIdentifier()) == null) {
                ++numFlowFilesMissingQueue;
                logger.warn("Encountered Repository Record (id={}) with no Queue Identifier. Dropping this FlowFile", (Object)recordId);
                dropRecords.add(new ReconstitutedSerializedRepositoryRecord.Builder().flowFileRecord(record.getFlowFileRecord()).swapLocation(record.getSwapLocation()).type(RepositoryRecordType.DELETE).build());
                continue;
            }
            ContentClaim claim = record.getContentClaim();
            FlowFileQueue flowFileQueue = (FlowFileQueue)queueMap.get(queueId);
            boolean bl = orphaned = flowFileQueue == null;
            if (orphaned) {
                ++numFlowFilesMissingQueue;
                if (this.isRetainOrphanedFlowFiles()) {
                    if (claim == null) {
                        logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. This FlowFile will not be restored to any FlowFile Queue in the flow. However, it will remain in the FlowFile Repository in case the flow containing this queue is later restored.", (Object)recordId, (Object)queueId);
                        continue;
                    }
                    this.claimManager.incrementClaimantCount(claim.getResourceClaim());
                    this.orphanedResourceClaims.add(claim.getResourceClaim());
                    logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. This FlowFile will not be restored to any FlowFile Queue in the flow. However, it will remain in the FlowFile Repository in case the flow containing this queue is later restored. This may result in the following Content Claim not being cleaned up by the Content Repository: {}", new Object[]{recordId, queueId, claim});
                    continue;
                }
                dropRecords.add(new ReconstitutedSerializedRepositoryRecord.Builder().flowFileRecord(record.getFlowFileRecord()).swapLocation(record.getSwapLocation()).type(RepositoryRecordType.DELETE).build());
                logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. This FlowFile will be dropped.", (Object)recordId, (Object)queueId);
                continue;
            }
            if (claim != null) {
                this.claimManager.incrementClaimantCount(claim.getResourceClaim());
            }
            flowFileQueue.put(record.getFlowFileRecord());
        }
        this.recoveredRecords = null;
        this.flowFileSequenceGenerator.set(maxId + 1L);
        logger.info("Successfully restored {} FlowFiles and {} Swap Files", (Object)(recordList.size() - numFlowFilesMissingQueue), (Object)recoveredSwapLocations.size());
        if (numFlowFilesMissingQueue > 0) {
            logger.warn("On recovery, found {} FlowFiles whose queues no longer exists.", (Object)numFlowFilesMissingQueue);
        }
        if (dropRecords.isEmpty()) {
            logger.debug("No Drop Records to update Repository with");
        } else {
            long updateStart = System.nanoTime();
            this.wal.update(dropRecords, true);
            long updateEnd = System.nanoTime();
            long updateMillis = TimeUnit.MILLISECONDS.convert(updateEnd - updateStart, TimeUnit.NANOSECONDS);
            logger.info("Successfully updated FlowFile Repository with {} Drop Records due to missing queues in {} milliseconds", (Object)dropRecords.size(), (Object)updateMillis);
        }
        Runnable checkpointRunnable = new Runnable(){

            @Override
            public void run() {
                try {
                    logger.info("Initiating checkpoint of FlowFile Repository");
                    long start = System.nanoTime();
                    int numRecordsCheckpointed = WriteAheadFlowFileRepository.this.checkpoint();
                    long end = System.nanoTime();
                    long millis = TimeUnit.MILLISECONDS.convert(end - start, TimeUnit.NANOSECONDS);
                    logger.info("Successfully checkpointed FlowFile Repository with {} records in {} milliseconds", (Object)numRecordsCheckpointed, (Object)millis);
                }
                catch (Throwable t) {
                    logger.error("Unable to checkpoint FlowFile Repository due to " + t.toString(), t);
                }
            }
        };
        this.checkpointFuture = this.checkpointExecutor.scheduleWithFixedDelay(checkpointRunnable, this.checkpointDelayMillis, this.checkpointDelayMillis, TimeUnit.MILLISECONDS);
        return maxId;
    }

    private boolean isRetainOrphanedFlowFiles() {
        return this.retainOrphanedFlowFiles;
    }

    public Set<ResourceClaim> findOrphanedResourceClaims() {
        return Collections.unmodifiableSet(this.orphanedResourceClaims);
    }

    public void updateMaxFlowFileIdentifier(long maxId) {
        long currentId;
        boolean updated;
        do {
            if ((currentId = this.flowFileSequenceGenerator.get()) < maxId) continue;
            return;
        } while (!(updated = this.flowFileSequenceGenerator.compareAndSet(currentId, maxId)));
    }

    public long getNextFlowFileSequence() {
        return this.flowFileSequenceGenerator.getAndIncrement();
    }

    public long getMaxFlowFileIdentifier() throws IOException {
        return this.flowFileSequenceGenerator.get() - 1L;
    }

    public int checkpoint() throws IOException {
        return this.wal.checkpoint();
    }
}

