package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Monitors the flow for activity and sends out an indicator when the flow has not had any data for some specified amount of time and again when the flow's activity is restored")
@WritesAttributes({@WritesAttribute(attribute = "inactivityStartMillis", description = "The time at which Inactivity began, in the form of milliseconds since Epoch"), @WritesAttribute(attribute = "inactivityDurationMillis", description = "The number of milliseconds that the inactivity has spanned")})
@TriggerWhenEmpty
@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"monitor", "flow", "active", "inactive", "activity", "detection"})
@SideEffectFree
/* loaded from: input_file:org/apache/nifi/processors/standard/MonitorActivity.class */
public class MonitorActivity extends AbstractProcessor {
    public static final PropertyDescriptor THRESHOLD = new PropertyDescriptor.Builder().name("Threshold Duration").description("Determines how much time must elapse before considering the flow to be inactive").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("5 min").build();
    public static final PropertyDescriptor CONTINUALLY_SEND_MESSAGES = new PropertyDescriptor.Builder().name("Continually Send Messages").description("If true, will send inactivity indicator continually every Threshold Duration amount of time until activity is restored; if false, will send an indicator only when the flow first becomes inactive").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    public static final PropertyDescriptor ACTIVITY_RESTORED_MESSAGE = new PropertyDescriptor.Builder().name("Activity Restored Message").description("The message that will be the content of FlowFiles that are sent to 'activity.restored' relationship").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("Activity restored at time: ${now():format('yyyy/MM/dd HH:mm:ss')} after being inactive for ${inactivityDurationMillis:toNumber():divide(60000)} minutes").build();
    public static final PropertyDescriptor INACTIVITY_MESSAGE = new PropertyDescriptor.Builder().name("Inactivity Message").description("The message that will be the content of FlowFiles that are sent to the 'inactive' relationship").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("Lacking activity as of time: ${now():format('yyyy/MM/dd HH:mm:ss')}; flow has been inactive for ${inactivityDurationMillis:toNumber():divide(60000)} minutes").build();
    public static final PropertyDescriptor COPY_ATTRIBUTES = new PropertyDescriptor.Builder().name("Copy Attributes").description("If true, will copy all flow file attributes from the flow file that resumed activity to the newly created indicator flow file").required(false).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All incoming FlowFiles are routed to success").build();
    public static final Relationship REL_INACTIVE = new Relationship.Builder().name("inactive").description("This relationship is used to transfer an Inactivity indicator when no FlowFiles are routed to 'success' for Threshold Duration amount of time").build();
    public static final Relationship REL_ACTIVITY_RESTORED = new Relationship.Builder().name("activity.restored").description("This relationship is used to transfer an Activity Restored indicator when FlowFiles are routing to 'success' following a period of inactivity").build();
    public static final Charset UTF8 = Charset.forName(EvaluateXQuery.UTF8);
    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;
    private final AtomicLong latestSuccessTransfer = new AtomicLong(System.currentTimeMillis());
    private final AtomicBoolean inactive = new AtomicBoolean(false);
    private final AtomicLong lastInactiveMessage = new AtomicLong(System.currentTimeMillis());

    protected void init(ProcessorInitializationContext processorInitializationContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(THRESHOLD);
        arrayList.add(CONTINUALLY_SEND_MESSAGES);
        arrayList.add(INACTIVITY_MESSAGE);
        arrayList.add(ACTIVITY_RESTORED_MESSAGE);
        arrayList.add(COPY_ATTRIBUTES);
        this.properties = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_INACTIVE);
        hashSet.add(REL_ACTIVITY_RESTORED);
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    @OnScheduled
    public void resetLastSuccessfulTransfer() {
        setLastSuccessfulTransfer(System.currentTimeMillis());
    }

    protected final void setLastSuccessfulTransfer(long j) {
        this.latestSuccessTransfer.set(j);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) {
        long longValue = processContext.getProperty(THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
        long currentTimeMillis = System.currentTimeMillis();
        ProcessorLog logger = getLogger();
        List list = processSession.get(50);
        if (list.isEmpty()) {
            long j = this.latestSuccessTransfer.get();
            boolean z = false;
            if (currentTimeMillis >= j + longValue) {
                z = !this.inactive.getAndSet(true) || (processContext.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean().booleanValue() && currentTimeMillis > this.lastInactiveMessage.get() + longValue);
            }
            if (!z) {
                processContext.yield();
                return;
            }
            this.lastInactiveMessage.set(System.currentTimeMillis());
            FlowFile putAttribute = processSession.putAttribute(processSession.putAttribute(processSession.create(), "inactivityStartMillis", String.valueOf(j)), "inactivityDurationMillis", String.valueOf(currentTimeMillis - j));
            final byte[] bytes = processContext.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(putAttribute).getValue().getBytes(UTF8);
            FlowFile write = processSession.write(putAttribute, new OutputStreamCallback() { // from class: org.apache.nifi.processors.standard.MonitorActivity.1
                public void process(OutputStream outputStream) throws IOException {
                    outputStream.write(bytes);
                }
            });
            processSession.getProvenanceReporter().create(write);
            processSession.transfer(write, REL_INACTIVE);
            logger.info("Transferred {} to 'inactive'", new Object[]{write});
            return;
        }
        processSession.transfer(list, REL_SUCCESS);
        logger.info("Transferred {} FlowFiles to 'success'", new Object[]{Integer.valueOf(list.size())});
        long andSet = this.latestSuccessTransfer.getAndSet(currentTimeMillis);
        if (this.inactive.getAndSet(false)) {
            FlowFile create = processSession.create();
            if (processContext.getProperty(COPY_ATTRIBUTES).asBoolean().booleanValue()) {
                HashMap hashMap = new HashMap(((FlowFile) list.get(0)).getAttributes());
                hashMap.remove(CoreAttributes.UUID.key());
                create = processSession.putAllAttributes(create, hashMap);
            }
            FlowFile putAttribute2 = processSession.putAttribute(processSession.putAttribute(create, "inactivityStartMillis", String.valueOf(andSet)), "inactivityDurationMillis", String.valueOf(currentTimeMillis - andSet));
            final byte[] bytes2 = processContext.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(putAttribute2).getValue().getBytes(UTF8);
            FlowFile write2 = processSession.write(putAttribute2, new OutputStreamCallback() { // from class: org.apache.nifi.processors.standard.MonitorActivity.2
                public void process(OutputStream outputStream) throws IOException {
                    outputStream.write(bytes2);
                }
            });
            processSession.getProvenanceReporter().create(write2);
            processSession.transfer(write2, REL_ACTIVITY_RESTORED);
            logger.info("Transferred {} to 'activity.restored'", new Object[]{write2});
        }
    }
}
