/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.queue.clustered.partition;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
import org.apache.nifi.controller.queue.StandardRemoteQueuePartitionDiagnostics;
import org.apache.nifi.controller.queue.SwappablePriorityQueue;
import org.apache.nifi.controller.queue.clustered.TransferFailureDestination;
import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
import org.apache.nifi.controller.queue.clustered.client.async.TransactionCompleteCallback;
import org.apache.nifi.controller.queue.clustered.client.async.TransactionFailureCallback;
import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
import org.apache.nifi.controller.queue.clustered.partition.QueuePartition;
import org.apache.nifi.controller.repository.ContentNotFoundException;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.RepositoryRecord;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteQueuePartition
implements QueuePartition {
    private static final Logger logger = LoggerFactory.getLogger(RemoteQueuePartition.class);
    private final NodeIdentifier nodeIdentifier;
    private final SwappablePriorityQueue priorityQueue;
    private final LoadBalancedFlowFileQueue flowFileQueue;
    private final TransferFailureDestination failureDestination;
    private final FlowFileRepository flowFileRepo;
    private final ProvenanceEventRepository provRepo;
    private final ContentRepository contentRepo;
    private final AsyncLoadBalanceClientRegistry clientRegistry;
    private boolean running = false;
    private final String description;

    public RemoteQueuePartition(NodeIdentifier nodeId, SwappablePriorityQueue priorityQueue, TransferFailureDestination failureDestination, FlowFileRepository flowFileRepo, ProvenanceEventRepository provRepo, ContentRepository contentRepository, AsyncLoadBalanceClientRegistry clientRegistry, LoadBalancedFlowFileQueue flowFileQueue) {
        this.nodeIdentifier = nodeId;
        this.priorityQueue = priorityQueue;
        this.flowFileQueue = flowFileQueue;
        this.failureDestination = failureDestination;
        this.flowFileRepo = flowFileRepo;
        this.provRepo = provRepo;
        this.contentRepo = contentRepository;
        this.clientRegistry = clientRegistry;
        this.description = "RemoteQueuePartition[queueId=" + flowFileQueue.getIdentifier() + ", nodeId=" + this.nodeIdentifier + "]";
    }

    @Override
    public QueueSize size() {
        return this.priorityQueue.size();
    }

    @Override
    public long getTotalActiveQueuedDuration(long fromTimestamp) {
        return this.priorityQueue.getTotalQueuedDuration(fromTimestamp);
    }

    @Override
    public long getMinLastQueueDate() {
        return this.priorityQueue.getMinLastQueueDate();
    }

    @Override
    public String getSwapPartitionName() {
        return this.nodeIdentifier.getId();
    }

    @Override
    public Optional<NodeIdentifier> getNodeIdentifier() {
        return Optional.ofNullable(this.nodeIdentifier);
    }

    @Override
    public void put(FlowFileRecord flowFile) {
        this.priorityQueue.put(flowFile);
    }

    @Override
    public void putAll(Collection<FlowFileRecord> flowFiles) {
        this.priorityQueue.putAll(flowFiles);
    }

    @Override
    public void dropFlowFiles(DropFlowFileRequest dropRequest, String requestor) {
        this.priorityQueue.dropFlowFiles(dropRequest, requestor);
    }

    @Override
    public SwapSummary recoverSwappedFlowFiles() {
        return this.priorityQueue.recoverSwappedFlowFiles();
    }

    @Override
    public FlowFileQueueContents packageForRebalance(String newPartitionName) {
        return this.priorityQueue.packageForRebalance(newPartitionName);
    }

    @Override
    public void setPriorities(List<FlowFilePrioritizer> newPriorities) {
        this.priorityQueue.setPriorities(newPriorities);
    }

    private FlowFileRecord getFlowFile() {
        HashSet<FlowFileRecord> expired = new HashSet<FlowFileRecord>();
        FlowFileRecord flowFile = this.priorityQueue.poll(expired, this.flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS));
        this.flowFileQueue.handleExpiredRecords(expired);
        return flowFile;
    }

    @Override
    public synchronized void start(final FlowFilePartitioner partitioner) {
        if (this.running) {
            return;
        }
        TransactionFailureCallback failureCallback = new TransactionFailureCallback(){

            @Override
            public void onTransactionFailed(List<FlowFileRecord> flowFiles, Exception cause, TransactionFailureCallback.TransactionPhase phase) {
                Optional optionalFlowFile;
                RemoteQueuePartition.this.priorityQueue.acknowledge(flowFiles);
                if (cause instanceof ContentNotFoundException && (optionalFlowFile = ((ContentNotFoundException)((Object)cause)).getFlowFile()).isPresent()) {
                    ArrayList<FlowFileRecord> successfulFlowFiles = new ArrayList<FlowFileRecord>(flowFiles);
                    FlowFileRecord flowFile = (FlowFileRecord)optionalFlowFile.get();
                    successfulFlowFiles.remove(flowFile);
                    StandardRepositoryRecord repoRecord = new StandardRepositoryRecord((FlowFileQueue)RemoteQueuePartition.this.flowFileQueue, flowFile);
                    repoRecord.markForAbort();
                    RemoteQueuePartition.this.updateRepositories(Collections.emptyList(), Collections.singleton(repoRecord), null);
                    if (phase == TransactionFailureCallback.TransactionPhase.CONNECTING) {
                        RemoteQueuePartition.this.failureDestination.putAll(RemoteQueuePartition.this.priorityQueue::packageForRebalance, partitioner);
                    }
                    RemoteQueuePartition.this.failureDestination.putAll(successfulFlowFiles, partitioner);
                    RemoteQueuePartition.this.flowFileQueue.onTransfer(Collections.singleton(flowFile));
                    return;
                }
                if (phase == TransactionFailureCallback.TransactionPhase.CONNECTING) {
                    RemoteQueuePartition.this.failureDestination.putAll(RemoteQueuePartition.this.priorityQueue::packageForRebalance, partitioner);
                }
                RemoteQueuePartition.this.failureDestination.putAll(flowFiles, partitioner);
            }

            @Override
            public boolean isRebalanceOnFailure() {
                return RemoteQueuePartition.this.failureDestination.isRebalanceOnFailure(partitioner);
            }
        };
        TransactionCompleteCallback successCallback = new TransactionCompleteCallback(){

            @Override
            public void onTransactionComplete(List<FlowFileRecord> flowFilesSent, NodeIdentifier nodeIdentifier) {
                RemoteQueuePartition.this.priorityQueue.acknowledge(flowFilesSent);
                RemoteQueuePartition.this.flowFileQueue.onTransfer(flowFilesSent);
                RemoteQueuePartition.this.updateRepositories(flowFilesSent, Collections.emptyList(), nodeIdentifier);
            }
        };
        BooleanSupplier emptySupplier = this::isQueueEmpty;
        this.clientRegistry.register(this.flowFileQueue.getIdentifier(), this.nodeIdentifier, emptySupplier, this::getFlowFile, failureCallback, successCallback, () -> ((LoadBalancedFlowFileQueue)this.flowFileQueue).getLoadBalanceCompression(), () -> ((LoadBalancedFlowFileQueue)this.flowFileQueue).isPropagateBackpressureAcrossNodes());
        this.running = true;
    }

    private boolean isQueueEmpty() {
        return !this.priorityQueue.isFlowFileAvailable();
    }

    public void onRemoved() {
        this.clientRegistry.unregister(this.flowFileQueue.getIdentifier(), this.nodeIdentifier);
    }

    private void updateRepositories(List<FlowFileRecord> flowFilesSent, Collection<RepositoryRecord> abortedRecords, NodeIdentifier nodeIdentifier) {
        ArrayList<ProvenanceEventRecord> provenanceEvents = new ArrayList<ProvenanceEventRecord>(flowFilesSent.size() * 2 + abortedRecords.size());
        for (FlowFileRecord sent : flowFilesSent) {
            provenanceEvents.add(this.createSendEvent(sent, nodeIdentifier));
            provenanceEvents.add(this.createDropEvent(sent));
        }
        for (RepositoryRecord abortedRecord : abortedRecords) {
            FlowFileRecord abortedFlowFile = abortedRecord.getCurrent();
            provenanceEvents.add(this.createDropEvent(abortedFlowFile, "Content Not Found"));
        }
        this.provRepo.registerEvents(provenanceEvents);
        List flowFileRepoRecords = flowFilesSent.stream().map(this::createRepositoryRecord).collect(Collectors.toCollection(ArrayList::new));
        flowFileRepoRecords.addAll(abortedRecords);
        try {
            this.flowFileRepo.updateRepository((Collection)flowFileRepoRecords);
        }
        catch (Exception e) {
            logger.error("Unable to update FlowFile repository to indicate that {} FlowFiles have been transferred to {}. It is possible that these FlowFiles will be duplicated upon restart of NiFi.", new Object[]{flowFilesSent.size(), this.getNodeIdentifier(), e});
        }
    }

    private RepositoryRecord createRepositoryRecord(FlowFileRecord flowFile) {
        StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue)this.flowFileQueue, flowFile);
        record.markForDelete();
        return record;
    }

    private ProvenanceEventRecord createSendEvent(FlowFileRecord flowFile, NodeIdentifier nodeIdentifier) {
        ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder().fromFlowFile((FlowFile)flowFile).setEventType(ProvenanceEventType.SEND).setDetails("Re-distributed for Load-balanced connection").setComponentId(this.flowFileQueue.getIdentifier()).setComponentType("Connection").setSourceQueueIdentifier(this.flowFileQueue.getIdentifier()).setSourceSystemFlowFileIdentifier(flowFile.getAttribute(CoreAttributes.UUID.key())).setTransitUri("nifi://" + nodeIdentifier.getApiAddress() + "/loadbalance/" + this.flowFileQueue.getIdentifier());
        ContentClaim contentClaim = flowFile.getContentClaim();
        if (contentClaim != null) {
            ResourceClaim resourceClaim = contentClaim.getResourceClaim();
            builder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(contentClaim.getOffset() + flowFile.getContentClaimOffset()), flowFile.getSize());
            builder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(contentClaim.getOffset() + flowFile.getContentClaimOffset()), flowFile.getSize());
        }
        ProvenanceEventRecord sendEvent = builder.build();
        return sendEvent;
    }

    private ProvenanceEventRecord createDropEvent(FlowFileRecord flowFile) {
        return this.createDropEvent(flowFile, null);
    }

    private ProvenanceEventRecord createDropEvent(FlowFileRecord flowFile, String details) {
        ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder().fromFlowFile((FlowFile)flowFile).setEventType(ProvenanceEventType.DROP).setDetails(details).setComponentId(this.flowFileQueue.getIdentifier()).setComponentType("Connection").setSourceQueueIdentifier(this.flowFileQueue.getIdentifier());
        ContentClaim contentClaim = flowFile.getContentClaim();
        if (contentClaim != null) {
            ResourceClaim resourceClaim = contentClaim.getResourceClaim();
            builder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(contentClaim.getOffset() + flowFile.getContentClaimOffset()), flowFile.getSize());
            builder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(contentClaim.getOffset() + flowFile.getContentClaimOffset()), flowFile.getSize());
        }
        ProvenanceEventRecord dropEvent = builder.build();
        return dropEvent;
    }

    @Override
    public synchronized void stop() {
        this.running = false;
        this.clientRegistry.unregister(this.flowFileQueue.getIdentifier(), this.nodeIdentifier);
    }

    public RemoteQueuePartitionDiagnostics getDiagnostics() {
        return new StandardRemoteQueuePartitionDiagnostics(this.nodeIdentifier.toString(), this.priorityQueue.getFlowFileQueueSize());
    }

    public String toString() {
        return this.description;
    }
}

