/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.repository;

import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.ContentNotFoundException;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.ProcessContext;
import org.apache.nifi.controller.repository.ProvenanceEventEnricher;
import org.apache.nifi.controller.repository.RepositoryRecord;
import org.apache.nifi.controller.repository.StandardFlowFileEvent;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.StandardProvenanceReporter;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.TransientClaimRepositoryRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream;
import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream;
import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream;
import org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.FlowFileHandlingException;
import org.apache.nifi.processor.exception.MissingFlowFileException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class StandardProcessSession
implements ProcessSession,
ProvenanceEventEnricher {
    private static final AtomicLong idGenerator = new AtomicLong(0L);
    private static final AtomicLong enqueuedIndex = new AtomicLong(0L);
    public static final int VERBOSE_LOG_THRESHOLD = 10;
    public static final String DEFAULT_FLOWFILE_PATH = "./";
    private static final Logger LOG = LoggerFactory.getLogger(StandardProcessSession.class);
    private static final Logger claimLog = LoggerFactory.getLogger((String)(StandardProcessSession.class.getSimpleName() + ".claims"));
    private static final int MAX_ROLLBACK_FLOWFILES_TO_LOG = 5;
    private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<FlowFileRecord, StandardRepositoryRecord>();
    private final Map<String, StandardFlowFileEvent> connectionCounts = new HashMap<String, StandardFlowFileEvent>();
    private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<FlowFileQueue, Set<FlowFileRecord>>();
    private final Map<String, Long> counters = new HashMap<String, Long>();
    private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<ContentClaim, ByteCountingOutputStream>();
    private final ProcessContext context;
    private final Set<FlowFile> recursionSet = new HashSet<FlowFile>();
    private final Map<FlowFile, Path> deleteOnCommit = new HashMap<FlowFile, Path>();
    private final long sessionId;
    private final String connectableDescription;
    private final Set<String> removedFlowFiles = new HashSet<String>();
    private final Set<String> createdFlowFiles = new HashSet<String>();
    private final StandardProvenanceReporter provenanceReporter;
    private int removedCount = 0;
    private long removedBytes = 0L;
    private long bytesRead = 0L;
    private long bytesWritten = 0L;
    private int flowFilesIn = 0;
    private int flowFilesOut = 0;
    private long contentSizeIn = 0L;
    private long contentSizeOut = 0L;
    private ContentClaim currentReadClaim = null;
    private ByteCountingInputStream currentReadClaimStream = null;
    private long processingStartTime;
    private final Map<FlowFile, InputStream> openInputStreams = new HashMap<FlowFile, InputStream>();
    private final Map<FlowFile, List<ProvenanceEventRecord>> generatedProvenanceEvents = new HashMap<FlowFile, List<ProvenanceEventRecord>>();
    private final Map<FlowFile, ProvenanceEventBuilder> forkEventBuilders = new HashMap<FlowFile, ProvenanceEventBuilder>();
    private Checkpoint checkpoint = new Checkpoint();
    private final ContentClaimWriteCache claimCache;

    public StandardProcessSession(ProcessContext context) {
        String componentType;
        this.context = context;
        Connectable connectable = context.getConnectable();
        String description = connectable.toString();
        switch (connectable.getConnectableType()) {
            case PROCESSOR: {
                ProcessorNode procNode = (ProcessorNode)connectable;
                componentType = procNode.getComponentType();
                description = procNode.getProcessor().toString();
                break;
            }
            case INPUT_PORT: {
                componentType = "Input Port";
                break;
            }
            case OUTPUT_PORT: {
                componentType = "Output Port";
                break;
            }
            case REMOTE_INPUT_PORT: {
                componentType = "Remote Input Port";
                break;
            }
            case REMOTE_OUTPUT_PORT: {
                componentType = "Remote Output Port";
                break;
            }
            case FUNNEL: {
                componentType = "Funnel";
                break;
            }
            default: {
                throw new AssertionError((Object)("Connectable type is " + connectable.getConnectableType()));
            }
        }
        this.provenanceReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(), componentType, context.getProvenanceRepository(), this);
        this.sessionId = idGenerator.getAndIncrement();
        this.connectableDescription = description;
        this.claimCache = new ContentClaimWriteCache(context.getContentRepository());
        LOG.trace("Session {} created for {}", (Object)this, (Object)this.connectableDescription);
        this.processingStartTime = System.nanoTime();
    }

    public void checkpoint() {
        this.resetWriteClaims(false);
        HashMap<FlowFile, InputStream> openStreamCopy = new HashMap<FlowFile, InputStream>(this.openInputStreams);
        for (Map.Entry entry : openStreamCopy.entrySet()) {
            FlowFile flowFile = (FlowFile)entry.getKey();
            InputStream openStream = (InputStream)entry.getValue();
            LOG.warn("{} closing {} for {} because the session was committed without the stream being closed.", new Object[]{this, openStream, flowFile});
            try {
                openStream.close();
            }
            catch (Exception e) {
                LOG.warn("{} Attempted to close {} for {} due to session commit but close failed", new Object[]{this, openStream, this.connectableDescription});
                LOG.warn("", (Throwable)e);
            }
        }
        if (!this.recursionSet.isEmpty()) {
            throw new IllegalStateException();
        }
        if (this.checkpoint == null) {
            this.checkpoint = new Checkpoint();
        }
        if (this.records.isEmpty()) {
            LOG.trace("{} checkpointed, but no events were performed by this ProcessSession", (Object)this);
            return;
        }
        ArrayList<ProvenanceEventRecord> autoTerminatedEvents = null;
        HashMap<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<FlowFileRecord, StandardRepositoryRecord>();
        for (StandardRepositoryRecord record : this.records.values()) {
            if (record.isMarkedForDelete()) continue;
            Relationship relationship = record.getTransferRelationship();
            if (relationship == null) {
                this.rollback();
                throw new FlowFileHandlingException(record.getCurrent() + " transfer relationship not specified");
            }
            ArrayList<Connection> destinations = new ArrayList<Connection>(this.context.getConnections(relationship));
            if (destinations.isEmpty() && !this.context.getConnectable().isAutoTerminated(relationship) && relationship != Relationship.SELF) {
                this.rollback();
                throw new FlowFileHandlingException(relationship + " does not have any destinations for " + this.context.getConnectable());
            }
            if (destinations.isEmpty() && relationship == Relationship.SELF) {
                record.setDestination(record.getOriginalQueue());
                continue;
            }
            if (destinations.isEmpty()) {
                record.markForDelete();
                if (autoTerminatedEvents == null) {
                    autoTerminatedEvents = new ArrayList<ProvenanceEventRecord>();
                }
                try {
                    ProvenanceEventRecord dropEvent = this.provenanceReporter.generateDropEvent((FlowFile)record.getCurrent(), "Auto-Terminated by " + relationship.getName() + " Relationship");
                    autoTerminatedEvents.add(dropEvent);
                }
                catch (Exception e) {
                    LOG.warn("Unable to generate Provenance Event for {} on behalf of {} due to {}", new Object[]{record.getCurrent(), this.connectableDescription, e});
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.warn("", (Throwable)e);
                }
                continue;
            }
            Connection finalDestination = (Connection)destinations.remove(destinations.size() - 1);
            record.setDestination(finalDestination.getFlowFileQueue());
            this.incrementConnectionInputCounts(finalDestination, (RepositoryRecord)record);
            for (Connection destination : destinations) {
                this.incrementConnectionInputCounts(destination, (RepositoryRecord)record);
                FlowFileRecord currRec = record.getCurrent();
                StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
                builder.id(this.context.getNextFlowFileSequence());
                String newUuid = UUID.randomUUID().toString();
                builder.addAttribute(CoreAttributes.UUID.key(), newUuid);
                FlowFileRecord clone = builder.build();
                StandardRepositoryRecord newRecord = new StandardRepositoryRecord(destination.getFlowFileQueue());
                this.provenanceReporter.clone((FlowFile)currRec, (FlowFile)clone, false);
                ContentClaim claim = clone.getContentClaim();
                if (claim != null) {
                    this.context.getContentRepository().incrementClaimaintCount(claim);
                }
                newRecord.setWorking(clone, Collections.emptyMap());
                newRecord.setDestination(destination.getFlowFileQueue());
                newRecord.setTransferRelationship(record.getTransferRelationship());
                toAdd.put(clone, newRecord);
            }
        }
        this.records.putAll(toAdd);
        toAdd.clear();
        this.checkpoint.checkpoint(this, autoTerminatedEvents);
        this.resetState();
    }

    public void commit() {
        this.checkpoint();
        this.commit(this.checkpoint);
        this.checkpoint = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void commit(Checkpoint checkpoint) {
        try {
            Object sessionSummary;
            long commitStartNanos = System.nanoTime();
            this.resetReadClaim();
            try {
                this.claimCache.flush();
            }
            finally {
                this.claimCache.reset();
            }
            long updateProvenanceStart = System.nanoTime();
            this.updateProvenanceRepo(checkpoint);
            long claimRemovalStart = System.nanoTime();
            long updateProvenanceNanos = claimRemovalStart - updateProvenanceStart;
            for (Map.Entry entry : checkpoint.records.entrySet()) {
                FlowFile flowFile = (FlowFile)entry.getKey();
                StandardRepositoryRecord record = (StandardRepositoryRecord)entry.getValue();
                if (record.isMarkedForDelete()) {
                    this.decrementClaimCount(record.getWorkingClaim());
                    if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getWorkingClaim())) {
                        this.decrementClaimCount(record.getOriginalClaim());
                    }
                    long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
                    Connectable connectable = this.context.getConnectable();
                    Connectable terminator = connectable instanceof ProcessorNode ? ((ProcessorNode)connectable).getProcessor() : connectable;
                    LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
                    continue;
                }
                if (!record.isWorking() || record.getWorkingClaim() == record.getOriginalClaim()) continue;
                this.decrementClaimCount(record.getOriginalClaim());
            }
            long claimRemovalFinishNanos = System.nanoTime();
            long claimRemovalNanos = claimRemovalFinishNanos - claimRemovalStart;
            try {
                Collection repoRecords = checkpoint.records.values();
                this.context.getFlowFileRepository().updateRepository(repoRecords);
            }
            catch (IOException ioe) {
                this.rollback(false, true);
                throw new ProcessException("FlowFile Repository failed to update", (Throwable)ioe);
            }
            long flowFileRepoUpdateFinishNanos = System.nanoTime();
            long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos;
            this.updateEventRepository(checkpoint);
            long updateEventRepositoryFinishNanos = System.nanoTime();
            long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - flowFileRepoUpdateFinishNanos;
            HashMap<FlowFileQueue, ArrayList<FlowFileRecord>> recordMap = new HashMap<FlowFileQueue, ArrayList<FlowFileRecord>>();
            for (StandardRepositoryRecord standardRepositoryRecord : checkpoint.records.values()) {
                if (standardRepositoryRecord.isMarkedForAbort() || standardRepositoryRecord.isMarkedForDelete() || standardRepositoryRecord.getCurrent() == null) continue;
                ArrayList<FlowFileRecord> collection = (ArrayList<FlowFileRecord>)recordMap.get(standardRepositoryRecord.getDestination());
                if (collection == null) {
                    collection = new ArrayList<FlowFileRecord>();
                    recordMap.put(standardRepositoryRecord.getDestination(), collection);
                }
                collection.add(standardRepositoryRecord.getCurrent());
            }
            for (Map.Entry entry : recordMap.entrySet()) {
                ((FlowFileQueue)entry.getKey()).putAll((Collection)entry.getValue());
            }
            long enqueueFlowFileFinishNanos = System.nanoTime();
            long enqueueFlowFileNanos = enqueueFlowFileFinishNanos - updateEventRepositoryFinishNanos;
            for (Path path : checkpoint.deleteOnCommit.values()) {
                try {
                    Files.deleteIfExists(path);
                }
                catch (IOException e) {
                    throw new FlowFileAccessException("Unable to delete " + path.toFile().getAbsolutePath(), (Throwable)e);
                }
            }
            checkpoint.deleteOnCommit.clear();
            if (LOG.isInfoEnabled() && !((String)(sessionSummary = this.summarizeEvents(checkpoint))).isEmpty()) {
                LOG.info("{} for {}, committed the following events: {}", new Object[]{this, this.connectableDescription, sessionSummary});
            }
            for (Map.Entry entry : checkpoint.counters.entrySet()) {
                this.adjustCounter((String)entry.getKey(), (long)((Long)entry.getValue()), true);
            }
            this.acknowledgeRecords();
            this.resetState();
            if (LOG.isDebugEnabled()) {
                StringBuilder timingInfo = new StringBuilder();
                timingInfo.append("Session commit for ").append(this).append(" [").append(this.connectableDescription).append("]").append(" took ");
                long l = System.nanoTime() - commitStartNanos;
                this.formatNanos(l, timingInfo);
                timingInfo.append("; FlowFile Repository Update took ");
                this.formatNanos(flowFileRepoUpdateNanos, timingInfo);
                timingInfo.append("; Claim Removal took ");
                this.formatNanos(claimRemovalNanos, timingInfo);
                timingInfo.append("; FlowFile Event Update took ");
                this.formatNanos(updateEventRepositoryNanos, timingInfo);
                timingInfo.append("; Enqueuing FlowFiles took ");
                this.formatNanos(enqueueFlowFileNanos, timingInfo);
                timingInfo.append("; Updating Provenance Event Repository took ");
                this.formatNanos(updateProvenanceNanos, timingInfo);
                LOG.debug(timingInfo.toString());
            }
        }
        catch (Exception e) {
            try {
                this.rollback(false, true);
            }
            catch (Exception e1) {
                e.addSuppressed(e1);
            }
            if (e instanceof RuntimeException) {
                throw (RuntimeException)e;
            }
            throw new ProcessException((Throwable)e);
        }
    }

    private void updateEventRepository(Checkpoint checkpoint) {
        int flowFilesReceived = 0;
        int flowFilesSent = 0;
        long bytesReceived = 0L;
        long bytesSent = 0L;
        for (ProvenanceEventRecord event : checkpoint.reportedEvents) {
            if (this.isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) continue;
            switch (event.getEventType()) {
                case SEND: {
                    ++flowFilesSent;
                    bytesSent += event.getFileSize();
                    break;
                }
                case RECEIVE: 
                case FETCH: {
                    ++flowFilesReceived;
                    bytesReceived += event.getFileSize();
                    break;
                }
            }
        }
        try {
            Connectable connectable = this.context.getConnectable();
            StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier());
            flowFileEvent.setBytesRead(checkpoint.bytesRead);
            flowFileEvent.setBytesWritten(checkpoint.bytesWritten);
            flowFileEvent.setContentSizeIn(checkpoint.contentSizeIn);
            flowFileEvent.setContentSizeOut(checkpoint.contentSizeOut);
            flowFileEvent.setContentSizeRemoved(checkpoint.removedBytes);
            flowFileEvent.setFlowFilesIn(checkpoint.flowFilesIn);
            flowFileEvent.setFlowFilesOut(checkpoint.flowFilesOut);
            flowFileEvent.setFlowFilesRemoved(checkpoint.removedCount);
            flowFileEvent.setFlowFilesReceived(flowFilesReceived);
            flowFileEvent.setBytesReceived(bytesReceived);
            flowFileEvent.setFlowFilesSent(flowFilesSent);
            flowFileEvent.setBytesSent(bytesSent);
            long lineageMillis = 0L;
            for (Map.Entry entry : checkpoint.records.entrySet()) {
                FlowFile flowFile = (FlowFile)entry.getKey();
                long lineageDuration = System.currentTimeMillis() - flowFile.getLineageStartDate();
                lineageMillis += lineageDuration;
            }
            flowFileEvent.setAggregateLineageMillis(lineageMillis);
            this.context.getFlowFileEventRepository().updateRepository((FlowFileEvent)flowFileEvent);
            for (FlowFileEvent connectionEvent : checkpoint.connectionCounts.values()) {
                this.context.getFlowFileEventRepository().updateRepository(connectionEvent);
            }
        }
        catch (IOException ioe) {
            LOG.error("FlowFile Event Repository failed to update", (Throwable)ioe);
        }
    }

    private void addEventType(Map<String, Set<ProvenanceEventType>> map, String id, ProvenanceEventType eventType) {
        Set<ProvenanceEventType> eventTypes = map.get(id);
        if (eventTypes == null) {
            eventTypes = new HashSet<ProvenanceEventType>();
            map.put(id, eventTypes);
        }
        eventTypes.add(eventType);
    }

    private void updateProvenanceRepo(final Checkpoint checkpoint) {
        FlowFile flowFile;
        ProvenanceEventRepository provenanceRepo = this.context.getProvenanceRepository();
        final LinkedHashSet<ProvenanceEventRecord> recordsToSubmit = new LinkedHashSet<ProvenanceEventRecord>();
        HashMap<String, Set<ProvenanceEventType>> eventTypesPerFlowFileId = new HashMap<String, Set<ProvenanceEventType>>();
        Set processorGenerated = checkpoint.reportedEvents;
        for (Map.Entry entry : checkpoint.forkEventBuilders.entrySet()) {
            ProvenanceEventBuilder builder = (ProvenanceEventBuilder)entry.getValue();
            flowFile = (FlowFile)entry.getKey();
            this.updateEventContentClaims(builder, flowFile, (StandardRepositoryRecord)checkpoint.records.get(flowFile));
            ProvenanceEventRecord event = builder.build();
            if (event.getChildUuids().isEmpty() || this.isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) continue;
            if (!processorGenerated.contains(event)) {
                recordsToSubmit.add(event);
            }
            for (String childUuid : event.getChildUuids()) {
                this.addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType());
            }
            for (String parentUuid : event.getParentUuids()) {
                this.addEventType(eventTypesPerFlowFileId, parentUuid, event.getEventType());
            }
        }
        for (ProvenanceEventRecord event : processorGenerated) {
            if (this.isSpuriousForkEvent(event, checkpoint.removedFlowFiles) || this.isSpuriousRouteEvent(event, checkpoint.records)) continue;
            recordsToSubmit.add(event);
            this.addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
        }
        for (List eventList : checkpoint.generatedProvenanceEvents.values()) {
            for (ProvenanceEventRecord event : eventList) {
                if (this.isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) continue;
                recordsToSubmit.add(event);
                this.addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
            }
        }
        for (Object repoRecord : checkpoint.records.values()) {
            boolean newFlowFile;
            ContentClaim original = repoRecord.getOriginalClaim();
            ContentClaim current = repoRecord.getCurrentClaim();
            boolean contentChanged = false;
            if (original == null && current != null) {
                contentChanged = true;
            }
            if (original != null && current == null) {
                contentChanged = true;
            }
            if (original != null && current != null && !original.equals(current)) {
                contentChanged = true;
            }
            FlowFileRecord curFlowFile = repoRecord.getCurrent();
            String flowFileId = curFlowFile.getAttribute(CoreAttributes.UUID.key());
            boolean eventAdded = false;
            if (checkpoint.removedFlowFiles.contains(flowFileId)) continue;
            boolean bl = newFlowFile = repoRecord.getOriginal() == null;
            if (contentChanged && !newFlowFile) {
                recordsToSubmit.add(this.provenanceReporter.build((FlowFile)curFlowFile, ProvenanceEventType.CONTENT_MODIFIED).build());
                this.addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.CONTENT_MODIFIED);
                eventAdded = true;
            }
            if (checkpoint.createdFlowFiles.contains(flowFileId)) {
                Set registeredTypes = (Set)eventTypesPerFlowFileId.get(flowFileId);
                boolean creationEventRegistered = false;
                if (registeredTypes != null && (registeredTypes.contains(ProvenanceEventType.CREATE) || registeredTypes.contains(ProvenanceEventType.FORK) || registeredTypes.contains(ProvenanceEventType.JOIN) || registeredTypes.contains(ProvenanceEventType.RECEIVE) || registeredTypes.contains(ProvenanceEventType.FETCH))) {
                    creationEventRegistered = true;
                }
                if (!creationEventRegistered) {
                    recordsToSubmit.add(this.provenanceReporter.build((FlowFile)curFlowFile, ProvenanceEventType.CREATE).build());
                    eventAdded = true;
                }
            }
            if (eventAdded || repoRecord.getUpdatedAttributes().isEmpty() || eventTypesPerFlowFileId.containsKey(flowFileId)) continue;
            recordsToSubmit.add(this.provenanceReporter.build((FlowFile)curFlowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).build());
            this.addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.ATTRIBUTES_MODIFIED);
        }
        final HashMap<String, FlowFile> flowFileRecordMap = new HashMap<String, FlowFile>();
        for (StandardRepositoryRecord repoRecord : checkpoint.records.values()) {
            flowFile = repoRecord.getCurrent();
            flowFileRecordMap.put(flowFile.getAttribute(CoreAttributes.UUID.key()), flowFile);
        }
        final List autoTermEvents = checkpoint.autoTerminatedEvents;
        Iterable<ProvenanceEventRecord> iterable = new Iterable<ProvenanceEventRecord>(){
            final Iterator<ProvenanceEventRecord> recordsToSubmitIterator;
            final Iterator<ProvenanceEventRecord> autoTermIterator;
            {
                this.recordsToSubmitIterator = recordsToSubmit.iterator();
                this.autoTermIterator = autoTermEvents == null ? null : autoTermEvents.iterator();
            }

            @Override
            public Iterator<ProvenanceEventRecord> iterator() {
                return new Iterator<ProvenanceEventRecord>(){

                    @Override
                    public boolean hasNext() {
                        return recordsToSubmitIterator.hasNext() || autoTermIterator != null && autoTermIterator.hasNext();
                    }

                    @Override
                    public ProvenanceEventRecord next() {
                        if (recordsToSubmitIterator.hasNext()) {
                            ProvenanceEventRecord rawEvent = recordsToSubmitIterator.next();
                            return StandardProcessSession.this.enrich(rawEvent, flowFileRecordMap, checkpoint.records, rawEvent.getEventType() != ProvenanceEventType.SEND);
                        }
                        if (autoTermIterator != null && autoTermIterator.hasNext()) {
                            return StandardProcessSession.this.enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true);
                        }
                        throw new NoSuchElementException();
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
        provenanceRepo.registerEvents((Iterable)iterable);
    }

    private void updateEventContentClaims(ProvenanceEventBuilder builder, FlowFile flowFile, StandardRepositoryRecord repoRecord) {
        ContentClaim originalClaim = repoRecord.getOriginalClaim();
        if (originalClaim == null) {
            builder.setCurrentContentClaim(null, null, null, null, 0L);
        } else {
            ResourceClaim resourceClaim = originalClaim.getResourceClaim();
            builder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(repoRecord.getOriginal().getContentClaimOffset() + originalClaim.getOffset()), repoRecord.getOriginal().getSize());
            builder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(repoRecord.getOriginal().getContentClaimOffset() + originalClaim.getOffset()), repoRecord.getOriginal().getSize());
        }
    }

    public StandardProvenanceEventRecord enrich(ProvenanceEventRecord rawEvent, FlowFile flowFile) {
        FlowFileQueue originalQueue;
        ResourceClaim resourceClaim;
        StandardRepositoryRecord repoRecord = this.records.get(flowFile);
        if (repoRecord == null) {
            throw new FlowFileHandlingException(flowFile + " is not known in this session (" + this.toString() + ")");
        }
        StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent);
        if (repoRecord.getCurrent() != null && repoRecord.getCurrentClaim() != null) {
            ContentClaim currentClaim = repoRecord.getCurrentClaim();
            long currentOffset = repoRecord.getCurrentClaimOffset();
            long size = flowFile.getSize();
            resourceClaim = currentClaim.getResourceClaim();
            recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(currentOffset + currentClaim.getOffset()), size);
        }
        if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
            ContentClaim originalClaim = repoRecord.getOriginalClaim();
            long originalOffset = repoRecord.getOriginal().getContentClaimOffset();
            long originalSize = repoRecord.getOriginal().getSize();
            resourceClaim = originalClaim.getResourceClaim();
            recordBuilder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(originalOffset + originalClaim.getOffset()), originalSize);
        }
        if ((originalQueue = repoRecord.getOriginalQueue()) != null) {
            recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier());
        }
        recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), repoRecord.getUpdatedAttributes());
        return recordBuilder.build();
    }

    private StandardProvenanceEventRecord enrich(ProvenanceEventRecord rawEvent, Map<String, FlowFileRecord> flowFileRecordMap, Map<FlowFileRecord, StandardRepositoryRecord> records, boolean updateAttributes) {
        StandardRepositoryRecord record;
        FlowFileRecord flowFileRecord;
        StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent);
        FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
        if (eventFlowFile != null) {
            FlowFileQueue originalQueue;
            ResourceClaim resourceClaim;
            StandardRepositoryRecord repoRecord = records.get(eventFlowFile);
            if (repoRecord.getCurrent() != null && repoRecord.getCurrentClaim() != null) {
                ContentClaim currentClaim = repoRecord.getCurrentClaim();
                long currentOffset = repoRecord.getCurrentClaimOffset();
                long size = eventFlowFile.getSize();
                resourceClaim = currentClaim.getResourceClaim();
                recordBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(currentOffset + currentClaim.getOffset()), size);
            }
            if (repoRecord.getOriginal() != null && repoRecord.getOriginalClaim() != null) {
                ContentClaim originalClaim = repoRecord.getOriginalClaim();
                long originalOffset = repoRecord.getOriginal().getContentClaimOffset();
                long originalSize = repoRecord.getOriginal().getSize();
                resourceClaim = originalClaim.getResourceClaim();
                recordBuilder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(originalOffset + originalClaim.getOffset()), originalSize);
            }
            if ((originalQueue = repoRecord.getOriginalQueue()) != null) {
                recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier());
            }
        }
        if (updateAttributes && (flowFileRecord = flowFileRecordMap.get(rawEvent.getFlowFileUuid())) != null && (record = records.get(flowFileRecord)) != null) {
            recordBuilder.setAttributes(record.getOriginalAttributes(), record.getUpdatedAttributes());
        }
        return recordBuilder.build();
    }

    private boolean isSpuriousForkEvent(ProvenanceEventRecord event, Set<String> removedFlowFiles) {
        List childUuids;
        return event.getEventType() == ProvenanceEventType.FORK && (childUuids = event.getChildUuids()) != null && childUuids.size() == 1 && removedFlowFiles.contains(childUuids.get(0));
    }

    private boolean isSpuriousRouteEvent(ProvenanceEventRecord event, Map<FlowFileRecord, StandardRepositoryRecord> records) {
        String relationshipName;
        Relationship relationship;
        Collection<Connection> connectionsForRelationship;
        if (event.getEventType() == ProvenanceEventType.ROUTE && (connectionsForRelationship = this.context.getConnections(relationship = new Relationship.Builder().name(relationshipName = event.getRelationship()).build())).size() == 1) {
            for (Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : records.entrySet()) {
                FlowFileRecord flowFileRecord = entry.getKey();
                if (!event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key()))) continue;
                StandardRepositoryRecord repoRecord = entry.getValue();
                if (repoRecord.getOriginalQueue() == null) {
                    return false;
                }
                String originalQueueId = repoRecord.getOriginalQueue().getIdentifier();
                Connection destinationConnection = connectionsForRelationship.iterator().next();
                String destinationQueueId = destinationConnection.getFlowFileQueue().getIdentifier();
                return originalQueueId.equals(destinationQueueId);
            }
        }
        return false;
    }

    public void rollback() {
        this.rollback(false);
    }

    public void rollback(boolean penalize) {
        this.rollback(penalize, false);
    }

    private void rollback(boolean penalize, boolean rollbackCheckpoint) {
        block25: {
            block24: {
                List<ContentClaim> transientClaims;
                HashSet<Object> recordsToHandle;
                block23: {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("{} session rollback called, FlowFile records are {} {}", new Object[]{this, this.loggableFlowfileInfo(), new Throwable("Stack Trace on rollback")});
                    }
                    this.deleteOnCommit.clear();
                    HashMap<FlowFile, InputStream> openStreamCopy = new HashMap<FlowFile, InputStream>(this.openInputStreams);
                    for (Map.Entry entry : openStreamCopy.entrySet()) {
                        FlowFile flowFile = (FlowFile)entry.getKey();
                        Iterator openStream = (InputStream)entry.getValue();
                        LOG.debug("{} closing {} for {} due to session rollback", new Object[]{this, openStream, flowFile});
                        try {
                            ((InputStream)((Object)openStream)).close();
                        }
                        catch (Exception exception) {
                            LOG.warn("{} Attempted to close {} for {} due to session rollback but close failed", new Object[]{this, openStream, this.connectableDescription});
                            LOG.warn("", (Throwable)exception);
                        }
                    }
                    try {
                        this.claimCache.reset();
                    }
                    catch (IOException e1) {
                        LOG.warn("{} Attempted to close Output Stream for {} due to session rollback but close failed", new Object[]{this, this.connectableDescription, e1});
                    }
                    recordsToHandle = new HashSet<Object>();
                    recordsToHandle.addAll(this.records.values());
                    if (rollbackCheckpoint) {
                        Checkpoint existingCheckpoint = this.checkpoint;
                        this.checkpoint = null;
                        if (existingCheckpoint != null && existingCheckpoint.records != null) {
                            recordsToHandle.addAll(existingCheckpoint.records.values());
                        }
                    }
                    this.resetWriteClaims();
                    this.resetReadClaim();
                    if (recordsToHandle.isEmpty()) {
                        LOG.trace("{} was rolled back, but no events were performed by this ProcessSession", (Object)this);
                        this.acknowledgeRecords();
                        this.resetState();
                        return;
                    }
                    for (StandardRepositoryRecord standardRepositoryRecord : recordsToHandle) {
                        this.removeTemporaryClaim(standardRepositoryRecord);
                    }
                    HashSet<StandardRepositoryRecord> abortedRecords = new HashSet<StandardRepositoryRecord>();
                    HashSet<StandardRepositoryRecord> hashSet = new HashSet<StandardRepositoryRecord>();
                    for (StandardRepositoryRecord standardRepositoryRecord : recordsToHandle) {
                        if (standardRepositoryRecord.isMarkedForAbort()) {
                            this.decrementClaimCount(standardRepositoryRecord.getWorkingClaim());
                            if (standardRepositoryRecord.getCurrentClaim() != null && !standardRepositoryRecord.getCurrentClaim().equals(standardRepositoryRecord.getWorkingClaim())) {
                                this.decrementClaimCount(standardRepositoryRecord.getCurrentClaim());
                            }
                            abortedRecords.add(standardRepositoryRecord);
                            continue;
                        }
                        hashSet.add(standardRepositoryRecord);
                    }
                    for (StandardRepositoryRecord standardRepositoryRecord : hashSet) {
                        FlowFileQueue originalQueue;
                        if (standardRepositoryRecord.getOriginal() == null || (originalQueue = standardRepositoryRecord.getOriginalQueue()) == null) continue;
                        if (penalize) {
                            long expirationEpochMillis = System.currentTimeMillis() + this.context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);
                            FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(standardRepositoryRecord.getOriginal()).penaltyExpirationTime(expirationEpochMillis).build();
                            originalQueue.put(newFile);
                            continue;
                        }
                        originalQueue.put(standardRepositoryRecord.getOriginal());
                    }
                    if (!abortedRecords.isEmpty()) {
                        try {
                            this.context.getFlowFileRepository().updateRepository(abortedRecords);
                        }
                        catch (IOException ioe) {
                            LOG.error("Unable to update FlowFile repository for aborted records due to {}", (Object)ioe.toString());
                            if (!LOG.isDebugEnabled()) break block23;
                            LOG.error("", (Throwable)ioe);
                        }
                    }
                }
                if (!(transientClaims = recordsToHandle.stream().flatMap(record -> record.getTransientClaims().stream()).collect(Collectors.toList())).isEmpty()) {
                    TransientClaimRepositoryRecord transientClaimRepositoryRecord = new TransientClaimRepositoryRecord(transientClaims);
                    try {
                        this.context.getFlowFileRepository().updateRepository(Collections.singletonList(transientClaimRepositoryRecord));
                    }
                    catch (IOException ioe) {
                        LOG.error("Unable to update FlowFile repository to cleanup transient claims due to {}", (Object)ioe.toString());
                        if (!LOG.isDebugEnabled()) break block24;
                        LOG.error("", (Throwable)ioe);
                    }
                }
            }
            Connectable connectable = this.context.getConnectable();
            StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier());
            flowFileEvent.setBytesRead(this.bytesRead);
            flowFileEvent.setBytesWritten(this.bytesWritten);
            try {
                this.context.getFlowFileEventRepository().updateRepository((FlowFileEvent)flowFileEvent);
            }
            catch (Exception e) {
                LOG.error("Failed to update FlowFileEvent Repository due to " + e);
                if (!LOG.isDebugEnabled()) break block25;
                LOG.error("", (Throwable)e);
            }
        }
        this.acknowledgeRecords();
        this.resetState();
    }

    private String loggableFlowfileInfo() {
        StringBuilder details = new StringBuilder(1024).append("[");
        int initLen = details.length();
        int filesListed = 0;
        for (Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : this.records.entrySet()) {
            if (filesListed >= 5) break;
            ++filesListed;
            FlowFileRecord entryKey = entry.getKey();
            StandardRepositoryRecord entryValue = entry.getValue();
            if (details.length() > initLen) {
                details.append(", ");
            }
            if (entryValue.getOriginalQueue() != null && entryValue.getOriginalQueue().getIdentifier() != null) {
                details.append("queue=").append(entryValue.getOriginalQueue().getIdentifier()).append("/");
            }
            details.append("filename=").append(entryKey.getAttribute(CoreAttributes.FILENAME.key())).append("/uuid=").append(entryKey.getAttribute(CoreAttributes.UUID.key()));
        }
        if (this.records.entrySet().size() > 5) {
            if (details.length() > initLen) {
                details.append(", ");
            }
            details.append(this.records.entrySet().size() - 5).append(" additional Flowfiles not listed");
        } else if (filesListed == 0) {
            details.append("none");
        }
        details.append("]");
        return details.toString();
    }

    private void decrementClaimCount(ContentClaim claim) {
        if (claim == null) {
            return;
        }
        this.context.getContentRepository().decrementClaimantCount(claim);
    }

    private void destroyContent(ContentClaim claim) {
        if (claim == null) {
            return;
        }
        int decrementedClaimCount = this.context.getContentRepository().decrementClaimantCount(claim);
        if (decrementedClaimCount <= 0) {
            this.resetWriteClaims();
            this.context.getContentRepository().remove(claim);
        }
    }

    private void resetState() {
        this.records.clear();
        this.recursionSet.clear();
        this.contentSizeIn = 0L;
        this.contentSizeOut = 0L;
        this.flowFilesIn = 0;
        this.flowFilesOut = 0;
        this.removedCount = 0;
        this.removedBytes = 0L;
        this.bytesRead = 0L;
        this.bytesWritten = 0L;
        this.connectionCounts.clear();
        this.createdFlowFiles.clear();
        this.removedFlowFiles.clear();
        this.counters.clear();
        this.generatedProvenanceEvents.clear();
        this.forkEventBuilders.clear();
        this.provenanceReporter.clear();
        this.processingStartTime = System.nanoTime();
    }

    private void acknowledgeRecords() {
        for (Map.Entry<FlowFileQueue, Set<FlowFileRecord>> entry : this.unacknowledgedFlowFiles.entrySet()) {
            entry.getKey().acknowledge((Collection)entry.getValue());
        }
        this.unacknowledgedFlowFiles.clear();
    }

    public void migrate(ProcessSession newOwner, Collection<FlowFile> flowFiles) {
        if (Objects.requireNonNull(newOwner) == this) {
            throw new IllegalArgumentException("Cannot migrate FlowFiles from a Process Session to itself");
        }
        if (flowFiles == null || flowFiles.isEmpty()) {
            throw new IllegalArgumentException("Must supply at least one FlowFile to migrate");
        }
        if (!(newOwner instanceof StandardProcessSession)) {
            throw new IllegalArgumentException("Cannot migrate from a StandardProcessSession to a " + newOwner.getClass());
        }
        this.migrate((StandardProcessSession)newOwner, flowFiles);
    }

    private void migrate(StandardProcessSession newOwner, Collection<FlowFile> flowFiles) {
        ProvenanceEventBuilder eventBuilder;
        FlowFile eventFlowFile;
        for (FlowFile flowFile : flowFiles) {
            if (this.openInputStreams.containsKey(flowFile)) {
                throw new IllegalStateException(flowFile + " cannot be migrated to a new Process Session because this session currently " + "has an open Input Stream for the FlowFile, created by calling ProcessSession.read(FlowFile)");
            }
            if (this.recursionSet.contains(flowFile)) {
                throw new IllegalStateException(flowFile + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
            }
            StandardRepositoryRecord record = this.records.get(flowFile);
            if (record == null) {
                throw new FlowFileHandlingException(flowFile + " is not known in this session (" + this.toString() + ")");
            }
            if (record.getCurrent() == flowFile) continue;
            throw new FlowFileHandlingException(flowFile + " is not the most recent version of this FlowFile within this session (" + this.toString() + ")");
        }
        Set<String> flowFileIds = flowFiles.stream().map(ff -> ff.getAttribute(CoreAttributes.UUID.key())).collect(Collectors.toSet());
        for (Map.Entry<FlowFile, ProvenanceEventBuilder> entry : this.forkEventBuilders.entrySet()) {
            eventFlowFile = entry.getKey();
            if (!flowFiles.contains(eventFlowFile)) continue;
            eventBuilder = entry.getValue();
            for (String childId : eventBuilder.getChildFlowFileIds()) {
                if (flowFileIds.contains(childId)) continue;
                throw new IllegalStateException("Cannot migrate " + eventFlowFile + " to a new session because it was forked to create " + eventBuilder.getChildFlowFileIds().size() + " children and not all children are being migrated. If any FlowFile is forked, all of its children must also be migrated at the same time as the forked FlowFile");
            }
        }
        for (Map.Entry<FlowFile, ProvenanceEventBuilder> entry : this.forkEventBuilders.entrySet()) {
            eventFlowFile = entry.getKey();
            eventBuilder = entry.getValue();
            HashSet childrenIds = new HashSet(eventBuilder.getChildFlowFileIds());
            ProvenanceEventBuilder copy = null;
            for (FlowFile flowFile : flowFiles) {
                String flowFileId = flowFile.getAttribute(CoreAttributes.UUID.key());
                if (!childrenIds.contains(flowFileId)) continue;
                eventBuilder.removeChildFlowFile(flowFile);
                if (copy == null) {
                    copy = eventBuilder.copy();
                    copy.getChildFlowFileIds().clear();
                }
                copy.addChildFlowFile(flowFileId);
            }
            if (copy == null) continue;
            newOwner.forkEventBuilders.put(eventFlowFile, copy);
        }
        newOwner.processingStartTime = Math.min(newOwner.processingStartTime, this.processingStartTime);
        for (FlowFile flowFile : flowFiles) {
            Path toDelete;
            ByteCountingOutputStream appendableStream;
            ContentClaim currentClaim;
            List<ProvenanceEventRecord> events;
            String flowFileId;
            FlowFileRecord flowFileRecord = (FlowFileRecord)flowFile;
            StandardRepositoryRecord repoRecord = this.records.remove(flowFile);
            newOwner.records.put(flowFileRecord, repoRecord);
            FlowFileQueue inputQueue = repoRecord.getOriginalQueue();
            if (inputQueue != null) {
                String connectionId = inputQueue.getIdentifier();
                this.incrementConnectionOutputCounts(connectionId, -1, -repoRecord.getOriginal().getSize());
                newOwner.incrementConnectionOutputCounts(connectionId, 1, repoRecord.getOriginal().getSize());
                this.unacknowledgedFlowFiles.get(inputQueue).remove(flowFile);
                newOwner.unacknowledgedFlowFiles.computeIfAbsent(inputQueue, queue -> new HashSet()).add(flowFileRecord);
                --this.flowFilesIn;
                this.contentSizeIn -= flowFile.getSize();
                ++newOwner.flowFilesIn;
                newOwner.contentSizeIn += flowFile.getSize();
            }
            if (this.removedFlowFiles.remove(flowFileId = flowFile.getAttribute(CoreAttributes.UUID.key()))) {
                newOwner.removedFlowFiles.add(flowFileId);
                ++newOwner.removedCount;
                newOwner.removedBytes += flowFile.getSize();
                --this.removedCount;
                this.removedBytes -= flowFile.getSize();
            }
            if (this.createdFlowFiles.remove(flowFileId)) {
                newOwner.createdFlowFiles.add(flowFileId);
            }
            if (repoRecord.getTransferRelationship() != null) {
                --this.flowFilesOut;
                this.contentSizeOut -= flowFile.getSize();
                ++newOwner.flowFilesOut;
                newOwner.contentSizeOut += flowFile.getSize();
            }
            if ((events = this.generatedProvenanceEvents.remove(flowFile)) != null) {
                newOwner.generatedProvenanceEvents.put(flowFile, events);
            }
            if ((currentClaim = repoRecord.getCurrentClaim()) != null && (appendableStream = this.appendableStreams.remove(currentClaim)) != null) {
                newOwner.appendableStreams.put(currentClaim, appendableStream);
            }
            if ((toDelete = this.deleteOnCommit.remove(flowFile)) == null) continue;
            newOwner.deleteOnCommit.put(flowFile, toDelete);
        }
        this.provenanceReporter.migrate(newOwner.provenanceReporter, flowFileIds);
    }

    /*
     * WARNING - void declaration
     */
    private String summarizeEvents(Checkpoint checkpoint) {
        Relationship relationship;
        HashMap transferMap = new HashMap();
        HashSet<String> modifiedFlowFileIds = new HashSet<String>();
        int largestTransferSetSize = 0;
        for (Map.Entry entry : checkpoint.records.entrySet()) {
            void var10_14;
            FlowFile flowFile = (FlowFile)entry.getKey();
            StandardRepositoryRecord record = (StandardRepositoryRecord)entry.getValue();
            Relationship relationship2 = record.getTransferRelationship();
            if (Relationship.SELF.equals((Object)relationship2)) continue;
            Set set = (Set)transferMap.get(relationship2);
            if (set == null) {
                HashSet hashSet = new HashSet();
                transferMap.put(relationship2, hashSet);
            }
            var10_14.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
            largestTransferSetSize = Math.max(largestTransferSetSize, var10_14.size());
            ContentClaim workingClaim = record.getWorkingClaim();
            if (workingClaim == null || workingClaim == record.getOriginalClaim() || record.getTransferRelationship() == null) continue;
            modifiedFlowFileIds.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
        }
        int numRemoved = checkpoint.removedFlowFiles.size();
        int numModified = modifiedFlowFileIds.size();
        int numCreated = checkpoint.createdFlowFiles.size();
        StringBuilder sb = new StringBuilder(512);
        if (!(LOG.isDebugEnabled() || largestTransferSetSize <= 10 && numModified <= 10 && numCreated <= 10 && numRemoved <= 10)) {
            if (numCreated > 0) {
                sb.append("created ").append(numCreated).append(" FlowFiles, ");
            }
            if (numModified > 0) {
                sb.append("modified ").append(modifiedFlowFileIds.size()).append(" FlowFiles, ");
            }
            if (numRemoved > 0) {
                sb.append("removed ").append(numRemoved).append(" FlowFiles, ");
            }
            for (Map.Entry entry : transferMap.entrySet()) {
                if (entry.getKey() == null) continue;
                sb.append("Transferred ").append(((Set)entry.getValue()).size()).append(" FlowFiles");
                relationship = (Relationship)entry.getKey();
                if (relationship == Relationship.ANONYMOUS) continue;
                sb.append(" to '").append(relationship.getName()).append("', ");
            }
        } else {
            if (numCreated > 0) {
                sb.append("created FlowFiles ").append(checkpoint.createdFlowFiles).append(", ");
            }
            if (numModified > 0) {
                sb.append("modified FlowFiles ").append(modifiedFlowFileIds).append(", ");
            }
            if (numRemoved > 0) {
                sb.append("removed FlowFiles ").append(checkpoint.removedFlowFiles).append(", ");
            }
            for (Map.Entry entry : transferMap.entrySet()) {
                if (entry.getKey() == null) continue;
                sb.append("Transferred FlowFiles ").append(entry.getValue());
                relationship = (Relationship)entry.getKey();
                if (relationship == Relationship.ANONYMOUS) continue;
                sb.append(" to '").append(relationship.getName()).append("', ");
            }
        }
        if (sb.length() > 2 && sb.subSequence(sb.length() - 2, sb.length()).equals(", ")) {
            sb.delete(sb.length() - 2, sb.length());
        }
        if (sb.length() > 0) {
            long processingNanos = checkpoint.processingTime;
            sb.append(", Processing Time = ");
            this.formatNanos(processingNanos, sb);
        }
        return sb.toString();
    }

    private void formatNanos(long nanos, StringBuilder sb) {
        long seconds = nanos > 1000000000L ? nanos / 1000000000L : 0L;
        long millis = nanos > 1000000L ? nanos / 1000000L : 0L;
        long nanosLeft = nanos % 1000000L;
        if (seconds > 0L) {
            sb.append(seconds).append(" seconds");
        }
        if (millis > 0L) {
            if (seconds > 0L) {
                sb.append(", ");
                millis -= seconds * 1000L;
            }
            sb.append(millis).append(" millis");
        }
        if (seconds == 0L && millis == 0L) {
            sb.append(nanosLeft).append(" nanos");
        }
        sb.append(" (").append(nanos).append(" nanos)");
    }

    private void incrementConnectionInputCounts(Connection connection, RepositoryRecord record) {
        this.incrementConnectionInputCounts(connection.getIdentifier(), 1, record.getCurrent().getSize());
    }

    private void incrementConnectionInputCounts(String connectionId, int flowFileCount, long bytes) {
        StandardFlowFileEvent connectionEvent = this.connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent((String)id));
        connectionEvent.setContentSizeIn(connectionEvent.getContentSizeIn() + bytes);
        connectionEvent.setFlowFilesIn(connectionEvent.getFlowFilesIn() + flowFileCount);
    }

    private void incrementConnectionOutputCounts(Connection connection, FlowFileRecord record) {
        this.incrementConnectionOutputCounts(connection.getIdentifier(), 1, record.getSize());
    }

    private void incrementConnectionOutputCounts(String connectionId, int flowFileCount, long bytes) {
        StandardFlowFileEvent connectionEvent = this.connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent((String)id));
        connectionEvent.setContentSizeOut(connectionEvent.getContentSizeOut() + bytes);
        connectionEvent.setFlowFilesOut(connectionEvent.getFlowFilesOut() + flowFileCount);
    }

    private void registerDequeuedRecord(FlowFileRecord flowFile, Connection connection) {
        StandardRepositoryRecord record = new StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile);
        this.records.put(flowFile, record);
        ++this.flowFilesIn;
        this.contentSizeIn += flowFile.getSize();
        Set<FlowFileRecord> set = this.unacknowledgedFlowFiles.get(connection.getFlowFileQueue());
        if (set == null) {
            set = new HashSet<FlowFileRecord>();
            this.unacknowledgedFlowFiles.put(connection.getFlowFileQueue(), set);
        }
        set.add(flowFile);
        this.incrementConnectionOutputCounts(connection, flowFile);
    }

    public void adjustCounter(String name, long delta, boolean immediate) {
        if (immediate) {
            this.context.adjustCounter(name, delta);
            return;
        }
        this.adjustCounter(name, delta, this.counters);
    }

    private void adjustCounter(String name, long delta, Map<String, Long> map) {
        Long curVal = map.get(name);
        if (curVal == null) {
            curVal = 0L;
        }
        long newValue = curVal + delta;
        map.put(name, newValue);
    }

    public FlowFile get() {
        List<Connection> connections = this.context.getPollableConnections();
        int numConnections = connections.size();
        for (int numAttempts = 0; numAttempts < numConnections; ++numAttempts) {
            Connection conn = connections.get(this.context.getNextIncomingConnectionIndex() % numConnections);
            HashSet<FlowFileRecord> expired = new HashSet<FlowFileRecord>();
            FlowFileRecord flowFile = conn.poll(expired);
            this.removeExpired(expired, conn);
            if (flowFile == null) continue;
            this.registerDequeuedRecord(flowFile, conn);
            return flowFile;
        }
        return null;
    }

    public List<FlowFile> get(final int maxResults) {
        if (maxResults < 0) {
            throw new IllegalArgumentException();
        }
        if (maxResults == 0) {
            return Collections.emptyList();
        }
        List<Connection> connections = this.context.getPollableConnections();
        if (connections.isEmpty()) {
            return Collections.emptyList();
        }
        Connection connection = connections.get(this.context.getNextIncomingConnectionIndex() % connections.size());
        return this.get(connection, new ConnectionPoller(){

            @Override
            public List<FlowFileRecord> poll(Connection connection, Set<FlowFileRecord> expiredRecords) {
                return connection.poll(new FlowFileFilter(){
                    int polled = 0;

                    public FlowFileFilter.FlowFileFilterResult filter(FlowFile flowFile) {
                        if (++this.polled < maxResults) {
                            return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
                        }
                        return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
                    }
                }, expiredRecords);
            }
        }, false);
    }

    public List<FlowFile> get(final FlowFileFilter filter) {
        return this.get(new ConnectionPoller(){

            @Override
            public List<FlowFileRecord> poll(Connection connection, Set<FlowFileRecord> expiredRecords) {
                return connection.poll(filter, expiredRecords);
            }
        }, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<FlowFile> get(Connection connection, ConnectionPoller poller, boolean lockQueue) {
        if (lockQueue) {
            connection.lock();
        }
        try {
            HashSet<FlowFileRecord> expired = new HashSet<FlowFileRecord>();
            List<FlowFileRecord> newlySelected = poller.poll(connection, expired);
            this.removeExpired(expired, connection);
            if (newlySelected.isEmpty() && expired.isEmpty()) {
                ArrayList<FlowFile> arrayList = new ArrayList<FlowFile>();
                return arrayList;
            }
            for (FlowFileRecord flowFile : newlySelected) {
                this.registerDequeuedRecord(flowFile, connection);
            }
            ArrayList<FlowFileRecord> arrayList = new ArrayList<FlowFileRecord>(newlySelected);
            return arrayList;
        }
        finally {
            if (lockQueue) {
                connection.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<FlowFile> get(ConnectionPoller poller, boolean lockAllQueues) {
        List<Connection> connections = this.context.getPollableConnections();
        if (lockAllQueues) {
            for (Connection connection : connections) {
                connection.lock();
            }
        }
        int startIndex = this.context.getNextIncomingConnectionIndex();
        try {
            for (int i = 0; i < connections.size(); ++i) {
                int connectionIndex = (startIndex + i) % connections.size();
                Connection conn = connections.get(connectionIndex);
                HashSet<FlowFileRecord> expired = new HashSet<FlowFileRecord>();
                List<FlowFileRecord> newlySelected = poller.poll(conn, expired);
                this.removeExpired(expired, conn);
                if (newlySelected.isEmpty() && expired.isEmpty()) continue;
                for (FlowFileRecord flowFile : newlySelected) {
                    this.registerDequeuedRecord(flowFile, conn);
                }
                ArrayList<FlowFileRecord> arrayList = new ArrayList<FlowFileRecord>(newlySelected);
                return arrayList;
            }
            ArrayList<FlowFile> arrayList = new ArrayList<FlowFile>();
            return arrayList;
        }
        finally {
            if (lockAllQueues) {
                for (Connection connection : connections) {
                    connection.unlock();
                }
            }
        }
    }

    public QueueSize getQueueSize() {
        int flowFileCount = 0;
        long byteCount = 0L;
        for (Connection conn : this.context.getPollableConnections()) {
            QueueSize queueSize = conn.getFlowFileQueue().size();
            flowFileCount += queueSize.getObjectCount();
            byteCount += queueSize.getByteCount();
        }
        return new QueueSize(flowFileCount, byteCount);
    }

    public FlowFile create() {
        HashMap<String, String> attrs = new HashMap<String, String>();
        attrs.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime()));
        attrs.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
        attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
        FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(this.context.getNextFlowFileSequence()).addAttributes(attrs).build();
        StandardRepositoryRecord record = new StandardRepositoryRecord(null);
        record.setWorking(fFile, attrs);
        this.records.put(fFile, record);
        this.createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
        return fFile;
    }

    public FlowFile clone(FlowFile example) {
        return this.clone(example, 0L, example.getSize());
    }

    public FlowFile clone(FlowFile example, long offset, long size) {
        this.validateRecordState(example);
        StandardRepositoryRecord exampleRepoRecord = this.records.get(example);
        FlowFileRecord currRec = exampleRepoRecord.getCurrent();
        ContentClaim claim = exampleRepoRecord.getCurrentClaim();
        if (offset + size > example.getSize()) {
            throw new FlowFileHandlingException("Specified offset of " + offset + " and size " + size + " exceeds size of " + example.toString());
        }
        StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
        builder.id(this.context.getNextFlowFileSequence());
        builder.contentClaimOffset(currRec.getContentClaimOffset() + offset);
        builder.size(size);
        String newUuid = UUID.randomUUID().toString();
        builder.addAttribute(CoreAttributes.UUID.key(), newUuid);
        FlowFileRecord clone = builder.build();
        if (claim != null) {
            this.context.getContentRepository().incrementClaimaintCount(claim);
        }
        StandardRepositoryRecord record = new StandardRepositoryRecord(null);
        record.setWorking(clone, clone.getAttributes());
        this.records.put(clone, record);
        if (offset == 0L && size == example.getSize()) {
            this.provenanceReporter.clone(example, (FlowFile)clone);
        } else {
            this.registerForkEvent(example, (FlowFile)clone);
        }
        return clone;
    }

    private void registerForkEvent(FlowFile parent, FlowFile child) {
        ProvenanceEventBuilder eventBuilder = this.forkEventBuilders.get(parent);
        if (eventBuilder == null) {
            eventBuilder = this.context.getProvenanceRepository().eventBuilder();
            eventBuilder.setEventType(ProvenanceEventType.FORK);
            eventBuilder.setFlowFileEntryDate(parent.getEntryDate());
            eventBuilder.setLineageStartDate(parent.getLineageStartDate());
            eventBuilder.setFlowFileUUID(parent.getAttribute(CoreAttributes.UUID.key()));
            eventBuilder.setComponentId(this.context.getConnectable().getIdentifier());
            Connectable connectable = this.context.getConnectable();
            String processorType = connectable.getComponentType();
            eventBuilder.setComponentType(processorType);
            eventBuilder.addParentFlowFile(parent);
            this.updateEventContentClaims(eventBuilder, parent, this.records.get(parent));
            this.forkEventBuilders.put(parent, eventBuilder);
        }
        eventBuilder.addChildFlowFile(child);
    }

    private void registerJoinEvent(FlowFile child, Collection<FlowFile> parents) {
        ProvenanceEventRecord eventRecord = this.provenanceReporter.generateJoinEvent(parents, child);
        List<ProvenanceEventRecord> existingRecords = this.generatedProvenanceEvents.get(child);
        if (existingRecords == null) {
            existingRecords = new ArrayList<ProvenanceEventRecord>();
            this.generatedProvenanceEvents.put(child, existingRecords);
        }
        existingRecords.add(eventRecord);
    }

    public FlowFile penalize(FlowFile flowFile) {
        this.validateRecordState(flowFile);
        StandardRepositoryRecord record = this.records.get(flowFile);
        long expirationEpochMillis = System.currentTimeMillis() + this.context.getConnectable().getPenalizationPeriod(TimeUnit.MILLISECONDS);
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).penaltyExpirationTime(expirationEpochMillis).build();
        record.setWorking(newFile);
        return newFile;
    }

    public FlowFile putAttribute(FlowFile flowFile, String key, String value) {
        this.validateRecordState(flowFile);
        if (CoreAttributes.UUID.key().equals(key)) {
            return flowFile;
        }
        StandardRepositoryRecord record = this.records.get(flowFile);
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttribute(key, value).build();
        record.setWorking(newFile, key, value);
        return newFile;
    }

    public FlowFile putAllAttributes(FlowFile flowFile, Map<String, String> attributes) {
        Map<String, String> updatedAttributes;
        this.validateRecordState(flowFile);
        StandardRepositoryRecord record = this.records.get(flowFile);
        if (attributes.containsKey(CoreAttributes.UUID.key())) {
            updatedAttributes = new HashMap<String, String>(attributes);
            updatedAttributes.remove(CoreAttributes.UUID.key());
        } else {
            updatedAttributes = attributes;
        }
        StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).addAttributes(updatedAttributes);
        FlowFileRecord newFile = ffBuilder.build();
        record.setWorking(newFile, updatedAttributes);
        return newFile;
    }

    public FlowFile removeAttribute(FlowFile flowFile, String key) {
        this.validateRecordState(flowFile);
        if (CoreAttributes.UUID.key().equals(key)) {
            return flowFile;
        }
        StandardRepositoryRecord record = this.records.get(flowFile);
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(new String[]{key}).build();
        record.setWorking(newFile, key, null);
        return newFile;
    }

    public FlowFile removeAllAttributes(FlowFile flowFile, Set<String> keys) {
        this.validateRecordState(flowFile);
        if (keys == null) {
            return flowFile;
        }
        StandardRepositoryRecord record = this.records.get(flowFile);
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keys).build();
        HashMap<String, Object> updatedAttrs = new HashMap<String, Object>();
        for (String key : keys) {
            if (CoreAttributes.UUID.key().equals(key)) continue;
            updatedAttrs.put(key, null);
        }
        record.setWorking(newFile, updatedAttrs);
        return newFile;
    }

    public FlowFile removeAllAttributes(FlowFile flowFile, Pattern keyPattern) {
        this.validateRecordState(flowFile);
        StandardRepositoryRecord record = this.records.get(flowFile);
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).removeAttributes(keyPattern).build();
        if (keyPattern == null) {
            record.setWorking(newFile);
        } else {
            Map curAttrs = record.getCurrent().getAttributes();
            HashMap<String, Object> removed = new HashMap<String, Object>();
            for (String key : curAttrs.keySet()) {
                if (CoreAttributes.UUID.key().equals(key) || !keyPattern.matcher(key).matches()) continue;
                removed.put(key, null);
            }
            record.setWorking(newFile, removed);
        }
        return newFile;
    }

    private void updateLastQueuedDate(StandardRepositoryRecord record) {
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).lastQueued(System.currentTimeMillis(), enqueuedIndex.getAndIncrement()).build();
        record.setWorking(newFile);
    }

    public void transfer(FlowFile flowFile, Relationship relationship) {
        this.validateRecordState(flowFile);
        int numDestinations = this.context.getConnections(relationship).size();
        int multiplier = Math.max(1, numDestinations);
        boolean autoTerminated = false;
        boolean selfRelationship = false;
        if (numDestinations == 0 && this.context.getConnectable().isAutoTerminated(relationship)) {
            autoTerminated = true;
        } else if (numDestinations == 0 && relationship == Relationship.SELF) {
            selfRelationship = true;
        } else if (numDestinations == 0) {
            throw new IllegalArgumentException("Relationship '" + relationship.getName() + "' is not known");
        }
        StandardRepositoryRecord record = this.records.get(flowFile);
        record.setTransferRelationship(relationship);
        this.updateLastQueuedDate(record);
        if (autoTerminated) {
            this.removedCount += multiplier;
            this.removedBytes += flowFile.getSize();
        } else if (!selfRelationship) {
            this.flowFilesOut += multiplier;
            this.contentSizeOut += flowFile.getSize() * (long)multiplier;
        }
    }

    public void transfer(FlowFile flowFile) {
        this.validateRecordState(flowFile);
        StandardRepositoryRecord record = this.records.get(flowFile);
        if (record.getOriginalQueue() == null) {
            throw new IllegalArgumentException("Cannot transfer FlowFiles that are created in this Session back to self");
        }
        record.setTransferRelationship(Relationship.SELF);
        this.updateLastQueuedDate(record);
    }

    public void transfer(Collection<FlowFile> flowFiles) {
        for (FlowFile flowFile : flowFiles) {
            this.transfer(flowFile);
        }
    }

    public void transfer(Collection<FlowFile> flowFiles, Relationship relationship) {
        this.validateRecordState(flowFiles);
        boolean autoTerminated = false;
        boolean selfRelationship = false;
        int numDestinations = this.context.getConnections(relationship).size();
        if (numDestinations == 0 && this.context.getConnectable().isAutoTerminated(relationship)) {
            autoTerminated = true;
        } else if (numDestinations == 0 && relationship == Relationship.SELF) {
            selfRelationship = true;
        } else if (numDestinations == 0) {
            throw new IllegalArgumentException("Relationship '" + relationship.getName() + "' is not known");
        }
        int multiplier = Math.max(1, numDestinations);
        long contentSize = 0L;
        for (FlowFile flowFile : flowFiles) {
            StandardRepositoryRecord record = this.records.get(flowFile);
            record.setTransferRelationship(relationship);
            this.updateLastQueuedDate(record);
            contentSize += flowFile.getSize() * (long)multiplier;
        }
        if (autoTerminated) {
            this.removedCount += multiplier * flowFiles.size();
            this.removedBytes += contentSize;
        } else if (!selfRelationship) {
            this.flowFilesOut += multiplier * flowFiles.size();
            this.contentSizeOut += (long)multiplier * contentSize;
        }
    }

    public void remove(FlowFile flowFile) {
        this.validateRecordState(flowFile);
        StandardRepositoryRecord record = this.records.get(flowFile);
        record.markForDelete();
        this.removedFlowFiles.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
        if (record.getOriginalQueue() == null) {
            this.generatedProvenanceEvents.remove(flowFile);
            this.removeForkEvents(flowFile);
        } else {
            ++this.removedCount;
            this.removedBytes += flowFile.getSize();
            this.provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key()));
        }
    }

    public void remove(Collection<FlowFile> flowFiles) {
        this.validateRecordState(flowFiles);
        for (FlowFile flowFile : flowFiles) {
            StandardRepositoryRecord record = this.records.get(flowFile);
            record.markForDelete();
            this.removedFlowFiles.add(flowFile.getAttribute(CoreAttributes.UUID.key()));
            if (record.getOriginalQueue() == null) {
                this.generatedProvenanceEvents.remove(flowFile);
                this.removeForkEvents(flowFile);
                continue;
            }
            ++this.removedCount;
            this.removedBytes += flowFile.getSize();
            this.provenanceReporter.drop(flowFile, flowFile.getAttribute(CoreAttributes.DISCARD_REASON.key()));
        }
    }

    private void removeForkEvents(FlowFile flowFile) {
        for (ProvenanceEventBuilder builder : this.forkEventBuilders.values()) {
            ProvenanceEventRecord event = builder.build();
            if (event.getEventType() != ProvenanceEventType.FORK) continue;
            builder.removeChildFlowFile(flowFile);
        }
    }

    public void expireFlowFiles() {
        HashSet<FlowFileRecord> expired = new HashSet<FlowFileRecord>();
        FlowFileFilter filter = new FlowFileFilter(){

            public FlowFileFilter.FlowFileFilterResult filter(FlowFile flowFile) {
                return FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
            }
        };
        for (Connection conn : this.context.getConnectable().getIncomingConnections()) {
            do {
                expired.clear();
                conn.getFlowFileQueue().poll(filter, expired);
                this.removeExpired(expired, conn);
            } while (!expired.isEmpty());
        }
    }

    private void removeExpired(Set<FlowFileRecord> flowFiles, Connection connection) {
        if (flowFiles.isEmpty()) {
            return;
        }
        LOG.info("{} {} FlowFiles have expired and will be removed", new Object[]{this, flowFiles.size()});
        ArrayList<StandardRepositoryRecord> expiredRecords = new ArrayList<StandardRepositoryRecord>(flowFiles.size());
        Connectable connectable = this.context.getConnectable();
        String processorType = connectable.getComponentType();
        final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(), processorType, this.context.getProvenanceRepository(), this);
        final HashMap<String, FlowFileRecord> recordIdMap = new HashMap<String, FlowFileRecord>();
        for (FlowFileRecord flowFile : flowFiles) {
            recordIdMap.put(flowFile.getAttribute(CoreAttributes.UUID.key()), flowFile);
            StandardRepositoryRecord record = new StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile);
            record.markForDelete();
            expiredRecords.add(record);
            expiredReporter.expire((FlowFile)flowFile, "Expiration Threshold = " + connection.getFlowFileQueue().getFlowFileExpiration());
            this.decrementClaimCount(flowFile.getContentClaim());
            long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
            Connectable terminator = connectable instanceof ProcessorNode ? ((ProcessorNode)connectable).getProcessor() : connectable;
            LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
        }
        try {
            Iterable<ProvenanceEventRecord> iterable = new Iterable<ProvenanceEventRecord>(){

                @Override
                public Iterator<ProvenanceEventRecord> iterator() {
                    final Iterator<ProvenanceEventRecord> expiredEventIterator = expiredReporter.getEvents().iterator();
                    Iterator<ProvenanceEventRecord> enrichingIterator = new Iterator<ProvenanceEventRecord>(){

                        @Override
                        public boolean hasNext() {
                            return expiredEventIterator.hasNext();
                        }

                        @Override
                        public ProvenanceEventRecord next() {
                            ProvenanceEventRecord event = (ProvenanceEventRecord)expiredEventIterator.next();
                            StandardProvenanceEventRecord.Builder enriched = new StandardProvenanceEventRecord.Builder().fromEvent(event);
                            FlowFileRecord record = (FlowFileRecord)recordIdMap.get(event.getFlowFileUuid());
                            if (record == null) {
                                return null;
                            }
                            ContentClaim claim = record.getContentClaim();
                            if (claim != null) {
                                ResourceClaim resourceClaim = claim.getResourceClaim();
                                enriched.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(record.getContentClaimOffset() + claim.getOffset()), record.getSize());
                                enriched.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(record.getContentClaimOffset() + claim.getOffset()), record.getSize());
                            }
                            enriched.setAttributes(record.getAttributes(), Collections.emptyMap());
                            return enriched.build();
                        }

                        @Override
                        public void remove() {
                            throw new UnsupportedOperationException();
                        }
                    };
                    return enrichingIterator;
                }
            };
            this.context.getProvenanceRepository().registerEvents((Iterable)iterable);
            this.context.getFlowFileRepository().updateRepository(expiredRecords);
        }
        catch (IOException e) {
            LOG.error("Failed to update FlowFile Repository to record expired records due to {}", (Throwable)e);
        }
    }

    private InputStream getInputStream(FlowFile flowFile, ContentClaim claim, long offset, boolean allowCachingOfStream) throws ContentNotFoundException {
        if (flowFile.getSize() == 0L) {
            return new ByteArrayInputStream(new byte[0]);
        }
        try {
            if (allowCachingOfStream && this.recursionSet.isEmpty()) {
                if (this.currentReadClaim == claim && this.currentReadClaimStream != null && this.currentReadClaimStream.getBytesConsumed() <= offset) {
                    long bytesToSkip = offset - this.currentReadClaimStream.getBytesConsumed();
                    if (bytesToSkip > 0L) {
                        StreamUtils.skip((InputStream)this.currentReadClaimStream, (long)bytesToSkip);
                    }
                    return new DisableOnCloseInputStream((InputStream)this.currentReadClaimStream);
                }
                this.claimCache.flush(claim);
                InputStream rawInStream = this.context.getContentRepository().read(claim);
                if (this.currentReadClaimStream != null) {
                    this.currentReadClaimStream.close();
                }
                this.currentReadClaim = claim;
                this.currentReadClaimStream = new ByteCountingInputStream(rawInStream);
                StreamUtils.skip((InputStream)this.currentReadClaimStream, (long)offset);
                return new DisableOnCloseInputStream((InputStream)this.currentReadClaimStream);
            }
            this.claimCache.flush(claim);
            InputStream rawInStream = this.context.getContentRepository().read(claim);
            try {
                StreamUtils.skip((InputStream)rawInStream, (long)offset);
            }
            catch (IOException ioe) {
                IOUtils.closeQuietly((InputStream)rawInStream);
                throw ioe;
            }
            return rawInStream;
        }
        catch (ContentNotFoundException cnfe) {
            throw cnfe;
        }
        catch (EOFException eof) {
            throw new ContentNotFoundException(claim, (Throwable)eof);
        }
        catch (IOException ioe) {
            throw new FlowFileAccessException("Failed to read content of " + flowFile, (Throwable)ioe);
        }
    }

    public void read(FlowFile source, InputStreamCallback reader) {
        this.read(source, false, reader);
    }

    public void read(FlowFile source, boolean allowSessionStreamManagement, InputStreamCallback reader) {
        this.validateRecordState(source);
        StandardRepositoryRecord record = this.records.get(source);
        try {
            this.ensureNotAppending(record.getCurrentClaim());
            this.claimCache.flush(record.getCurrentClaim());
        }
        catch (IOException e) {
            throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), (Throwable)e);
        }
        try (InputStream rawIn = this.getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), true);
             LimitedInputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
             DisableOnCloseInputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
             ByteCountingInputStream countingStream = new ByteCountingInputStream((InputStream)disableOnCloseIn, this.bytesRead);){
            FlowFileAccessInputStream ffais = new FlowFileAccessInputStream((InputStream)countingStream, source, record.getCurrentClaim());
            boolean cnfeThrown = false;
            try {
                this.recursionSet.add(source);
                reader.process((InputStream)ffais);
                if (this.currentReadClaimStream != null) {
                    if (!allowSessionStreamManagement) {
                        this.currentReadClaimStream.close();
                        this.currentReadClaimStream = null;
                    }
                }
            }
            catch (ContentNotFoundException cnfe) {
                cnfeThrown = true;
                throw cnfe;
            }
            finally {
                this.recursionSet.remove(source);
                this.bytesRead += countingStream.getBytesRead();
                if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
                    throw ffais.getContentNotFoundException();
                }
            }
        }
        catch (ContentNotFoundException nfe) {
            this.handleContentNotFound(nfe, record);
        }
        catch (IOException ex) {
            throw new ProcessException("IOException thrown from " + this.connectableDescription + ": " + ex.toString(), (Throwable)ex);
        }
    }

    public InputStream read(final FlowFile source) {
        this.validateRecordState(source);
        final StandardRepositoryRecord record = this.records.get(source);
        try {
            this.ensureNotAppending(record.getCurrentClaim());
        }
        catch (IOException e) {
            throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), (Throwable)e);
        }
        InputStream rawIn = this.getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), false);
        LimitedInputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
        final ByteCountingInputStream countingStream = new ByteCountingInputStream((InputStream)limitedIn);
        final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream((InputStream)countingStream, source, record.getCurrentClaim());
        InputStream errorHandlingStream = new InputStream(){
            private boolean closed = false;

            @Override
            public int read() throws IOException {
                try {
                    return ffais.read();
                }
                catch (ContentNotFoundException cnfe) {
                    StandardProcessSession.this.handleContentNotFound(cnfe, record);
                    this.close();
                    throw cnfe;
                }
                catch (FlowFileAccessException ffae) {
                    LOG.error("Failed to read content from " + source + "; rolling back session", (Throwable)ffae);
                    StandardProcessSession.this.rollback(true);
                    this.close();
                    throw ffae;
                }
            }

            @Override
            public int read(byte[] b) throws IOException {
                return this.read(b, 0, b.length);
            }

            @Override
            public int read(byte[] b, int off, int len) throws IOException {
                try {
                    return ffais.read(b, off, len);
                }
                catch (ContentNotFoundException cnfe) {
                    StandardProcessSession.this.handleContentNotFound(cnfe, record);
                    this.close();
                    throw cnfe;
                }
                catch (FlowFileAccessException ffae) {
                    LOG.error("Failed to read content from " + source + "; rolling back session", (Throwable)ffae);
                    StandardProcessSession.this.rollback(true);
                    this.close();
                    throw ffae;
                }
            }

            @Override
            public void close() throws IOException {
                if (!this.closed) {
                    StandardProcessSession standardProcessSession = StandardProcessSession.this;
                    standardProcessSession.bytesRead = standardProcessSession.bytesRead + countingStream.getBytesRead();
                    this.closed = true;
                }
                ffais.close();
                StandardProcessSession.this.openInputStreams.remove(source);
            }

            @Override
            public int available() throws IOException {
                return ffais.available();
            }

            @Override
            public long skip(long n) throws IOException {
                return ffais.skip(n);
            }

            @Override
            public boolean markSupported() {
                return ffais.markSupported();
            }

            @Override
            public synchronized void mark(int readlimit) {
                ffais.mark(readlimit);
            }

            @Override
            public synchronized void reset() throws IOException {
                ffais.reset();
            }

            public String toString() {
                return "ErrorHandlingInputStream[FlowFile=" + source + "]";
            }
        };
        this.openInputStreams.put(source, errorHandlingStream);
        return errorHandlingStream;
    }

    public FlowFile merge(Collection<FlowFile> sources, FlowFile destination) {
        return this.merge(sources, destination, null, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FlowFile merge(Collection<FlowFile> sources, FlowFile destination, byte[] header, byte[] footer, byte[] demarcator) {
        ContentClaim newClaim;
        this.validateRecordState(sources);
        this.validateRecordState(destination);
        if (sources.contains(destination)) {
            throw new IllegalArgumentException("Destination cannot be within sources");
        }
        ArrayList<StandardRepositoryRecord> sourceRecords = new ArrayList<StandardRepositoryRecord>();
        for (FlowFile source : sources) {
            StandardRepositoryRecord record = this.records.get(source);
            sourceRecords.add(record);
            try {
                this.ensureNotAppending(record.getCurrentClaim());
                this.claimCache.flush(record.getCurrentClaim());
            }
            catch (IOException e) {
                throw new FlowFileAccessException("Unable to read from source " + source + " due to " + e.toString(), (Throwable)e);
            }
        }
        StandardRepositoryRecord destinationRecord = this.records.get(destination);
        ContentRepository contentRepo = this.context.getContentRepository();
        try {
            newClaim = contentRepo.create(this.context.getConnectable().isLossTolerant());
            claimLog.debug("Creating ContentClaim {} for 'merge' for {}", (Object)newClaim, (Object)destinationRecord.getCurrent());
        }
        catch (IOException e) {
            throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), (Throwable)e);
        }
        long readCount = 0L;
        long writtenCount = 0L;
        try {
            try (OutputStream rawOut = contentRepo.write(newClaim);
                 BufferedOutputStream out = new BufferedOutputStream(rawOut);){
                if (header != null && header.length > 0) {
                    ((OutputStream)out).write(header);
                    writtenCount += (long)header.length;
                }
                int objectIndex = 0;
                boolean useDemarcator = demarcator != null && demarcator.length > 0;
                int numSources = sources.size();
                for (FlowFile source : sources) {
                    StandardRepositoryRecord sourceRecord = this.records.get(source);
                    long copied = contentRepo.exportTo(sourceRecord.getCurrentClaim(), (OutputStream)out, sourceRecord.getCurrentClaimOffset(), source.getSize());
                    writtenCount += copied;
                    readCount += copied;
                    if (!useDemarcator || ++objectIndex >= numSources) continue;
                    ((OutputStream)out).write(demarcator);
                    writtenCount += (long)demarcator.length;
                }
                if (footer != null && footer.length > 0) {
                    ((OutputStream)out).write(footer);
                    writtenCount += (long)footer.length;
                }
            }
            finally {
                this.bytesWritten += writtenCount;
                this.bytesRead += readCount;
            }
        }
        catch (ContentNotFoundException nfe) {
            this.destroyContent(newClaim);
            this.handleContentNotFound(nfe, destinationRecord);
            this.handleContentNotFound(nfe, sourceRecords);
        }
        catch (IOException ioe) {
            this.destroyContent(newClaim);
            throw new FlowFileAccessException("Failed to merge " + sources.size() + " into " + destination + " due to " + ioe.toString(), (Throwable)ioe);
        }
        catch (Throwable t) {
            this.destroyContent(newClaim);
            throw t;
        }
        this.removeTemporaryClaim(destinationRecord);
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(destinationRecord.getCurrent()).contentClaim(newClaim).contentClaimOffset(0L).size(writtenCount).build();
        destinationRecord.setWorking(newFile);
        this.records.put(newFile, destinationRecord);
        return newFile;
    }

    private void ensureNotAppending(ContentClaim claim) throws IOException {
        if (claim == null) {
            return;
        }
        ByteCountingOutputStream outStream = this.appendableStreams.remove(claim);
        if (outStream == null) {
            return;
        }
        outStream.flush();
        outStream.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FlowFile write(FlowFile source, OutputStreamCallback writer) {
        this.validateRecordState(source);
        StandardRepositoryRecord record = this.records.get(source);
        long writtenToFlowFile = 0L;
        ContentClaim newClaim = null;
        try {
            newClaim = this.claimCache.getContentClaim();
            claimLog.debug("Creating ContentClaim {} for 'write' for {}", (Object)newClaim, (Object)source);
            this.ensureNotAppending(newClaim);
            try (OutputStream stream = this.claimCache.write(newClaim);
                 DisableOnCloseOutputStream disableOnClose = new DisableOnCloseOutputStream(stream);
                 ByteCountingOutputStream countingOut = new ByteCountingOutputStream((OutputStream)disableOnClose);){
                try {
                    this.recursionSet.add(source);
                    writer.process((OutputStream)new FlowFileAccessOutputStream((OutputStream)countingOut, source));
                }
                finally {
                    writtenToFlowFile = countingOut.getBytesWritten();
                    this.bytesWritten += countingOut.getBytesWritten();
                }
            }
            finally {
                this.recursionSet.remove(source);
            }
        }
        catch (ContentNotFoundException nfe) {
            this.resetWriteClaims();
            this.destroyContent(newClaim);
            this.handleContentNotFound(nfe, record);
        }
        catch (FlowFileAccessException ffae) {
            this.resetWriteClaims();
            this.destroyContent(newClaim);
            throw ffae;
        }
        catch (IOException ioe) {
            this.resetWriteClaims();
            this.destroyContent(newClaim);
            throw new ProcessException("IOException thrown from " + this.connectableDescription + ": " + ioe.toString(), (Throwable)ioe);
        }
        catch (Throwable t) {
            this.resetWriteClaims();
            this.destroyContent(newClaim);
            throw t;
        }
        this.removeTemporaryClaim(record);
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(Math.max(0L, newClaim.getLength() - writtenToFlowFile)).size(writtenToFlowFile).build();
        record.setWorking(newFile);
        return newFile;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FlowFile append(FlowFile source, OutputStreamCallback writer) {
        this.validateRecordState(source);
        StandardRepositoryRecord record = this.records.get(source);
        long newSize = 0L;
        ContentClaim oldClaim = record.getCurrentClaim();
        ByteCountingOutputStream outStream = this.appendableStreams.get(oldClaim);
        long originalByteWrittenCount = 0L;
        ContentClaim newClaim = null;
        try {
            block68: {
                if (outStream == null) {
                    this.claimCache.flush(oldClaim);
                    try (InputStream oldClaimIn = this.context.getContentRepository().read(oldClaim);){
                        newClaim = this.context.getContentRepository().create(this.context.getConnectable().isLossTolerant());
                        claimLog.debug("Creating ContentClaim {} for 'append' for {}", (Object)newClaim, (Object)source);
                        OutputStream rawOutStream = this.context.getContentRepository().write(newClaim);
                        BufferedOutputStream bufferedOutStream = new BufferedOutputStream(rawOutStream);
                        outStream = new ByteCountingOutputStream((OutputStream)bufferedOutStream);
                        originalByteWrittenCount = 0L;
                        this.appendableStreams.put(newClaim, outStream);
                        StreamUtils.copy((InputStream)oldClaimIn, (OutputStream)outStream);
                        try (DisableOnCloseOutputStream disableOnClose = new DisableOnCloseOutputStream((OutputStream)outStream);){
                            this.recursionSet.add(source);
                            writer.process((OutputStream)new FlowFileAccessOutputStream(disableOnClose, source));
                            break block68;
                        }
                        finally {
                            this.recursionSet.remove(source);
                        }
                    }
                }
                newClaim = oldClaim;
                originalByteWrittenCount = outStream.getBytesWritten();
                try (DisableOnCloseOutputStream disableOnClose = new DisableOnCloseOutputStream((OutputStream)outStream);
                     FlowFileAccessOutputStream flowFileAccessOutStream = new FlowFileAccessOutputStream(disableOnClose, source);){
                    this.recursionSet.add(source);
                    writer.process((OutputStream)flowFileAccessOutStream);
                }
                finally {
                    this.recursionSet.remove(source);
                }
            }
            newSize = outStream.getBytesWritten();
        }
        catch (ContentNotFoundException nfe) {
            this.resetWriteClaims();
            if (newClaim != oldClaim) {
                this.destroyContent(newClaim);
            }
            this.handleContentNotFound(nfe, record);
        }
        catch (IOException ioe) {
            this.resetWriteClaims();
            if (newClaim != oldClaim) {
                this.destroyContent(newClaim);
            }
            throw new ProcessException("IOException thrown from " + this.connectableDescription + ": " + ioe.toString(), (Throwable)ioe);
        }
        catch (Throwable t) {
            this.resetWriteClaims();
            if (newClaim != oldClaim) {
                this.destroyContent(newClaim);
            }
            throw t;
        }
        finally {
            if (outStream != null) {
                long bytesWrittenThisIteration = outStream.getBytesWritten() - originalByteWrittenCount;
                this.bytesWritten += bytesWrittenThisIteration;
            }
        }
        if (newClaim != oldClaim) {
            this.removeTemporaryClaim(record);
        }
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(0L).size(newSize).build();
        record.setWorking(newFile);
        return newFile;
    }

    private void removeTemporaryClaim(StandardRepositoryRecord record) {
        boolean contentModified;
        boolean bl = contentModified = record.getWorkingClaim() != null && record.getWorkingClaim() != record.getOriginalClaim();
        if (contentModified) {
            this.context.getContentRepository().decrementClaimantCount(record.getWorkingClaim());
            record.addTransientClaim(record.getWorkingClaim());
        }
    }

    private void resetWriteClaims() {
        this.resetWriteClaims(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void resetWriteClaims(boolean suppressExceptions) {
        for (ByteCountingOutputStream out : this.appendableStreams.values()) {
            try {
                try {
                    out.flush();
                }
                finally {
                    out.close();
                }
            }
            catch (IOException e) {
                if (suppressExceptions) continue;
                throw new FlowFileAccessException("Unable to flush the output of FlowFile to the Content Repository");
            }
        }
        this.appendableStreams.clear();
    }

    private void resetReadClaim() {
        try {
            if (this.currentReadClaimStream != null) {
                this.currentReadClaimStream.close();
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.currentReadClaimStream = null;
        this.currentReadClaim = null;
    }

    public FlowFile write(FlowFile source, StreamCallback writer) {
        this.validateRecordState(source);
        StandardRepositoryRecord record = this.records.get(source);
        ContentClaim currClaim = record.getCurrentClaim();
        long writtenToFlowFile = 0L;
        ContentClaim newClaim = null;
        try {
            newClaim = this.claimCache.getContentClaim();
            claimLog.debug("Creating ContentClaim {} for 'write' for {}", (Object)newClaim, (Object)source);
            this.ensureNotAppending(newClaim);
            if (currClaim != null) {
                this.claimCache.flush(currClaim.getResourceClaim());
            }
            try (InputStream is = this.getInputStream(source, currClaim, record.getCurrentClaimOffset(), true);
                 LimitedInputStream limitedIn = new LimitedInputStream(is, source.getSize());
                 DisableOnCloseInputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
                 ByteCountingInputStream countingIn = new ByteCountingInputStream((InputStream)disableOnCloseIn, this.bytesRead);
                 OutputStream os = this.claimCache.write(newClaim);
                 DisableOnCloseOutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os);
                 ByteCountingOutputStream countingOut = new ByteCountingOutputStream((OutputStream)disableOnCloseOut);){
                this.recursionSet.add(source);
                FlowFileAccessInputStream ffais = new FlowFileAccessInputStream((InputStream)countingIn, source, currClaim);
                boolean cnfeThrown = false;
                try {
                    writer.process((InputStream)ffais, (OutputStream)new FlowFileAccessOutputStream((OutputStream)countingOut, source));
                }
                catch (ContentNotFoundException cnfe) {
                    cnfeThrown = true;
                    throw cnfe;
                }
                finally {
                    writtenToFlowFile = countingOut.getBytesWritten();
                    this.bytesWritten += writtenToFlowFile;
                    this.bytesRead += countingIn.getBytesRead();
                    this.recursionSet.remove(source);
                    if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
                        throw ffais.getContentNotFoundException();
                    }
                }
            }
        }
        catch (ContentNotFoundException nfe) {
            this.destroyContent(newClaim);
            this.handleContentNotFound(nfe, record);
        }
        catch (IOException ioe) {
            this.destroyContent(newClaim);
            throw new ProcessException("IOException thrown from " + this.connectableDescription + ": " + ioe.toString(), (Throwable)ioe);
        }
        catch (FlowFileAccessException ffae) {
            this.destroyContent(newClaim);
            throw ffae;
        }
        catch (Throwable t) {
            this.destroyContent(newClaim);
            throw t;
        }
        this.removeTemporaryClaim(record);
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(Math.max(0L, newClaim.getLength() - writtenToFlowFile)).size(writtenToFlowFile).build();
        record.setWorking(newFile);
        return newFile;
    }

    public FlowFile importFrom(Path source, boolean keepSourceFile, FlowFile destination) {
        ContentClaim newClaim;
        this.validateRecordState(destination);
        if (!(keepSourceFile || Files.isWritable(source.getParent()) || source.getParent().toFile().canWrite())) {
            throw new FlowFileAccessException("Cannot write to path " + source.getParent().toFile().getAbsolutePath() + " so cannot delete file; will not import.");
        }
        StandardRepositoryRecord record = this.records.get(destination);
        try {
            newClaim = this.context.getContentRepository().create(this.context.getConnectable().isLossTolerant());
            claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", (Object)newClaim, (Object)destination);
        }
        catch (IOException e) {
            throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), (Throwable)e);
        }
        long claimOffset = 0L;
        long newSize = 0L;
        try {
            newSize = this.context.getContentRepository().importFrom(source, newClaim);
            this.bytesWritten += newSize;
            this.bytesRead += newSize;
        }
        catch (Throwable t) {
            this.destroyContent(newClaim);
            throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t);
        }
        this.removeTemporaryClaim(record);
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize).addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName()).build();
        record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName());
        if (!keepSourceFile) {
            this.deleteOnCommit.put((FlowFile)newFile, source);
        }
        return newFile;
    }

    public FlowFile importFrom(InputStream source, FlowFile destination) {
        long newSize;
        this.validateRecordState(destination);
        StandardRepositoryRecord record = this.records.get(destination);
        ContentClaim newClaim = null;
        long claimOffset = 0L;
        try {
            try {
                newClaim = this.context.getContentRepository().create(this.context.getConnectable().isLossTolerant());
                claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", (Object)newClaim, (Object)destination);
                newSize = this.context.getContentRepository().importFrom(source, newClaim);
                this.bytesWritten += newSize;
            }
            catch (IOException e) {
                throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), (Throwable)e);
            }
        }
        catch (Throwable t) {
            if (newClaim != null) {
                this.destroyContent(newClaim);
            }
            throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t);
        }
        this.removeTemporaryClaim(record);
        FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(0L).size(newSize).build();
        record.setWorking(newFile);
        return newFile;
    }

    public void exportTo(FlowFile source, Path destination, boolean append) {
        this.validateRecordState(source);
        StandardRepositoryRecord record = this.records.get(source);
        try {
            this.ensureNotAppending(record.getCurrentClaim());
            this.claimCache.flush(record.getCurrentClaim());
            long copyCount = this.context.getContentRepository().exportTo(record.getCurrentClaim(), destination, append, record.getCurrentClaimOffset(), source.getSize());
            this.bytesRead += copyCount;
            this.bytesWritten += copyCount;
        }
        catch (ContentNotFoundException nfe) {
            this.handleContentNotFound(nfe, record);
        }
        catch (Throwable t) {
            throw new FlowFileAccessException("Failed to export " + source + " to " + destination + " due to " + t.toString(), t);
        }
    }

    public void exportTo(FlowFile source, OutputStream destination) {
        this.validateRecordState(source);
        StandardRepositoryRecord record = this.records.get(source);
        if (record.getCurrentClaim() == null) {
            return;
        }
        try {
            this.ensureNotAppending(record.getCurrentClaim());
            this.claimCache.flush(record.getCurrentClaim());
        }
        catch (IOException e) {
            throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), (Throwable)e);
        }
        try (InputStream rawIn = this.getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), true);
             LimitedInputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
             DisableOnCloseInputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
             ByteCountingInputStream countingStream = new ByteCountingInputStream((InputStream)disableOnCloseIn, this.bytesRead);){
            FlowFileAccessInputStream ffais = new FlowFileAccessInputStream((InputStream)countingStream, source, record.getCurrentClaim());
            boolean cnfeThrown = false;
            try {
                this.recursionSet.add(source);
                StreamUtils.copy((InputStream)ffais, (OutputStream)destination, (long)source.getSize());
            }
            catch (ContentNotFoundException cnfe) {
                cnfeThrown = true;
                throw cnfe;
            }
            finally {
                this.recursionSet.remove(source);
                IOUtils.closeQuietly((InputStream)ffais);
                if (!cnfeThrown && ffais.getContentNotFoundException() != null) {
                    throw ffais.getContentNotFoundException();
                }
            }
        }
        catch (ContentNotFoundException nfe) {
            this.handleContentNotFound(nfe, record);
        }
        catch (IOException ex) {
            throw new ProcessException("IOException thrown from " + this.connectableDescription + ": " + ex.toString(), (Throwable)ex);
        }
    }

    private void handleContentNotFound(ContentNotFoundException nfe, Collection<StandardRepositoryRecord> suspectRecords) {
        for (StandardRepositoryRecord record : suspectRecords) {
            this.handleContentNotFound(nfe, record);
        }
    }

    private void handleContentNotFound(ContentNotFoundException nfe, StandardRepositoryRecord suspectRecord) {
        ContentClaim registeredClaim = suspectRecord.getOriginalClaim();
        ContentClaim transientClaim = suspectRecord.getWorkingClaim();
        ContentClaim missingClaim = nfe.getMissingClaim();
        ProvenanceEventRecord dropEvent = this.provenanceReporter.drop((FlowFile)suspectRecord.getCurrent(), nfe.getMessage() == null ? "Content Not Found" : nfe.getMessage());
        if (dropEvent != null) {
            this.context.getProvenanceRepository().registerEvent(dropEvent);
        }
        if (missingClaim == registeredClaim) {
            suspectRecord.markForAbort();
            this.rollback();
            throw new MissingFlowFileException("Unable to find content for FlowFile", (Throwable)nfe);
        }
        if (missingClaim == transientClaim) {
            this.rollback();
            throw new MissingFlowFileException("Unable to find content for FlowFile", (Throwable)nfe);
        }
    }

    private void validateRecordState(FlowFile ... flowFiles) {
        for (FlowFile file : flowFiles) {
            if (this.recursionSet.contains(file)) {
                throw new IllegalStateException(file + " already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed");
            }
            StandardRepositoryRecord record = this.records.get(file);
            if (record == null) {
                this.rollback();
                throw new FlowFileHandlingException(file + " is not known in this session (" + this.toString() + ")");
            }
            if (record.getCurrent() != file) {
                this.rollback();
                throw new FlowFileHandlingException(file + " is not the most recent version of this FlowFile within this session (" + this.toString() + ")");
            }
            if (record.getTransferRelationship() != null) {
                this.rollback();
                throw new FlowFileHandlingException(file + " is already marked for transfer");
            }
            if (!record.isMarkedForDelete()) continue;
            this.rollback();
            throw new FlowFileHandlingException(file + " has already been marked for removal");
        }
    }

    private void validateRecordState(Collection<FlowFile> flowFiles) {
        for (FlowFile flowFile : flowFiles) {
            this.validateRecordState(flowFile);
        }
    }

    boolean isFlowFileKnown(FlowFile flowFile) {
        return this.records.containsKey(flowFile);
    }

    public FlowFile create(FlowFile parent) {
        HashMap<String, String> newAttributes = new HashMap<String, String>(3);
        newAttributes.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime()));
        newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
        newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
        StandardFlowFileRecord.Builder fFileBuilder = new StandardFlowFileRecord.Builder().id(this.context.getNextFlowFileSequence());
        for (Map.Entry entry : parent.getAttributes().entrySet()) {
            String key = (String)entry.getKey();
            String value = (String)entry.getValue();
            if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key) || CoreAttributes.DISCARD_REASON.key().equals(key) || CoreAttributes.UUID.key().equals(key)) continue;
            newAttributes.put(key, value);
        }
        fFileBuilder.lineageStart(parent.getLineageStartDate(), parent.getLineageStartIndex());
        fFileBuilder.addAttributes(newAttributes);
        FlowFileRecord fFile = fFileBuilder.build();
        StandardRepositoryRecord record = new StandardRepositoryRecord(null);
        record.setWorking(fFile, newAttributes);
        this.records.put(fFile, record);
        this.createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
        this.registerForkEvent(parent, (FlowFile)fFile);
        return fFile;
    }

    public FlowFile create(Collection<FlowFile> parents) {
        Map<String, String> newAttributes = StandardProcessSession.intersectAttributes(parents);
        newAttributes.remove(CoreAttributes.UUID.key());
        newAttributes.remove(CoreAttributes.ALTERNATE_IDENTIFIER.key());
        newAttributes.remove(CoreAttributes.DISCARD_REASON.key());
        long lineageStartDate = 0L;
        for (FlowFile parent : parents) {
            long parentLineageStartDate = parent.getLineageStartDate();
            if (lineageStartDate != 0L && parentLineageStartDate >= lineageStartDate) continue;
            lineageStartDate = parentLineageStartDate;
        }
        long lineageStartIndex = 0L;
        for (FlowFile parent : parents) {
            if (parent.getLineageStartDate() != lineageStartDate || parent.getLineageStartIndex() >= lineageStartIndex) continue;
            lineageStartIndex = parent.getLineageStartIndex();
        }
        newAttributes.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime()));
        newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
        newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
        FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(this.context.getNextFlowFileSequence()).addAttributes(newAttributes).lineageStart(lineageStartDate, lineageStartIndex).build();
        StandardRepositoryRecord record = new StandardRepositoryRecord(null);
        record.setWorking(fFile, newAttributes);
        this.records.put(fFile, record);
        this.createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
        this.registerJoinEvent((FlowFile)fFile, parents);
        return fFile;
    }

    private static Map<String, String> intersectAttributes(Collection<FlowFile> flowFileList) {
        HashMap<String, String> result = new HashMap<String, String>();
        if (flowFileList == null || flowFileList.isEmpty()) {
            return result;
        }
        if (flowFileList.size() == 1) {
            result.putAll(flowFileList.iterator().next().getAttributes());
        }
        Map firstMap = flowFileList.iterator().next().getAttributes();
        block0: for (Map.Entry mapEntry : firstMap.entrySet()) {
            String key = (String)mapEntry.getKey();
            String value = (String)mapEntry.getValue();
            for (FlowFile flowFile : flowFileList) {
                Map currMap = flowFile.getAttributes();
                String curVal = (String)currMap.get(key);
                if (curVal != null && curVal.equals(value)) continue;
                continue block0;
            }
            result.put(key, value);
        }
        return result;
    }

    protected void finalize() throws Throwable {
        this.rollback();
        super.finalize();
    }

    public ProvenanceReporter getProvenanceReporter() {
        return this.provenanceReporter;
    }

    public String toString() {
        return "StandardProcessSession[id=" + this.sessionId + "]";
    }

    private static class Checkpoint {
        private long processingTime = 0L;
        private final Map<FlowFile, List<ProvenanceEventRecord>> generatedProvenanceEvents = new HashMap<FlowFile, List<ProvenanceEventRecord>>();
        private final Map<FlowFile, ProvenanceEventBuilder> forkEventBuilders = new HashMap<FlowFile, ProvenanceEventBuilder>();
        private final List<ProvenanceEventRecord> autoTerminatedEvents = new ArrayList<ProvenanceEventRecord>();
        private final Set<ProvenanceEventRecord> reportedEvents = new LinkedHashSet<ProvenanceEventRecord>();
        private final Map<FlowFileRecord, StandardRepositoryRecord> records = new HashMap<FlowFileRecord, StandardRepositoryRecord>();
        private final Map<String, StandardFlowFileEvent> connectionCounts = new HashMap<String, StandardFlowFileEvent>();
        private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new HashMap<FlowFileQueue, Set<FlowFileRecord>>();
        private final Map<String, Long> counters = new HashMap<String, Long>();
        private final Map<FlowFile, Path> deleteOnCommit = new HashMap<FlowFile, Path>();
        private final Set<String> removedFlowFiles = new HashSet<String>();
        private final Set<String> createdFlowFiles = new HashSet<String>();
        private int removedCount = 0;
        private long removedBytes = 0L;
        private long bytesRead = 0L;
        private long bytesWritten = 0L;
        private int flowFilesIn = 0;
        private int flowFilesOut = 0;
        private long contentSizeIn = 0L;
        private long contentSizeOut = 0L;

        private Checkpoint() {
        }

        private void checkpoint(StandardProcessSession session, List<ProvenanceEventRecord> autoTerminatedEvents) {
            this.processingTime += System.nanoTime() - session.processingStartTime;
            this.generatedProvenanceEvents.putAll(session.generatedProvenanceEvents);
            this.forkEventBuilders.putAll(session.forkEventBuilders);
            if (autoTerminatedEvents != null) {
                this.autoTerminatedEvents.addAll(autoTerminatedEvents);
            }
            this.reportedEvents.addAll(session.provenanceReporter.getEvents());
            this.records.putAll(session.records);
            this.connectionCounts.putAll(session.connectionCounts);
            this.unacknowledgedFlowFiles.putAll(session.unacknowledgedFlowFiles);
            this.counters.putAll(session.counters);
            this.deleteOnCommit.putAll(session.deleteOnCommit);
            this.removedFlowFiles.addAll(session.removedFlowFiles);
            this.createdFlowFiles.addAll(session.createdFlowFiles);
            this.removedCount += session.removedCount;
            this.removedBytes += session.removedBytes;
            this.bytesRead += session.bytesRead;
            this.bytesWritten += session.bytesWritten;
            this.flowFilesIn += session.flowFilesIn;
            this.flowFilesOut += session.flowFilesOut;
            this.contentSizeIn += session.contentSizeIn;
            this.contentSizeOut += session.contentSizeOut;
        }
    }

    private static interface ConnectionPoller {
        public List<FlowFileRecord> poll(Connection var1, Set<FlowFileRecord> var2);
    }
}

