/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;

@SideEffectFree
@TriggerSerially
@TriggerWhenEmpty
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"monitor", "flow", "active", "inactive", "activity", "detection"})
@CapabilityDescription(value="Monitors the flow for activity and sends out an indicator when the flow has not had any data for some specified amount of time and again when the flow's activity is restored")
@WritesAttributes(value={@WritesAttribute(attribute="inactivityStartMillis", description="The time at which Inactivity began, in the form of milliseconds since Epoch"), @WritesAttribute(attribute="inactivityDurationMillis", description="The number of milliseconds that the inactivity has spanned")})
@Stateful(scopes={Scope.CLUSTER}, description="MonitorActivity stores the last timestamp at each node as state, so that it can examine activity at cluster wide.If 'Copy Attribute' is set to true, then flow file attributes are also persisted.")
public class MonitorActivity
extends AbstractProcessor {
    public static final AllowableValue SCOPE_NODE = new AllowableValue("node");
    public static final AllowableValue SCOPE_CLUSTER = new AllowableValue("cluster");
    public static final AllowableValue REPORT_NODE_ALL = new AllowableValue("all");
    public static final AllowableValue REPORT_NODE_PRIMARY = new AllowableValue("primary");
    public static final PropertyDescriptor THRESHOLD = new PropertyDescriptor.Builder().name("Threshold Duration").description("Determines how much time must elapse before considering the flow to be inactive").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("5 min").build();
    public static final PropertyDescriptor CONTINUALLY_SEND_MESSAGES = new PropertyDescriptor.Builder().name("Continually Send Messages").description("If true, will send inactivity indicator continually every Threshold Duration amount of time until activity is restored; if false, will send an indicator only when the flow first becomes inactive").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    public static final PropertyDescriptor ACTIVITY_RESTORED_MESSAGE = new PropertyDescriptor.Builder().name("Activity Restored Message").description("The message that will be the content of FlowFiles that are sent to 'activity.restored' relationship").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("Activity restored at time: ${now():format('yyyy/MM/dd HH:mm:ss')} after being inactive for ${inactivityDurationMillis:toNumber():divide(60000)} minutes").build();
    public static final PropertyDescriptor INACTIVITY_MESSAGE = new PropertyDescriptor.Builder().name("Inactivity Message").description("The message that will be the content of FlowFiles that are sent to the 'inactive' relationship").required(true).expressionLanguageSupported(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("Lacking activity as of time: ${now():format('yyyy/MM/dd HH:mm:ss')}; flow has been inactive for ${inactivityDurationMillis:toNumber():divide(60000)} minutes").build();
    public static final PropertyDescriptor COPY_ATTRIBUTES = new PropertyDescriptor.Builder().name("Copy Attributes").description("If true, will copy all flow file attributes from the flow file that resumed activity to the newly created indicator flow file").required(false).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    public static final PropertyDescriptor MONITORING_SCOPE = new PropertyDescriptor.Builder().name("Monitoring Scope").description("Specify how to determine activeness of the flow. 'node' means that activeness is examined at individual node separately. It can be useful if DFM expects each node should receive flow files in a distributed manner. With 'cluster', it defines the flow is active while at least one node receives flow files actively. If NiFi is running as standalone mode, this should be set as 'node', if it's 'cluster', NiFi logs a warning message and act as 'node' scope.").required(true).allowableValues(new AllowableValue[]{SCOPE_NODE, SCOPE_CLUSTER}).defaultValue(SCOPE_NODE.getValue()).build();
    public static final PropertyDescriptor REPORTING_NODE = new PropertyDescriptor.Builder().name("Reporting Node").description("Specify which node should send notification flow-files to inactive and activity.restored relationships. With 'all', every node in this cluster send notification flow-files. 'primary' means flow-files will be sent only from a primary node. If NiFi is running as standalone mode, this should be set as 'all', even if it's 'primary', NiFi act as 'all'.").required(true).allowableValues(new AllowableValue[]{REPORT_NODE_ALL, REPORT_NODE_PRIMARY}).addValidator((subject, input, context) -> {
        boolean invalid = REPORT_NODE_PRIMARY.equals((Object)input) && SCOPE_NODE.equals((Object)context.getProperty(MONITORING_SCOPE).getValue());
        return new ValidationResult.Builder().subject(subject).input(input).explanation("'" + REPORT_NODE_PRIMARY + "' is only available with '" + SCOPE_CLUSTER + "' scope.").valid(!invalid).build();
    }).defaultValue(REPORT_NODE_ALL.getValue()).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All incoming FlowFiles are routed to success").build();
    public static final Relationship REL_INACTIVE = new Relationship.Builder().name("inactive").description("This relationship is used to transfer an Inactivity indicator when no FlowFiles are routed to 'success' for Threshold Duration amount of time").build();
    public static final Relationship REL_ACTIVITY_RESTORED = new Relationship.Builder().name("activity.restored").description("This relationship is used to transfer an Activity Restored indicator when FlowFiles are routing to 'success' following a period of inactivity").build();
    public static final Charset UTF8 = Charset.forName("UTF-8");
    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;
    private final AtomicLong latestSuccessTransfer = new AtomicLong(System.currentTimeMillis());
    private final AtomicLong latestReportedNodeState = new AtomicLong(System.currentTimeMillis());
    private final AtomicBoolean inactive = new AtomicBoolean(false);
    private final AtomicLong lastInactiveMessage = new AtomicLong(System.currentTimeMillis());
    public static final String STATE_KEY_LATEST_SUCCESS_TRANSFER = "MonitorActivity.latestSuccessTransfer";

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(THRESHOLD);
        properties.add(CONTINUALLY_SEND_MESSAGES);
        properties.add(INACTIVITY_MESSAGE);
        properties.add(ACTIVITY_RESTORED_MESSAGE);
        properties.add(COPY_ATTRIBUTES);
        properties.add(MONITORING_SCOPE);
        properties.add(REPORTING_NODE);
        this.properties = Collections.unmodifiableList(properties);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_INACTIVE);
        relationships.add(REL_ACTIVITY_RESTORED);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        this.isClusterScope(context, true);
        this.resetLastSuccessfulTransfer();
        this.inactive.set(false);
    }

    protected void resetLastSuccessfulTransfer() {
        this.setLastSuccessfulTransfer(System.currentTimeMillis());
    }

    protected final void setLastSuccessfulTransfer(long timestamp) {
        this.latestSuccessTransfer.set(timestamp);
        this.latestReportedNodeState.set(timestamp);
    }

    private boolean isClusterScope(ProcessContext context, boolean logInvalidConfig) {
        if (SCOPE_CLUSTER.equals((Object)context.getProperty(MONITORING_SCOPE).getValue())) {
            if (this.getNodeTypeProvider().isClustered()) {
                return true;
            }
            if (logInvalidConfig) {
                this.getLogger().warn("NiFi is running as a Standalone mode, but 'cluster' scope is set. Fallback to 'node' scope. Fix configuration to stop this message.");
            }
        }
        return false;
    }

    private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, ProcessContext context) {
        return REPORT_NODE_PRIMARY.equals((Object)context.getProperty(REPORTING_NODE).getValue()) && isClusterScope;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        long thresholdMillis = context.getProperty(THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS);
        long now = System.currentTimeMillis();
        ComponentLog logger = this.getLogger();
        boolean copyAttributes = context.getProperty(COPY_ATTRIBUTES).asBoolean();
        boolean isClusterScope = this.isClusterScope(context, false);
        boolean shouldReportOnlyOnPrimary = this.shouldReportOnlyOnPrimary(isClusterScope, context);
        List flowFiles = session.get(50);
        boolean isInactive = false;
        long updatedLatestSuccessTransfer = -1L;
        StateMap clusterState = null;
        if (flowFiles.isEmpty()) {
            long previousSuccessMillis = this.latestSuccessTransfer.get();
            boolean sendInactiveMarker = false;
            isInactive = now >= previousSuccessMillis + thresholdMillis;
            logger.debug("isInactive={}, previousSuccessMillis={}, now={}", new Object[]{isInactive, previousSuccessMillis, now});
            if (isInactive && isClusterScope) {
                try {
                    clusterState = context.getStateManager().getState(Scope.CLUSTER);
                    if (clusterState != null && !StringUtils.isEmpty((String)clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER))) {
                        long latestReportedClusterActivity = Long.valueOf(clusterState.get(STATE_KEY_LATEST_SUCCESS_TRANSFER));
                        boolean bl = isInactive = now >= latestReportedClusterActivity + thresholdMillis;
                        if (!isInactive) {
                            updatedLatestSuccessTransfer = latestReportedClusterActivity;
                        }
                        logger.debug("isInactive={}, latestReportedClusterActivity={}", new Object[]{isInactive, latestReportedClusterActivity});
                    }
                }
                catch (IOException e) {
                    logger.error("Failed to access cluster state. Activity will not be monitored properly until this is addressed.", (Throwable)e);
                }
            }
            if (isInactive) {
                boolean continual = context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
                boolean bl = sendInactiveMarker = !this.inactive.getAndSet(true) || continual && now > this.lastInactiveMessage.get() + thresholdMillis;
            }
            if (sendInactiveMarker && this.shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary)) {
                this.lastInactiveMessage.set(System.currentTimeMillis());
                FlowFile inactiveFlowFile = session.create();
                inactiveFlowFile = session.putAttribute(inactiveFlowFile, "inactivityStartMillis", String.valueOf(previousSuccessMillis));
                inactiveFlowFile = session.putAttribute(inactiveFlowFile, "inactivityDurationMillis", String.valueOf(now - previousSuccessMillis));
                final byte[] outBytes = context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(UTF8);
                inactiveFlowFile = session.write(inactiveFlowFile, new OutputStreamCallback(){

                    public void process(OutputStream out) throws IOException {
                        out.write(outBytes);
                    }
                });
                session.getProvenanceReporter().create(inactiveFlowFile);
                session.transfer(inactiveFlowFile, REL_INACTIVE);
                logger.info("Transferred {} to 'inactive'", new Object[]{inactiveFlowFile});
            } else {
                context.yield();
            }
        } else {
            session.transfer((Collection)flowFiles, REL_SUCCESS);
            updatedLatestSuccessTransfer = now;
            logger.info("Transferred {} FlowFiles to 'success'", new Object[]{flowFiles.size()});
            long latestStateReportTimestamp = this.latestReportedNodeState.get();
            if (isClusterScope && now - latestStateReportTimestamp > thresholdMillis / 3L) {
                try {
                    StateManager stateManager = context.getStateManager();
                    StateMap state = stateManager.getState(Scope.CLUSTER);
                    HashMap<String, String> newValues = new HashMap<String, String>();
                    if (copyAttributes) {
                        newValues.putAll(((FlowFile)flowFiles.get(0)).getAttributes());
                    }
                    newValues.put(STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(now));
                    if (state == null || state.getVersion() == -1L) {
                        stateManager.setState(newValues, Scope.CLUSTER);
                    } else {
                        String existingTimestamp = state.get(STATE_KEY_LATEST_SUCCESS_TRANSFER);
                        if (StringUtils.isEmpty((String)existingTimestamp) || Long.parseLong(existingTimestamp) < now) {
                            stateManager.replace(state, newValues, Scope.CLUSTER);
                        } else {
                            logger.debug("Existing state has more recent timestamp, didn't update state.");
                        }
                    }
                    this.latestReportedNodeState.set(now);
                }
                catch (IOException e) {
                    logger.error("Failed to access cluster state. Activity will not be monitored properly until this is addressed.", (Throwable)e);
                }
            }
        }
        if (!isInactive) {
            long inactivityStartMillis = this.latestSuccessTransfer.get();
            if (updatedLatestSuccessTransfer > -1L) {
                this.latestSuccessTransfer.set(updatedLatestSuccessTransfer);
            }
            if (this.inactive.getAndSet(false) && this.shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary)) {
                FlowFile activityRestoredFlowFile = session.create();
                if (copyAttributes) {
                    HashMap attributes = new HashMap();
                    if (flowFiles.size() > 0) {
                        attributes.putAll(((FlowFile)flowFiles.get(0)).getAttributes());
                    } else if (clusterState != null) {
                        attributes.putAll(clusterState.toMap());
                        attributes.remove(STATE_KEY_LATEST_SUCCESS_TRANSFER);
                    }
                    attributes.remove(CoreAttributes.UUID.key());
                    activityRestoredFlowFile = session.putAllAttributes(activityRestoredFlowFile, attributes);
                }
                activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityStartMillis", String.valueOf(inactivityStartMillis));
                activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis", String.valueOf(now - inactivityStartMillis));
                byte[] outBytes = context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(UTF8);
                activityRestoredFlowFile = session.write(activityRestoredFlowFile, out -> out.write(outBytes));
                session.getProvenanceReporter().create(activityRestoredFlowFile);
                session.transfer(activityRestoredFlowFile, REL_ACTIVITY_RESTORED);
                logger.info("Transferred {} to 'activity.restored'", new Object[]{activityRestoredFlowFile});
            }
        }
    }

    @OnStopped
    public void onStopped(ProcessContext context) {
        if (this.getNodeTypeProvider().isPrimary()) {
            StateManager stateManager = context.getStateManager();
            try {
                stateManager.clear(Scope.CLUSTER);
            }
            catch (IOException e) {
                this.getLogger().error("Failed to clear cluster state due to " + e, (Throwable)e);
            }
        }
    }

    private boolean shouldThisNodeReport(boolean isClusterScope, boolean isReportOnlyOnPrimary) {
        return !isClusterScope || !isReportOnlyOnPrimary || this.getNodeTypeProvider().isPrimary();
    }
}

