/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.tasks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.ConnectionUtils;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.RepositoryRecord;
import org.apache.nifi.controller.repository.RepositoryRecordType;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.StatelessGroupNode;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.exception.TerminatedTaskException;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.DataflowTriggerContext;
import org.apache.nifi.stateless.flow.FailurePortEncounteredException;
import org.apache.nifi.stateless.flow.FlowFileSupplier;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TriggerResult;

public class StatelessFlowTask {
    private static final Set<ProvenanceEventType> eventTypesToKeepOnFailure = EnumSet.of(ProvenanceEventType.SEND, ProvenanceEventType.REMOTE_INVOCATION);
    private final StatelessGroupNode statelessGroupNode;
    private final StatelessDataflow flow;
    private final FlowFileRepository nifiFlowFileRepository;
    private final ProvenanceEventRepository nifiProvenanceEventRepository;
    private final ContentRepository nifiContentRepository;
    private final FlowFileEventRepository flowFileEventRepository;
    private final ComponentLog logger;
    private final Map<String, Port> outputPorts;
    private final long timeoutMillis;
    private final boolean allowBatch;
    private Long maxProvenanceEventId;
    private List<ConnectionUtils.FlowFileCloneResult> cloneResults;
    private List<RepositoryRecord> outputRepositoryRecords;
    private List<ProvenanceEventRecord> cloneProvenanceEvents;

    private StatelessFlowTask(Builder builder) {
        this.statelessGroupNode = builder.statelessGroupNode;
        this.nifiFlowFileRepository = builder.flowFileRepository;
        this.nifiContentRepository = builder.contentRepository;
        this.nifiProvenanceEventRepository = builder.provenanceEventRepository;
        this.flowFileEventRepository = builder.flowFileEventRepository;
        this.flow = builder.statelessFlow;
        this.allowBatch = this.isAllowBatch(this.statelessGroupNode.getProcessGroup());
        this.logger = builder.logger;
        this.timeoutMillis = builder.timeoutMillis;
        ProcessGroup processGroup = this.statelessGroupNode.getProcessGroup();
        this.outputPorts = new HashMap<String, Port>();
        for (Port outputPort : processGroup.getOutputPorts()) {
            this.outputPorts.put(outputPort.getName(), outputPort);
        }
    }

    private boolean isAllowBatch(ProcessGroup group) {
        return group.findAllProcessors().stream().noneMatch(this::isPreventBatch);
    }

    private boolean isPreventBatch(ProcessorNode procNode) {
        if (procNode.isTriggeredSerially()) {
            return true;
        }
        if (procNode.hasIncomingConnection()) {
            return false;
        }
        return !this.isRunAsFastAsPossible(procNode);
    }

    private boolean isRunAsFastAsPossible(ProcessorNode procNode) {
        SchedulingStrategy schedulingStrategy = procNode.getSchedulingStrategy();
        if (schedulingStrategy != SchedulingStrategy.TIMER_DRIVEN) {
            return false;
        }
        return procNode.getSchedulingPeriod(TimeUnit.NANOSECONDS) <= 1L;
    }

    public void shutdown() {
        this.flow.shutdown(false, true);
    }

    private boolean isAbort() {
        ScheduledState desiredState = this.statelessGroupNode.getDesiredState();
        return desiredState != ScheduledState.RUNNING && desiredState != ScheduledState.RUN_ONCE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public synchronized void trigger() {
        Invocation invocation;
        long startTime = System.currentTimeMillis();
        long endTime = startTime + 100L;
        if (this.allowBatch) {
            this.logger.debug("Will run in batch mode for 100 milliseconds until {}", new Object[]{endTime});
        }
        ArrayList<Invocation> allInvocations = new ArrayList<Invocation>();
        ArrayList<Invocation> successfulInvocations = new ArrayList<Invocation>();
        ProvenanceEventRepository statelessProvRepo = this.flow.getProvenanceRepository();
        this.maxProvenanceEventId = statelessProvRepo.getMaxEventId();
        try {
            for (int invocationCount = 0; (invocationCount == 0 || this.allowBatch) && System.currentTimeMillis() < endTime; ++invocationCount) {
                invocation = new Invocation();
                BridgingFlowFileSupplier flowFileSupplier = new BridgingFlowFileSupplier(invocation);
                StatelessFlowTaskTriggerContext triggerContext = new StatelessFlowTaskTriggerContext(flowFileSupplier);
                TriggerResult triggerResult = this.triggerFlow(triggerContext);
                invocation.setTriggerResult(triggerResult);
                allInvocations.add(invocation);
                if (triggerResult.isSuccessful()) {
                    successfulInvocations.add(invocation);
                    if (invocation.getPolledFlowFiles().size() <= 1) continue;
                    break;
                }
                this.logger.debug("Failed to trigger", (Throwable)triggerResult.getFailureCause().orElse(null));
                this.fail(invocation);
                break;
            }
            this.logger.debug("Finished triggering");
        }
        catch (Throwable throwable) {
            try {
                this.completeInvocations(successfulInvocations);
            }
            catch (Exception e) {
                this.logger.error("Failed to complete Stateless Flow", (Throwable)e);
                this.statelessGroupNode.yield();
                this.fail(successfulInvocations, (Throwable)e);
            }
            this.logger.debug("Acknowledging FlowFiles from {} invocations", new Object[]{allInvocations.size()});
            Iterator iterator = allInvocations.iterator();
            block7: while (true) {
                if (!iterator.hasNext()) {
                    throw throwable;
                }
                Invocation invocation2 = (Invocation)iterator.next();
                Iterator<PolledFlowFile> iterator2 = invocation2.getPolledFlowFiles().iterator();
                while (true) {
                    if (!iterator2.hasNext()) continue block7;
                    PolledFlowFile polledFlowFile = iterator2.next();
                    polledFlowFile.getOriginalQueue().acknowledge(polledFlowFile.getInputFlowFile());
                }
                break;
            }
        }
        try {
            this.completeInvocations(successfulInvocations);
        }
        catch (Exception e) {
            this.logger.error("Failed to complete Stateless Flow", (Throwable)e);
            this.statelessGroupNode.yield();
            this.fail(successfulInvocations, (Throwable)e);
        }
        this.logger.debug("Acknowledging FlowFiles from {} invocations", new Object[]{allInvocations.size()});
        Iterator iterator = allInvocations.iterator();
        block9: while (iterator.hasNext()) {
            invocation = (Invocation)iterator.next();
            Iterator<PolledFlowFile> iterator3 = invocation.getPolledFlowFiles().iterator();
            while (true) {
                if (!iterator3.hasNext()) continue block9;
                PolledFlowFile polledFlowFile = iterator3.next();
                polledFlowFile.getOriginalQueue().acknowledge(polledFlowFile.getInputFlowFile());
            }
            break;
        }
        return;
    }

    private void fail(List<Invocation> invocations, Throwable cause) {
        invocations.forEach(invocation -> this.fail((Invocation)invocation, cause));
    }

    private void fail(Invocation invocation) {
        Throwable cause = invocation.getTriggerResult().isCanceled() ? new TerminatedTaskException() : (Throwable)invocation.getTriggerResult().getFailureCause().orElse(null);
        this.fail(invocation, cause);
    }

    private void fail(Invocation invocation, Throwable cause) {
        Port destinationPort = this.getDestinationPort(cause);
        try {
            this.failInvocation(invocation, destinationPort, cause);
        }
        catch (Exception e) {
            if (cause != null) {
                cause.addSuppressed(e);
            }
            this.logger.error("Failed to trigger Stateless Flow and failed to properly handle failure", cause);
        }
    }

    private Port getDestinationPort(Throwable failureCause) {
        if (!(failureCause instanceof FailurePortEncounteredException)) {
            return null;
        }
        FailurePortEncounteredException fpee = (FailurePortEncounteredException)failureCause;
        Port port = this.outputPorts.get(fpee.getPortName());
        if (port == null) {
            this.logger.error("FlowFile was routed to Failure Port {} but no such port exists in the dataflow", new Object[]{fpee.getPortName()});
        }
        return port;
    }

    private TriggerResult triggerFlow(DataflowTriggerContext triggerContext) {
        DataflowTrigger trigger = this.flow.trigger(triggerContext);
        try {
            Optional optionalResult = trigger.getResult(this.timeoutMillis, TimeUnit.MILLISECONDS);
            if (optionalResult.isEmpty()) {
                trigger.cancel();
                return (TriggerResult)trigger.getResult(5L, TimeUnit.SECONDS).orElseThrow(() -> new ProcessException("Stateless Flow " + String.valueOf(this) + " timed out and failed to cancel within the allotted amount of time"));
            }
            return (TriggerResult)optionalResult.get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    private void completeInvocations(List<Invocation> invocations) throws IOException {
        this.logger.debug("Completing transactions from {} invocations", new Object[]{invocations.size()});
        if (invocations.isEmpty()) {
            return;
        }
        this.resetState();
        for (Invocation invocation : invocations) {
            this.validateDestinations(invocation.getTriggerResult());
        }
        for (Invocation invocation : invocations) {
            this.dropInputFlowFiles(invocation);
        }
        for (Invocation invocation : invocations) {
            this.createOutputRecords(invocation.getTriggerResult().getOutputFlowFiles());
        }
        this.updateClaimantCounts();
        try {
            this.updateFlowFileRepository();
        }
        catch (Exception e) {
            throw new IOException("Failed to update FlowFile Repository after triggering " + String.valueOf(this), e);
        }
        this.updateProvenanceRepository(event -> true);
        for (Invocation invocation : invocations) {
            this.acknowledge(invocation);
        }
        this.updateEventRepository(invocations);
        this.distributeFlowFiles();
    }

    void resetState() {
        this.cloneResults = new ArrayList<ConnectionUtils.FlowFileCloneResult>();
        this.outputRepositoryRecords = new ArrayList<RepositoryRecord>();
        this.cloneProvenanceEvents = new ArrayList<ProvenanceEventRecord>();
    }

    private void failInvocation(Invocation invocation, Port destinationPort, Throwable cause) throws IOException {
        List<PolledFlowFile> inputFlowFiles = invocation.getPolledFlowFiles();
        boolean stopped = false;
        if (cause instanceof TerminatedTaskException) {
            input = inputFlowFiles.isEmpty() ? "no input FlowFile" : inputFlowFiles.toString();
            ScheduledState desiredState = this.statelessGroupNode.getDesiredState();
            if (desiredState == ScheduledState.STOPPED) {
                this.logger.info("Stateless Flow canceled while running with input {}", new Object[]{input});
                stopped = true;
            } else {
                this.logger.error("Stateless Flow timed out while running with input {}", new Object[]{input});
            }
        } else {
            input = inputFlowFiles.isEmpty() ? "with no input FlowFile" : " for input " + String.valueOf(inputFlowFiles);
            this.logger.error("Failed to trigger Stateless Flow {}", new Object[]{input, cause});
        }
        this.resetState();
        if (!inputFlowFiles.isEmpty()) {
            if (destinationPort == null) {
                for (PolledFlowFile polledFlowFile : inputFlowFiles) {
                    if (stopped) {
                        polledFlowFile.getOriginalQueue().put(polledFlowFile.getInputFlowFile());
                        continue;
                    }
                    FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(polledFlowFile.getInputFlowFile()).penaltyExpirationTime(System.currentTimeMillis() + 30000L).build();
                    polledFlowFile.getOriginalQueue().put(newFile);
                }
            } else {
                this.dropInputFlowFiles(invocation);
                Map<String, List<FlowFile>> outputRecords = Collections.singletonMap(destinationPort.getName(), inputFlowFiles.stream().map(PolledFlowFile::getInputFlowFile).collect(Collectors.toList()));
                this.createOutputRecords(outputRecords);
                this.updateClaimantCounts();
            }
        }
        try {
            this.updateFlowFileRepository();
        }
        catch (Exception e) {
            throw new IOException("Failed to update FlowFile Repository after triggering " + String.valueOf(this), e);
        }
        this.updateProvenanceRepository(event -> eventTypesToKeepOnFailure.contains(event.getEventType()));
        this.abort(invocation, cause);
        this.updateEventRepository(Collections.singletonList(invocation));
        this.distributeFlowFiles();
    }

    private void validateDestinations(TriggerResult result) {
        Map outputFlowFiles = result.getOutputFlowFiles();
        for (Map.Entry entry : outputFlowFiles.entrySet()) {
            String portName = (String)entry.getKey();
            Port outputPort = this.outputPorts.get(portName);
            if (outputPort != null) continue;
            this.logger.error("Transferred FlowFile to Output Port {} but no port is known with that name", new Object[]{portName});
            throw new IllegalStateException("FlowFile was transferred to nonexistent Port " + portName);
        }
    }

    void dropInputFlowFiles(Invocation invocation) {
        for (PolledFlowFile polledFlowFile : invocation.getPolledFlowFiles()) {
            FlowFileRecord inputFlowFile = polledFlowFile.getInputFlowFile();
            if (inputFlowFile == null) continue;
            StandardRepositoryRecord repoRecord = new StandardRepositoryRecord(polledFlowFile.getOriginalQueue(), polledFlowFile.getInputFlowFile());
            repoRecord.markForDelete();
            this.outputRepositoryRecords.add((RepositoryRecord)repoRecord);
        }
    }

    List<RepositoryRecord> getOutputRepositoryRecords() {
        return this.outputRepositoryRecords;
    }

    public List<ConnectionUtils.FlowFileCloneResult> getCloneResults() {
        return this.cloneResults;
    }

    public List<ProvenanceEventRecord> getCloneProvenanceEvents() {
        return this.cloneProvenanceEvents;
    }

    void createOutputRecords(Map<String, List<FlowFile>> outputFlowFiles) {
        for (Map.Entry<String, List<FlowFile>> entry : outputFlowFiles.entrySet()) {
            String portName = entry.getKey();
            Port outputPort = this.outputPorts.get(portName);
            List<FlowFile> portFlowFiles = entry.getValue();
            Set outputConnections = outputPort.getConnections();
            for (FlowFileRecord flowFileRecord : portFlowFiles) {
                ConnectionUtils.FlowFileCloneResult cloneResult = ConnectionUtils.clone((FlowFileRecord)flowFileRecord, (Collection)outputConnections, (FlowFileRepository)this.nifiFlowFileRepository, null);
                this.cloneResults.add(cloneResult);
                List repoRecords = cloneResult.getRepositoryRecords();
                this.outputRepositoryRecords.addAll(repoRecords);
                this.createCloneProvenanceEvent(flowFileRecord, repoRecords, outputPort).ifPresent(this.cloneProvenanceEvents::add);
            }
        }
    }

    void updateProvenanceRepository(Predicate<ProvenanceEventRecord> eventFilter) {
        long firstProvEventId = this.maxProvenanceEventId == null ? 0L : this.maxProvenanceEventId + 1L;
        ProvenanceEventRepository statelessProvRepo = this.flow.getProvenanceRepository();
        if (!this.cloneProvenanceEvents.isEmpty()) {
            this.nifiProvenanceEventRepository.registerEvents(this.cloneProvenanceEvents);
        }
        block2: while (true) {
            try {
                while (true) {
                    List statelessProvEvents;
                    if ((statelessProvEvents = statelessProvRepo.getEvents(firstProvEventId, 1000)).isEmpty()) {
                        return;
                    }
                    ArrayList<StandardProvenanceEventRecord> provenanceEvents = new ArrayList<StandardProvenanceEventRecord>();
                    for (ProvenanceEventRecord eventRecord : statelessProvEvents) {
                        if (!eventFilter.test(eventRecord)) continue;
                        provenanceEvents.add(new StandardProvenanceEventRecord.Builder().fromEvent(eventRecord).build());
                    }
                    this.nifiProvenanceEventRepository.registerEvents(provenanceEvents);
                    if (provenanceEvents.size() != 1000) break block2;
                    firstProvEventId += 1000L;
                }
            }
            catch (IOException e) {
                this.logger.warn("Failed to obtain Provenance Events from Stateless Dataflow. These events will not be added to the NiFi Provenance Repository", (Throwable)e);
                continue;
            }
            break;
        }
    }

    void updateClaimantCounts() {
        for (RepositoryRecord outputRepoRecord : this.outputRepositoryRecords) {
            if (outputRepoRecord.getType() == RepositoryRecordType.DELETE) continue;
            this.nifiContentRepository.incrementClaimaintCount(outputRepoRecord.getCurrentClaim());
        }
    }

    private void updateFlowFileRepository() throws IOException {
        this.nifiFlowFileRepository.updateRepository(this.outputRepositoryRecords);
    }

    private void acknowledge(Invocation invocation) {
        invocation.getTriggerResult().acknowledge();
    }

    private void abort(Invocation invocation, Throwable cause) {
        invocation.getTriggerResult().abort(cause);
    }

    private void distributeFlowFiles() {
        int enqueued = 0;
        for (ConnectionUtils.FlowFileCloneResult result : this.cloneResults) {
            enqueued += result.distributeFlowFiles();
        }
        this.logger.debug("Distributed {} FlowFiles to output queues", new Object[]{enqueued});
    }

    void updateEventRepository(List<Invocation> invocations) {
        HashMap<String, StandardFlowFileEvent> eventsByComponentId = new HashMap<String, StandardFlowFileEvent>();
        for (Invocation invocation : invocations) {
            List<PolledFlowFile> polledFlowFiles = invocation.getPolledFlowFiles();
            for (PolledFlowFile polledFlowFile : polledFlowFiles) {
                long bytes = polledFlowFile.getInputFlowFile().getSize();
                int numOutputConnections = polledFlowFile.getInputPort().getConnections().size();
                StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent();
                flowFileEvent.setFlowFilesIn(1);
                flowFileEvent.setFlowFilesOut(numOutputConnections);
                flowFileEvent.setContentSizeIn(bytes);
                flowFileEvent.setContentSizeOut((long)numOutputConnections * bytes);
                StandardFlowFileEvent cumulativeEvent = eventsByComponentId.computeIfAbsent(polledFlowFile.getInputPort().getIdentifier(), key -> new StandardFlowFileEvent());
                cumulativeEvent.add((FlowFileEvent)flowFileEvent);
            }
            Map outputFlowFiles = invocation.getTriggerResult().getOutputFlowFiles();
            for (Map.Entry entry : outputFlowFiles.entrySet()) {
                String portName = (String)entry.getKey();
                List flowFiles = (List)entry.getValue();
                int flowFileCount = flowFiles.size();
                long byteCount = 0L;
                for (FlowFile flowFile : flowFiles) {
                    byteCount += flowFile.getSize();
                }
                Port port = this.outputPorts.get(portName);
                int outputConnectionCount = port.getConnections().size();
                StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent();
                flowFileEvent.setFlowFilesIn(flowFileCount);
                flowFileEvent.setFlowFilesOut(flowFileCount * outputConnectionCount);
                flowFileEvent.setContentSizeIn(byteCount);
                flowFileEvent.setContentSizeOut(byteCount * (long)outputConnectionCount);
                StandardFlowFileEvent cumulativeEvent = eventsByComponentId.computeIfAbsent(port.getIdentifier(), key -> new StandardFlowFileEvent());
                cumulativeEvent.add((FlowFileEvent)flowFileEvent);
            }
        }
        for (Map.Entry entry : eventsByComponentId.entrySet()) {
            String componentId = (String)entry.getKey();
            FlowFileEvent event = (FlowFileEvent)entry.getValue();
            try {
                this.flowFileEventRepository.updateRepository(event, componentId);
            }
            catch (Exception exception) {
                this.logger.warn("Failed to update FlowFile Event Repository", (Throwable)exception);
            }
        }
    }

    private void expireRecords(final FlowFileQueue sourceQueue, final Set<FlowFileRecord> expiredRecords) throws IOException {
        if (expiredRecords.isEmpty()) {
            return;
        }
        ArrayList<StandardRepositoryRecord> repositoryRecords = new ArrayList<StandardRepositoryRecord>();
        final long time = System.currentTimeMillis();
        final String expirationDetails = "Expiration Threshold = " + sourceQueue.getFlowFileExpiration();
        for (FlowFileRecord expired : expiredRecords) {
            StandardRepositoryRecord record = new StandardRepositoryRecord(sourceQueue, expired);
            record.markForDelete();
            repositoryRecords.add(record);
        }
        Iterable<ProvenanceEventRecord> provenanceEventIterable = new Iterable<ProvenanceEventRecord>(this){

            @Override
            public Iterator<ProvenanceEventRecord> iterator() {
                final Iterator expiredItr = expiredRecords.iterator();
                return new Iterator<ProvenanceEventRecord>(){

                    @Override
                    public boolean hasNext() {
                        return expiredItr.hasNext();
                    }

                    @Override
                    public ProvenanceEventRecord next() {
                        FlowFileRecord expired = (FlowFileRecord)expiredItr.next();
                        ProvenanceEventRecord provenanceEvent = new StandardProvenanceEventRecord.Builder().fromFlowFile((FlowFile)expired).setEventTime(time).setEventType(ProvenanceEventType.EXPIRE).setDetails(expirationDetails).setComponentId(sourceQueue.getIdentifier()).setComponentType("Connection").build();
                        return provenanceEvent;
                    }
                };
            }
        };
        this.nifiFlowFileRepository.updateRepository(repositoryRecords);
        this.nifiProvenanceEventRepository.registerEvents((Iterable)provenanceEventIterable);
        expiredRecords.clear();
    }

    private Optional<ProvenanceEventRecord> createCloneProvenanceEvent(FlowFileRecord outputFlowFile, List<RepositoryRecord> cloneRecords, Port outputPort) {
        if (outputFlowFile == null || cloneRecords == null || cloneRecords.size() < 2) {
            return Optional.empty();
        }
        ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder().setEventType(ProvenanceEventType.CLONE).fromFlowFile((FlowFile)outputFlowFile).setLineageStartDate(outputFlowFile.getLineageStartDate()).setComponentId(outputPort.getIdentifier()).setComponentType("Output Port").addParentFlowFile((FlowFile)outputFlowFile);
        for (RepositoryRecord clone : cloneRecords) {
            FlowFileRecord childFlowFile = clone.getCurrent();
            if (outputFlowFile.equals((Object)childFlowFile)) continue;
            builder.addChildFlowFile((FlowFile)childFlowFile);
        }
        ProvenanceEventRecord cloneEvent = builder.build();
        return Optional.of(cloneEvent);
    }

    public String toString() {
        return "StatelessFlowTask[Group=" + String.valueOf(this.statelessGroupNode.getProcessGroup()) + "]";
    }

    public static class Builder {
        private StatelessGroupNode statelessGroupNode;
        private FlowFileRepository flowFileRepository;
        private ContentRepository contentRepository;
        private ProvenanceEventRepository provenanceEventRepository;
        private FlowFileEventRepository flowFileEventRepository;
        private StatelessDataflow statelessFlow;
        private long timeoutMillis = TimeUnit.MINUTES.toMillis(1L);
        private ComponentLog logger;

        public Builder statelessGroupNode(StatelessGroupNode statelessGroupNode) {
            this.statelessGroupNode = statelessGroupNode;
            return this;
        }

        public Builder nifiFlowFileRepository(FlowFileRepository flowFileRepository) {
            this.flowFileRepository = flowFileRepository;
            return this;
        }

        public Builder nifiContentRepository(ContentRepository contentRepository) {
            this.contentRepository = contentRepository;
            return this;
        }

        public Builder nifiProvenanceRepository(ProvenanceEventRepository provenanceEventRepository) {
            this.provenanceEventRepository = provenanceEventRepository;
            return this;
        }

        public Builder flowFileEventRepository(FlowFileEventRepository flowFileEventRepository) {
            this.flowFileEventRepository = flowFileEventRepository;
            return this;
        }

        public Builder statelessFlow(StatelessDataflow statelessFlow) {
            this.statelessFlow = statelessFlow;
            return this;
        }

        public Builder timeout(long value, TimeUnit unit) {
            long millis = unit.toMillis(value);
            this.timeoutMillis = Math.max(millis, 1L);
            return this;
        }

        public Builder logger(ComponentLog logger) {
            this.logger = logger;
            return this;
        }

        public StatelessFlowTask build() {
            return new StatelessFlowTask(this);
        }
    }

    static class Invocation {
        private List<PolledFlowFile> polledFlowFiles;
        private TriggerResult triggerResult;

        Invocation() {
        }

        public void setTriggerResult(TriggerResult triggerResult) {
            this.triggerResult = triggerResult;
        }

        public List<PolledFlowFile> getPolledFlowFiles() {
            if (this.polledFlowFiles == null) {
                return Collections.emptyList();
            }
            return this.polledFlowFiles;
        }

        public TriggerResult getTriggerResult() {
            return this.triggerResult;
        }

        public void addPolledFlowFile(PolledFlowFile polledFlowFile) {
            if (polledFlowFile == null) {
                return;
            }
            if (this.polledFlowFiles == null) {
                this.polledFlowFiles = new ArrayList<PolledFlowFile>();
            }
            this.polledFlowFiles.add(polledFlowFile);
        }
    }

    private class BridgingFlowFileSupplier
    implements FlowFileSupplier {
        private final Map<String, Port> portsByName;
        private final Set<FlowFileRecord> expiredRecords = new HashSet<FlowFileRecord>();
        private final Invocation invocation;
        private int zeroFlowFileInvocations = 0;

        public BridgingFlowFileSupplier(Invocation invocation) {
            this.invocation = invocation;
            Set inputPorts = StatelessFlowTask.this.statelessGroupNode.getProcessGroup().getInputPorts();
            this.portsByName = inputPorts.stream().collect(Collectors.toMap(Connectable::getName, port -> port));
        }

        public Optional<FlowFile> getFlowFile(String portName) {
            Port port = this.portsByName.get(portName);
            if (port == null) {
                return Optional.empty();
            }
            for (Connection sourceConnection : port.getIncomingConnections()) {
                FlowFileQueue sourceQueue = sourceConnection.getFlowFileQueue();
                FlowFileRecord flowFile = sourceQueue.poll(this.expiredRecords);
                if (!this.expiredRecords.isEmpty()) {
                    try {
                        StatelessFlowTask.this.expireRecords(sourceQueue, this.expiredRecords);
                    }
                    catch (Exception e) {
                        StatelessFlowTask.this.logger.error("Failed to expire FlowFile Records when consuming from input queue {}", new Object[]{sourceQueue, e});
                    }
                    this.expiredRecords.clear();
                }
                if (flowFile == null) continue;
                this.zeroFlowFileInvocations = 0;
                StatelessFlowTask.this.nifiContentRepository.incrementClaimaintCount(flowFile.getContentClaim());
                this.invocation.addPolledFlowFile(new PolledFlowFile(flowFile, sourceQueue, port));
                return Optional.of(flowFile);
            }
            if (++this.zeroFlowFileInvocations > 1) {
                long yieldMillis = StatelessFlowTask.this.statelessGroupNode.getBoredYieldDuration(TimeUnit.MILLISECONDS);
                try {
                    Thread.sleep(yieldMillis);
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
            return Optional.empty();
        }
    }

    private class StatelessFlowTaskTriggerContext
    implements DataflowTriggerContext {
        private final FlowFileSupplier flowFileSupplier;

        public StatelessFlowTaskTriggerContext(FlowFileSupplier flowFileSupplier) {
            this.flowFileSupplier = flowFileSupplier;
        }

        public boolean isAbort() {
            return StatelessFlowTask.this.isAbort();
        }

        public FlowFileSupplier getFlowFileSupplier() {
            return this.flowFileSupplier;
        }
    }

    static class PolledFlowFile {
        private final FlowFileRecord inputFlowFile;
        private final FlowFileQueue originalQueue;
        private final Port inputPort;

        public PolledFlowFile(FlowFileRecord inputFlowFile, FlowFileQueue originalQueue, Port inputPort) {
            this.inputFlowFile = inputFlowFile;
            this.originalQueue = originalQueue;
            this.inputPort = inputPort;
        }

        public FlowFileRecord getInputFlowFile() {
            return this.inputFlowFile;
        }

        public FlowFileQueue getOriginalQueue() {
            return this.originalQueue;
        }

        public Port getInputPort() {
            return this.inputPort;
        }

        public String toString() {
            return this.inputFlowFile.toString();
        }
    }
}

