/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.queue.clustered.partition;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.queue.BlockingSwappablePriorityQueue;
import org.apache.nifi.controller.queue.DropFlowFileAction;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
import org.apache.nifi.controller.queue.clustered.partition.RebalancingPartition;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFilePrioritizer;

public class StandardRebalancingPartition
implements RebalancingPartition {
    private final String SWAP_PARTITION_NAME = "rebalance";
    private final String queueIdentifier;
    private final BlockingSwappablePriorityQueue queue;
    private final LoadBalancedFlowFileQueue flowFileQueue;
    private final String description;
    private volatile boolean stopped = true;
    private RebalanceTask rebalanceTask;

    public StandardRebalancingPartition(FlowFileSwapManager swapManager, int swapThreshold, EventReporter eventReporter, LoadBalancedFlowFileQueue flowFileQueue, DropFlowFileAction dropAction) {
        this.queue = new BlockingSwappablePriorityQueue(swapManager, swapThreshold, eventReporter, (FlowFileQueue)flowFileQueue, dropAction, "rebalance");
        this.queueIdentifier = flowFileQueue.getIdentifier();
        this.flowFileQueue = flowFileQueue;
        this.description = "RebalancingPartition[queueId=" + this.queueIdentifier + "]";
    }

    @Override
    public Optional<NodeIdentifier> getNodeIdentifier() {
        return Optional.empty();
    }

    @Override
    public QueueSize size() {
        return this.queue.size();
    }

    @Override
    public long getTotalActiveQueuedDuration(long fromTimestamp) {
        return this.queue.getTotalQueuedDuration(fromTimestamp);
    }

    @Override
    public long getMinLastQueueDate() {
        return this.queue.getMinLastQueueDate();
    }

    @Override
    public SwapSummary recoverSwappedFlowFiles() {
        return this.queue.recoverSwappedFlowFiles();
    }

    @Override
    public String getSwapPartitionName() {
        return "rebalance";
    }

    @Override
    public void put(FlowFileRecord flowFile) {
        this.queue.put(flowFile);
    }

    @Override
    public void putAll(Collection<FlowFileRecord> flowFiles) {
        this.queue.putAll(flowFiles);
    }

    @Override
    public void dropFlowFiles(DropFlowFileRequest dropRequest, String requestor) {
        this.queue.dropFlowFiles(dropRequest, requestor);
    }

    @Override
    public void setPriorities(List<FlowFilePrioritizer> newPriorities) {
        this.queue.setPriorities(newPriorities);
    }

    @Override
    public synchronized void start(FlowFilePartitioner partitionerUsed) {
        this.stopped = false;
        this.rebalanceFromQueue();
    }

    @Override
    public synchronized void stop() {
        this.stopped = true;
        if (this.rebalanceTask != null) {
            this.rebalanceTask.stop();
        }
        this.rebalanceTask = null;
    }

    private synchronized void rebalanceFromQueue() {
        if (this.stopped) {
            return;
        }
        if (this.rebalanceTask != null) {
            return;
        }
        this.rebalanceTask = new RebalanceTask();
        Thread rebalanceThread = new Thread(this.rebalanceTask);
        rebalanceThread.setName("Rebalance queued data for Connection " + this.queueIdentifier);
        rebalanceThread.start();
    }

    @Override
    public void rebalance(FlowFileQueueContents queueContents) {
        if (queueContents.getActiveFlowFiles().isEmpty() && queueContents.getSwapLocations().isEmpty()) {
            return;
        }
        this.queue.inheritQueueContents(queueContents);
        this.rebalanceFromQueue();
    }

    @Override
    public void rebalance(Collection<FlowFileRecord> flowFiles) {
        this.queue.putAll(flowFiles);
        this.rebalanceFromQueue();
    }

    @Override
    public FlowFileQueueContents packageForRebalance(String newPartitionName) {
        return this.queue.packageForRebalance(newPartitionName);
    }

    private synchronized boolean complete() {
        if (!this.queue.isEmpty()) {
            return false;
        }
        this.rebalanceTask = null;
        return true;
    }

    public String toString() {
        return this.description;
    }

    private class RebalanceTask
    implements Runnable {
        private volatile boolean stopped = false;
        private final Set<FlowFileRecord> expiredRecords = new HashSet<FlowFileRecord>();
        private final long pollWaitMillis = 100L;

        private RebalanceTask() {
        }

        public void stop() {
            this.stopped = true;
        }

        @Override
        public void run() {
            while (!this.stopped) {
                FlowFileRecord polled;
                this.expiredRecords.clear();
                try {
                    polled = StandardRebalancingPartition.this.queue.poll(this.expiredRecords, -1L, 100L, PollStrategy.ALL_FLOWFILES);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    continue;
                }
                if (polled == null) {
                    StandardRebalancingPartition.this.flowFileQueue.handleExpiredRecords(this.expiredRecords);
                    if (!StandardRebalancingPartition.this.complete()) continue;
                    return;
                }
                ArrayList<FlowFileRecord> toDistribute = new ArrayList<FlowFileRecord>();
                toDistribute.add(polled);
                List<FlowFileRecord> additionalRecords = StandardRebalancingPartition.this.queue.poll(999, this.expiredRecords, -1L, PollStrategy.ALL_FLOWFILES);
                toDistribute.addAll(additionalRecords);
                StandardRebalancingPartition.this.flowFileQueue.handleExpiredRecords(this.expiredRecords);
                StandardRebalancingPartition.this.flowFileQueue.distributeToPartitions(toDistribute);
                StandardRebalancingPartition.this.queue.acknowledge(toDistribute);
            }
        }
    }
}

