package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Enforces expected ordering of FlowFiles that belong to the same data group within a single node.  Although PriorityAttributePrioritizer can be used on a connection to ensure that flow files going through that connection are in priority order, depending on error-handling, branching, and other flow designs, it is possible for FlowFiles to get out-of-order. EnforceOrder can be used to enforce original ordering for those FlowFiles. [IMPORTANT] In order to take effect of EnforceOrder, FirstInFirstOutPrioritizer should be used at EVERY downstream relationship UNTIL the order of FlowFiles physically get FIXED by operation such as MergeContent or being stored to the final destination.")
@WritesAttributes({@WritesAttribute(attribute = EnforceOrder.ATTR_STARTED_AT, description = "All FlowFiles going through this processor will have this attribute. This value is used to determine wait timeout."), @WritesAttribute(attribute = EnforceOrder.ATTR_RESULT, description = "All FlowFiles going through this processor will have this attribute denoting which relationship it was routed to."), @WritesAttribute(attribute = EnforceOrder.ATTR_DETAIL, description = "FlowFiles routed to 'failure' or 'skipped' relationship will have this attribute describing details."), @WritesAttribute(attribute = EnforceOrder.ATTR_EXPECTED_ORDER, description = "FlowFiles routed to 'wait' or 'skipped' relationship will have this attribute denoting expected order when the FlowFile was processed.")})
@Stateful(scopes = {Scope.LOCAL}, description = "EnforceOrder uses following states per ordering group: '<groupId>.target' is a order number which is being waited to arrive next. When a FlowFile with a matching order arrives, or a FlowFile overtakes the FlowFile being waited for because of wait timeout, target order will be updated to (FlowFile.order + 1). '<groupId>.max is the maximum order number for a group. '<groupId>.updatedAt' is a timestamp when the order of a group was updated last time. These managed states will be removed automatically once a group is determined as inactive, see 'Inactive Timeout' for detail.")
@EventDriven
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@TriggerSerially
@Tags({"sort", "order"})
/* loaded from: input_file:org/apache/nifi/processors/standard/EnforceOrder.class */
public class EnforceOrder extends AbstractProcessor {
    public static final String ATTR_STARTED_AT = "EnforceOrder.startedAt";
    public static final String ATTR_EXPECTED_ORDER = "EnforceOrder.expectedOrder";
    public static final String ATTR_RESULT = "EnforceOrder.result";
    public static final String ATTR_DETAIL = "EnforceOrder.detail";
    private static final String STATE_SUFFIX_UPDATED_AT = ".updatedAt";
    private final Set<Relationship> relationships;
    private static final Function<String, String> STATE_TARGET_ORDER = str -> {
        return str + ".target";
    };
    private static final Function<String, String> STATE_UPDATED_AT = str -> {
        return str + STATE_SUFFIX_UPDATED_AT;
    };
    private static final Function<String, String> STATE_MAX_ORDER = str -> {
        return str + ".max";
    };
    public static final PropertyDescriptor GROUP_IDENTIFIER = new PropertyDescriptor.Builder().name("group-id").displayName("Group Identifier").description("EnforceOrder is capable of multiple ordering groups. 'Group Identifier' is used to determine which group a FlowFile belongs to. This property will be evaluated with each incoming FlowFile. If evaluated result is empty, the FlowFile will be routed to failure.").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("${filename}").build();
    public static final PropertyDescriptor ORDER_ATTRIBUTE = new PropertyDescriptor.Builder().name("order-attribute").displayName("Order Attribute").description("A name of FlowFile attribute whose value will be used to enforce order of FlowFiles within a group. If a FlowFile does not have this attribute, or its value is not an integer, the FlowFile will be routed to failure.").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor INITIAL_ORDER = new PropertyDescriptor.Builder().name("initial-order").displayName("Initial Order").description("When the first FlowFile of a group arrives, initial target order will be computed and stored in the managed state. After that, target order will start being tracked by EnforceOrder and stored in the state management store. If Expression Language is used but evaluated result was not an integer, then the FlowFile will be routed to failure, and initial order will be left unknown until consecutive FlowFiles provide a valid initial order.").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("0").build();
    public static final PropertyDescriptor MAX_ORDER = new PropertyDescriptor.Builder().name("maximum-order").displayName("Maximum Order").description("If specified, any FlowFiles that have larger order will be routed to failure. This property is computed only once for a given group. After a maximum order is computed, it will be persisted in the state management store and used for other FlowFiles belonging to the same group. If Expression Language is used but evaluated result was not an integer, then the FlowFile will be routed to failure, and maximum order will be left unknown until consecutive FlowFiles provide a valid maximum order.").required(false).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor WAIT_TIMEOUT = new PropertyDescriptor.Builder().name("wait-timeout").displayName("Wait Timeout").description("Indicates the duration after which waiting FlowFiles will be routed to the 'overtook' relationship.").required(true).defaultValue("10 min").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor INACTIVE_TIMEOUT = new PropertyDescriptor.Builder().name("inactive-timeout").displayName("Inactive Timeout").description("Indicates the duration after which state for an inactive group will be cleared from managed state. Group is determined as inactive if any new incoming FlowFile has not seen for a group for specified duration. Inactive Timeout must be longer than Wait Timeout. If a FlowFile arrives late after its group is already cleared, it will be treated as a brand new group, but will never match the order since expected preceding FlowFiles are already gone. The FlowFile will eventually timeout for waiting and routed to 'overtook'. To avoid this, group states should be kept long enough, however, shorter duration would be helpful for reusing the same group identifier again.").required(true).defaultValue("30 min").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor BATCH_COUNT = new PropertyDescriptor.Builder().name("batch-count").displayName("Batch Count").description("The maximum number of FlowFiles that EnforceOrder can process at an execution.").required(true).defaultValue("1000").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile with a matching order number will be routed to this relationship.").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFiles which does not have required attributes, or fails to compute those will be routed to this relationship").build();
    public static final Relationship REL_WAIT = new Relationship.Builder().name("wait").description("A FlowFile with non matching order will be routed to this relationship").build();
    public static final Relationship REL_OVERTOOK = new Relationship.Builder().name("overtook").description("A FlowFile that waited for preceding FlowFiles longer than Wait Timeout and overtook those FlowFiles, will be routed to this relationship.").build();
    public static final Relationship REL_SKIPPED = new Relationship.Builder().name("skipped").description("A FlowFile that has an order younger than current, which means arrived too late and skipped, will be routed to this relationship.").build();

    /* loaded from: input_file:org/apache/nifi/processors/standard/EnforceOrder$OrderingContext.class */
    private class OrderingContext {
        private final ComponentLog logger;
        private final ProcessSession processSession;
        private final ProcessContext processContext;
        private final String orderAttribute;
        private final Long waitTimeoutMillis;
        private final Function<FlowFile, Integer> getOrder;
        private final Map<String, String> groupStates;
        private final long now;
        private final PropertyValue groupIdentifierProperty;
        private final PropertyValue initOrderProperty;
        private final PropertyValue maxOrderProperty;
        private final Map<String, List<FlowFile>> flowFileGroups;
        private FlowFile flowFile;
        private String groupId;
        private Integer order;

        private OrderingContext(ProcessContext processContext, ProcessSession processSession) {
            this.logger = EnforceOrder.this.getLogger();
            this.groupStates = new HashMap();
            this.now = System.currentTimeMillis();
            this.flowFileGroups = new TreeMap();
            this.processContext = processContext;
            this.processSession = processSession;
            this.orderAttribute = processContext.getProperty(EnforceOrder.ORDER_ATTRIBUTE).getValue();
            this.waitTimeoutMillis = processContext.getProperty(EnforceOrder.WAIT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
            this.getOrder = flowFile -> {
                return Integer.valueOf(Integer.parseInt(flowFile.getAttribute(this.orderAttribute)));
            };
            this.groupIdentifierProperty = processContext.getProperty(EnforceOrder.GROUP_IDENTIFIER);
            this.initOrderProperty = processContext.getProperty(EnforceOrder.INITIAL_ORDER);
            this.maxOrderProperty = processContext.getProperty(EnforceOrder.MAX_ORDER);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setFlowFile(FlowFile flowFile) {
            this.flowFile = flowFile;
            this.groupId = null;
            this.order = null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean computeGroupId() {
            this.groupId = this.groupIdentifierProperty.evaluateAttributeExpressions(this.flowFile).getValue();
            if (!StringUtils.isBlank(this.groupId)) {
                return true;
            }
            transferToFailure(this.flowFile, "Failed to get Group Identifier.");
            return false;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean computeOrder() {
            try {
                this.order = this.getOrder.apply(this.flowFile);
                return true;
            } catch (NumberFormatException e) {
                transferToFailure(this.flowFile, "Failed to parse order attribute due to " + e, e);
                return false;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* JADX WARN: Multi-variable type inference failed */
        public boolean computeMaxOrder() {
            if (!this.maxOrderProperty.isSet()) {
                return true;
            }
            String str = (String) this.groupStates.computeIfAbsent(EnforceOrder.STATE_MAX_ORDER.apply(this.groupId), str2 -> {
                return this.maxOrderProperty.evaluateAttributeExpressions(this.flowFile).getValue();
            });
            if (StringUtils.isBlank(str)) {
                transferToFailure(this.flowFile, String.format("%s was specified but result was empty.", EnforceOrder.MAX_ORDER.getDisplayName()));
                return false;
            }
            try {
                Integer valueOf = Integer.valueOf(Integer.parseInt(str));
                if (this.order.intValue() <= valueOf.intValue()) {
                    return true;
                }
                transferToFailure(this.flowFile, String.format("Order (%d) is greater than the Maximum Order (%d) for Group [%s]", this.order, valueOf, this.groupId));
                return false;
            } catch (NumberFormatException e) {
                transferToFailure(this.flowFile, String.format("Failed to get Maximum Order for group [%s] due to %s", this.groupId, e), e);
                return false;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* JADX WARN: Multi-variable type inference failed */
        public boolean computeInitialOrder() {
            String str = (String) EnforceOrder.STATE_TARGET_ORDER.apply(this.groupId);
            try {
                AtomicReference atomicReference = new AtomicReference();
                this.groupStates.computeIfAbsent(str, str2 -> {
                    String value = this.initOrderProperty.evaluateAttributeExpressions(this.flowFile).getValue();
                    Integer.parseInt(value);
                    atomicReference.set(value);
                    return value;
                });
                if (!StringUtils.isBlank((CharSequence) atomicReference.get())) {
                    this.groupStates.put(EnforceOrder.STATE_UPDATED_AT.apply(this.groupId), String.valueOf(this.now));
                }
                return true;
            } catch (NumberFormatException e) {
                transferToFailure(this.flowFile, String.format("Failed to get Initial Order for Group [%s] due to %s", this.groupId, e), e);
                return false;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void markFlowFileValid() {
            this.flowFileGroups.computeIfAbsent(this.groupId, str -> {
                return new ArrayList();
            }).add(StringUtils.isBlank(this.flowFile.getAttribute(EnforceOrder.ATTR_STARTED_AT)) ? this.processSession.putAttribute(this.flowFile, EnforceOrder.ATTR_STARTED_AT, String.valueOf(this.now)) : this.flowFile);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void transferFlowFiles() {
            this.flowFileGroups.entrySet().stream().filter(entry -> {
                return !((List) entry.getValue()).isEmpty();
            }).map(entry2 -> {
                ((List) entry2.getValue()).sort(Comparator.comparing(this.getOrder));
                return entry2;
            }).forEach(entry3 -> {
                String str = (String) entry3.getKey();
                String str2 = (String) EnforceOrder.STATE_TARGET_ORDER.apply(str);
                int parseInt = Integer.parseInt(this.groupStates.get(str2));
                AtomicInteger atomicInteger = new AtomicInteger(parseInt);
                List list = (List) entry3.getValue();
                String str3 = this.groupStates.get(EnforceOrder.STATE_MAX_ORDER.apply(str));
                list.forEach(flowFile -> {
                    Integer apply = this.getOrder.apply(flowFile);
                    boolean z = !StringUtils.isBlank(str3) && apply.equals(Integer.valueOf(Integer.parseInt(str3)));
                    if (apply.intValue() == atomicInteger.get()) {
                        transferResult(flowFile, EnforceOrder.REL_SUCCESS, null, null);
                        if (z) {
                            return;
                        }
                        atomicInteger.incrementAndGet();
                        return;
                    }
                    if (apply.intValue() <= atomicInteger.get()) {
                        String format = String.format("Skipped, FlowFile order was %d but current target is %d", apply, Integer.valueOf(atomicInteger.get()));
                        this.logger.warn(format + ". {}", new Object[]{flowFile});
                        transferResult(flowFile, EnforceOrder.REL_SKIPPED, format, Integer.valueOf(atomicInteger.get()));
                    } else if (this.now - Long.parseLong(flowFile.getAttribute(EnforceOrder.ATTR_STARTED_AT)) <= this.waitTimeoutMillis.longValue()) {
                        transferResult(flowFile, EnforceOrder.REL_WAIT, null, Integer.valueOf(atomicInteger.get()));
                    } else {
                        transferResult(flowFile, EnforceOrder.REL_OVERTOOK, null, Integer.valueOf(atomicInteger.get()));
                        atomicInteger.set(z ? apply.intValue() : apply.intValue() + 1);
                    }
                });
                if (parseInt != atomicInteger.get()) {
                    this.groupStates.put(str2, String.valueOf(atomicInteger.get()));
                    this.groupStates.put(EnforceOrder.STATE_UPDATED_AT.apply(str), String.valueOf(this.now));
                }
            });
        }

        private void transferResult(FlowFile flowFile, Relationship relationship, String str, Integer num) {
            HashMap hashMap = new HashMap();
            hashMap.put(EnforceOrder.ATTR_RESULT, relationship.getName());
            if (num != null) {
                hashMap.put(EnforceOrder.ATTR_EXPECTED_ORDER, num.toString());
            }
            if (!StringUtils.isBlank(str)) {
                hashMap.put(EnforceOrder.ATTR_DETAIL, str);
            }
            FlowFile putAllAttributes = this.processSession.putAllAttributes(flowFile, hashMap);
            if (num == null) {
                putAllAttributes = this.processSession.removeAttribute(putAllAttributes, EnforceOrder.ATTR_EXPECTED_ORDER);
            }
            if (str == null) {
                putAllAttributes = this.processSession.removeAttribute(putAllAttributes, EnforceOrder.ATTR_DETAIL);
            }
            this.processSession.transfer(putAllAttributes, relationship);
        }

        private void transferToFailure(FlowFile flowFile, String str) {
            transferToFailure(flowFile, str, null);
        }

        private void transferToFailure(FlowFile flowFile, String str, Throwable th) {
            if (th != null) {
                EnforceOrder.this.getLogger().warn(str + " {}", new Object[]{flowFile, th});
            } else {
                EnforceOrder.this.getLogger().warn(str + " {}", new Object[]{flowFile});
            }
            transferResult(flowFile, EnforceOrder.REL_FAILURE, str, null);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void cleanupInactiveStates() {
            Long asTimePeriod = this.processContext.getProperty(EnforceOrder.INACTIVE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
            ((List) this.groupStates.keySet().stream().filter(str -> {
                return str.endsWith(EnforceOrder.STATE_SUFFIX_UPDATED_AT) && this.now - Long.parseLong(this.groupStates.get(str)) > asTimePeriod.longValue();
            }).map(str2 -> {
                return str2.substring(0, str2.length() - EnforceOrder.STATE_SUFFIX_UPDATED_AT.length());
            }).collect(Collectors.toList())).forEach(str3 -> {
                this.groupStates.remove(EnforceOrder.STATE_TARGET_ORDER.apply(str3));
                this.groupStates.remove(EnforceOrder.STATE_UPDATED_AT.apply(str3));
                this.groupStates.remove(EnforceOrder.STATE_MAX_ORDER.apply(str3));
            });
        }
    }

    public EnforceOrder() {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_WAIT);
        hashSet.add(REL_OVERTOOK);
        hashSet.add(REL_FAILURE);
        hashSet.add(REL_SKIPPED);
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(GROUP_IDENTIFIER);
        arrayList.add(ORDER_ATTRIBUTE);
        arrayList.add(INITIAL_ORDER);
        arrayList.add(MAX_ORDER);
        arrayList.add(BATCH_COUNT);
        arrayList.add(WAIT_TIMEOUT);
        arrayList.add(INACTIVE_TIMEOUT);
        return arrayList;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList arrayList = new ArrayList(super.customValidate(validationContext));
        if (validationContext.getProperty(WAIT_TIMEOUT).asTimePeriod(TimeUnit.MICROSECONDS).longValue() >= validationContext.getProperty(INACTIVE_TIMEOUT).asTimePeriod(TimeUnit.MICROSECONDS).longValue()) {
            arrayList.add(new ValidationResult.Builder().input(validationContext.getProperty(INACTIVE_TIMEOUT).getValue()).subject(INACTIVE_TIMEOUT.getDisplayName()).explanation(String.format("%s should be longer than %s", INACTIVE_TIMEOUT.getDisplayName(), WAIT_TIMEOUT.getDisplayName())).valid(false).build());
        }
        return arrayList;
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        List list = processSession.get(processContext.getProperty(BATCH_COUNT).asInteger().intValue());
        if (list == null || list.isEmpty()) {
            return;
        }
        try {
            StateMap state = processSession.getState(Scope.LOCAL);
            OrderingContext orderingContext = new OrderingContext(processContext, processSession);
            orderingContext.groupStates.putAll(state.toMap());
            Iterator it = list.iterator();
            while (it.hasNext()) {
                orderingContext.setFlowFile((FlowFile) it.next());
                if (orderingContext.flowFile == null) {
                    break;
                }
                if (orderingContext.computeGroupId() && orderingContext.computeOrder() && orderingContext.computeInitialOrder() && orderingContext.computeMaxOrder()) {
                    orderingContext.markFlowFileValid();
                }
            }
            orderingContext.transferFlowFiles();
            orderingContext.cleanupInactiveStates();
            try {
                processSession.setState(orderingContext.groupStates, Scope.LOCAL);
            } catch (IOException e) {
                throw new RuntimeException("Failed to update state due to " + e + ". Session will be rollback and processor will be yielded for a while.", e);
            }
        } catch (IOException e2) {
            getLogger().error("Failed to retrieve state from StateManager due to {}" + e2, e2);
            processContext.yield();
        }
    }
}
