/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

@SideEffectFree
@TriggerSerially
@TriggerWhenEmpty
@Tags(value={"monitor", "flow", "active", "inactive", "activity", "detection"})
@CapabilityDescription(value="Monitors the flow for activity and sends out an indicator when the flow has not had any data for some specified amount of time and again when the flow's activity is restored")
@WritesAttributes(value={@WritesAttribute(attribute="inactivityStartMillis", description="The time at which Inactivity began, in the form of milliseconds since Epoch"), @WritesAttribute(attribute="inactivityDurationMillis", description="The number of milliseconds that the inactivity has spanned")})
public class MonitorActivity
extends AbstractProcessor {
    public static final PropertyDescriptor THRESHOLD = new PropertyDescriptor.Builder().name("Threshold Duration").description("Determines how much time must elapse before considering the flow to be inactive").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("5 min").build();
    public static final PropertyDescriptor CONTINUALLY_SEND_MESSAGES = new PropertyDescriptor.Builder().name("Continually Send Messages").description("If true, will send inactivity indicator continually every Threshold Duration amount of time until activity is restored; if false, will send an indicator only when the flow first becomes inactive").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    public static final PropertyDescriptor ACTIVITY_RESTORED_MESSAGE = new PropertyDescriptor.Builder().name("Activity Restored Message").description("The message that will be the content of FlowFiles that are sent to 'activity.restored' relationship").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("Activity restored at time: ${now():format('yyyy/MM/dd HH:mm:ss')} after being inactive for ${inactivityDurationMillis:toNumber():divide(60000)} minutes").build();
    public static final PropertyDescriptor INACTIVITY_MESSAGE = new PropertyDescriptor.Builder().name("Inactivity Message").description("The message that will be the content of FlowFiles that are sent to the 'inactive' relationship").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("Lacking activity as of time: ${now():format('yyyy/MM/dd HH:mm:ss')}; flow has been inactive for ${inactivityDurationMillis:toNumber():divide(60000)} minutes").build();
    public static final PropertyDescriptor COPY_ATTRIBUTES = new PropertyDescriptor.Builder().name("Copy Attributes").description("If true, will copy all flow file attributes from the flow file that resumed activity to the newly created indicator flow file").required(false).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All incoming FlowFiles are routed to success").build();
    public static final Relationship REL_INACTIVE = new Relationship.Builder().name("inactive").description("This relationship is used to transfer an Inactivity indicator when no FlowFiles are routed to 'success' for Threshold Duration amount of time").build();
    public static final Relationship REL_ACTIVITY_RESTORED = new Relationship.Builder().name("activity.restored").description("This relationship is used to transfer an Activity Restored indicator when FlowFiles are routing to 'success' following a period of inactivity").build();
    public static final Charset UTF8 = Charset.forName("UTF-8");
    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;
    private final AtomicLong latestSuccessTransfer = new AtomicLong(System.currentTimeMillis());
    private final AtomicBoolean inactive = new AtomicBoolean(false);
    private final AtomicLong lastInactiveMessage = new AtomicLong(System.currentTimeMillis());

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(THRESHOLD);
        properties.add(CONTINUALLY_SEND_MESSAGES);
        properties.add(INACTIVITY_MESSAGE);
        properties.add(ACTIVITY_RESTORED_MESSAGE);
        properties.add(COPY_ATTRIBUTES);
        this.properties = Collections.unmodifiableList(properties);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_INACTIVE);
        relationships.add(REL_ACTIVITY_RESTORED);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        long thresholdMillis = context.getProperty(THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS);
        long now = System.currentTimeMillis();
        ProcessorLog logger = this.getLogger();
        List flowFiles = session.get(50);
        if (flowFiles.isEmpty()) {
            long previousSuccessMillis = this.latestSuccessTransfer.get();
            boolean sendInactiveMarker = false;
            if (now >= previousSuccessMillis + thresholdMillis) {
                boolean continual = context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
                boolean bl = sendInactiveMarker = !this.inactive.getAndSet(true) || continual && now > this.lastInactiveMessage.get() + thresholdMillis;
            }
            if (sendInactiveMarker) {
                this.lastInactiveMessage.set(System.currentTimeMillis());
                FlowFile inactiveFlowFile = session.create();
                inactiveFlowFile = session.putAttribute(inactiveFlowFile, "inactivityStartMillis", String.valueOf(previousSuccessMillis));
                inactiveFlowFile = session.putAttribute(inactiveFlowFile, "inactivityDurationMillis", String.valueOf(now - previousSuccessMillis));
                final byte[] outBytes = context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(UTF8);
                inactiveFlowFile = session.write(inactiveFlowFile, new OutputStreamCallback(){

                    public void process(OutputStream out) throws IOException {
                        out.write(outBytes);
                    }
                });
                session.getProvenanceReporter().create(inactiveFlowFile);
                session.transfer(inactiveFlowFile, REL_INACTIVE);
                logger.info("Transferred {} to 'inactive'", new Object[]{inactiveFlowFile});
            } else {
                context.yield();
            }
        } else {
            session.transfer((Collection)flowFiles, REL_SUCCESS);
            logger.info("Transferred {} FlowFiles to 'success'", new Object[]{flowFiles.size()});
            long inactivityStartMillis = this.latestSuccessTransfer.getAndSet(now);
            if (this.inactive.getAndSet(false)) {
                FlowFile activityRestoredFlowFile = session.create();
                boolean copyAttributes = context.getProperty(COPY_ATTRIBUTES).asBoolean();
                if (copyAttributes) {
                    HashMap attributes = new HashMap(((FlowFile)flowFiles.get(0)).getAttributes());
                    attributes.remove(CoreAttributes.UUID.key());
                    activityRestoredFlowFile = session.putAllAttributes(activityRestoredFlowFile, attributes);
                }
                activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityStartMillis", String.valueOf(inactivityStartMillis));
                activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis", String.valueOf(now - inactivityStartMillis));
                final byte[] outBytes = context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(UTF8);
                activityRestoredFlowFile = session.write(activityRestoredFlowFile, new OutputStreamCallback(){

                    public void process(OutputStream out) throws IOException {
                        out.write(outBytes);
                    }
                });
                session.getProvenanceReporter().create(activityRestoredFlowFile);
                session.transfer(activityRestoredFlowFile, REL_ACTIVITY_RESTORED);
                logger.info("Transferred {} to 'activity.restored'", new Object[]{activityRestoredFlowFile});
            }
        }
    }
}

