/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.queue;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.nifi.controller.queue.DropFlowFileAction;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.FlowFileQueueSize;
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueuePrioritizer;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.StandardLocalQueuePartitionDiagnostics;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.IncompleteSwapFileException;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.concurrency.TimedLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SwappablePriorityQueue {
    private static final Logger logger = LoggerFactory.getLogger(SwappablePriorityQueue.class);
    private static final int SWAP_RECORD_POLL_SIZE = 10000;
    private static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 10000;
    private final int swapThreshold;
    private final FlowFileSwapManager swapManager;
    private final EventReporter eventReporter;
    private final FlowFileQueue flowFileQueue;
    private final DropFlowFileAction dropAction;
    private final List<FlowFilePrioritizer> priorities = new ArrayList<FlowFilePrioritizer>();
    private final String swapPartitionName;
    private final List<String> swapLocations = new ArrayList<String>();
    private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<FlowFileQueueSize>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0, 0L));
    private final TimedLock readLock;
    private final TimedLock writeLock;
    private PriorityQueue<FlowFileRecord> activeQueue;
    private ArrayList<FlowFileRecord> swapQueue;
    private boolean swapMode = false;
    private final Map<String, Long> minQueueDateInSwapLocation = new HashMap<String, Long>();
    private final Map<String, Long> totalQueueDateInSwapLocation = new HashMap<String, Long>();

    public SwappablePriorityQueue(FlowFileSwapManager swapManager, int swapThreshold, EventReporter eventReporter, FlowFileQueue flowFileQueue, DropFlowFileAction dropAction, String swapPartitionName) {
        this.swapManager = swapManager;
        this.swapThreshold = swapThreshold;
        this.activeQueue = new PriorityQueue<FlowFileRecord>(20, new QueuePrioritizer(Collections.emptyList()));
        this.swapQueue = new ArrayList();
        this.eventReporter = eventReporter;
        this.flowFileQueue = flowFileQueue;
        this.dropAction = dropAction;
        this.swapPartitionName = swapPartitionName;
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
        this.readLock = new TimedLock((Lock)lock.readLock(), flowFileQueue.getIdentifier() + " Read Lock", 100);
        this.writeLock = new TimedLock((Lock)lock.writeLock(), flowFileQueue.getIdentifier() + " Write Lock", 100);
    }

    private String getQueueIdentifier() {
        return this.flowFileQueue.getIdentifier();
    }

    public List<FlowFilePrioritizer> getPriorities() {
        this.readLock.lock();
        try {
            List<FlowFilePrioritizer> list = Collections.unmodifiableList(this.priorities);
            return list;
        }
        finally {
            this.readLock.unlock("getPriorities");
        }
    }

    public void setPriorities(List<FlowFilePrioritizer> newPriorities) {
        this.writeLock.lock();
        try {
            this.priorities.clear();
            this.priorities.addAll(newPriorities);
            PriorityQueue<FlowFileRecord> newQueue = new PriorityQueue<FlowFileRecord>(Math.max(20, this.activeQueue.size()), new QueuePrioritizer(newPriorities));
            newQueue.addAll(this.activeQueue);
            this.activeQueue = newQueue;
        }
        finally {
            this.writeLock.unlock("setPriorities");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public LocalQueuePartitionDiagnostics getQueueDiagnostics() {
        this.readLock.lock();
        try {
            boolean anyPenalized = !this.activeQueue.isEmpty() && this.activeQueue.peek().isPenalized();
            boolean allPenalized = anyPenalized && this.activeQueue.stream().anyMatch(FlowFile::isPenalized);
            StandardLocalQueuePartitionDiagnostics standardLocalQueuePartitionDiagnostics = new StandardLocalQueuePartitionDiagnostics(this.getFlowFileQueueSize(), anyPenalized, allPenalized);
            return standardLocalQueuePartitionDiagnostics;
        }
        finally {
            this.readLock.unlock("getQueueDiagnostics");
        }
    }

    public List<FlowFileRecord> getActiveFlowFiles() {
        this.readLock.lock();
        try {
            ArrayList<FlowFileRecord> arrayList = new ArrayList<FlowFileRecord>(this.activeQueue);
            return arrayList;
        }
        finally {
            this.readLock.unlock("getActiveFlowFiles");
        }
    }

    public boolean isUnacknowledgedFlowFile() {
        return this.getFlowFileQueueSize().getUnacknowledgedCount() > 0;
    }

    private void writeSwapFilesIfNecessary() {
        FlowFileRecord record;
        if (this.swapQueue.size() < 10000) {
            return;
        }
        this.migrateSwapToActive();
        if (this.swapQueue.size() < 10000) {
            return;
        }
        int numSwapFiles = this.swapQueue.size() / 10000;
        int originalSwapQueueCount = this.swapQueue.size();
        long originalSwapQueueBytes = 0L;
        for (FlowFileRecord flowFile : this.swapQueue) {
            originalSwapQueueBytes += flowFile.getSize();
        }
        PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<FlowFileRecord>(this.swapQueue.size(), new QueuePrioritizer(this.getPriorities()));
        tempQueue.addAll(this.swapQueue);
        long bytesSwappedOut = 0L;
        int flowFilesSwappedOut = 0;
        ArrayList<String> swapLocations = new ArrayList<String>(numSwapFiles);
        for (int i = 0; i < numSwapFiles; ++i) {
            long bytesSwappedThisIteration = 0L;
            long totalSwapQueueDatesThisIteration = 0L;
            long minQueueDateThisIteration = Long.MAX_VALUE;
            ArrayList<FlowFileRecord> toSwap = new ArrayList<FlowFileRecord>(10000);
            for (int j = 0; j < 10000; ++j) {
                FlowFileRecord flowFile = tempQueue.poll();
                toSwap.add(flowFile);
                bytesSwappedThisIteration += flowFile.getSize();
                totalSwapQueueDatesThisIteration += flowFile.getLastQueueDate().longValue();
                minQueueDateThisIteration = minQueueDateThisIteration < flowFile.getLastQueueDate() ? minQueueDateThisIteration : flowFile.getLastQueueDate();
            }
            try {
                Collections.reverse(toSwap);
                String swapLocation = this.swapManager.swapOut(toSwap, this.flowFileQueue, this.swapPartitionName);
                swapLocations.add(swapLocation);
                logger.debug("Successfully wrote out Swap File {} containing {} FlowFiles ({} bytes)", new Object[]{swapLocation, toSwap.size(), bytesSwappedThisIteration});
                bytesSwappedOut += bytesSwappedThisIteration;
                flowFilesSwappedOut += toSwap.size();
                this.minQueueDateInSwapLocation.put(swapLocation, minQueueDateThisIteration);
                this.totalQueueDateInSwapLocation.put(swapLocation, totalSwapQueueDatesThisIteration);
                continue;
            }
            catch (IOException ioe) {
                tempQueue.addAll(toSwap);
                int objectCount = this.getFlowFileCount();
                logger.error("FlowFile Queue with identifier {} has {} FlowFiles queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting the Java heap space but failed to write information to disk due to {}", new Object[]{this.getQueueIdentifier(), objectCount, ioe.toString()});
                logger.error("", (Throwable)ioe);
                if (this.eventReporter == null) break;
                this.eventReporter.reportEvent(Severity.ERROR, "Failed to Overflow to Disk", "Flowfile Queue with identifier " + this.getQueueIdentifier() + " has " + objectCount + " queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting the Java heap space but failed to write information to disk. See logs for more information.");
                break;
            }
        }
        this.swapQueue.clear();
        long updatedSwapQueueBytes = 0L;
        while ((record = tempQueue.poll()) != null) {
            this.swapQueue.add(record);
            updatedSwapQueueBytes += record.getSize();
        }
        Collections.reverse(this.swapQueue);
        boolean updated = false;
        while (!updated) {
            FlowFileQueueSize originalSize = this.getFlowFileQueueSize();
            int addedSwapRecords = this.swapQueue.size() - originalSwapQueueCount;
            long addedSwapBytes = updatedSwapQueueBytes - originalSwapQueueBytes;
            FlowFileQueueSize newSize = new FlowFileQueueSize(originalSize.getActiveCount(), originalSize.getActiveBytes(), originalSize.getSwappedCount() + addedSwapRecords + flowFilesSwappedOut, originalSize.getSwappedBytes() + addedSwapBytes + bytesSwappedOut, originalSize.getSwapFileCount() + numSwapFiles, originalSize.getUnacknowledgedCount(), originalSize.getUnacknowledgedBytes());
            updated = this.updateSize(originalSize, newSize);
            if (!updated) continue;
            this.logIfNegative(originalSize, newSize, "swap");
        }
        this.swapLocations.addAll(swapLocations);
        logger.debug("After writing swap files, setting new set of Swap Locations to {}", this.swapLocations);
    }

    private int getFlowFileCount() {
        FlowFileQueueSize size = this.getFlowFileQueueSize();
        return size.getActiveCount() + size.getSwappedCount() + size.getUnacknowledgedCount();
    }

    private void migrateSwapToActive() {
        FlowFileRecord toRequeue;
        FlowFileRecord toMigrate;
        if (!this.activeQueue.isEmpty()) {
            return;
        }
        if (!this.swapLocations.isEmpty()) {
            this.swapIn();
            return;
        }
        FlowFileQueueSize size = this.getFlowFileQueueSize();
        if (size.getSwappedCount() == 0 && this.swapQueue.isEmpty()) {
            return;
        }
        if (size.getSwappedCount() > this.swapQueue.size()) {
            return;
        }
        PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<FlowFileRecord>(this.swapQueue.size(), new QueuePrioritizer(this.getPriorities()));
        tempQueue.addAll(this.swapQueue);
        int recordsMigrated = 0;
        long bytesMigrated = 0L;
        while (this.activeQueue.size() < this.swapThreshold && (toMigrate = tempQueue.poll()) != null) {
            this.activeQueue.add(toMigrate);
            bytesMigrated += toMigrate.getSize();
            ++recordsMigrated;
        }
        this.swapQueue.clear();
        while ((toRequeue = tempQueue.poll()) != null) {
            this.swapQueue.add(toRequeue);
        }
        if (recordsMigrated > 0) {
            this.incrementActiveQueueSize(recordsMigrated, bytesMigrated);
            this.incrementSwapQueueSize(-recordsMigrated, -bytesMigrated, 0);
            logger.debug("Migrated {} FlowFiles from swap queue to active queue for {}", (Object)recordsMigrated, (Object)this);
        }
        if (size.getSwappedCount() == 0) {
            this.swapMode = false;
        }
    }

    private void swapIn() {
        SwapContents swapContents;
        String swapLocation = this.swapLocations.get(0);
        boolean partialContents = false;
        try {
            logger.debug("Attempting to swap in {}; all swap locations = {}", (Object)swapLocation, this.swapLocations);
            swapContents = this.swapManager.swapIn(swapLocation, this.flowFileQueue);
            this.swapLocations.remove(0);
            this.minQueueDateInSwapLocation.remove(swapLocation);
            this.totalQueueDateInSwapLocation.remove(swapLocation);
        }
        catch (IncompleteSwapFileException isfe) {
            logger.error("Failed to swap in all FlowFiles from Swap File {}; Swap File ended prematurely. The records that were present will still be swapped in", (Object)swapLocation);
            logger.error("", (Throwable)isfe);
            swapContents = isfe.getPartialContents();
            partialContents = true;
            this.swapLocations.remove(0);
            this.minQueueDateInSwapLocation.remove(swapLocation);
            this.totalQueueDateInSwapLocation.remove(swapLocation);
        }
        catch (FileNotFoundException fnfe) {
            logger.error("Failed to swap in FlowFiles from Swap File {} because the Swap File can no longer be found", (Object)swapLocation);
            if (this.eventReporter != null) {
                this.eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + swapLocation + " because the Swap File can no longer be found");
            }
            this.swapLocations.remove(0);
            this.minQueueDateInSwapLocation.remove(swapLocation);
            this.totalQueueDateInSwapLocation.remove(swapLocation);
            return;
        }
        catch (IOException ioe) {
            logger.error("Failed to swap in FlowFiles from Swap File {}; Swap File appears to be corrupt!", (Object)swapLocation);
            logger.error("", (Throwable)ioe);
            if (this.eventReporter != null) {
                this.eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + swapLocation + "; Swap File appears to be corrupt! Some FlowFiles in the queue may not be accessible. See logs for more information.");
            }
            return;
        }
        catch (Throwable t) {
            logger.error("Failed to swap in FlowFiles from Swap File {}", (Object)swapLocation, (Object)t);
            throw t;
        }
        QueueSize swapSize = swapContents.getSummary().getQueueSize();
        long contentSize = swapSize.getByteCount();
        int flowFileCount = swapSize.getObjectCount();
        this.incrementSwapQueueSize(-flowFileCount, -contentSize, -1);
        if (partialContents) {
            long contentSizeSwappedIn = 0L;
            for (FlowFileRecord swappedIn : swapContents.getFlowFiles()) {
                contentSizeSwappedIn += swappedIn.getSize();
            }
            this.incrementActiveQueueSize(swapContents.getFlowFiles().size(), contentSizeSwappedIn);
            logger.debug("Swapped in partial contents containing {} FlowFiles ({} bytes) from {}", new Object[]{swapContents.getFlowFiles().size(), contentSizeSwappedIn, swapLocation});
        } else {
            this.incrementActiveQueueSize(flowFileCount, contentSize);
            logger.debug("Successfully swapped in Swap File {} containing {} FlowFiles ({} bytes)", new Object[]{swapLocation, flowFileCount, contentSize});
        }
        this.activeQueue.addAll(swapContents.getFlowFiles());
    }

    public QueueSize size() {
        return this.getFlowFileQueueSize().toQueueSize();
    }

    public boolean isEmpty() {
        return this.getFlowFileQueueSize().isEmpty();
    }

    public boolean isActiveQueueEmpty() {
        FlowFileQueueSize queueSize = this.getFlowFileQueueSize();
        return queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FlowFileAvailability getFlowFileAvailability() {
        boolean lockObtained;
        FlowFileRecord top;
        FlowFileQueueSize queueSize = this.getFlowFileQueueSize();
        if (queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0) {
            return FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
        }
        boolean mustMigrateSwapToActive = false;
        this.readLock.lock();
        try {
            top = this.activeQueue.peek();
            if (top == null) {
                if (this.swapQueue.isEmpty() && queueSize.getSwapFileCount() > 0) {
                    mustMigrateSwapToActive = true;
                } else {
                    if (this.swapQueue.isEmpty()) {
                        FlowFileAvailability flowFileAvailability = FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
                        return flowFileAvailability;
                    }
                    top = this.swapQueue.get(0);
                }
            }
        }
        finally {
            this.readLock.unlock("isFlowFileAvailable");
        }
        if (mustMigrateSwapToActive && (lockObtained = this.writeLock.tryLock())) {
            try {
                this.migrateSwapToActive();
            }
            finally {
                this.writeLock.unlock("getFlowFileAvailability");
            }
        }
        if (top == null) {
            return FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
        }
        if (top.isPenalized()) {
            return FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED;
        }
        return FlowFileAvailability.FLOWFILE_AVAILABLE;
    }

    public void acknowledge(FlowFileRecord flowFile) {
        logger.trace("{} Acknowledging {}", (Object)this, (Object)flowFile);
        this.directlyIncrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
    }

    public void acknowledge(Collection<FlowFileRecord> flowFiles) {
        if (logger.isTraceEnabled()) {
            for (FlowFileRecord flowFile : flowFiles) {
                logger.trace("{} Acknowledging {}", (Object)this, (Object)flowFile);
            }
        }
        long totalSize = flowFiles.stream().mapToLong(FlowFile::getSize).sum();
        this.directlyIncrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize);
    }

    public void put(FlowFileRecord flowFile) {
        this.writeLock.lock();
        try {
            if (this.swapMode || this.activeQueue.size() >= this.swapThreshold) {
                this.swapQueue.add(flowFile);
                this.incrementSwapQueueSize(1, flowFile.getSize(), 0);
                this.swapMode = true;
                this.writeSwapFilesIfNecessary();
            } else {
                this.incrementActiveQueueSize(1, flowFile.getSize());
                this.activeQueue.add(flowFile);
            }
            logger.trace("{} put to {}", (Object)flowFile, (Object)this);
        }
        finally {
            this.writeLock.unlock("put(FlowFileRecord)");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void putAll(Collection<FlowFileRecord> flowFiles) {
        int numFiles = flowFiles.size();
        long bytes = 0L;
        for (FlowFile flowFile : flowFiles) {
            bytes += flowFile.getSize();
        }
        this.writeLock.lock();
        try {
            if (this.swapMode || this.activeQueue.size() >= this.swapThreshold - numFiles) {
                this.swapQueue.addAll(flowFiles);
                this.incrementSwapQueueSize(numFiles, bytes, 0);
                this.swapMode = true;
                this.writeSwapFilesIfNecessary();
            } else {
                this.incrementActiveQueueSize(numFiles, bytes);
                this.activeQueue.addAll(flowFiles);
            }
            logger.trace("{} put to {}", flowFiles, (Object)this);
        }
        finally {
            this.writeLock.unlock("putAll");
        }
    }

    public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords, long expirationMillis) {
        return this.poll(expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords, long expirationMillis, PollStrategy pollStrategy) {
        this.writeLock.lock();
        try {
            FlowFileRecord flowFile = this.doPoll(expiredRecords, expirationMillis, pollStrategy);
            if (flowFile != null) {
                logger.trace("{} poll() returning {}", (Object)this, (Object)flowFile);
                this.unacknowledge(1, flowFile.getSize());
            }
            FlowFileRecord flowFileRecord = flowFile;
            return flowFileRecord;
        }
        finally {
            this.writeLock.unlock("poll(Set)");
        }
    }

    private FlowFileRecord doPoll(Set<FlowFileRecord> expiredRecords, long expirationMillis, PollStrategy pollStrategy) {
        FlowFileRecord flowFile;
        boolean isExpired;
        this.migrateSwapToActive();
        long expiredBytes = 0L;
        do {
            if (isExpired = this.isExpired((FlowFile)(flowFile = this.activeQueue.poll()), expirationMillis)) {
                expiredRecords.add(flowFile);
                expiredBytes += flowFile.getSize();
                flowFile = null;
                if (expiredRecords.size() < 10000) continue;
                break;
            }
            if (flowFile == null || !flowFile.isPenalized() || pollStrategy != PollStrategy.UNPENALIZED_FLOWFILES) continue;
            this.activeQueue.add(flowFile);
            flowFile = null;
            break;
        } while (isExpired);
        if (!expiredRecords.isEmpty()) {
            this.incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes);
        }
        return flowFile;
    }

    public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords, long expirationMillis) {
        return this.poll(maxResults, expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords, long expirationMillis, PollStrategy pollStrategy) {
        ArrayList<FlowFileRecord> records = new ArrayList<FlowFileRecord>(Math.min(1, maxResults));
        this.writeLock.lock();
        try {
            this.doPoll(records, maxResults, expiredRecords, expirationMillis, pollStrategy);
        }
        finally {
            this.writeLock.unlock("poll(int, Set)");
        }
        if (!records.isEmpty() && logger.isTraceEnabled()) {
            for (FlowFileRecord flowFile : records) {
                logger.trace("{} poll() returning {}", (Object)this, (Object)flowFile);
            }
        }
        return records;
    }

    public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords, long expirationMillis) {
        return this.poll(filter, expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords, long expirationMillis, PollStrategy pollStrategy) {
        long bytesPulled = 0L;
        int flowFilesPulled = 0;
        long bytesExpired = 0L;
        int flowFilesExpired = 0;
        this.writeLock.lock();
        try {
            FlowFileRecord flowFile;
            this.migrateSwapToActive();
            ArrayList<FlowFileRecord> selectedFlowFiles = new ArrayList<FlowFileRecord>();
            ArrayList<FlowFileRecord> unselected = new ArrayList<FlowFileRecord>();
            while ((flowFile = this.activeQueue.poll()) != null) {
                boolean isExpired = this.isExpired((FlowFile)flowFile, expirationMillis);
                if (isExpired) {
                    expiredRecords.add(flowFile);
                    bytesExpired += flowFile.getSize();
                    ++flowFilesExpired;
                    if (expiredRecords.size() < 10000) continue;
                    break;
                }
                if (flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
                    this.activeQueue.add(flowFile);
                    break;
                }
                FlowFileFilter.FlowFileFilterResult result = filter.filter((FlowFile)flowFile);
                if (result.isAccept()) {
                    bytesPulled += flowFile.getSize();
                    ++flowFilesPulled;
                    selectedFlowFiles.add(flowFile);
                } else {
                    unselected.add(flowFile);
                }
                if (result.isContinue()) continue;
                break;
            }
            this.activeQueue.addAll(unselected);
            this.unacknowledge(flowFilesPulled, bytesPulled);
            if (flowFilesExpired > 0) {
                this.incrementActiveQueueSize(-flowFilesExpired, -bytesExpired);
            }
            if (!selectedFlowFiles.isEmpty() && logger.isTraceEnabled()) {
                for (FlowFileRecord flowFile2 : selectedFlowFiles) {
                    logger.trace("{} poll() returning {}", (Object)this, (Object)flowFile2);
                }
            }
            ArrayList<FlowFileRecord> arrayList = selectedFlowFiles;
            return arrayList;
        }
        finally {
            this.writeLock.unlock("poll(Filter, Set)");
        }
    }

    private void doPoll(List<FlowFileRecord> records, int maxResults, Set<FlowFileRecord> expiredRecords, long expirationMillis, PollStrategy pollStrategy) {
        this.migrateSwapToActive();
        long bytesDrained = this.drainQueue(this.activeQueue, records, maxResults, expiredRecords, expirationMillis, pollStrategy);
        long expiredBytes = 0L;
        for (FlowFileRecord record : expiredRecords) {
            expiredBytes += record.getSize();
        }
        this.unacknowledge(records.size(), bytesDrained);
        if (!expiredRecords.isEmpty()) {
            this.incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes);
        }
    }

    protected boolean isExpired(FlowFile flowFile, long expirationMillis) {
        return this.isLaterThan(this.getExpirationDate(flowFile, expirationMillis));
    }

    private boolean isLaterThan(Long maxAge) {
        if (maxAge == null) {
            return false;
        }
        return maxAge < System.currentTimeMillis();
    }

    private Long getExpirationDate(FlowFile flowFile, long expirationMillis) {
        if (flowFile == null) {
            return null;
        }
        if (expirationMillis <= 0L) {
            return null;
        }
        long entryDate = flowFile.getEntryDate();
        long expirationDate = entryDate + expirationMillis;
        return expirationDate;
    }

    private long drainQueue(Queue<FlowFileRecord> sourceQueue, List<FlowFileRecord> destination, int maxResults, Set<FlowFileRecord> expiredRecords, long expirationMillis, PollStrategy pollStrategy) {
        FlowFileRecord pulled;
        long drainedSize = 0L;
        while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) {
            if (this.isExpired((FlowFile)pulled, expirationMillis)) {
                expiredRecords.add(pulled);
                if (expiredRecords.size() >= 10000) {
                    break;
                }
            } else {
                if (pulled.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
                    sourceQueue.add(pulled);
                    break;
                }
                destination.add(pulled);
            }
            drainedSize += pulled.getSize();
        }
        return drainedSize;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FlowFileRecord getFlowFile(String flowFileUuid) {
        if (flowFileUuid == null) {
            return null;
        }
        this.readLock.lock();
        try {
            for (FlowFileRecord flowFile : this.activeQueue) {
                if (!flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) continue;
                FlowFileRecord flowFileRecord = flowFile;
                return flowFileRecord;
            }
        }
        finally {
            this.readLock.unlock("getFlowFile");
        }
        return null;
    }

    /*
     * Exception decompiling
     */
    public void dropFlowFiles(DropFlowFileRequest dropRequest, String requestor) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SwapSummary recoverSwappedFlowFiles() {
        int swapFlowFileCount = 0;
        long swapByteCount = 0L;
        long totalSwappedQueueDate = 0L;
        Long minSwappedQueueDate = null;
        Long maxId = null;
        ArrayList<ResourceClaim> resourceClaims = new ArrayList<ResourceClaim>();
        long startNanos = System.nanoTime();
        int failures = 0;
        this.writeLock.lock();
        try {
            List swapLocationsFromSwapManager;
            try {
                swapLocationsFromSwapManager = this.swapManager.recoverSwapLocations(this.flowFileQueue, this.swapPartitionName);
            }
            catch (IOException ioe) {
                logger.error("Failed to determine whether or not any Swap Files exist for FlowFile Queue {}", (Object)this.getQueueIdentifier());
                logger.error("", (Throwable)ioe);
                if (this.eventReporter != null) {
                    this.eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine whether or not any Swap Files exist for FlowFile Queue " + this.getQueueIdentifier() + "; see logs for more detials");
                }
                SwapSummary swapSummary = null;
                this.writeLock.unlock("Recover Swap Files");
                return swapSummary;
            }
            LinkedHashSet swapLocations = new LinkedHashSet(swapLocationsFromSwapManager);
            swapLocations.removeAll(this.swapLocations);
            logger.debug("Swap Manager reports {} Swap Files for {}: {}", new Object[]{swapLocations.size(), this.flowFileQueue, swapLocations});
            for (String swapLocation : swapLocations) {
                try {
                    SwapSummary summary = this.swapManager.getSwapSummary(swapLocation);
                    QueueSize queueSize = summary.getQueueSize();
                    Long maxSwapRecordId = summary.getMaxFlowFileId();
                    if (maxSwapRecordId != null && (maxId == null || maxSwapRecordId > maxId)) {
                        maxId = maxSwapRecordId;
                    }
                    swapFlowFileCount += queueSize.getObjectCount();
                    swapByteCount += queueSize.getByteCount();
                    resourceClaims.addAll(summary.getResourceClaims());
                    this.minQueueDateInSwapLocation.put(swapLocation, summary.getMinLastQueueDate());
                    this.totalQueueDateInSwapLocation.put(swapLocation, summary.getTotalLastQueueDate());
                    if (minSwappedQueueDate == null) {
                        minSwappedQueueDate = summary.getMinLastQueueDate();
                    } else if (summary.getMinLastQueueDate() != null) {
                        minSwappedQueueDate = Long.min(minSwappedQueueDate, summary.getMinLastQueueDate());
                    }
                    totalSwappedQueueDate += summary.getTotalLastQueueDate().longValue();
                }
                catch (IOException ioe) {
                    ++failures;
                    logger.error("Failed to recover FlowFiles from Swap File {}; the file appears to be corrupt", (Object)swapLocation);
                    logger.error("", (Throwable)ioe);
                    if (this.eventReporter == null) continue;
                    this.eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to recover FlowFiles from Swap File " + swapLocation + "; the file appears to be corrupt. See logs for more details");
                }
            }
            this.incrementSwapQueueSize(swapFlowFileCount, swapByteCount, swapLocations.size());
            this.swapLocations.addAll(swapLocations);
        }
        finally {
            this.writeLock.unlock("Recover Swap Files");
        }
        if (this.swapLocations.isEmpty()) {
            logger.debug("No swap files were recovered for {}", (Object)this.flowFileQueue);
        } else {
            long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
            logger.info("Recovered {} swap files for {} in {} millis", new Object[]{this.swapLocations.size() - failures, this, millis});
        }
        return new StandardSwapSummary(new QueueSize(swapFlowFileCount, swapByteCount), maxId, resourceClaims, minSwappedQueueDate, totalSwappedQueueDate);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long getMinLastQueueDate() {
        this.readLock.lock();
        try {
            long min = this.getMinLastQueueDate(this.activeQueue, 0L);
            min = Long.min(min, this.getMinLastQueueDate(this.swapQueue, min));
            for (Long minSwapQueueDate : this.minQueueDateInSwapLocation.values()) {
                min = min == 0L ? minSwapQueueDate : Long.min(min, minSwapQueueDate);
            }
            long l = min;
            return l;
        }
        finally {
            this.readLock.unlock("Get Min Last Queue Date");
        }
    }

    private long getMinLastQueueDate(Iterable<FlowFileRecord> iterable, long defaultMin) {
        long min = 0L;
        for (FlowFileRecord flowFileRecord : iterable) {
            min = min == 0L ? flowFileRecord.getLastQueueDate() : Long.min(flowFileRecord.getLastQueueDate(), min);
        }
        return min == 0L ? defaultMin : min;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long getTotalQueuedDuration(long fromTimestamp) {
        this.readLock.lock();
        try {
            long sum = 0L;
            for (FlowFileRecord flowFileRecord : this.activeQueue) {
                sum += fromTimestamp - flowFileRecord.getLastQueueDate();
            }
            for (FlowFileRecord flowFileRecord : this.swapQueue) {
                sum += fromTimestamp - flowFileRecord.getLastQueueDate();
            }
            long totalSwappedQueueDate = 0L;
            for (Long totalQueueDate : this.totalQueueDateInSwapLocation.values()) {
                totalSwappedQueueDate += totalQueueDate.longValue();
            }
            long l = sum += (long)(this.getFlowFileQueueSize().getSwappedCount() - this.swapQueue.size()) * fromTimestamp - totalSwappedQueueDate;
            return l;
        }
        finally {
            this.readLock.unlock("Get Total Queued Duration");
        }
    }

    protected void incrementActiveQueueSize(int count, long bytes) {
        boolean updated = false;
        while (!updated) {
            FlowFileQueueSize newSize;
            FlowFileQueueSize original = this.size.get();
            updated = this.updateSize(original, newSize = new FlowFileQueueSize(original.getActiveCount() + count, original.getActiveBytes() + bytes, original.getSwappedCount(), original.getSwappedBytes(), original.getSwapFileCount(), original.getUnacknowledgedCount(), original.getUnacknowledgedBytes()));
            if (!updated) continue;
            this.logIfNegative(original, newSize, "active");
        }
    }

    private void incrementSwapQueueSize(int count, long bytes, int fileCount) {
        boolean updated = false;
        while (!updated) {
            FlowFileQueueSize newSize;
            FlowFileQueueSize original = this.getFlowFileQueueSize();
            updated = this.updateSize(original, newSize = new FlowFileQueueSize(original.getActiveCount(), original.getActiveBytes(), original.getSwappedCount() + count, original.getSwappedBytes() + bytes, original.getSwapFileCount() + fileCount, original.getUnacknowledgedCount(), original.getUnacknowledgedBytes()));
            if (!updated) continue;
            this.logIfNegative(original, newSize, "swap");
        }
    }

    private void unacknowledge(int count, long bytes) {
        this.directlyIncrementUnacknowledgedQueueSize(count, bytes);
        this.incrementActiveQueueSize(-count, -bytes);
    }

    private void directlyIncrementUnacknowledgedQueueSize(int count, long bytes) {
        boolean updated = false;
        while (!updated) {
            FlowFileQueueSize newSize;
            FlowFileQueueSize original = this.size.get();
            updated = this.updateSize(original, newSize = new FlowFileQueueSize(original.getActiveCount(), original.getActiveBytes(), original.getSwappedCount(), original.getSwappedBytes(), original.getSwapFileCount(), original.getUnacknowledgedCount() + count, original.getUnacknowledgedBytes() + bytes));
            if (!updated) continue;
            this.logIfNegative(original, newSize, "Unacknowledged");
        }
    }

    private void logIfNegative(FlowFileQueueSize original, FlowFileQueueSize newSize, String counterName) {
        if (newSize.getActiveBytes() < 0L || newSize.getActiveCount() < 0 || newSize.getSwappedBytes() < 0L || newSize.getSwappedCount() < 0 || newSize.getUnacknowledgedBytes() < 0L || newSize.getUnacknowledgedCount() < 0) {
            logger.error("Updated Size of Queue " + counterName + " from " + original + " to " + newSize, (Throwable)new RuntimeException("Cannot create negative queue size"));
        }
    }

    protected boolean updateSize(FlowFileQueueSize expected, FlowFileQueueSize updated) {
        return this.size.compareAndSet(expected, updated);
    }

    public FlowFileQueueSize getFlowFileQueueSize() {
        return this.size.get();
    }

    public void inheritQueueContents(FlowFileQueueContents queueContents) {
        this.writeLock.lock();
        try {
            this.putAll(queueContents.getActiveFlowFiles());
            List<String> inheritedSwapLocations = queueContents.getSwapLocations();
            this.swapLocations.addAll(inheritedSwapLocations);
            this.incrementSwapQueueSize(queueContents.getSwapSize().getObjectCount(), queueContents.getSwapSize().getByteCount(), queueContents.getSwapLocations().size());
            if (!inheritedSwapLocations.isEmpty()) {
                logger.debug("Inherited the following swap locations: {}", inheritedSwapLocations);
            }
        }
        finally {
            this.writeLock.unlock("inheritQueueContents");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FlowFileQueueContents packageForRebalance(String newPartitionName) {
        this.writeLock.lock();
        try {
            QueueSize swapSize;
            FlowFileQueueSize updatedSize;
            FlowFileQueueSize currentSize;
            boolean updated;
            ArrayList<FlowFileRecord> activeRecords = new ArrayList<FlowFileRecord>(this.activeQueue);
            ArrayList<String> updatedSwapLocations = new ArrayList<String>(this.swapLocations.size());
            for (String swapLocation : this.swapLocations) {
                try {
                    String updatedSwapLocation = this.swapManager.changePartitionName(swapLocation, newPartitionName);
                    updatedSwapLocations.add(updatedSwapLocation);
                }
                catch (IOException ioe) {
                    logger.error("Failed to update Swap File {} to reflect that the contents are now owned by Partition '{}'", new Object[]{swapLocation, newPartitionName, ioe});
                }
            }
            this.swapLocations.clear();
            this.activeQueue.clear();
            int swapQueueCount = this.swapQueue.size();
            long swapQueueBytes = this.swapQueue.stream().mapToLong(FlowFile::getSize).sum();
            activeRecords.addAll(this.swapQueue);
            this.swapQueue.clear();
            this.swapMode = false;
            do {
                currentSize = this.getFlowFileQueueSize();
                swapSize = new QueueSize(currentSize.getSwappedCount() - swapQueueCount, currentSize.getSwappedBytes() - swapQueueBytes);
            } while (!(updated = this.updateSize(currentSize, updatedSize = new FlowFileQueueSize(0, 0L, 0, 0L, 0, currentSize.getUnacknowledgedCount(), currentSize.getUnacknowledgedBytes()))));
            logger.debug("Cleared {} to package FlowFile for rebalance to {}", (Object)this, (Object)newPartitionName);
            FlowFileQueueContents flowFileQueueContents = new FlowFileQueueContents(activeRecords, updatedSwapLocations, swapSize);
            return flowFileQueueContents;
        }
        finally {
            this.writeLock.unlock("packageForRebalance(SwappablePriorityQueue)");
        }
    }

    public String toString() {
        return "SwappablePriorityQueue[queueId=" + this.flowFileQueue.getIdentifier() + ", partition=" + this.swapPartitionName + "]";
    }
}

