package org.apache.nifi.processors.standard.merge;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.standard.MergeRecord;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.stream.io.ByteCountingOutputStream;

/* loaded from: input_file:org/apache/nifi/processors/standard/merge/RecordBin.class */
public class RecordBin {
    private final ComponentLog logger;
    private final ProcessSession session;
    private final RecordSetWriterFactory writerFactory;
    private final RecordBinThresholds thresholds;
    private final ProcessContext context;
    private FlowFile merged;
    private RecordSetWriter recordWriter;
    private ByteCountingOutputStream out;
    private static final AtomicLong idGenerator = new AtomicLong(0);
    private final List<FlowFile> flowFiles = new ArrayList();
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = this.rwLock.readLock();
    private final Lock writeLock = this.rwLock.writeLock();
    private final long creationNanos = System.nanoTime();
    private int recordCount = 0;
    private int fragmentCount = 0;
    private volatile boolean complete = false;
    private final long id = idGenerator.getAndIncrement();
    private volatile int requiredRecordCount = -1;

    public RecordBin(ProcessContext processContext, ProcessSession processSession, ComponentLog componentLog, RecordBinThresholds recordBinThresholds) {
        this.session = processSession;
        this.writerFactory = processContext.getProperty(MergeRecord.RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        this.logger = componentLog;
        this.context = processContext;
        this.merged = processSession.create();
        this.thresholds = recordBinThresholds;
    }

    public boolean isOlderThan(RecordBin recordBin) {
        return this.creationNanos < recordBin.creationNanos;
    }

    public boolean isOlderThan(long j, TimeUnit timeUnit) {
        return this.creationNanos < System.nanoTime() - timeUnit.toNanos(j);
    }

    public boolean isComplete() {
        return this.complete;
    }

    /* JADX WARN: Finally extract failed */
    public boolean offer(FlowFile flowFile, RecordReader recordReader, ProcessSession processSession, boolean z) throws IOException {
        boolean tryLock;
        if (isComplete()) {
            this.logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[]{Long.valueOf(flowFile.getId()), this});
            return false;
        }
        if (z) {
            this.writeLock.lock();
            tryLock = true;
        } else {
            tryLock = this.writeLock.tryLock();
        }
        if (!tryLock) {
            this.logger.debug("RecordBin.offer for id={} returning false because failed to get lock for {}", new Object[]{Long.valueOf(flowFile.getId()), this});
            return false;
        }
        this.fragmentCount++;
        try {
            try {
                if (isComplete()) {
                    this.logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[]{Long.valueOf(flowFile.getId()), this});
                    this.writeLock.unlock();
                    return false;
                }
                this.logger.debug("Migrating id={} to {}", new Object[]{Long.valueOf(flowFile.getId()), this});
                if (this.recordWriter == null) {
                    OutputStream write = this.session.write(this.merged);
                    this.logger.debug("Created OutputStream using session {} for {}", new Object[]{this.session, this});
                    this.out = new ByteCountingOutputStream(write);
                    this.recordWriter = this.writerFactory.createWriter(this.logger, this.writerFactory.getSchema(flowFile.getAttributes(), recordReader.getSchema()), this.out, flowFile);
                    this.recordWriter.beginRecordSet();
                }
                while (true) {
                    Record nextRecord = recordReader.nextRecord();
                    if (nextRecord == null) {
                        break;
                    }
                    this.recordWriter.write(nextRecord);
                    this.recordCount++;
                }
                recordReader.close();
                processSession.migrate(this.session, Collections.singleton(flowFile));
                this.flowFiles.add(flowFile);
                this.thresholds.getFragmentCountAttribute().ifPresent(this::validateFragmentCount);
                if (this.recordCount >= getMinimumRecordCount()) {
                    this.recordWriter.flush();
                }
                if (isFull()) {
                    this.logger.debug("{} is now full. Completing bin.", new Object[]{this});
                    complete("Bin is full");
                } else if (isOlderThan(this.thresholds.getMaxBinMillis(), TimeUnit.MILLISECONDS)) {
                    this.logger.debug("{} is now expired. Completing bin.", new Object[]{this});
                    complete("Bin is older than " + this.thresholds.getMaxBinAge());
                }
                this.writeLock.unlock();
                return true;
            } catch (Exception e) {
                this.logger.error("Failed to create merged FlowFile from {} input FlowFiles; routing originals to failure", new Object[]{Integer.valueOf(this.flowFiles.size() + 1), e});
                try {
                    recordReader.close();
                    if (this.recordWriter != null) {
                        this.recordWriter.close();
                    }
                    if (this.out != null) {
                        this.out.close();
                    }
                    if (0 == 0) {
                        processSession.migrate(this.session, Collections.singleton(flowFile));
                        this.flowFiles.add(flowFile);
                    }
                    this.complete = true;
                    this.session.remove(this.merged);
                    this.session.transfer(this.flowFiles, MergeRecord.REL_FAILURE);
                    this.session.commitAsync();
                    this.writeLock.unlock();
                    return true;
                } catch (Throwable th) {
                    this.complete = true;
                    this.session.remove(this.merged);
                    this.session.transfer(this.flowFiles, MergeRecord.REL_FAILURE);
                    this.session.commitAsync();
                    throw th;
                }
            }
        } catch (Throwable th2) {
            this.writeLock.unlock();
            throw th2;
        }
    }

    public boolean isFull() {
        this.readLock.lock();
        try {
            if (!isFullEnough()) {
                return false;
            }
            if (this.thresholds.getFragmentCountAttribute().isPresent()) {
                return this.fragmentCount == this.thresholds.getFragmentCount().intValue();
            }
            if (this.recordCount >= this.thresholds.getMaxRecords()) {
                return true;
            }
            return this.out.getBytesWritten() >= this.thresholds.getMaxBytes();
        } finally {
            this.readLock.unlock();
        }
    }

    private int getMinimumRecordCount() {
        int i = this.requiredRecordCount;
        if (i > -1) {
            return i;
        }
        int minRecords = this.thresholds.getMinRecords();
        this.requiredRecordCount = minRecords;
        return minRecords;
    }

    public boolean isFullEnough() {
        this.readLock.lock();
        try {
            if (this.flowFiles.isEmpty()) {
                return false;
            }
            if (this.thresholds.getFragmentCountAttribute().isPresent()) {
                return this.fragmentCount == this.thresholds.getFragmentCount().intValue();
            }
            return this.recordCount >= getMinimumRecordCount() && this.out.getBytesWritten() >= this.thresholds.getMinBytes();
        } finally {
            this.readLock.unlock();
        }
    }

    public void rollback() {
        this.complete = true;
        this.logger.debug("Marked {} as complete because rollback() was called", new Object[]{this});
        this.writeLock.lock();
        try {
            if (this.recordWriter != null) {
                try {
                    this.recordWriter.close();
                } catch (IOException e) {
                    this.logger.warn("Failed to close Record Writer", e);
                }
            }
            this.session.rollback();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Rolled back bin {} containing input FlowFiles {}", new Object[]{this, (List) this.flowFiles.stream().map(flowFile -> {
                    return " id=" + flowFile.getId() + ",";
                }).collect(Collectors.toList())});
            }
        } finally {
            this.writeLock.unlock();
        }
    }

    private long getBinAge() {
        return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.creationNanos);
    }

    private void fail() {
        this.complete = true;
        this.logger.debug("Marked {} as complete because fail() was called", new Object[]{this});
        this.writeLock.lock();
        try {
            if (this.recordWriter != null) {
                try {
                    this.recordWriter.close();
                } catch (IOException e) {
                    this.logger.warn("Failed to close Record Writer", e);
                }
            }
            this.session.remove(this.merged);
            this.session.transfer(this.flowFiles, MergeRecord.REL_FAILURE);
            this.session.commitAsync();
        } finally {
            this.writeLock.unlock();
        }
    }

    private void validateFragmentCount(String str) {
        Integer fragmentCount = this.thresholds.getFragmentCount();
        for (FlowFile flowFile : this.flowFiles) {
            String attribute = flowFile.getAttribute(str);
            if (attribute != null) {
                try {
                    int parseInt = Integer.parseInt(attribute);
                    if (fragmentCount != null && parseInt != fragmentCount.intValue()) {
                        this.logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' for {} but another FlowFile in the bin had a value of {}", new Object[]{Integer.valueOf(this.flowFiles.size()), str, attribute, flowFile, fragmentCount});
                        fail();
                        return;
                    } else if (fragmentCount == null) {
                        fragmentCount = Integer.valueOf(parseInt);
                        this.thresholds.setFragmentCount(Integer.valueOf(parseInt));
                    }
                } catch (NumberFormatException e) {
                    this.logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' for {} but expected a number", new Object[]{Integer.valueOf(this.flowFiles.size()), str, attribute, flowFile});
                    fail();
                    return;
                }
            }
        }
        if (fragmentCount == null) {
            this.logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute was not present on any of the FlowFiles", new Object[]{Integer.valueOf(this.flowFiles.size()), str});
            fail();
        }
    }

    public void complete(String str) throws IOException {
        this.writeLock.lock();
        try {
            try {
                if (isComplete()) {
                    this.logger.debug("Cannot complete {} because it is already completed", new Object[]{this});
                    this.writeLock.unlock();
                    return;
                }
                this.complete = true;
                this.logger.debug("Marked {} as complete because complete() was called", new Object[]{this});
                WriteResult finishRecordSet = this.recordWriter.finishRecordSet();
                this.recordWriter.close();
                this.logger.debug("Closed Record Writer using session {} for {}", new Object[]{this.session, this});
                if (this.flowFiles.isEmpty()) {
                    this.session.remove(this.merged);
                    this.writeLock.unlock();
                    return;
                }
                Optional<String> fragmentCountAttribute = this.thresholds.getFragmentCountAttribute();
                if (fragmentCountAttribute.isPresent()) {
                    validateFragmentCount(fragmentCountAttribute.get());
                    Integer fragmentCount = this.thresholds.getFragmentCount();
                    if (fragmentCount.intValue() != this.flowFiles.size()) {
                        this.logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' but only {} of {} FlowFiles were encountered before this bin was evicted (due to to Max Bin Age being reached or due to the Maximum Number of Bins being exceeded).", new Object[]{Integer.valueOf(this.flowFiles.size()), fragmentCountAttribute.get(), fragmentCount, Integer.valueOf(this.flowFiles.size()), fragmentCount});
                        fail();
                        this.writeLock.unlock();
                        return;
                    }
                }
                HashMap hashMap = new HashMap();
                hashMap.putAll(AttributeStrategyUtil.strategyFor(this.context).getMergedAttributes(this.flowFiles));
                hashMap.putAll(finishRecordSet.getAttributes());
                hashMap.put("record.count", String.valueOf(finishRecordSet.getRecordCount()));
                hashMap.put(CoreAttributes.MIME_TYPE.key(), this.recordWriter.getMimeType());
                hashMap.put("merge.count", Integer.toString(this.flowFiles.size()));
                hashMap.put("merge.bin.age", Long.toString(getBinAge()));
                this.merged = this.session.putAllAttributes(this.merged, hashMap);
                this.flowFiles.forEach(flowFile -> {
                    this.session.putAttribute(flowFile, "merge.uuid", this.merged.getAttribute(CoreAttributes.UUID.key()));
                });
                this.session.getProvenanceReporter().join(this.flowFiles, this.merged, "Records Merged due to: " + str);
                this.session.transfer(this.merged, MergeRecord.REL_MERGED);
                this.session.transfer(this.flowFiles, MergeRecord.REL_ORIGINAL);
                this.session.adjustCounter("Records Merged", finishRecordSet.getRecordCount(), false);
                this.session.commitAsync();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Completed bin {} with {} records with Merged FlowFile {} using input FlowFiles {}", new Object[]{this, Integer.valueOf(finishRecordSet.getRecordCount()), this.merged, (List) this.flowFiles.stream().map(flowFile2 -> {
                        return "id=" + flowFile2.getId();
                    }).collect(Collectors.toList())});
                }
            } catch (Exception e) {
                this.session.rollback(true);
                throw e;
            }
        } finally {
            this.writeLock.unlock();
        }
    }

    public String toString() {
        this.readLock.lock();
        try {
            return "RecordBin[size=" + this.flowFiles.size() + ", full=" + isFull() + ", isComplete=" + isComplete() + ", id=" + this.id + "]";
        } finally {
            this.readLock.unlock();
        }
    }
}
