/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.timebuffer.EntityAccess;
import org.apache.nifi.util.timebuffer.LongEntityAccess;
import org.apache.nifi.util.timebuffer.TimedBuffer;
import org.apache.nifi.util.timebuffer.TimestampedLong;

@SideEffectFree
@TriggerSerially
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"rate control", "throttle", "rate", "throughput"})
@CapabilityDescription(value="Controls the rate at which data is transferred to follow-on processors. If you configure a very small Time Duration, then the accuracy of the throttle gets worse. You can improve this accuracy by decreasing the Yield Duration, at the expense of more Tasks given to the processor.")
public class ControlRate
extends AbstractProcessor {
    public static final String DATA_RATE = "data rate";
    public static final String FLOWFILE_RATE = "flowfile count";
    public static final String ATTRIBUTE_RATE = "attribute value";
    public static final AllowableValue DATA_RATE_VALUE = new AllowableValue("data rate", "data rate", "Rate is controlled by counting bytes transferred per time duration.");
    public static final AllowableValue FLOWFILE_RATE_VALUE = new AllowableValue("flowfile count", "flowfile count", "Rate is controlled by counting flowfiles transferred per time duration");
    public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue("attribute value", "attribute value", "Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration");
    public static final int MAX_FLOW_FILES_PER_BATCH = 1000;
    public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder().name("Rate Control Criteria").description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.").required(true).allowableValues(new AllowableValue[]{DATA_RATE_VALUE, FLOWFILE_RATE_VALUE, ATTRIBUTE_RATE_VALUE}).defaultValue("data rate").build();
    public static final PropertyDescriptor MAX_RATE = new PropertyDescriptor.Builder().name("Maximum Rate").description("The maximum rate at which data should pass through this processor. The format of this property is expected to be a positive integer, or a Data Size (such as '1 MB') if Rate Control Criteria is set to 'data rate'.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor RATE_CONTROL_ATTRIBUTE_NAME = new PropertyDescriptor.Builder().name("Rate Controlled Attribute").description("The name of an attribute whose values build toward the rate limit if Rate Control Criteria is set to 'attribute value'. The value of the attribute referenced by this property must be a positive long, or the FlowFile will be routed to failure. This value is ignored if Rate Control Criteria is not set to 'attribute value'. Changing this value resets the rate counters.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false).build();
    public static final PropertyDescriptor TIME_PERIOD = new PropertyDescriptor.Builder().name("Time Duration").description("The amount of time to which the Maximum Rate pertains. Changing this value resets the rate counters.").required(true).addValidator(StandardValidators.createTimePeriodValidator((long)1L, (TimeUnit)TimeUnit.SECONDS, (long)Integer.MAX_VALUE, (TimeUnit)TimeUnit.SECONDS)).defaultValue("1 min").build();
    public static final PropertyDescriptor GROUPING_ATTRIBUTE_NAME = new PropertyDescriptor.Builder().name("Grouping Attribute").description("By default, a single \"throttle\" is used for all FlowFiles. If this value is specified, a separate throttle is used for each value specified by the attribute with this name. Changing this value resets the rate counters.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles are transferred to this relationship under normal conditions").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles will be routed to this relationship if they are missing a necessary Rate Controlled Attribute or the attribute is not in the expected format").build();
    private static final Pattern POSITIVE_LONG_PATTERN = Pattern.compile("0*[1-9][0-9]*");
    private static final String DEFAULT_GROUP_ATTRIBUTE = ControlRate.class.getName() + "###____DEFAULT_GROUP_ATTRIBUTE___###";
    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;
    private final ConcurrentMap<String, Throttle> throttleMap = new ConcurrentHashMap<String, Throttle>();
    private final AtomicLong lastThrottleClearTime = new AtomicLong(System.currentTimeMillis());
    private volatile String rateControlCriteria = null;
    private volatile String rateControlAttribute = null;
    private volatile String maximumRateStr = null;
    private volatile String groupingAttributeName = null;
    private volatile int timePeriodSeconds = 1;

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(RATE_CONTROL_CRITERIA);
        properties.add(MAX_RATE);
        properties.add(RATE_CONTROL_ATTRIBUTE_NAME);
        properties.add(TIME_PERIOD);
        properties.add(GROUPING_ATTRIBUTE_NAME);
        this.properties = Collections.unmodifiableList(properties);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext context) {
        Validator rateValidator;
        ArrayList<ValidationResult> validationResults = new ArrayList<ValidationResult>(super.customValidate(context));
        switch (context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase()) {
            case "data rate": {
                rateValidator = StandardValidators.DATA_SIZE_VALIDATOR;
                break;
            }
            case "attribute value": {
                rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
                String rateAttr = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
                if (rateAttr != null) break;
                validationResults.add(new ValidationResult.Builder().subject(RATE_CONTROL_ATTRIBUTE_NAME.getName()).explanation("<Rate Controlled Attribute> property must be set if using <Rate Control Criteria> of 'attribute value'").build());
                break;
            }
            default: {
                rateValidator = StandardValidators.POSITIVE_LONG_VALIDATOR;
            }
        }
        ValidationResult rateResult = rateValidator.validate("Maximum Rate", context.getProperty(MAX_RATE).getValue(), context);
        if (!rateResult.isValid()) {
            validationResults.add(rateResult);
        }
        return validationResults;
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        super.onPropertyModified(descriptor, oldValue, newValue);
        if (descriptor.equals((Object)RATE_CONTROL_CRITERIA) || descriptor.equals((Object)RATE_CONTROL_ATTRIBUTE_NAME) || descriptor.equals((Object)GROUPING_ATTRIBUTE_NAME) || descriptor.equals((Object)TIME_PERIOD)) {
            this.throttleMap.clear();
        } else if (descriptor.equals((Object)MAX_RATE)) {
            long newRate = DataUnit.DATA_SIZE_PATTERN.matcher(newValue.toUpperCase()).matches() ? DataUnit.parseDataSize((String)newValue, (DataUnit)DataUnit.B).longValue() : Long.parseLong(newValue);
            for (Throttle throttle : this.throttleMap.values()) {
                throttle.setMaxRate(newRate);
            }
        }
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        this.rateControlCriteria = context.getProperty(RATE_CONTROL_CRITERIA).getValue().toLowerCase();
        this.rateControlAttribute = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue();
        this.maximumRateStr = context.getProperty(MAX_RATE).getValue().toUpperCase();
        this.groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue();
        this.timePeriodSeconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS).intValue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        long throttleExpirationMillis;
        List flowFiles = session.get((FlowFileFilter)new ThrottleFilter(1000));
        if (flowFiles.isEmpty()) {
            context.yield();
            return;
        }
        long lastClearTime = this.lastThrottleClearTime.get();
        if (lastClearTime < (throttleExpirationMillis = System.currentTimeMillis() - 2L * context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS)) && this.lastThrottleClearTime.compareAndSet(lastClearTime, System.currentTimeMillis())) {
            Iterator itr = this.throttleMap.entrySet().iterator();
            while (itr.hasNext()) {
                Map.Entry entry = itr.next();
                Throttle throttle = (Throttle)entry.getValue();
                if (!throttle.tryLock()) continue;
                try {
                    if (throttle.lastUpdateTime() >= lastClearTime) continue;
                    itr.remove();
                }
                finally {
                    throttle.unlock();
                }
            }
        }
        ProcessorLog logger = this.getLogger();
        for (FlowFile flowFile : flowFiles) {
            long accrualAmount = this.getFlowFileAccrual(flowFile);
            if (accrualAmount < 0L) {
                logger.error("Routing {} to 'failure' due to missing or invalid attribute", new Object[]{flowFile});
                session.transfer(flowFile, REL_FAILURE);
                continue;
            }
            logger.info("transferring {} to 'success'", new Object[]{flowFile});
            session.transfer(flowFile, REL_SUCCESS);
        }
    }

    private long getFlowFileAccrual(FlowFile flowFile) {
        long rateValue;
        switch (this.rateControlCriteria) {
            case "data rate": {
                rateValue = flowFile.getSize();
                break;
            }
            case "flowfile count": {
                rateValue = 1L;
                break;
            }
            case "attribute value": {
                String attributeValue = flowFile.getAttribute(this.rateControlAttribute);
                if (attributeValue == null) {
                    return -1L;
                }
                if (!POSITIVE_LONG_PATTERN.matcher(attributeValue).matches()) {
                    return -1L;
                }
                rateValue = Long.parseLong(attributeValue);
                break;
            }
            default: {
                throw new AssertionError((Object)("<Rate Control Criteria> property set to illegal value of " + this.rateControlCriteria));
            }
        }
        return rateValue;
    }

    private class ThrottleFilter
    implements FlowFileFilter {
        private final int flowFilesPerBatch;
        private int flowFilesInBatch = 0;

        ThrottleFilter(int maxFFPerBatch) {
            this.flowFilesPerBatch = maxFFPerBatch;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public FlowFileFilter.FlowFileFilterResult filter(FlowFile flowFile) {
            long accrual = ControlRate.this.getFlowFileAccrual(flowFile);
            if (accrual < 0L) {
                return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
            }
            String groupName = ControlRate.this.groupingAttributeName == null ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(ControlRate.this.groupingAttributeName);
            Throttle throttle = (Throttle)ControlRate.this.throttleMap.get(groupName);
            if (throttle == null) {
                throttle = new Throttle(ControlRate.this.timePeriodSeconds, TimeUnit.SECONDS, ControlRate.this.getLogger());
                long newRate = DataUnit.DATA_SIZE_PATTERN.matcher(ControlRate.this.maximumRateStr).matches() ? DataUnit.parseDataSize((String)ControlRate.this.maximumRateStr, (DataUnit)DataUnit.B).longValue() : Long.parseLong(ControlRate.this.maximumRateStr);
                throttle.setMaxRate(newRate);
                ControlRate.this.throttleMap.put(groupName, throttle);
            }
            throttle.lock();
            try {
                if (throttle.tryAdd(accrual)) {
                    ++this.flowFilesInBatch;
                    if (this.flowFilesInBatch >= this.flowFilesPerBatch) {
                        this.flowFilesInBatch = 0;
                        FlowFileFilter.FlowFileFilterResult flowFileFilterResult = FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
                        return flowFileFilterResult;
                    }
                    FlowFileFilter.FlowFileFilterResult flowFileFilterResult = FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
                    return flowFileFilterResult;
                }
            }
            finally {
                throttle.unlock();
            }
            return FlowFileFilter.FlowFileFilterResult.REJECT_AND_TERMINATE;
        }
    }

    private static class Throttle
    extends ReentrantLock {
        private final AtomicLong maxRate = new AtomicLong(1L);
        private final long timePeriodMillis;
        private final TimedBuffer<TimestampedLong> timedBuffer;
        private final ProcessorLog logger;
        private volatile long penalizationPeriod = 0L;
        private volatile long penalizationExpired = 0L;
        private volatile long lastUpdateTime;

        public Throttle(int timePeriod, TimeUnit unit, ProcessorLog logger) {
            this.timePeriodMillis = TimeUnit.MILLISECONDS.convert(timePeriod, unit);
            this.timedBuffer = new TimedBuffer(unit, timePeriod, (EntityAccess)new LongEntityAccess());
            this.logger = logger;
        }

        public void setMaxRate(long maxRate) {
            this.maxRate.set(maxRate);
        }

        public long lastUpdateTime() {
            return this.lastUpdateTime;
        }

        public boolean tryAdd(long value) {
            long transferred;
            long now = System.currentTimeMillis();
            if (this.penalizationExpired > now) {
                return false;
            }
            long maxRateValue = this.maxRate.get();
            TimestampedLong sum = (TimestampedLong)this.timedBuffer.getAggregateValue(this.timePeriodMillis);
            if (sum != null && sum.getValue() >= maxRateValue) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("current sum for throttle is {} at time {}, so not allowing rate of {} through", new Object[]{sum.getValue(), sum.getTimestamp(), value});
                }
                return false;
            }
            if (this.penalizationPeriod > 0L) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Starting Throttle penalization, expiring {} milliseconds from now", new Object[]{this.penalizationPeriod});
                }
                this.penalizationExpired = now + this.penalizationPeriod;
                this.penalizationPeriod = 0L;
                return false;
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("current sum for throttle is {} at time {}, so allowing rate of {} through", new Object[]{sum == null ? 0L : sum.getValue(), sum == null ? 0L : sum.getTimestamp(), value});
            }
            if ((transferred = ((TimestampedLong)this.timedBuffer.add((Object)new TimestampedLong(Long.valueOf(value)))).getValue().longValue()) > maxRateValue) {
                long amountOver = transferred - maxRateValue;
                double pct = (double)amountOver / (double)maxRateValue;
                this.penalizationPeriod = (long)((double)this.timePeriodMillis * pct);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("allowing rate of {} through but penalizing Throttle for {} milliseconds", new Object[]{value, this.penalizationPeriod});
                }
            }
            this.lastUpdateTime = now;
            return true;
        }
    }
}

