/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.queue.clustered;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.queue.AbstractFlowFileQueue;
import org.apache.nifi.controller.queue.ConnectionEventListener;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.IllegalClusterStateException;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueDiagnostics;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
import org.apache.nifi.controller.queue.StandardQueueDiagnostics;
import org.apache.nifi.controller.queue.SwappablePriorityQueue;
import org.apache.nifi.controller.queue.clustered.TransferFailureDestination;
import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
import org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner;
import org.apache.nifi.controller.queue.clustered.partition.FirstNodePartitioner;
import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
import org.apache.nifi.controller.queue.clustered.partition.LocalPartitionPartitioner;
import org.apache.nifi.controller.queue.clustered.partition.LocalQueuePartition;
import org.apache.nifi.controller.queue.clustered.partition.NonLocalPartitionPartitioner;
import org.apache.nifi.controller.queue.clustered.partition.QueuePartition;
import org.apache.nifi.controller.queue.clustered.partition.RebalancingPartition;
import org.apache.nifi.controller.queue.clustered.partition.RemoteQueuePartition;
import org.apache.nifi.controller.queue.clustered.partition.RoundRobinPartitioner;
import org.apache.nifi.controller.queue.clustered.partition.StandardRebalancingPartition;
import org.apache.nifi.controller.queue.clustered.partition.SwappablePriorityQueueLocalPartition;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SocketLoadBalancedFlowFileQueue
extends AbstractFlowFileQueue
implements LoadBalancedFlowFileQueue {
    private static final Logger logger = LoggerFactory.getLogger(SocketLoadBalancedFlowFileQueue.class);
    private static final int NODE_SWAP_THRESHOLD = 1000;
    private static final Comparator<NodeIdentifier> loadBalanceEndpointComparator = Comparator.comparing(NodeIdentifier::getLoadBalanceAddress).thenComparing(NodeIdentifier::getLoadBalancePort);
    private final List<FlowFilePrioritizer> prioritizers = new ArrayList<FlowFilePrioritizer>();
    private final ConnectionEventListener eventListener;
    private final AtomicReference<QueueSize> totalSize = new AtomicReference<QueueSize>(new QueueSize(0, 0L));
    private final LocalQueuePartition localPartition;
    private final RebalancingPartition rebalancingPartition;
    private final FlowFileSwapManager swapManager;
    private final EventReporter eventReporter;
    private final ClusterCoordinator clusterCoordinator;
    private final AsyncLoadBalanceClientRegistry clientRegistry;
    private final FlowFileRepository flowFileRepo;
    private final ProvenanceEventRepository provRepo;
    private final ContentRepository contentRepo;
    private final Set<NodeIdentifier> nodeIdentifiers;
    private final ReadWriteLock partitionLock = new ReentrantReadWriteLock();
    private final Lock partitionReadLock = this.partitionLock.readLock();
    private final Lock partitionWriteLock = this.partitionLock.writeLock();
    private QueuePartition[] queuePartitions;
    private volatile FlowFilePartitioner partitioner;
    private boolean stopped = true;
    private volatile boolean offloaded = false;

    public SocketLoadBalancedFlowFileQueue(String identifier, ConnectionEventListener eventListener, ProcessScheduler scheduler, FlowFileRepository flowFileRepo, ProvenanceEventRepository provRepo, ContentRepository contentRepo, ResourceClaimManager resourceClaimManager, ClusterCoordinator clusterCoordinator, AsyncLoadBalanceClientRegistry clientRegistry, FlowFileSwapManager swapManager, int swapThreshold, EventReporter eventReporter) {
        super(identifier, scheduler, flowFileRepo, provRepo, resourceClaimManager);
        this.eventListener = eventListener;
        this.eventReporter = eventReporter;
        this.swapManager = swapManager;
        this.flowFileRepo = flowFileRepo;
        this.provRepo = provRepo;
        this.contentRepo = contentRepo;
        this.clusterCoordinator = clusterCoordinator;
        this.clientRegistry = clientRegistry;
        this.localPartition = new SwappablePriorityQueueLocalPartition(swapManager, swapThreshold, eventReporter, this, this::drop);
        this.rebalancingPartition = new StandardRebalancingPartition(swapManager, swapThreshold, eventReporter, this, this::drop);
        TreeSet<NodeIdentifier> treeSet = this.nodeIdentifiers = clusterCoordinator == null ? Collections.emptySet() : new TreeSet<NodeIdentifier>(loadBalanceEndpointComparator);
        if (clusterCoordinator != null) {
            this.nodeIdentifiers.addAll(clusterCoordinator.getNodeIdentifiers(new NodeConnectionState[0]));
        }
        ArrayList<NodeIdentifier> sortedNodeIdentifiers = new ArrayList<NodeIdentifier>(this.nodeIdentifiers);
        sortedNodeIdentifiers.sort(Comparator.comparing(NodeIdentifier::getApiAddress));
        if (sortedNodeIdentifiers.isEmpty()) {
            this.queuePartitions = new QueuePartition[]{this.localPartition};
        } else {
            ArrayList<QueuePartition> partitionList = new ArrayList<QueuePartition>();
            NodeIdentifier localNodeId = clusterCoordinator.getLocalNodeIdentifier();
            for (NodeIdentifier nodeId : sortedNodeIdentifiers) {
                if (nodeId.equals((Object)localNodeId)) {
                    partitionList.add(this.localPartition);
                    continue;
                }
                partitionList.add(this.createRemotePartition(nodeId));
            }
            if (!partitionList.contains(this.localPartition)) {
                partitionList.add(this.localPartition);
            }
            this.queuePartitions = partitionList.toArray(new QueuePartition[0]);
        }
        this.partitioner = new LocalPartitionPartitioner();
        if (clusterCoordinator != null) {
            clusterCoordinator.registerEventListener((ClusterTopologyEventListener)new ClusterEventListener());
        }
        this.rebalancingPartition.start(this.partitioner);
    }

    @Override
    public synchronized void setLoadBalanceStrategy(LoadBalanceStrategy strategy, String partitioningAttribute) {
        LoadBalanceStrategy currentStrategy = this.getLoadBalanceStrategy();
        String currentPartitioningAttribute = this.getPartitioningAttribute();
        super.setLoadBalanceStrategy(strategy, partitioningAttribute);
        if (strategy == currentStrategy && Objects.equals(partitioningAttribute, currentPartitioningAttribute)) {
            return;
        }
        if (this.clusterCoordinator == null) {
            return;
        }
        if (!this.offloaded) {
            FlowFilePartitioner partitioner = this.getPartitionerForLoadBalancingStrategy(strategy, partitioningAttribute);
            this.setFlowFilePartitioner(partitioner);
        }
    }

    private FlowFilePartitioner getPartitionerForLoadBalancingStrategy(LoadBalanceStrategy strategy, String partitioningAttribute) {
        FlowFilePartitioner partitioner;
        switch (strategy) {
            case DO_NOT_LOAD_BALANCE: {
                partitioner = new LocalPartitionPartitioner();
                break;
            }
            case PARTITION_BY_ATTRIBUTE: {
                partitioner = new CorrelationAttributePartitioner(partitioningAttribute);
                break;
            }
            case ROUND_ROBIN: {
                partitioner = new RoundRobinPartitioner();
                break;
            }
            case SINGLE_NODE: {
                partitioner = new FirstNodePartitioner();
                break;
            }
            default: {
                throw new IllegalArgumentException();
            }
        }
        return partitioner;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void offloadQueue() {
        if (this.clusterCoordinator == null) {
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Setting queue {} on node {} as offloaded. Current size: {}, Partition Sizes: {}", new Object[]{this, this.clusterCoordinator.getLocalNodeIdentifier(), this.size(), this.getPartitionSizes()});
        }
        this.offloaded = true;
        this.partitionWriteLock.lock();
        try {
            HashSet<NodeIdentifier> nodesToKeep = new HashSet<NodeIdentifier>();
            for (QueuePartition partition : this.queuePartitions) {
                NodeIdentifier nodeId;
                NodeConnectionStatus status;
                Optional<NodeIdentifier> nodeIdOption = partition.getNodeIdentifier();
                if (!nodeIdOption.isPresent() || (status = this.clusterCoordinator.getConnectionStatus(nodeId = nodeIdOption.get())) == null || status.getState() != NodeConnectionState.CONNECTED) continue;
                nodesToKeep.add(nodeId);
            }
            if (!nodesToKeep.isEmpty()) {
                this.setNodeIdentifiers(nodesToKeep, false);
            }
            this.setFlowFilePartitioner(new NonLocalPartitionPartitioner());
            if (logger.isDebugEnabled()) {
                logger.debug("Queue {} has now updated Partition on node {} for offload. Current size: {}, Partition Sizes: {}", new Object[]{this, this.clusterCoordinator.getLocalNodeIdentifier(), this.size(), this.getPartitionSizes()});
            }
        }
        finally {
            this.partitionWriteLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Map<QueuePartition, QueueSize> getPartitionSizes() {
        this.partitionReadLock.lock();
        try {
            HashMap<QueuePartition, QueueSize> sizeMap = new HashMap<QueuePartition, QueueSize>();
            for (QueuePartition partition : this.queuePartitions) {
                sizeMap.put(partition, partition.size());
            }
            HashMap<QueuePartition, QueueSize> hashMap = sizeMap;
            return hashMap;
        }
        finally {
            this.partitionReadLock.unlock();
        }
    }

    public void resetOffloadedQueue() {
        if (this.clusterCoordinator == null) {
            return;
        }
        if (this.offloaded) {
            this.offloaded = false;
            logger.debug("Queue {} on node {} was previously offloaded, resetting offloaded status to {}", new Object[]{this, this.clusterCoordinator.getLocalNodeIdentifier(), this.offloaded});
            FlowFilePartitioner partitioner = this.getPartitionerForLoadBalancingStrategy(this.getLoadBalanceStrategy(), this.getPartitioningAttribute());
            this.setFlowFilePartitioner(partitioner);
            logger.debug("Queue {} is no longer offloaded, restored load balance strategy to {} and partitioning attribute to \"{}\"", new Object[]{this, this.getLoadBalanceStrategy(), this.getPartitioningAttribute()});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void startLoadBalancing() {
        logger.debug("{} started. Will begin distributing FlowFiles across the cluster", (Object)this);
        if (!this.stopped) {
            return;
        }
        this.stopped = false;
        this.partitionReadLock.lock();
        try {
            this.rebalancingPartition.start(this.partitioner);
            for (QueuePartition queuePartition : this.queuePartitions) {
                queuePartition.start(this.partitioner);
            }
        }
        finally {
            this.partitionReadLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void stopLoadBalancing() {
        logger.debug("{} stopped. Will no longer distribute FlowFiles across the cluster", (Object)this);
        if (this.stopped) {
            return;
        }
        this.stopped = true;
        this.partitionReadLock.lock();
        try {
            this.rebalancingPartition.stop();
            for (QueuePartition queuePartition : this.queuePartitions) {
                queuePartition.stop();
            }
        }
        finally {
            this.partitionReadLock.unlock();
        }
    }

    public boolean isActivelyLoadBalancing() {
        QueueSize size = this.size();
        if (size.getObjectCount() == 0) {
            return false;
        }
        int localObjectCount = this.localPartition.size().getObjectCount();
        return size.getObjectCount() > localObjectCount;
    }

    private QueuePartition createRemotePartition(final NodeIdentifier nodeId) {
        final SwappablePriorityQueue partitionQueue = new SwappablePriorityQueue(this.swapManager, 1000, this.eventReporter, this, this::drop, nodeId.getId());
        TransferFailureDestination failureDestination = new TransferFailureDestination(){

            @Override
            public void putAll(Collection<FlowFileRecord> flowFiles, FlowFilePartitioner partitionerUsed) {
                if (flowFiles.isEmpty()) {
                    return;
                }
                if (this.isRebalanceOnFailure(partitionerUsed)) {
                    logger.debug("Transferring {} FlowFiles to Rebalancing Partition from node {}", (Object)flowFiles.size(), (Object)nodeId);
                    SocketLoadBalancedFlowFileQueue.this.rebalancingPartition.rebalance(flowFiles);
                } else {
                    logger.debug("Returning {} FlowFiles to their queue for node {} because Partitioner {} indicates that the FlowFiles should stay where they are", new Object[]{flowFiles.size(), nodeId, partitionerUsed});
                    partitionQueue.putAll(flowFiles);
                }
            }

            @Override
            public void putAll(Function<String, FlowFileQueueContents> queueContentsFunction, FlowFilePartitioner partitionerUsed) {
                if (this.isRebalanceOnFailure(partitionerUsed)) {
                    FlowFileQueueContents contents = queueContentsFunction.apply(SocketLoadBalancedFlowFileQueue.this.rebalancingPartition.getSwapPartitionName());
                    SocketLoadBalancedFlowFileQueue.this.rebalancingPartition.rebalance(contents);
                    logger.debug("Transferring all {} FlowFiles and {} Swap Files queued for node {} to Rebalancing Partition", new Object[]{contents.getActiveFlowFiles().size(), contents.getSwapLocations().size(), nodeId});
                } else {
                    logger.debug("Will not transfer FlowFiles queued for node {} to Rebalancing Partition because Partitioner {} indicates that the FlowFiles should stay where they are", (Object)nodeId, (Object)partitionerUsed);
                }
            }

            @Override
            public boolean isRebalanceOnFailure(FlowFilePartitioner partitionerUsed) {
                return partitionerUsed.isRebalanceOnFailure() || !partitionerUsed.equals(SocketLoadBalancedFlowFileQueue.this.partitioner);
            }
        };
        RemoteQueuePartition partition = new RemoteQueuePartition(nodeId, partitionQueue, failureDestination, this.flowFileRepo, this.provRepo, this.contentRepo, this.clientRegistry, this);
        if (!this.stopped) {
            partition.start(this.partitioner);
        }
        return partition;
    }

    public synchronized List<FlowFilePrioritizer> getPriorities() {
        return new ArrayList<FlowFilePrioritizer>(this.prioritizers);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void setPriorities(List<FlowFilePrioritizer> newPriorities) {
        this.prioritizers.clear();
        this.prioritizers.addAll(newPriorities);
        this.partitionReadLock.lock();
        try {
            for (QueuePartition partition : this.queuePartitions) {
                partition.setPriorities(newPriorities);
            }
            this.rebalancingPartition.setPriorities(newPriorities);
        }
        finally {
            this.partitionReadLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SwapSummary recoverSwappedFlowFiles() {
        this.partitionReadLock.lock();
        try {
            Set<String> partitionNamesToRecover;
            ArrayList<SwapSummary> summaries = new ArrayList<SwapSummary>(this.queuePartitions.length);
            try {
                partitionNamesToRecover = this.swapManager.getSwappedPartitionNames((FlowFileQueue)this);
                logger.debug("For {}, partition names to recover are {}", (Object)this, (Object)partitionNamesToRecover);
            }
            catch (IOException ioe) {
                logger.error("Failed to determine the names of the Partitions that have swapped FlowFiles for queue with ID {}.", (Object)this.getIdentifier(), (Object)ioe);
                if (this.eventReporter != null) {
                    this.eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine the names of Partitions that have swapped FlowFiles for queue with ID " + this.getIdentifier() + "; see logs for more detials");
                }
                partitionNamesToRecover = Collections.emptySet();
            }
            for (QueuePartition partition : this.queuePartitions) {
                partitionNamesToRecover.remove(partition.getSwapPartitionName());
                SwapSummary summary = partition.recoverSwappedFlowFiles();
                summaries.add(summary);
            }
            partitionNamesToRecover.remove(this.rebalancingPartition.getSwapPartitionName());
            SwapSummary rebalancingSwapSummary = this.rebalancingPartition.recoverSwappedFlowFiles();
            summaries.add(rebalancingSwapSummary);
            for (String partitionName : partitionNamesToRecover) {
                logger.info("Found Swap Files for FlowFile Queue with Identifier {} and Partition {} that has not been recovered yet. Will recover Swap Files for this Partition even though no partition exists with this name yet", (Object)this.getIdentifier(), (Object)partitionName);
                try {
                    List swapLocations = this.swapManager.recoverSwapLocations((FlowFileQueue)this, partitionName);
                    for (String swapLocation : swapLocations) {
                        SwapSummary swapSummary = this.swapManager.getSwapSummary(swapLocation);
                        summaries.add(swapSummary);
                        String updatedSwapLocation = this.swapManager.changePartitionName(swapLocation, this.rebalancingPartition.getSwapPartitionName());
                        FlowFileQueueContents queueContents = new FlowFileQueueContents(Collections.emptyList(), Collections.singletonList(updatedSwapLocation), swapSummary.getQueueSize());
                        this.rebalancingPartition.rebalance(queueContents);
                    }
                }
                catch (IOException e) {
                    logger.error("Failed to determine whether or not any Swap Files exist for FlowFile Queue {} and Partition {}", new Object[]{this.getIdentifier(), partitionName, e});
                    if (this.eventReporter == null) continue;
                    this.eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine whether or not any Swap Files exist for FlowFile Queue " + this.getIdentifier() + "; see logs for more detials");
                }
            }
            Long maxId = null;
            QueueSize totalQueueSize = new QueueSize(0, 0L);
            ArrayList<ResourceClaim> resourceClaims = new ArrayList<ResourceClaim>();
            Long minLastQueueDate = null;
            long totalLastQueueDate = 0L;
            for (SwapSummary summary : summaries) {
                Long summaryMaxId = summary.getMaxFlowFileId();
                if (summaryMaxId != null && (maxId == null || summaryMaxId > maxId)) {
                    maxId = summaryMaxId;
                }
                QueueSize summaryQueueSize = summary.getQueueSize();
                totalQueueSize = totalQueueSize.add(summaryQueueSize);
                List summaryResourceClaims = summary.getResourceClaims();
                resourceClaims.addAll(summaryResourceClaims);
                if (minLastQueueDate == null) {
                    minLastQueueDate = summary.getMinLastQueueDate();
                } else if (summary.getMinLastQueueDate() != null) {
                    minLastQueueDate = Long.min(minLastQueueDate, summary.getMinLastQueueDate());
                }
                totalLastQueueDate += summary.getTotalLastQueueDate().longValue();
            }
            this.adjustSize(totalQueueSize.getObjectCount(), totalQueueSize.getByteCount());
            StandardSwapSummary standardSwapSummary = new StandardSwapSummary(totalQueueSize, maxId, resourceClaims, minLastQueueDate, totalLastQueueDate);
            return standardSwapSummary;
        }
        finally {
            this.partitionReadLock.unlock();
        }
    }

    public void purgeSwapFiles() {
        this.swapManager.purge();
    }

    public QueueSize size() {
        return this.totalSize.get();
    }

    public long getTotalQueuedDuration(long fromTimestamp) {
        long sum = 0L;
        for (QueuePartition queuePartition : this.queuePartitions) {
            long totalActiveQueuedDuration = queuePartition.getTotalActiveQueuedDuration(fromTimestamp);
            sum += totalActiveQueuedDuration;
        }
        return sum;
    }

    public long getMinLastQueueDate() {
        long min = 0L;
        for (QueuePartition queuePartition : this.queuePartitions) {
            min = min == 0L ? queuePartition.getMinLastQueueDate() : Long.min(min, queuePartition.getMinLastQueueDate());
        }
        return min;
    }

    public boolean isEmpty() {
        return this.size().getObjectCount() == 0;
    }

    public FlowFileAvailability getFlowFileAvailability() {
        return this.localPartition.getFlowFileAvailability();
    }

    public boolean isActiveQueueEmpty() {
        return this.localPartition.isActiveQueueEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public QueueDiagnostics getQueueDiagnostics() {
        this.partitionReadLock.lock();
        try {
            LocalQueuePartitionDiagnostics localDiagnostics = this.localPartition.getQueueDiagnostics();
            ArrayList<RemoteQueuePartitionDiagnostics> remoteDiagnostics = new ArrayList<RemoteQueuePartitionDiagnostics>(this.queuePartitions.length - 1);
            for (QueuePartition partition : this.queuePartitions) {
                if (!(partition instanceof RemoteQueuePartition)) continue;
                RemoteQueuePartition queuePartition = (RemoteQueuePartition)partition;
                RemoteQueuePartitionDiagnostics diagnostics = queuePartition.getDiagnostics();
                remoteDiagnostics.add(diagnostics);
            }
            StandardQueueDiagnostics standardQueueDiagnostics = new StandardQueueDiagnostics(localDiagnostics, remoteDiagnostics);
            return standardQueueDiagnostics;
        }
        finally {
            this.partitionReadLock.unlock();
        }
    }

    protected LocalQueuePartition getLocalPartition() {
        return this.localPartition;
    }

    protected int getPartitionCount() {
        this.partitionReadLock.lock();
        try {
            int n = this.queuePartitions.length;
            return n;
        }
        finally {
            this.partitionReadLock.unlock();
        }
    }

    protected QueuePartition getPartition(int index) {
        this.partitionReadLock.lock();
        try {
            if (index < 0 || index >= this.queuePartitions.length) {
                throw new IndexOutOfBoundsException();
            }
            QueuePartition queuePartition = this.queuePartitions[index];
            return queuePartition;
        }
        finally {
            this.partitionReadLock.unlock();
        }
    }

    private void adjustSize(int countToAdd, long bytesToAdd) {
        boolean updated = false;
        while (!updated) {
            QueueSize queueSize = this.totalSize.get();
            QueueSize updatedSize = queueSize.add(countToAdd, bytesToAdd);
            updated = this.totalSize.compareAndSet(queueSize, updatedSize);
        }
    }

    public void onTransfer(Collection<FlowFileRecord> flowFiles) {
        this.adjustSize(-flowFiles.size(), -flowFiles.stream().mapToLong(FlowFile::getSize).sum());
    }

    public void onAbort(Collection<FlowFileRecord> flowFiles) {
        if (flowFiles == null || flowFiles.isEmpty()) {
            return;
        }
        this.adjustSize(-flowFiles.size(), -flowFiles.stream().mapToLong(FlowFile::getSize).sum());
    }

    public boolean isLocalPartitionFull() {
        return this.isFull(this.localPartition.size());
    }

    private QueuePartition getPartition(FlowFileRecord flowFile) {
        QueuePartition queuePartition = this.partitioner.getPartition(flowFile, this.queuePartitions, this.localPartition);
        logger.debug("{} Assigning {} to Partition: {}", new Object[]{this, flowFile, queuePartition});
        return queuePartition;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setNodeIdentifiers(Set<NodeIdentifier> updatedNodeIdentifiers, boolean forceUpdate) {
        this.partitionWriteLock.lock();
        try {
            if (!forceUpdate && this.nodeIdentifiers.equals(updatedNodeIdentifiers)) {
                logger.debug("{} Not going to rebalance Queue even though setNodeIdentifiers was called, because the new set of Node Identifiers is the same as the existing set", (Object)this);
                return;
            }
            logger.debug("{} Stopping the {} queue partitions in order to change node identifiers from {} to {}", new Object[]{this, this.queuePartitions.length, this.nodeIdentifiers, updatedNodeIdentifiers});
            for (QueuePartition queuePartition : this.queuePartitions) {
                queuePartition.stop();
            }
            TreeSet<NodeIdentifier> removedNodeIds = new TreeSet<NodeIdentifier>(loadBalanceEndpointComparator);
            removedNodeIds.addAll(this.nodeIdentifiers);
            removedNodeIds.removeAll(updatedNodeIdentifiers);
            logger.debug("{} The following Node Identifiers were removed from the cluster: {}", (Object)this, removedNodeIds);
            Function<NodeIdentifier, String> mapKeyTransform = nodeId -> nodeId.getLoadBalanceAddress() + ":" + nodeId.getLoadBalancePort();
            HashMap partitionMap = new HashMap();
            for (QueuePartition partition : this.queuePartitions) {
                Optional<NodeIdentifier> nodeIdOption = partition.getNodeIdentifier();
                nodeIdOption.ifPresent(nodeIdentifier -> partitionMap.put(mapKeyTransform.apply((NodeIdentifier)nodeIdentifier), partition));
            }
            ArrayList<NodeIdentifier> arrayList = new ArrayList<NodeIdentifier>(updatedNodeIdentifiers);
            arrayList.sort(Comparator.comparing(nodeId -> nodeId.getApiAddress() + ":" + nodeId.getApiPort()));
            QueuePartition[] updatedQueuePartitions = arrayList.isEmpty() ? new QueuePartition[]{this.localPartition} : new QueuePartition[arrayList.size()];
            boolean localPartitionIncluded = false;
            for (int i = 0; i < arrayList.size(); ++i) {
                NodeIdentifier nodeId2 = (NodeIdentifier)arrayList.get(i);
                String nodeIdMapKey = mapKeyTransform.apply(nodeId2);
                if (nodeId2.equals((Object)this.clusterCoordinator.getLocalNodeIdentifier())) {
                    updatedQueuePartitions[i] = this.localPartition;
                    localPartitionIncluded = true;
                    QueuePartition queuePartition = (QueuePartition)partitionMap.get(nodeIdMapKey);
                    if (queuePartition == null || queuePartition == this.localPartition) continue;
                    FlowFileQueueContents partitionContents = queuePartition.packageForRebalance(this.localPartition.getSwapPartitionName());
                    logger.debug("Transferred data from {} to {}", (Object)queuePartition, (Object)this.localPartition);
                    this.localPartition.inheritQueueContents(partitionContents);
                    continue;
                }
                QueuePartition queuePartition = (QueuePartition)partitionMap.get(nodeIdMapKey);
                updatedQueuePartitions[i] = queuePartition == null ? this.createRemotePartition(nodeId2) : queuePartition;
            }
            if (!localPartitionIncluded) {
                QueuePartition[] withLocal = new QueuePartition[updatedQueuePartitions.length + 1];
                System.arraycopy(updatedQueuePartitions, 0, withLocal, 0, updatedQueuePartitions.length);
                withLocal[withLocal.length - 1] = this.localPartition;
                updatedQueuePartitions = withLocal;
            }
            if (this.partitioner.isRebalanceOnClusterResize()) {
                for (QueuePartition queuePartition : this.queuePartitions) {
                    logger.debug("Rebalancing {}", (Object)queuePartition);
                    this.rebalance(queuePartition);
                }
            } else {
                for (NodeIdentifier removedNodeId : removedNodeIds) {
                    String removedNodeMapKey = mapKeyTransform.apply(removedNodeId);
                    QueuePartition queuePartition = (QueuePartition)partitionMap.get(removedNodeMapKey);
                    if (queuePartition == null) continue;
                    logger.debug("Rebalancing {}", (Object)queuePartition);
                    this.rebalance(queuePartition);
                }
            }
            for (NodeIdentifier removedNodeId : removedNodeIds) {
                String removedNodeMapKey = mapKeyTransform.apply(removedNodeId);
                QueuePartition queuePartition = (QueuePartition)partitionMap.get(removedNodeMapKey);
                if (!(queuePartition instanceof RemoteQueuePartition)) continue;
                ((RemoteQueuePartition)queuePartition).onRemoved();
            }
            this.nodeIdentifiers.clear();
            this.nodeIdentifiers.addAll(updatedNodeIdentifiers);
            this.queuePartitions = updatedQueuePartitions;
            logger.debug("{} Restarting the {} queue partitions now that node identifiers have been updated", (Object)this, (Object)this.queuePartitions.length);
            if (!this.stopped) {
                for (QueuePartition queuePartition : updatedQueuePartitions) {
                    queuePartition.start(this.partitioner);
                }
            }
        }
        finally {
            this.partitionWriteLock.unlock();
        }
    }

    protected void rebalance(QueuePartition partition) {
        logger.debug("Rebalancing Partition {}", (Object)partition);
        FlowFileQueueContents contents = partition.packageForRebalance(this.rebalancingPartition.getSwapPartitionName());
        this.rebalancingPartition.rebalance(contents);
    }

    public void put(FlowFileRecord flowFile) {
        this.putAndGetPartition(flowFile);
    }

    protected QueuePartition putAndGetPartition(FlowFileRecord flowFile) {
        QueuePartition partition;
        this.partitionReadLock.lock();
        try {
            this.adjustSize(1, flowFile.getSize());
            partition = this.getPartition(flowFile);
            partition.put(flowFile);
        }
        finally {
            this.partitionReadLock.unlock();
        }
        this.eventListener.triggerDestinationEvent();
        return partition;
    }

    public void receiveFromPeer(Collection<FlowFileRecord> flowFiles) throws IllegalClusterStateException {
        this.partitionReadLock.lock();
        try {
            if (this.offloaded) {
                throw new IllegalClusterStateException("Node cannot accept data from load-balanced connection because it is in the process of offloading");
            }
            if (!this.clusterCoordinator.isConnected()) {
                throw new IllegalClusterStateException("Node cannot accept data from load-balanced connection because it is not connected to cluster");
            }
            if (this.partitioner.isRebalanceOnClusterResize()) {
                logger.debug("Received the following FlowFiles from Peer: {}. Will re-partition FlowFiles to ensure proper balancing across the cluster.", flowFiles);
                this.putAll(flowFiles);
            } else {
                logger.debug("Received the following FlowFiles from Peer: {}. Will accept FlowFiles to the local partition", flowFiles);
                this.adjustSize(flowFiles.size(), flowFiles.stream().mapToLong(FlowFile::getSize).sum());
                this.localPartition.putAll(flowFiles);
            }
        }
        finally {
            this.partitionReadLock.unlock();
        }
    }

    public void putAll(Collection<FlowFileRecord> flowFiles) {
        this.putAllAndGetPartitions(flowFiles);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Map<QueuePartition, List<FlowFileRecord>> putAllAndGetPartitions(Collection<FlowFileRecord> flowFiles) {
        this.partitionReadLock.lock();
        try {
            Map<QueuePartition, List<FlowFileRecord>> partitionMap;
            long bytes = flowFiles.stream().mapToLong(FlowFile::getSize).sum();
            this.adjustSize(flowFiles.size(), bytes);
            Map<QueuePartition, List<FlowFileRecord>> map = partitionMap = this.distributeToPartitionsAndGet(flowFiles);
            return map;
        }
        finally {
            this.partitionReadLock.unlock();
            this.eventListener.triggerDestinationEvent();
        }
    }

    public void distributeToPartitions(Collection<FlowFileRecord> flowFiles) {
        this.distributeToPartitionsAndGet(flowFiles);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<QueuePartition, List<FlowFileRecord>> distributeToPartitionsAndGet(Collection<FlowFileRecord> flowFiles) {
        Map<QueuePartition, List<FlowFileRecord>> partitionMap;
        if (flowFiles == null || flowFiles.isEmpty()) {
            return Collections.emptyMap();
        }
        this.partitionReadLock.lock();
        try {
            if (this.partitioner.isPartitionStatic()) {
                QueuePartition partition = this.getPartition(flowFiles.iterator().next());
                partition.putAll(flowFiles);
                List<Object> flowFileList = flowFiles instanceof List ? (List<Object>)flowFiles : new ArrayList<FlowFileRecord>(flowFiles);
                Map<QueuePartition, List<FlowFileRecord>> partitionMap2 = Collections.singletonMap(partition, flowFileList);
                logger.debug("Partitioner {} is static so Partitioned FlowFiles as: {}", (Object)this.partitioner, partitionMap2);
                Map<QueuePartition, List<FlowFileRecord>> map = partitionMap2;
                return map;
            }
            partitionMap = flowFiles.stream().collect(Collectors.groupingBy(this::getPartition));
            logger.debug("Partitioned FlowFiles as: {}", partitionMap);
            for (Map.Entry<QueuePartition, List<FlowFileRecord>> entry : partitionMap.entrySet()) {
                QueuePartition partition = entry.getKey();
                List<FlowFileRecord> flowFilesForPartition = entry.getValue();
                partition.putAll(flowFilesForPartition);
            }
        }
        finally {
            this.partitionReadLock.unlock();
        }
        return partitionMap;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void setFlowFilePartitioner(FlowFilePartitioner partitioner) {
        this.partitionWriteLock.lock();
        try {
            if (this.partitioner.equals(partitioner)) {
                return;
            }
            this.partitioner = partitioner;
            for (QueuePartition partition : this.queuePartitions) {
                this.rebalance(partition);
            }
        }
        finally {
            this.partitionWriteLock.unlock();
        }
    }

    public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords, PollStrategy pollStrategy) {
        FlowFileRecord flowFile = this.localPartition.poll(expiredRecords, pollStrategy);
        this.onAbort(expiredRecords);
        return flowFile;
    }

    public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords, PollStrategy pollStrategy) {
        List<FlowFileRecord> flowFiles = this.localPartition.poll(maxResults, expiredRecords, pollStrategy);
        this.onAbort(expiredRecords);
        return flowFiles;
    }

    public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords, PollStrategy pollStrategy) {
        List<FlowFileRecord> flowFiles = this.localPartition.poll(filter, expiredRecords, pollStrategy);
        this.onAbort(expiredRecords);
        return flowFiles;
    }

    public void acknowledge(FlowFileRecord flowFile) {
        this.localPartition.acknowledge(flowFile);
        this.adjustSize(-1, -flowFile.getSize());
        this.eventListener.triggerSourceEvent();
    }

    public void acknowledge(Collection<FlowFileRecord> flowFiles) {
        this.localPartition.acknowledge(flowFiles);
        if (!flowFiles.isEmpty()) {
            long bytes = flowFiles.stream().mapToLong(FlowFile::getSize).sum();
            this.adjustSize(-flowFiles.size(), -bytes);
        }
        this.eventListener.triggerSourceEvent();
    }

    public boolean isUnacknowledgedFlowFile() {
        return this.localPartition.isUnacknowledgedFlowFile();
    }

    public FlowFileRecord getFlowFile(String flowFileUuid) throws IOException {
        return this.localPartition.getFlowFile(flowFileUuid);
    }

    public boolean isPropagateBackpressureAcrossNodes() {
        return !this.offloaded;
    }

    public void handleExpiredRecords(Collection<FlowFileRecord> expired) {
        if (expired == null || expired.isEmpty()) {
            return;
        }
        logger.info("{} {} FlowFiles have expired and will be removed", new Object[]{this, expired.size()});
        ArrayList<StandardRepositoryRecord> expiredRecords = new ArrayList<StandardRepositoryRecord>(expired.size());
        ArrayList<ProvenanceEventRecord> provenanceEvents = new ArrayList<ProvenanceEventRecord>(expired.size());
        for (FlowFileRecord flowFile : expired) {
            StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue)this, flowFile);
            record.markForDelete();
            expiredRecords.add(record);
            ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder().fromFlowFile((FlowFile)flowFile).setEventType(ProvenanceEventType.EXPIRE).setDetails("Expiration Threshold = " + this.getFlowFileExpiration()).setComponentType("Load-Balanced Connection").setComponentId(this.getIdentifier()).setEventTime(System.currentTimeMillis());
            ContentClaim contentClaim = flowFile.getContentClaim();
            if (contentClaim != null) {
                ResourceClaim resourceClaim = contentClaim.getResourceClaim();
                builder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(contentClaim.getOffset() + flowFile.getContentClaimOffset()), flowFile.getSize());
                builder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(contentClaim.getOffset() + flowFile.getContentClaimOffset()), flowFile.getSize());
            }
            ProvenanceEventRecord provenanceEvent = builder.build();
            provenanceEvents.add(provenanceEvent);
            long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
            logger.debug("{} terminated due to FlowFile expiration; life of FlowFile = {} ms", new Object[]{flowFile, flowFileLife});
        }
        try {
            this.flowFileRepo.updateRepository(expiredRecords);
            this.provRepo.registerEvents(provenanceEvents);
            this.adjustSize(-expired.size(), -expired.stream().mapToLong(FlowFile::getSize).sum());
        }
        catch (IOException e) {
            logger.warn("Encountered {} expired FlowFiles but failed to update FlowFile Repository. This FlowFiles may re-appear in the queue after NiFi is restarted and will be expired again at that point.", (Object)expiredRecords.size(), (Object)e);
        }
    }

    @Override
    protected List<FlowFileRecord> getListableFlowFiles() {
        return this.localPartition.getListableFlowFiles();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void dropFlowFiles(DropFlowFileRequest dropRequest, String requestor) {
        this.partitionReadLock.lock();
        try {
            dropRequest.setOriginalSize(this.size());
            dropRequest.setState(DropFlowFileState.DROPPING_FLOWFILES);
            int droppedCount = 0;
            long droppedBytes = 0L;
            try {
                for (QueuePartition partition : this.queuePartitions) {
                    DropFlowFileRequest partitionRequest = new DropFlowFileRequest(dropRequest.getRequestIdentifier() + "-" + this.localPartition.getNodeIdentifier());
                    partition.dropFlowFiles(partitionRequest, requestor);
                    this.adjustSize(-partitionRequest.getDroppedSize().getObjectCount(), -partitionRequest.getDroppedSize().getByteCount());
                    dropRequest.setDroppedSize(new QueueSize(dropRequest.getDroppedSize().getObjectCount() + partitionRequest.getDroppedSize().getObjectCount(), dropRequest.getDroppedSize().getByteCount() + partitionRequest.getDroppedSize().getByteCount()));
                    dropRequest.setDroppedSize(new QueueSize(droppedCount += partitionRequest.getDroppedSize().getObjectCount(), droppedBytes += partitionRequest.getDroppedSize().getByteCount()));
                    dropRequest.setCurrentSize(this.size());
                    if (partitionRequest.getState() == DropFlowFileState.CANCELED) {
                        dropRequest.cancel();
                        break;
                    }
                    if (partitionRequest.getState() != DropFlowFileState.FAILURE) continue;
                    dropRequest.setState(DropFlowFileState.FAILURE, partitionRequest.getFailureReason());
                    break;
                }
                if (dropRequest.getState() == DropFlowFileState.DROPPING_FLOWFILES) {
                    dropRequest.setState(DropFlowFileState.COMPLETE);
                }
            }
            catch (Exception e) {
                logger.error("Failed to drop FlowFiles for {}", (Object)this, (Object)e);
                dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.getMessage() + ". See log for more details.");
            }
        }
        finally {
            this.partitionReadLock.unlock();
        }
    }

    public void lock() {
        this.partitionReadLock.lock();
    }

    public void unlock() {
        this.partitionReadLock.unlock();
    }

    public String toString() {
        return "FlowFileQueue[id=" + this.getIdentifier() + ", Load Balance Strategy=" + this.getLoadBalanceStrategy() + ", size=" + this.size() + "]";
    }

    private class ClusterEventListener
    implements ClusterTopologyEventListener {
        private ClusterEventListener() {
        }

        public void onNodeAdded(NodeIdentifier nodeId) {
            SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.lock();
            try {
                if (SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers.contains(nodeId)) {
                    logger.debug("Node Identifier {} added to cluster but already known in set: {}", (Object)nodeId, (Object)SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers);
                    return;
                }
                HashSet<NodeIdentifier> updatedNodeIds = new HashSet<NodeIdentifier>(SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers);
                updatedNodeIds.removeIf(id -> id.getId().equals(nodeId.getId()));
                updatedNodeIds.add(nodeId);
                logger.debug("Node Identifier {} added to cluster. Node ID's changing from {} to {}", new Object[]{nodeId, SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers, updatedNodeIds});
                SocketLoadBalancedFlowFileQueue.this.setNodeIdentifiers(updatedNodeIds, false);
            }
            finally {
                SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onNodeRemoved(NodeIdentifier nodeId) {
            SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.lock();
            try {
                HashSet<NodeIdentifier> updatedNodeIds = new HashSet<NodeIdentifier>(SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers);
                boolean removed = updatedNodeIds.remove(nodeId);
                if (!removed) {
                    return;
                }
                logger.debug("Node Identifier {} removed from cluster. Node ID's changing from {} to {}", new Object[]{nodeId, SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers, updatedNodeIds});
                SocketLoadBalancedFlowFileQueue.this.setNodeIdentifiers(updatedNodeIds, false);
            }
            finally {
                SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onLocalNodeIdentifierSet(NodeIdentifier localNodeId) {
            SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.lock();
            try {
                if (localNodeId == null) {
                    return;
                }
                if (!SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers.contains(localNodeId)) {
                    HashSet<NodeIdentifier> updatedNodeIds = new HashSet<NodeIdentifier>(SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers);
                    updatedNodeIds.add(localNodeId);
                    logger.debug("Local Node Identifier has now been determined to be {}. Adding to set of Node Identifiers for {}", (Object)localNodeId, (Object)SocketLoadBalancedFlowFileQueue.this);
                    SocketLoadBalancedFlowFileQueue.this.setNodeIdentifiers(updatedNodeIds, false);
                }
                logger.debug("Local Node Identifier set to {}; current partitions = {}", (Object)localNodeId, (Object)SocketLoadBalancedFlowFileQueue.this.queuePartitions);
                for (QueuePartition partition : SocketLoadBalancedFlowFileQueue.this.queuePartitions) {
                    NodeIdentifier nodeIdentifier;
                    Optional<NodeIdentifier> nodeIdentifierOption = partition.getNodeIdentifier();
                    if (!nodeIdentifierOption.isPresent() || !(nodeIdentifier = nodeIdentifierOption.get()).equals((Object)localNodeId)) continue;
                    if (partition instanceof LocalQueuePartition) {
                        logger.debug("{} Local Node Identifier set to {} and QueuePartition with this identifier is already a Local Queue Partition", (Object)SocketLoadBalancedFlowFileQueue.this, (Object)localNodeId);
                        break;
                    }
                    logger.debug("{} Local Node Identifier set to {} and found Queue Partition {} with that Node Identifier. Will force update of partitions", new Object[]{SocketLoadBalancedFlowFileQueue.this, localNodeId, partition});
                    HashSet<NodeIdentifier> updatedNodeIds = new HashSet<NodeIdentifier>(SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers);
                    updatedNodeIds.add(localNodeId);
                    SocketLoadBalancedFlowFileQueue.this.setNodeIdentifiers(updatedNodeIds, true);
                    return;
                }
                logger.debug("{} Local Node Identifier set to {} but found no Queue Partition with that Node Identifier.", (Object)SocketLoadBalancedFlowFileQueue.this, (Object)localNodeId);
            }
            finally {
                SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.unlock();
            }
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public void onNodeStateChange(NodeIdentifier nodeId, NodeConnectionState newState) {
            SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.lock();
            try {
                if (!SocketLoadBalancedFlowFileQueue.this.offloaded) {
                    switch (newState) {
                        case OFFLOADING: {
                            this.onNodeRemoved(nodeId);
                            return;
                        }
                        case CONNECTED: {
                            this.onNodeAdded(nodeId);
                            return;
                        }
                    }
                    return;
                }
                switch (newState) {
                    case CONNECTED: {
                        if (nodeId == null) return;
                        if (!nodeId.equals((Object)SocketLoadBalancedFlowFileQueue.this.clusterCoordinator.getLocalNodeIdentifier())) return;
                        SocketLoadBalancedFlowFileQueue.this.resetOffloadedQueue();
                        return;
                    }
                    case OFFLOADING: 
                    case OFFLOADED: 
                    case DISCONNECTED: 
                    case DISCONNECTING: {
                        this.onNodeRemoved(nodeId);
                        return;
                    }
                }
                return;
            }
            finally {
                SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.unlock();
            }
        }
    }
}

