/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.reporting;

import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObjectBuilder;
import javax.json.JsonValue;
import org.apache.avro.Schema;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.reporting.AbstractSiteToSiteReportingTask;
import org.apache.nifi.reporting.ReportingContext;

@Tags(value={"status", "metrics", "history", "site", "site to site"})
@CapabilityDescription(value="Publishes Status events using the Site To Site protocol.  The component type and name filter regexes form a union: only components matching both regexes will be reported.  However, all process groups are recursively searched for matching components, regardless of whether the process group matches the component filters.")
public class SiteToSiteStatusReportingTask
extends AbstractSiteToSiteReportingTask {
    static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder().name("Platform").description("The value to use for the platform field in each status record.").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("nifi").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor COMPONENT_TYPE_FILTER_REGEX = new PropertyDescriptor.Builder().name("Component Type Filter Regex").description("A regex specifying which component types to report.  Any component type matching this regex will be included.  Component types are: Processor, RootProcessGroup, ProcessGroup, RemoteProcessGroup, Connection, InputPort, OutputPort").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("(Processor|ProcessGroup|RemoteProcessGroup|RootProcessGroup|Connection|InputPort|OutputPort)").addValidator(StandardValidators.createRegexValidator((int)0, (int)Integer.MAX_VALUE, (boolean)true)).build();
    static final PropertyDescriptor COMPONENT_NAME_FILTER_REGEX = new PropertyDescriptor.Builder().name("Component Name Filter Regex").description("A regex specifying which component names to report.  Any component name matching this regex will be included.").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue(".*").addValidator(StandardValidators.createRegexValidator((int)0, (int)Integer.MAX_VALUE, (boolean)true)).build();
    private volatile Pattern componentTypeFilter;
    private volatile Pattern componentNameFilter;

    public SiteToSiteStatusReportingTask() throws IOException {
        InputStream schema = ((Object)((Object)this)).getClass().getClassLoader().getResourceAsStream("schema-status.avsc");
        this.recordSchema = AvroTypeUtil.createSchema((Schema)new Schema.Parser().parse(schema));
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>(super.getSupportedPropertyDescriptors());
        properties.add(PLATFORM);
        properties.add(COMPONENT_TYPE_FILTER_REGEX);
        properties.add(COMPONENT_NAME_FILTER_REGEX);
        return properties;
    }

    public void onTrigger(ReportingContext context) {
        URL url;
        boolean isClustered = context.isClustered();
        String nodeId = context.getClusterNodeIdentifier();
        if (nodeId == null && isClustered) {
            this.getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. Will wait for Node Identifier to be established.");
            return;
        }
        this.componentTypeFilter = Pattern.compile(context.getProperty(COMPONENT_TYPE_FILTER_REGEX).evaluateAttributeExpressions().getValue());
        this.componentNameFilter = Pattern.compile(context.getProperty(COMPONENT_NAME_FILTER_REGEX).evaluateAttributeExpressions().getValue());
        ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus();
        String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName();
        String nifiUrl = context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue();
        try {
            url = new URL(nifiUrl);
        }
        catch (MalformedURLException e1) {
            throw new AssertionError();
        }
        String hostname = url.getHost();
        String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue();
        Map config = Collections.emptyMap();
        JsonBuilderFactory factory = Json.createBuilderFactory(config);
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
        df.setTimeZone(TimeZone.getTimeZone("Z"));
        JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
        this.serializeProcessGroupStatus(arrayBuilder, factory, procGroupStatus, df, hostname, rootGroupName, platform, null, new Date());
        JsonArray jsonArray = arrayBuilder.build();
        int batchSize = context.getProperty(BATCH_SIZE).asInteger();
        int fromIndex = 0;
        int toIndex = Math.min(batchSize, jsonArray.size());
        List jsonBatch = jsonArray.subList(fromIndex, toIndex);
        while (!jsonBatch.isEmpty()) {
            try {
                long start = System.nanoTime();
                Transaction transaction = this.getClient().createTransaction(TransferDirection.SEND);
                if (transaction == null) {
                    this.getLogger().debug("All destination nodes are penalized; will attempt to send data later");
                    return;
                }
                HashMap<String, String> attributes = new HashMap<String, String>();
                String transactionId = UUID.randomUUID().toString();
                attributes.put("reporting.task.transaction.id", transactionId);
                attributes.put("reporting.task.name", this.getName());
                attributes.put("reporting.task.uuid", this.getIdentifier());
                attributes.put("reporting.task.type", ((Object)((Object)this)).getClass().getSimpleName());
                attributes.put("mime.type", "application/json");
                JsonArrayBuilder jsonBatchArrayBuilder = factory.createArrayBuilder();
                for (JsonValue jsonValue : jsonBatch) {
                    jsonBatchArrayBuilder.add(jsonValue);
                }
                JsonArray jsonBatchArray = jsonBatchArrayBuilder.build();
                this.sendData(context, transaction, attributes, jsonBatchArray);
                transaction.confirm();
                transaction.complete();
                long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
                this.getLogger().info("Successfully sent {} Status Records to destination in {} ms; Transaction ID = {}", new Object[]{jsonArray.size(), transferMillis, transactionId});
                fromIndex = toIndex;
                toIndex = Math.min(fromIndex + batchSize, jsonArray.size());
                jsonBatch = jsonArray.subList(fromIndex, toIndex);
            }
            catch (IOException e) {
                throw new ProcessException("Failed to send Status Records to destination due to IOException:" + e.getMessage(), (Throwable)e);
            }
        }
    }

    private boolean componentMatchesFilters(String componentType, String componentName) {
        return this.componentTypeFilter.matcher(componentType).matches() && this.componentNameFilter.matcher(componentName).matches();
    }

    private void serializeProcessGroupStatus(JsonArrayBuilder arrayBuilder, JsonBuilderFactory factory, ProcessGroupStatus status, DateFormat df, String hostname, String applicationName, String platform, String parentId, Date currentDate) {
        String componentName;
        JsonObjectBuilder builder = factory.createObjectBuilder();
        String componentType = parentId == null ? "RootProcessGroup" : "ProcessGroup";
        if (this.componentMatchesFilters(componentType, componentName = status.getName())) {
            this.addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, componentType, componentName);
            this.addField(builder, "componentId", status.getId());
            this.addField(builder, "bytesRead", status.getBytesRead());
            this.addField(builder, "bytesWritten", status.getBytesWritten());
            this.addField(builder, "bytesReceived", status.getBytesReceived());
            this.addField(builder, "bytesSent", status.getBytesSent());
            this.addField(builder, "bytesTransferred", status.getBytesTransferred());
            this.addField(builder, "flowFilesReceived", status.getFlowFilesReceived());
            this.addField(builder, "flowFilesSent", status.getFlowFilesSent());
            this.addField(builder, "flowFilesTransferred", status.getFlowFilesTransferred());
            this.addField(builder, "inputContentSize", status.getInputContentSize());
            this.addField(builder, "inputCount", status.getInputCount());
            this.addField(builder, "outputContentSize", status.getOutputContentSize());
            this.addField(builder, "outputCount", status.getOutputCount());
            this.addField(builder, "queuedContentSize", status.getQueuedContentSize());
            this.addField(builder, "activeThreadCount", status.getActiveThreadCount());
            this.addField(builder, "queuedCount", status.getQueuedCount());
            arrayBuilder.add((JsonValue)builder.build());
        }
        for (ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
            this.serializeProcessGroupStatus(arrayBuilder, factory, childGroupStatus, df, hostname, applicationName, platform, status.getId(), currentDate);
        }
        for (ProcessorStatus processorStatus : status.getProcessorStatus()) {
            this.serializeProcessorStatus(arrayBuilder, factory, processorStatus, df, hostname, applicationName, platform, status.getId(), currentDate);
        }
        for (ConnectionStatus connectionStatus : status.getConnectionStatus()) {
            this.serializeConnectionStatus(arrayBuilder, factory, connectionStatus, df, hostname, applicationName, platform, status.getId(), currentDate);
        }
        for (PortStatus portStatus : status.getInputPortStatus()) {
            this.serializePortStatus("InputPort", arrayBuilder, factory, portStatus, df, hostname, applicationName, platform, status.getId(), currentDate);
        }
        for (PortStatus portStatus : status.getOutputPortStatus()) {
            this.serializePortStatus("OutputPort", arrayBuilder, factory, portStatus, df, hostname, applicationName, platform, status.getId(), currentDate);
        }
        for (RemoteProcessGroupStatus remoteProcessGroupStatus : status.getRemoteProcessGroupStatus()) {
            this.serializeRemoteProcessGroupStatus(arrayBuilder, factory, remoteProcessGroupStatus, df, hostname, applicationName, platform, status.getId(), currentDate);
        }
    }

    private void serializeRemoteProcessGroupStatus(JsonArrayBuilder arrayBuilder, JsonBuilderFactory factory, RemoteProcessGroupStatus status, DateFormat df, String hostname, String applicationName, String platform, String parentId, Date currentDate) {
        JsonObjectBuilder builder = factory.createObjectBuilder();
        String componentType = "RemoteProcessGroup";
        String componentName = status.getName();
        if (this.componentMatchesFilters("RemoteProcessGroup", componentName)) {
            this.addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, "RemoteProcessGroup", componentName);
            this.addField(builder, "componentId", status.getId());
            this.addField(builder, "activeRemotePortCount", status.getActiveRemotePortCount());
            this.addField(builder, "activeThreadCount", status.getActiveThreadCount());
            this.addField(builder, "inactiveRemotePortCount", status.getInactiveRemotePortCount());
            this.addField(builder, "receivedContentSize", status.getReceivedContentSize());
            this.addField(builder, "receivedCount", status.getReceivedCount());
            this.addField(builder, "sentContentSize", status.getSentContentSize());
            this.addField(builder, "sentCount", status.getSentCount());
            this.addField(builder, "averageLineageDuration", status.getAverageLineageDuration());
            arrayBuilder.add((JsonValue)builder.build());
        }
    }

    private void serializePortStatus(String componentType, JsonArrayBuilder arrayBuilder, JsonBuilderFactory factory, PortStatus status, DateFormat df, String hostname, String applicationName, String platform, String parentId, Date currentDate) {
        JsonObjectBuilder builder = factory.createObjectBuilder();
        String componentName = status.getName();
        if (this.componentMatchesFilters(componentType, componentName)) {
            this.addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, componentType, componentName);
            this.addField(builder, "componentId", status.getId());
            this.addField(builder, "activeThreadCount", status.getActiveThreadCount());
            this.addField(builder, "bytesReceived", status.getBytesReceived());
            this.addField(builder, "bytesSent", status.getBytesSent());
            this.addField(builder, "flowFilesReceived", status.getFlowFilesReceived());
            this.addField(builder, "flowFilesSent", status.getFlowFilesSent());
            this.addField(builder, "inputBytes", status.getInputBytes());
            this.addField(builder, "inputCount", status.getInputCount());
            this.addField(builder, "outputBytes", status.getOutputBytes());
            this.addField(builder, "outputCount", status.getOutputCount());
            arrayBuilder.add((JsonValue)builder.build());
        }
    }

    private void serializeConnectionStatus(JsonArrayBuilder arrayBuilder, JsonBuilderFactory factory, ConnectionStatus status, DateFormat df, String hostname, String applicationName, String platform, String parentId, Date currentDate) {
        JsonObjectBuilder builder = factory.createObjectBuilder();
        String componentType = "Connection";
        String componentName = status.getName();
        if (this.componentMatchesFilters("Connection", componentName)) {
            this.addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, "Connection", componentName);
            this.addField(builder, "componentId", status.getId());
            this.addField(builder, "sourceId", status.getSourceId());
            this.addField(builder, "sourceName", status.getSourceName());
            this.addField(builder, "destinationId", status.getDestinationId());
            this.addField(builder, "destinationName", status.getDestinationName());
            this.addField(builder, "maxQueuedBytes", status.getMaxQueuedBytes());
            this.addField(builder, "maxQueuedCount", status.getMaxQueuedCount());
            this.addField(builder, "queuedBytes", status.getQueuedBytes());
            this.addField(builder, "queuedCount", status.getQueuedCount());
            this.addField(builder, "inputBytes", status.getInputBytes());
            this.addField(builder, "inputCount", status.getInputCount());
            this.addField(builder, "outputBytes", status.getOutputBytes());
            this.addField(builder, "outputCount", status.getOutputCount());
            this.addField(builder, "backPressureBytesThreshold", status.getBackPressureBytesThreshold());
            this.addField(builder, "backPressureObjectThreshold", status.getBackPressureObjectThreshold());
            this.addField(builder, "isBackPressureEnabled", Boolean.toString(status.getBackPressureObjectThreshold() > 0L && status.getBackPressureObjectThreshold() <= (long)status.getQueuedCount() || status.getBackPressureBytesThreshold() > 0L && status.getBackPressureBytesThreshold() <= status.getMaxQueuedBytes()));
            arrayBuilder.add((JsonValue)builder.build());
        }
    }

    private void serializeProcessorStatus(JsonArrayBuilder arrayBuilder, JsonBuilderFactory factory, ProcessorStatus status, DateFormat df, String hostname, String applicationName, String platform, String parentId, Date currentDate) {
        JsonObjectBuilder builder = factory.createObjectBuilder();
        String componentType = "Processor";
        String componentName = status.getName();
        if (this.componentMatchesFilters("Processor", componentName)) {
            this.addCommonFields(builder, df, hostname, applicationName, platform, parentId, currentDate, "Processor", componentName);
            this.addField(builder, "componentId", status.getId());
            this.addField(builder, "processorType", status.getType());
            this.addField(builder, "averageLineageDurationMS", status.getAverageLineageDuration());
            this.addField(builder, "bytesRead", status.getBytesRead());
            this.addField(builder, "bytesWritten", status.getBytesWritten());
            this.addField(builder, "bytesReceived", status.getBytesReceived());
            this.addField(builder, "bytesSent", status.getBytesSent());
            this.addField(builder, "flowFilesRemoved", status.getFlowFilesRemoved());
            this.addField(builder, "flowFilesReceived", status.getFlowFilesReceived());
            this.addField(builder, "flowFilesSent", status.getFlowFilesSent());
            this.addField(builder, "inputCount", status.getInputCount());
            this.addField(builder, "inputBytes", status.getInputBytes());
            this.addField(builder, "outputCount", status.getOutputCount());
            this.addField(builder, "outputBytes", status.getOutputBytes());
            this.addField(builder, "activeThreadCount", status.getActiveThreadCount());
            this.addField(builder, "invocations", status.getInvocations());
            this.addField(builder, "processingNanos", status.getProcessingNanos());
            arrayBuilder.add((JsonValue)builder.build());
        }
    }

    private void addCommonFields(JsonObjectBuilder builder, DateFormat df, String hostname, String applicationName, String platform, String parentId, Date currentDate, String componentType, String componentName) {
        this.addField(builder, "statusId", UUID.randomUUID().toString());
        this.addField(builder, "timestampMillis", currentDate.getTime());
        this.addField(builder, "timestamp", df.format(currentDate));
        this.addField(builder, "actorHostname", hostname);
        this.addField(builder, "componentType", componentType);
        this.addField(builder, "componentName", componentName);
        this.addField(builder, "parentId", parentId);
        this.addField(builder, "platform", platform);
        this.addField(builder, "application", applicationName);
    }
}

