package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.Range;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StringUtils;

@CapabilityDescription("Samples the records of a FlowFile based on a specified sampling strategy (such as Reservoir Sampling). The resulting FlowFile may be of a fixed number of records (in the case of reservoir-based algorithms) or some subset of the total number of records (in the case of probabilistic sampling), or a deterministic number of records (in the case of interval sampling).")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@SupportsBatching
@WritesAttributes({@WritesAttribute(attribute = "mime.type", description = "The MIME type indicated by the record writer"), @WritesAttribute(attribute = "record.count", description = "The number of records in the resulting flow file")})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"record", "sample", SampleRecord.RESERVOIR_SAMPLING_KEY, SampleRecord.RANGE_SAMPLING_KEY, SampleRecord.INTERVAL_SAMPLING_KEY})
@SideEffectFree
/* loaded from: input_file:org/apache/nifi/processors/standard/SampleRecord.class */
public class SampleRecord extends AbstractProcessor {
    private static final String RANGE_SEPARATOR = ",";
    private static final List<PropertyDescriptor> properties;
    private static final Set<Relationship> relationships;
    static final String INTERVAL_SAMPLING_KEY = "interval";
    static final AllowableValue INTERVAL_SAMPLING = new AllowableValue(INTERVAL_SAMPLING_KEY, "Interval Sampling", "Selects every Nth record where N is the value of the 'Interval Value' property");
    static final String RANGE_SAMPLING_KEY = "range";
    static final AllowableValue RANGE_SAMPLING = new AllowableValue(RANGE_SAMPLING_KEY, "Range Sampling", "Creates a sample of records based on the index (i.e. record number) of the records using the specified range. An example is '3,6-8,20-' which includes the third record, the sixth, seventh and eighth record, and all records from the twentieth record on. Commas separate intervals that don't overlap, and an interval can be between two numbers (i.e. 6-8) or up to a given number (i.e. -5), or from a number to the number of the last record (i.e. 20-).");
    static final String PROBABILISTIC_SAMPLING_KEY = "probabilistic";
    static final AllowableValue PROBABILISTIC_SAMPLING = new AllowableValue(PROBABILISTIC_SAMPLING_KEY, "Probabilistic Sampling", "Selects each record with probability P where P is the value of the 'Selection Probability' property");
    static final String RESERVOIR_SAMPLING_KEY = "reservoir";
    static final AllowableValue RESERVOIR_SAMPLING = new AllowableValue(RESERVOIR_SAMPLING_KEY, "Reservoir Sampling", "Creates a sample of K records where each record has equal probability of being included, where K is the value of the 'Reservoir Size' property. Note that if the value is very large it may cause memory issues as the reservoir is kept in-memory.");
    private static final Pattern RANGE_PATTERN = Pattern.compile("^([0-9]+)?(-)?([0-9]+)?");
    private static final Pattern INTERVAL_PATTERN = Pattern.compile("([0-9]+)?(-)?([0-9]+)?(?:,|$)");
    static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing results to a FlowFile").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    static final PropertyDescriptor SAMPLING_STRATEGY = new PropertyDescriptor.Builder().name("sample-record-sampling-strategy").displayName("Sampling Strategy").description("Specifies which method to use for sampling records from the incoming FlowFile").allowableValues(new DescribedValue[]{INTERVAL_SAMPLING, RANGE_SAMPLING, PROBABILISTIC_SAMPLING, RESERVOIR_SAMPLING}).required(true).defaultValue(RESERVOIR_SAMPLING.getValue()).addValidator(Validator.VALID).build();
    static final PropertyDescriptor SAMPLING_INTERVAL = new PropertyDescriptor.Builder().name("sample-record-interval").displayName("Sampling Interval").description("Specifies the number of records to skip before writing a record to the outgoing FlowFile. This property is only used if Sampling Strategy is set to Interval Sampling. A value of zero (0) will cause no records to be included in theoutgoing FlowFile, a value of one (1) will cause all records to be included, and a value of two (2) will cause half the records to be included, and so on.").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dependsOn(SAMPLING_STRATEGY, new AllowableValue[]{INTERVAL_SAMPLING}).build();
    static final PropertyDescriptor SAMPLING_RANGE = new PropertyDescriptor.Builder().name("sample-record-range").displayName("Sampling Range").description("Specifies the range of records to include in the sample, from 1 to the total number of records. An example is '3,6-8,20-' which includes the third record, the sixth, seventh and eighth records, and all records from the twentieth record on. Commas separate intervals that don't overlap, and an interval can be between two numbers (i.e. 6-8) or up to a given number (i.e. -5), or from a number to the number of the last record (i.e. 20-). If this property is unset, all records will be included.").required(true).addValidator(Validator.VALID).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dependsOn(SAMPLING_STRATEGY, new AllowableValue[]{RANGE_SAMPLING}).build();
    static final PropertyDescriptor SAMPLING_PROBABILITY = new PropertyDescriptor.Builder().name("sample-record-probability").displayName("Sampling Probability").description("Specifies the probability (as a percent from 0-100) of a record being included in the outgoing FlowFile. This property is only used if Sampling Strategy is set to Probabilistic Sampling. A value of zero (0) will cause no records to be included in theoutgoing FlowFile, and a value of 100 will cause all records to be included in the outgoing FlowFile..").required(true).addValidator(StandardValidators.createLongValidator(0, 100, true)).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dependsOn(SAMPLING_STRATEGY, new AllowableValue[]{PROBABILISTIC_SAMPLING}).build();
    static final PropertyDescriptor RESERVOIR_SIZE = new PropertyDescriptor.Builder().name("sample-record-reservoir").displayName("Reservoir Size").description("Specifies the number of records to write to the outgoing FlowFile. This property is only used if Sampling Strategy is set to reservoir-based strategies such as Reservoir Sampling.").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dependsOn(SAMPLING_STRATEGY, new AllowableValue[]{RESERVOIR_SAMPLING}).build();
    static final PropertyDescriptor RANDOM_SEED = new PropertyDescriptor.Builder().name("sample-record-random-seed").displayName("Random Seed").description("Specifies a particular number to use as the seed for the random number generator (used by probabilistic strategies). Setting this property will ensure the same records are selected even when using probabilistic strategies.").required(false).addValidator(StandardValidators.LONG_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dependsOn(SAMPLING_STRATEGY, new AllowableValue[]{PROBABILISTIC_SAMPLING, RESERVOIR_SAMPLING}).build();
    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile is routed to this relationship if sampling is successful").autoTerminateDefault(true).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("The FlowFile is routed to this relationship if the sampling completed successfully").autoTerminateDefault(true).build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile fails processing for any reason (for example, any record is not valid), the original FlowFile will be routed to this relationship").build();

    /* loaded from: input_file:org/apache/nifi/processors/standard/SampleRecord$IntervalSamplingStrategy.class */
    static class IntervalSamplingStrategy implements SamplingStrategy {
        final RecordSetWriter writer;
        final int interval;
        int currentCount = 0;

        IntervalSamplingStrategy(RecordSetWriter recordSetWriter, int i) {
            this.writer = recordSetWriter;
            this.interval = i;
        }

        @Override // org.apache.nifi.processors.standard.SampleRecord.SamplingStrategy
        public void init() throws IOException {
            this.currentCount = 0;
            this.writer.beginRecordSet();
        }

        @Override // org.apache.nifi.processors.standard.SampleRecord.SamplingStrategy
        public void sample(Record record) throws IOException {
            int i = this.currentCount + 1;
            this.currentCount = i;
            if (i < this.interval || this.interval <= 0) {
                return;
            }
            this.writer.write(record);
            this.currentCount = 0;
        }

        @Override // org.apache.nifi.processors.standard.SampleRecord.SamplingStrategy
        public WriteResult finish() throws IOException {
            return this.writer.finishRecordSet();
        }
    }

    /* loaded from: input_file:org/apache/nifi/processors/standard/SampleRecord$ProbabilisticSamplingStrategy.class */
    static class ProbabilisticSamplingStrategy implements SamplingStrategy {
        final RecordSetWriter writer;
        final int probabilityValue;
        final Random randomNumberGenerator;

        ProbabilisticSamplingStrategy(RecordSetWriter recordSetWriter, int i, Long l) {
            this.writer = recordSetWriter;
            this.probabilityValue = i;
            this.randomNumberGenerator = l == null ? new Random() : new Random(l.longValue());
        }

        @Override // org.apache.nifi.processors.standard.SampleRecord.SamplingStrategy
        public void init() throws IOException {
            this.writer.beginRecordSet();
        }

        @Override // org.apache.nifi.processors.standard.SampleRecord.SamplingStrategy
        public void sample(Record record) throws IOException {
            if (this.randomNumberGenerator.nextInt(100) < this.probabilityValue) {
                this.writer.write(record);
            }
        }

        @Override // org.apache.nifi.processors.standard.SampleRecord.SamplingStrategy
        public WriteResult finish() throws IOException {
            return this.writer.finishRecordSet();
        }
    }

    /* loaded from: input_file:org/apache/nifi/processors/standard/SampleRecord$RangeSamplingStrategy.class */
    static class RangeSamplingStrategy implements SamplingStrategy {
        final RecordSetWriter writer;
        final String rangeExpression;
        int currentCount = 1;
        final List<Range<Integer>> ranges = new ArrayList();

        RangeSamplingStrategy(RecordSetWriter recordSetWriter, String str) {
            this.writer = recordSetWriter;
            this.rangeExpression = str;
        }

        private boolean isRangeExpressionInvalid() {
            boolean z = false;
            String[] split = this.rangeExpression.split(SampleRecord.RANGE_SEPARATOR);
            int length = split.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                if (!SampleRecord.RANGE_PATTERN.matcher(split[i]).matches()) {
                    z = true;
                    break;
                }
                i++;
            }
            return z;
        }

        @Override // org.apache.nifi.processors.standard.SampleRecord.SamplingStrategy
        public void init() throws IOException {
            this.currentCount = 1;
            this.ranges.clear();
            this.writer.beginRecordSet();
            if (isRangeExpressionInvalid()) {
                throw new IOException(this.rangeExpression + " is not a valid range expression");
            }
            if (StringUtils.isEmpty(this.rangeExpression)) {
                this.ranges.add(Range.of(0, Integer.MAX_VALUE));
                return;
            }
            Matcher matcher = SampleRecord.INTERVAL_PATTERN.matcher(this.rangeExpression);
            while (matcher.find()) {
                if (matcher.group(1) != null || matcher.group(2) != null || matcher.group(3) != null) {
                    Integer valueOf = matcher.group(1) != null ? Integer.valueOf(Integer.parseInt(matcher.group(1))) : "-".equals(matcher.group(2)) ? 0 : null;
                    Integer valueOf2 = matcher.group(3) != null ? Integer.valueOf(Integer.parseInt(matcher.group(3))) : "-".equals(matcher.group(2)) ? Integer.MAX_VALUE : null;
                    this.ranges.add((valueOf == null || valueOf2 != null) ? Range.of(valueOf, valueOf2) : Range.of(valueOf, valueOf));
                }
            }
        }

        @Override // org.apache.nifi.processors.standard.SampleRecord.SamplingStrategy
        public void sample(Record record) throws IOException {
            Iterator<Range<Integer>> it = this.ranges.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                } else if (it.next().contains(Integer.valueOf(this.currentCount))) {
                    this.writer.write(record);
                    break;
                }
            }
            this.currentCount++;
        }

        @Override // org.apache.nifi.processors.standard.SampleRecord.SamplingStrategy
        public WriteResult finish() throws IOException {
            return this.writer.finishRecordSet();
        }
    }

    /* loaded from: input_file:org/apache/nifi/processors/standard/SampleRecord$ReservoirSamplingStrategy.class */
    static class ReservoirSamplingStrategy implements SamplingStrategy {
        final RecordSetWriter writer;
        final int reservoirSize;
        final ArrayList<Record> reservoir;
        int currentCount = 0;
        final Random randomNumberGenerator;

        ReservoirSamplingStrategy(RecordSetWriter recordSetWriter, int i, Long l) {
            this.writer = recordSetWriter;
            this.reservoirSize = i;
            this.reservoir = new ArrayList<>(i);
            this.randomNumberGenerator = l == null ? new Random() : new Random(l.longValue());
        }

        @Override // org.apache.nifi.processors.standard.SampleRecord.SamplingStrategy
        public void init() throws IOException {
            this.currentCount = 0;
            this.writer.beginRecordSet();
        }

        @Override // org.apache.nifi.processors.standard.SampleRecord.SamplingStrategy
        public void sample(Record record) {
            if (this.currentCount < this.reservoirSize) {
                this.reservoir.add(record);
            } else {
                int nextInt = this.randomNumberGenerator.nextInt(this.currentCount + 1);
                if (nextInt < this.reservoirSize) {
                    this.reservoir.set(nextInt, record);
                }
            }
            this.currentCount++;
        }

        @Override // org.apache.nifi.processors.standard.SampleRecord.SamplingStrategy
        public WriteResult finish() throws IOException {
            Iterator<Record> it = this.reservoir.iterator();
            while (it.hasNext()) {
                this.writer.write(it.next());
            }
            return this.writer.finishRecordSet();
        }
    }

    /* loaded from: input_file:org/apache/nifi/processors/standard/SampleRecord$SamplingStrategy.class */
    interface SamplingStrategy {
        void init() throws IOException;

        void sample(Record record) throws IOException;

        WriteResult finish() throws IOException;
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        SamplingStrategy reservoirSamplingStrategy;
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        FlowFile create = processSession.create(flowFile);
        HashMap hashMap = new HashMap();
        try {
            InputStream read = processSession.read(flowFile);
            try {
                OutputStream write = processSession.write(create);
                try {
                    RecordReader createRecordReader = processContext.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class).createRecordReader(flowFile, read, getLogger());
                    RecordSetWriterFactory asControllerService = processContext.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
                    RecordSetWriter createWriter = asControllerService.createWriter(getLogger(), asControllerService.getSchema(flowFile.getAttributes(), createRecordReader.getSchema()), write, create);
                    String value = processContext.getProperty(SAMPLING_STRATEGY).getValue();
                    if (INTERVAL_SAMPLING_KEY.equals(value)) {
                        reservoirSamplingStrategy = new IntervalSamplingStrategy(createWriter, processContext.getProperty(SAMPLING_INTERVAL).evaluateAttributeExpressions(create).asInteger().intValue());
                    } else if (RANGE_SAMPLING_KEY.equals(value)) {
                        reservoirSamplingStrategy = new RangeSamplingStrategy(createWriter, processContext.getProperty(SAMPLING_RANGE).evaluateAttributeExpressions(create).getValue());
                    } else if (PROBABILISTIC_SAMPLING_KEY.equals(value)) {
                        reservoirSamplingStrategy = new ProbabilisticSamplingStrategy(createWriter, processContext.getProperty(SAMPLING_PROBABILITY).evaluateAttributeExpressions(create).asInteger().intValue(), processContext.getProperty(RANDOM_SEED).isSet() ? processContext.getProperty(RANDOM_SEED).evaluateAttributeExpressions(create).asLong() : null);
                    } else {
                        reservoirSamplingStrategy = new ReservoirSamplingStrategy(createWriter, processContext.getProperty(RESERVOIR_SIZE).evaluateAttributeExpressions(create).asInteger().intValue(), processContext.getProperty(RANDOM_SEED).isSet() ? processContext.getProperty(RANDOM_SEED).evaluateAttributeExpressions(create).asLong() : null);
                    }
                    reservoirSamplingStrategy.init();
                    while (true) {
                        Record nextRecord = createRecordReader.nextRecord();
                        if (nextRecord == null) {
                            break;
                        } else {
                            reservoirSamplingStrategy.sample(nextRecord);
                        }
                    }
                    WriteResult finish = reservoirSamplingStrategy.finish();
                    try {
                        createWriter.flush();
                        createWriter.close();
                    } catch (IOException e) {
                        getLogger().warn("Failed to close Writer for {}", new Object[]{create});
                    }
                    hashMap.put("record.count", String.valueOf(finish.getRecordCount()));
                    hashMap.put(CoreAttributes.MIME_TYPE.key(), createWriter.getMimeType());
                    hashMap.putAll(finish.getAttributes());
                    if (write != null) {
                        write.close();
                    }
                    if (read != null) {
                        read.close();
                    }
                    processSession.transfer(flowFile, REL_ORIGINAL);
                    processSession.transfer(processSession.putAllAttributes(create, hashMap), REL_SUCCESS);
                } catch (Throwable th) {
                    if (write != null) {
                        try {
                            write.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Exception e2) {
            getLogger().error("Error during transmission of records, routing to failure", e2);
            processSession.transfer(flowFile, REL_FAILURE);
            processSession.remove(create);
        }
    }

    static {
        ArrayList arrayList = new ArrayList();
        arrayList.add(RECORD_READER_FACTORY);
        arrayList.add(RECORD_WRITER_FACTORY);
        arrayList.add(SAMPLING_STRATEGY);
        arrayList.add(SAMPLING_INTERVAL);
        arrayList.add(SAMPLING_RANGE);
        arrayList.add(SAMPLING_PROBABILITY);
        arrayList.add(RESERVOIR_SIZE);
        arrayList.add(RANDOM_SEED);
        properties = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        hashSet.add(REL_ORIGINAL);
        relationships = Collections.unmodifiableSet(hashSet);
    }
}
