/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard.merge;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.standard.MergeRecord;
import org.apache.nifi.processors.standard.merge.AttributeStrategy;
import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
import org.apache.nifi.processors.standard.merge.RecordBinThresholds;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.stream.io.ByteCountingOutputStream;

public class RecordBin {
    private final ComponentLog logger;
    private final ProcessSession session;
    private final RecordSetWriterFactory writerFactory;
    private final RecordBinThresholds thresholds;
    private final ProcessContext context;
    private final List<FlowFile> flowFiles = new ArrayList<FlowFile>();
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = this.rwLock.readLock();
    private final Lock writeLock = this.rwLock.writeLock();
    private final long creationNanos = System.nanoTime();
    private FlowFile merged;
    private RecordSetWriter recordWriter;
    private ByteCountingOutputStream out;
    private int recordCount = 0;
    private int fragmentCount = 0;
    private volatile boolean complete = false;
    private static final AtomicLong idGenerator = new AtomicLong(0L);
    private final long id = idGenerator.getAndIncrement();
    private volatile int requiredRecordCount = -1;

    public RecordBin(ProcessContext context, ProcessSession session, ComponentLog logger, RecordBinThresholds thresholds) {
        this.session = session;
        this.writerFactory = (RecordSetWriterFactory)context.getProperty(MergeRecord.RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        this.logger = logger;
        this.context = context;
        this.merged = session.create();
        this.thresholds = thresholds;
    }

    public boolean isOlderThan(RecordBin other) {
        return this.creationNanos < other.creationNanos;
    }

    public boolean isOlderThan(long period, TimeUnit unit) {
        long nanos = unit.toNanos(period);
        return this.creationNanos < System.nanoTime() - nanos;
    }

    public boolean isComplete() {
        return this.complete;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean offer(FlowFile flowFile, RecordReader recordReader, ProcessSession flowFileSession, boolean block) throws IOException {
        boolean locked;
        if (this.isComplete()) {
            this.logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[]{flowFile.getId(), this});
            return false;
        }
        if (block) {
            this.writeLock.lock();
            locked = true;
        } else {
            locked = this.writeLock.tryLock();
        }
        if (!locked) {
            this.logger.debug("RecordBin.offer for id={} returning false because failed to get lock for {}", new Object[]{flowFile.getId(), this});
            return false;
        }
        boolean flowFileMigrated = false;
        ++this.fragmentCount;
        try {
            Record record;
            if (this.isComplete()) {
                this.logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[]{flowFile.getId(), this});
                boolean bl = false;
                return bl;
            }
            this.logger.debug("Migrating id={} to {}", new Object[]{flowFile.getId(), this});
            while ((record = recordReader.nextRecord()) != null) {
                if (this.recordWriter == null) {
                    OutputStream rawOut = this.session.write(this.merged);
                    this.logger.debug("Created OutputStream using session {} for {}", new Object[]{this.session, this});
                    this.out = new ByteCountingOutputStream(rawOut);
                    this.recordWriter = this.writerFactory.createWriter(this.logger, record.getSchema(), (OutputStream)this.out);
                    this.recordWriter.beginRecordSet();
                }
                this.recordWriter.write(record);
                ++this.recordCount;
            }
            recordReader.close();
            flowFileSession.migrate(this.session, Collections.singleton(flowFile));
            flowFileMigrated = true;
            this.flowFiles.add(flowFile);
            if (this.recordCount >= this.getMinimumRecordCount()) {
                this.recordWriter.flush();
            }
            if (this.isFull()) {
                this.logger.debug(this + " is now full. Completing bin.");
                this.complete("Bin is full");
            } else if (this.isOlderThan(this.thresholds.getMaxBinMillis(), TimeUnit.MILLISECONDS)) {
                this.logger.debug(this + " is now expired. Completing bin.");
                this.complete("Bin is older than " + this.thresholds.getMaxBinAge());
            }
            boolean bl = true;
            return bl;
        }
        catch (Exception e) {
            this.logger.error("Failed to create merged FlowFile from " + (this.flowFiles.size() + 1) + " input FlowFiles; routing originals to failure", (Throwable)e);
            try {
                recordReader.close();
                if (this.recordWriter != null) {
                    this.recordWriter.close();
                }
                if (this.out != null) {
                    this.out.close();
                }
                if (!flowFileMigrated) {
                    flowFileSession.migrate(this.session, Collections.singleton(flowFile));
                    this.flowFiles.add(flowFile);
                }
            }
            finally {
                this.complete = true;
                this.session.remove(this.merged);
                this.session.transfer(this.flowFiles, MergeRecord.REL_FAILURE);
                this.session.commit();
            }
            boolean bl = true;
            return bl;
        }
        finally {
            this.writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isFull() {
        this.readLock.lock();
        try {
            Optional<String> fragmentCountValue;
            if (!this.isFullEnough()) {
                boolean bl = false;
                return bl;
            }
            int maxRecords = this.thresholds.getMaxRecords();
            if (this.recordCount >= maxRecords) {
                boolean bl = true;
                return bl;
            }
            if (this.out.getBytesWritten() >= this.thresholds.getMaxBytes()) {
                boolean bl = true;
                return bl;
            }
            Optional<String> fragmentCountAttribute = this.thresholds.getFragmentCountAttribute();
            if (fragmentCountAttribute != null && fragmentCountAttribute.isPresent() && (fragmentCountValue = this.flowFiles.stream().filter(ff -> ff.getAttribute((String)fragmentCountAttribute.get()) != null).map(ff -> ff.getAttribute((String)fragmentCountAttribute.get())).findFirst()).isPresent()) {
                try {
                    int expectedFragments = Integer.parseInt(fragmentCountValue.get());
                    if (this.fragmentCount == expectedFragments) {
                        boolean bl = true;
                        return bl;
                    }
                }
                catch (NumberFormatException nfe) {
                    this.logger.error(nfe.getMessage(), (Throwable)nfe);
                }
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.readLock.unlock();
        }
    }

    private int getMinimumRecordCount() {
        int requiredCount;
        int currentCount = this.requiredRecordCount;
        if (currentCount > -1) {
            return currentCount;
        }
        this.requiredRecordCount = requiredCount = this.thresholds.getMinRecords();
        return requiredCount;
    }

    public boolean isFullEnough() {
        this.readLock.lock();
        try {
            if (this.flowFiles.isEmpty()) {
                boolean bl = false;
                return bl;
            }
            int requiredRecordCount = this.getMinimumRecordCount();
            boolean bl = this.recordCount >= requiredRecordCount && this.out.getBytesWritten() >= this.thresholds.getMinBytes();
            return bl;
        }
        finally {
            this.readLock.unlock();
        }
    }

    public void rollback() {
        this.complete = true;
        this.logger.debug("Marked {} as complete because rollback() was called", new Object[]{this});
        this.writeLock.lock();
        try {
            if (this.recordWriter != null) {
                try {
                    this.recordWriter.close();
                }
                catch (IOException e) {
                    this.logger.warn("Failed to close Record Writer", (Throwable)e);
                }
            }
            this.session.rollback();
            if (this.logger.isDebugEnabled()) {
                List ids = this.flowFiles.stream().map(ff -> " id=" + ff.getId() + ",").collect(Collectors.toList());
                this.logger.debug("Rolled back bin {} containing input FlowFiles {}", new Object[]{this, ids});
            }
        }
        finally {
            this.writeLock.unlock();
        }
    }

    private long getBinAge() {
        return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.creationNanos);
    }

    private void fail() {
        this.complete = true;
        this.logger.debug("Marked {} as complete because fail() was called", new Object[]{this});
        this.writeLock.lock();
        try {
            if (this.recordWriter != null) {
                try {
                    this.recordWriter.close();
                }
                catch (IOException e) {
                    this.logger.warn("Failed to close Record Writer", (Throwable)e);
                }
            }
            this.session.remove(this.merged);
            this.session.transfer(this.flowFiles, MergeRecord.REL_FAILURE);
            this.session.commit();
        }
        finally {
            this.writeLock.unlock();
        }
    }

    public void complete(String completionReason) throws IOException {
        this.writeLock.lock();
        try {
            if (this.isComplete()) {
                this.logger.debug("Cannot complete {} because it is already completed", new Object[]{this});
                return;
            }
            this.complete = true;
            this.logger.debug("Marked {} as complete because complete() was called", new Object[]{this});
            WriteResult writeResult = this.recordWriter.finishRecordSet();
            this.recordWriter.close();
            this.logger.debug("Closed Record Writer using session {} for {}", new Object[]{this.session, this});
            if (this.flowFiles.isEmpty()) {
                this.session.remove(this.merged);
                return;
            }
            Optional<String> countAttr = this.thresholds.getFragmentCountAttribute();
            if (countAttr.isPresent()) {
                Integer expectedBinCount = null;
                for (FlowFile flowFile : this.flowFiles) {
                    int count;
                    String countVal = flowFile.getAttribute(countAttr.get());
                    if (countVal == null) continue;
                    try {
                        count = Integer.parseInt(countVal);
                    }
                    catch (NumberFormatException nfe) {
                        this.logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' for {} but expected a number", new Object[]{this.flowFiles.size(), countAttr.get(), countVal, flowFile});
                        this.fail();
                        this.writeLock.unlock();
                        return;
                    }
                    if (expectedBinCount != null && count != expectedBinCount) {
                        this.logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' for {} but another FlowFile in the bin had a value of {}", new Object[]{this.flowFiles.size(), countAttr.get(), countVal, flowFile, expectedBinCount});
                        this.fail();
                        return;
                    }
                    expectedBinCount = count;
                }
                if (expectedBinCount == null) {
                    this.logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute was not present on any of the FlowFiles", new Object[]{this.flowFiles.size(), countAttr.get()});
                    this.fail();
                    return;
                }
                if (expectedBinCount.intValue() != this.flowFiles.size()) {
                    this.logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' but only {} of {} FlowFiles were encountered before this bin was evicted (due to to Max Bin Age being reached or due to the Maximum Number of Bins being exceeded).", new Object[]{this.flowFiles.size(), countAttr.get(), expectedBinCount, this.flowFiles.size(), expectedBinCount});
                    this.fail();
                    return;
                }
            }
            HashMap<String, String> attributes = new HashMap<String, String>();
            AttributeStrategy attributeStrategy = AttributeStrategyUtil.strategyFor(this.context);
            Map<String, String> mergedAttributes = attributeStrategy.getMergedAttributes(this.flowFiles);
            attributes.putAll(mergedAttributes);
            attributes.putAll(writeResult.getAttributes());
            attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
            attributes.put(CoreAttributes.MIME_TYPE.key(), this.recordWriter.getMimeType());
            attributes.put("merge.count", Integer.toString(this.flowFiles.size()));
            attributes.put("merge.bin.age", Long.toString(this.getBinAge()));
            this.merged = this.session.putAllAttributes(this.merged, attributes);
            this.flowFiles.forEach(ff -> this.session.putAttribute(ff, "merge.uuid", this.merged.getAttribute(CoreAttributes.UUID.key())));
            this.session.getProvenanceReporter().join(this.flowFiles, this.merged, "Records Merged due to: " + completionReason);
            this.session.transfer(this.merged, MergeRecord.REL_MERGED);
            this.session.transfer(this.flowFiles, MergeRecord.REL_ORIGINAL);
            this.session.adjustCounter("Records Merged", (long)writeResult.getRecordCount(), false);
            this.session.commit();
            if (this.logger.isDebugEnabled()) {
                List ids = this.flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
                this.logger.debug("Completed bin {} with {} records with Merged FlowFile {} using input FlowFiles {}", new Object[]{this, writeResult.getRecordCount(), this.merged, ids});
            }
        }
        catch (Exception e) {
            this.session.rollback(true);
            throw e;
        }
        finally {
            this.writeLock.unlock();
        }
    }

    public String toString() {
        this.readLock.lock();
        try {
            String string = "RecordBin[size=" + this.flowFiles.size() + ", full=" + this.isFull() + ", isComplete=" + this.isComplete() + ", id=" + this.id + "]";
            return string;
        }
        finally {
            this.readLock.unlock();
        }
    }
}

