/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.nifi.controller.FlowFileQueue;
import org.apache.nifi.controller.repository.ConnectionSwapInfo;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.QueueProvider;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ContentClaimManager;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.processor.QueueSize;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileSystemSwapManager
implements FlowFileSwapManager {
    public static final int MINIMUM_SWAP_COUNT = 10000;
    private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap");
    private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part");
    public static final int SWAP_ENCODING_VERSION = 6;
    public static final String EVENT_CATEGORY = "Swap FlowFiles";
    private final ScheduledExecutorService swapQueueIdentifierExecutor;
    private final ScheduledExecutorService swapInExecutor;
    private volatile FlowFileRepository flowFileRepository;
    private volatile EventReporter eventReporter;
    private final ConcurrentMap<FlowFileQueue, QueueLockWrapper> swapMap = new ConcurrentHashMap<FlowFileQueue, QueueLockWrapper>();
    private final File storageDirectory;
    private final long swapInMillis;
    private final long swapOutMillis;
    private final int swapOutThreadCount;
    private ContentClaimManager claimManager;
    private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);

    public FileSystemSwapManager() {
        NiFiProperties properties = NiFiProperties.getInstance();
        Path flowFileRepoPath = properties.getFlowFileRepositoryPath();
        this.storageDirectory = flowFileRepoPath.resolve("swap").toFile();
        if (!this.storageDirectory.exists() && !this.storageDirectory.mkdirs()) {
            throw new RuntimeException("Cannot create Swap Storage directory " + this.storageDirectory.getAbsolutePath());
        }
        this.swapQueueIdentifierExecutor = new FlowEngine(1, "Identifies Queues for FlowFile Swapping");
        this.swapInMillis = FormatUtils.getTimeDuration((String)properties.getSwapInPeriod(), (TimeUnit)TimeUnit.MILLISECONDS);
        this.swapOutMillis = FormatUtils.getTimeDuration((String)properties.getSwapOutPeriod(), (TimeUnit)TimeUnit.MILLISECONDS);
        this.swapOutThreadCount = properties.getSwapOutThreads();
        this.swapInExecutor = new FlowEngine(properties.getSwapInThreads(), "Swap In FlowFiles");
    }

    public void purge() {
        File[] swapFiles = this.storageDirectory.listFiles(new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return SWAP_FILE_PATTERN.matcher(name).matches();
            }
        });
        if (swapFiles != null) {
            for (File file : swapFiles) {
                if (file.delete() || !file.exists()) continue;
                logger.warn("Failed to delete SWAP file {}", (Object)file);
            }
        }
    }

    public synchronized void start(FlowFileRepository flowFileRepository, QueueProvider connectionProvider, ContentClaimManager claimManager, EventReporter eventReporter) {
        this.claimManager = claimManager;
        this.flowFileRepository = flowFileRepository;
        this.eventReporter = eventReporter;
        this.swapQueueIdentifierExecutor.scheduleWithFixedDelay(new QueueIdentifier(connectionProvider), this.swapOutMillis, this.swapOutMillis, TimeUnit.MILLISECONDS);
        this.swapInExecutor.scheduleWithFixedDelay(new SwapInTask(), this.swapInMillis, this.swapInMillis, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int serializeFlowFiles(List<FlowFileRecord> toSwap, FlowFileQueue queue, String swapLocation, OutputStream destination) throws IOException {
        if (toSwap == null || toSwap.isEmpty()) {
            return 0;
        }
        long contentSize = 0L;
        for (FlowFileRecord record : toSwap) {
            contentSize += record.getSize();
        }
        BufferedOutputStream bufferedOut = new BufferedOutputStream(destination);
        DataOutputStream out = new DataOutputStream((OutputStream)bufferedOut);
        try {
            out.writeInt(6);
            out.writeUTF(queue.getIdentifier());
            out.writeInt(toSwap.size());
            out.writeLong(contentSize);
            for (FlowFileRecord flowFile : toSwap) {
                out.writeLong(flowFile.getId());
                out.writeLong(flowFile.getEntryDate());
                Set lineageIdentifiers = flowFile.getLineageIdentifiers();
                out.writeInt(lineageIdentifiers.size());
                for (String lineageId : lineageIdentifiers) {
                    out.writeUTF(lineageId);
                }
                out.writeLong(flowFile.getLineageStartDate());
                out.writeLong(flowFile.getLastQueueDate());
                out.writeLong(flowFile.getSize());
                ContentClaim claim = flowFile.getContentClaim();
                if (claim == null) {
                    out.writeBoolean(false);
                } else {
                    out.writeBoolean(true);
                    out.writeUTF(claim.getId());
                    out.writeUTF(claim.getContainer());
                    out.writeUTF(claim.getSection());
                    out.writeLong(flowFile.getContentClaimOffset());
                    out.writeBoolean(claim.isLossTolerant());
                }
                Map attributes = flowFile.getAttributes();
                out.writeInt(attributes.size());
                for (Map.Entry entry : attributes.entrySet()) {
                    this.writeString((String)entry.getKey(), out);
                    this.writeString((String)entry.getValue(), out);
                }
            }
        }
        finally {
            out.flush();
        }
        logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{toSwap.size(), queue, swapLocation});
        return toSwap.size();
    }

    private void writeString(String toWrite, OutputStream out) throws IOException {
        byte[] bytes = toWrite.getBytes("UTF-8");
        int utflen = bytes.length;
        if (utflen < 65535) {
            out.write(utflen >>> 8);
            out.write(utflen);
            out.write(bytes);
        } else {
            out.write(255);
            out.write(255);
            out.write(utflen >>> 24);
            out.write(utflen >>> 16);
            out.write(utflen >>> 8);
            out.write(utflen);
            out.write(bytes);
        }
    }

    static List<FlowFileRecord> deserializeFlowFiles(DataInputStream in, FlowFileQueue queue, ContentClaimManager claimManager) throws IOException {
        int swapEncodingVersion = in.readInt();
        if (swapEncodingVersion > 6) {
            throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is " + swapEncodingVersion + ", which is too new (expecting " + 6 + " or less)");
        }
        String connectionId = in.readUTF();
        if (!connectionId.equals(queue.getIdentifier())) {
            throw new IllegalArgumentException("Cannot restore Swap File because the file indicates that records belong to Connection with ID " + connectionId + " but received Connection " + queue);
        }
        int numRecords = in.readInt();
        in.readLong();
        return FileSystemSwapManager.deserializeFlowFiles(in, numRecords, queue, swapEncodingVersion, false, claimManager);
    }

    static List<FlowFileRecord> deserializeFlowFiles(DataInputStream in, int numFlowFiles, FlowFileQueue queue, int serializationVersion, boolean incrementContentClaims, ContentClaimManager claimManager) throws IOException {
        ArrayList<FlowFileRecord> flowFiles = new ArrayList<FlowFileRecord>();
        for (int i = 0; i < numFlowFiles; ++i) {
            boolean hasClaim;
            int action;
            if (serializationVersion < 3 && (action = in.read()) != 1) {
                throw new IOException("Swap File is version " + serializationVersion + " but did not contain a 'UPDATE' record type");
            }
            StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
            ffBuilder.id(in.readLong());
            ffBuilder.entryDate(in.readLong());
            if (serializationVersion > 1) {
                int numLineageIdentifiers = in.readInt();
                HashSet<String> lineageIdentifiers = new HashSet<String>(numLineageIdentifiers);
                for (int lineageIdIdx = 0; lineageIdIdx < numLineageIdentifiers; ++lineageIdIdx) {
                    lineageIdentifiers.add(in.readUTF());
                }
                ffBuilder.lineageIdentifiers(lineageIdentifiers);
                ffBuilder.lineageStartDate(in.readLong());
                if (serializationVersion > 5) {
                    ffBuilder.lastQueueDate(in.readLong());
                }
            }
            ffBuilder.size(in.readLong());
            if (serializationVersion < 3) {
                FileSystemSwapManager.readString(in);
            }
            if (hasClaim = in.readBoolean()) {
                String claimId = serializationVersion < 5 ? String.valueOf(in.readLong()) : in.readUTF();
                String container = in.readUTF();
                String section = in.readUTF();
                long claimOffset = in.readLong();
                boolean lossTolerant = serializationVersion >= 4 ? in.readBoolean() : false;
                ContentClaim claim = claimManager.newContentClaim(container, section, claimId, lossTolerant);
                if (incrementContentClaims) {
                    claimManager.incrementClaimantCount(claim);
                }
                ffBuilder.contentClaim(claim);
                ffBuilder.contentClaimOffset(claimOffset);
            }
            boolean attributesChanged = true;
            if (serializationVersion < 3) {
                attributesChanged = in.readBoolean();
            }
            if (attributesChanged) {
                int numAttributes = in.readInt();
                for (int j = 0; j < numAttributes; ++j) {
                    String key = FileSystemSwapManager.readString(in);
                    String value = FileSystemSwapManager.readString(in);
                    ffBuilder.addAttribute(key, value);
                }
            }
            FlowFileRecord record = ffBuilder.build();
            flowFiles.add(record);
        }
        return flowFiles;
    }

    private static String readString(InputStream in) throws IOException {
        Integer numBytes = FileSystemSwapManager.readFieldLength(in);
        if (numBytes == null) {
            throw new EOFException();
        }
        byte[] bytes = new byte[numBytes.intValue()];
        FileSystemSwapManager.fillBuffer(in, bytes, numBytes);
        return new String(bytes, "UTF-8");
    }

    private static Integer readFieldLength(InputStream in) throws IOException {
        int firstValue = in.read();
        int secondValue = in.read();
        if (firstValue < 0) {
            return null;
        }
        if (secondValue < 0) {
            throw new EOFException();
        }
        if (firstValue == 255 && secondValue == 255) {
            int ch4;
            int ch3;
            int ch2;
            int ch1 = in.read();
            if ((ch1 | (ch2 = in.read()) | (ch3 = in.read()) | (ch4 = in.read())) < 0) {
                throw new EOFException();
            }
            return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
        }
        return (firstValue << 8) + secondValue;
    }

    private static void fillBuffer(InputStream in, byte[] buffer, int length) throws IOException {
        int bytesRead;
        int totalBytesRead = 0;
        while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) {
            totalBytesRead += bytesRead;
        }
        if (totalBytesRead != length) {
            throw new EOFException();
        }
    }

    private void error(String error, Throwable t) {
        this.error(error);
        if (logger.isDebugEnabled()) {
            logger.error("", t);
        }
    }

    private void error(String error) {
        logger.error(error);
        if (this.eventReporter != null) {
            this.eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, error);
        }
    }

    private void warn(String warning) {
        logger.warn(warning);
        if (this.eventReporter != null) {
            this.eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, warning);
        }
    }

    public long recoverSwappedFlowFiles(QueueProvider queueProvider, ContentClaimManager claimManager) {
        File[] swapFiles = this.storageDirectory.listFiles(new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return SWAP_FILE_PATTERN.matcher(name).matches() || TEMP_SWAP_FILE_PATTERN.matcher(name).matches();
            }
        });
        if (swapFiles == null) {
            return 0L;
        }
        Collection allQueues = queueProvider.getAllQueues();
        HashMap<String, FlowFileQueue> queueMap = new HashMap<String, FlowFileQueue>();
        for (FlowFileQueue queue : allQueues) {
            queueMap.put(queue.getIdentifier(), queue);
        }
        ConnectionSwapInfo swapInfo = new ConnectionSwapInfo();
        int swappedCount = 0;
        long swappedBytes = 0L;
        long maxRecoveredId = 0L;
        for (File swapFile : swapFiles) {
            if (TEMP_SWAP_FILE_PATTERN.matcher(swapFile.getName()).matches()) {
                if (swapFile.delete()) {
                    logger.info("Removed incomplete/temporary Swap File " + swapFile);
                    continue;
                }
                this.warn("Failed to remove incomplete/temporary Swap File " + swapFile + "; this file should be cleaned up manually");
                continue;
            }
            try (FileInputStream fis = new FileInputStream(swapFile);
                 BufferedInputStream bufferedIn = new BufferedInputStream(fis);
                 DataInputStream in = new DataInputStream(bufferedIn);){
                int swapEncodingVersion = in.readInt();
                if (swapEncodingVersion > 6) {
                    String errMsg = "Cannot swap FlowFiles in from " + swapFile + " because the encoding version is " + swapEncodingVersion + ", which is too new (expecting " + 6 + " or less)";
                    this.eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, errMsg);
                    throw new IOException(errMsg);
                }
                String connectionId = in.readUTF();
                FlowFileQueue queue = (FlowFileQueue)queueMap.get(connectionId);
                if (queue == null) {
                    this.error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist");
                    continue;
                }
                int numRecords = in.readInt();
                long contentSize = in.readLong();
                swapInfo.addSwapSizeInfo(connectionId, swapFile.getAbsolutePath(), new QueueSize(numRecords, contentSize));
                swappedCount += numRecords;
                swappedBytes += contentSize;
                List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, numRecords, queue, swapEncodingVersion, true, claimManager);
                long maxId = 0L;
                for (FlowFileRecord record : records) {
                    if (record.getId() <= maxId) continue;
                    maxId = record.getId();
                }
                if (maxId <= maxRecoveredId) continue;
                maxRecoveredId = maxId;
            }
            catch (IOException ioe) {
                this.error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " due to " + ioe, ioe);
            }
        }
        this.restoreSwapLocations(queueMap.values(), swapInfo);
        logger.info("Recovered {} FlowFiles ({} bytes) from Swap Files", (Object)swappedCount, (Object)swappedBytes);
        return maxRecoveredId;
    }

    public void restoreSwapLocations(Collection<FlowFileQueue> flowFileQueues, ConnectionSwapInfo swapInfo) {
        for (FlowFileQueue queue : flowFileQueues) {
            String queueId = queue.getIdentifier();
            Collection<String> swapFileLocations = swapInfo.getSwapFileLocations(queueId);
            if (swapFileLocations == null || swapFileLocations.isEmpty()) continue;
            TreeMap<String, QueueSize> sortedFileQueueMap = new TreeMap<String, QueueSize>(new SwapFileComparator());
            for (String swapFileLocation : swapFileLocations) {
                QueueSize queueSize = swapInfo.getSwappedSize(queueId, swapFileLocation);
                sortedFileQueueMap.put(swapFileLocation, queueSize);
            }
            QueueLockWrapper fileQueue = (QueueLockWrapper)this.swapMap.get(queue);
            if (fileQueue == null) {
                fileQueue = new QueueLockWrapper(new LinkedBlockingQueue<File>());
                this.swapMap.put(queue, fileQueue);
            }
            for (Map.Entry innerEntry : sortedFileQueueMap.entrySet()) {
                File swapFile = new File((String)innerEntry.getKey());
                QueueSize size = (QueueSize)innerEntry.getValue();
                fileQueue.getQueue().add(swapFile);
                queue.incrementSwapCount(size.getObjectCount(), size.getByteCount());
            }
        }
    }

    public void shutdown() {
        this.swapQueueIdentifierExecutor.shutdownNow();
        this.swapInExecutor.shutdownNow();
    }

    private static class QueueLockWrapper {
        private final Lock lock = new ReentrantLock();
        private final Queue<File> queue;

        public QueueLockWrapper(Queue<File> queue) {
            this.queue = queue;
        }

        public Queue<File> getQueue() {
            return this.queue;
        }

        public Lock getLock() {
            return this.lock;
        }

        public int hashCode() {
            return this.queue.hashCode();
        }

        public boolean equals(Object obj) {
            if (obj instanceof QueueLockWrapper) {
                return this.queue.equals(((QueueLockWrapper)obj).queue);
            }
            return false;
        }
    }

    private static class SwapFileComparator
    implements Comparator<String> {
        private SwapFileComparator() {
        }

        @Override
        public int compare(String o1, String o2) {
            if (o1 == o2) {
                return 0;
            }
            Long time1 = this.getTimestampFromFilename(o1);
            Long time2 = this.getTimestampFromFilename(o2);
            if (time1 == null && time2 == null) {
                return 0;
            }
            if (time1 == null) {
                return 1;
            }
            if (time2 == null) {
                return -1;
            }
            int timeComparisonValue = time1.compareTo(time2);
            if (timeComparisonValue != 0) {
                return timeComparisonValue;
            }
            return o1.compareTo(o2);
        }

        private Long getTimestampFromFilename(String fullyQualifiedFilename) {
            if (fullyQualifiedFilename == null) {
                return null;
            }
            File file = new File(fullyQualifiedFilename);
            String filename = file.getName();
            int idx = filename.indexOf("-");
            if (idx < 1) {
                return null;
            }
            String millisVal = filename.substring(0, idx);
            try {
                return Long.parseLong(millisVal);
            }
            catch (NumberFormatException e) {
                return null;
            }
        }
    }

    private class SwapOutTask
    implements Runnable {
        private final BlockingQueue<FlowFileQueue> connectionQueue;

        public SwapOutTask(BlockingQueue<FlowFileQueue> connectionQueue) {
            this.connectionQueue = connectionQueue;
        }

        @Override
        public void run() {
            block11: while (true) {
                FlowFileQueue flowFileQueue;
                if ((flowFileQueue = (FlowFileQueue)this.connectionQueue.poll()) == null) {
                    logger.debug("No more FlowFile Queues to Swap Out");
                    return;
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("{} has {} FlowFiles to swap out", (Object)flowFileQueue, (Object)flowFileQueue.getSwapQueueSize());
                }
                while (true) {
                    int recordsSwapped;
                    if (flowFileQueue.getSwapQueueSize() < 10000) continue block11;
                    File swapFile = new File(FileSystemSwapManager.this.storageDirectory, System.currentTimeMillis() + "-" + UUID.randomUUID().toString() + ".swap");
                    File swapTempFile = new File(swapFile.getParentFile(), swapFile.getName() + ".part");
                    String swapLocation = swapFile.getAbsolutePath();
                    List toSwap = flowFileQueue.pollSwappableRecords();
                    try {
                        try (FileOutputStream fos = new FileOutputStream(swapTempFile);){
                            recordsSwapped = FileSystemSwapManager.this.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
                            fos.getFD().sync();
                        }
                        if (swapTempFile.renameTo(swapFile)) {
                            FileSystemSwapManager.this.flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, swapLocation);
                        } else {
                            FileSystemSwapManager.this.error("Failed to swap out FlowFiles from " + flowFileQueue + " due to: Unable to rename swap file from " + swapTempFile + " to " + swapFile);
                            recordsSwapped = 0;
                        }
                    }
                    catch (IOException ioe) {
                        recordsSwapped = 0;
                        flowFileQueue.putSwappedRecords((Collection)toSwap);
                        FileSystemSwapManager.this.error("Failed to swap out " + toSwap.size() + " FlowFiles from " + flowFileQueue + " to Swap File " + swapLocation + " due to " + ioe, ioe);
                    }
                    if (recordsSwapped > 0) {
                        QueueLockWrapper swapQueue = (QueueLockWrapper)FileSystemSwapManager.this.swapMap.get(flowFileQueue);
                        if (swapQueue == null) {
                            swapQueue = new QueueLockWrapper(new LinkedBlockingQueue<File>());
                            QueueLockWrapper oldQueue = FileSystemSwapManager.this.swapMap.putIfAbsent(flowFileQueue, swapQueue);
                            if (oldQueue != null) {
                                swapQueue = oldQueue;
                            }
                        }
                        swapQueue.getQueue().add(swapFile);
                        continue;
                    }
                    swapTempFile.delete();
                }
                break;
            }
        }
    }

    private class SwapInTask
    implements Runnable {
        private SwapInTask() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block25: for (Map.Entry entry : FileSystemSwapManager.this.swapMap.entrySet()) {
                QueueLockWrapper queueLockWrapper;
                FlowFileQueue flowFileQueue = (FlowFileQueue)entry.getKey();
                if ((float)flowFileQueue.unswappedSize() >= (float)flowFileQueue.getSwapThreshold() * 0.6f || !(queueLockWrapper = (QueueLockWrapper)entry.getValue()).getLock().tryLock()) continue;
                try {
                    Queue<File> queue = queueLockWrapper.getQueue();
                    while ((float)flowFileQueue.unswappedSize() < (float)flowFileQueue.getSwapThreshold() * 0.9f) {
                        File swapFile = null;
                        try {
                            swapFile = queue.poll();
                            if (swapFile == null) continue block25;
                            try (FileInputStream fis = new FileInputStream(swapFile);
                                 DataInputStream in = new DataInputStream(fis);){
                                List<FlowFileRecord> swappedFlowFiles = FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, FileSystemSwapManager.this.claimManager);
                                FileSystemSwapManager.this.flowFileRepository.swapFlowFilesIn(swapFile.getAbsolutePath(), swappedFlowFiles, flowFileQueue);
                                flowFileQueue.putSwappedRecords(swappedFlowFiles);
                            }
                            if (swapFile.delete()) continue;
                            FileSystemSwapManager.this.warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually");
                        }
                        catch (EOFException eof) {
                            FileSystemSwapManager.this.error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Corrupt Swap File; will remove this Swap File: " + swapFile);
                            if (swapFile.delete()) continue;
                            FileSystemSwapManager.this.warn("Failed to remove corrupt Swap File " + swapFile + "; This file should be cleaned up manually");
                        }
                        catch (FileNotFoundException fnfe) {
                            FileSystemSwapManager.this.error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Could not find Swap File " + swapFile);
                        }
                        catch (Exception e) {
                            FileSystemSwapManager.this.error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e, e);
                            if (swapFile == null) continue;
                            queue.add(swapFile);
                        }
                    }
                }
                finally {
                    queueLockWrapper.getLock().unlock();
                }
            }
        }
    }

    private class QueueIdentifier
    implements Runnable {
        private final QueueProvider connectionProvider;

        public QueueIdentifier(QueueProvider connectionProvider) {
            this.connectionProvider = connectionProvider;
        }

        @Override
        public void run() {
            Collection allQueues = this.connectionProvider.getAllQueues();
            LinkedBlockingQueue<FlowFileQueue> connectionQueue = new LinkedBlockingQueue<FlowFileQueue>(allQueues);
            ThreadFactory threadFactory = new ThreadFactory(){

                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("Swap Out FlowFiles");
                    return t;
                }
            };
            ExecutorService workerExecutor = Executors.newFixedThreadPool(FileSystemSwapManager.this.swapOutThreadCount, threadFactory);
            for (int i = 0; i < FileSystemSwapManager.this.swapOutThreadCount; ++i) {
                workerExecutor.submit(new SwapOutTask(connectionQueue));
            }
            workerExecutor.shutdown();
            try {
                workerExecutor.awaitTermination(10L, TimeUnit.MINUTES);
            }
            catch (InterruptedException e) {
                // empty catch block
            }
        }
    }
}

