/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.status.history;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.controller.status.history.MetricDescriptor;
import org.apache.nifi.controller.status.history.StandardMetricDescriptor;
import org.apache.nifi.controller.status.history.StandardStatusHistory;
import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
import org.apache.nifi.controller.status.history.StatusHistory;
import org.apache.nifi.controller.status.history.StatusSnapshot;
import org.apache.nifi.controller.status.history.ValueMapper;
import org.apache.nifi.controller.status.history.ValueReducer;
import org.apache.nifi.util.ComponentStatusReport;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.RingBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VolatileComponentStatusRepository
implements ComponentStatusRepository {
    public static final String NUM_DATA_POINTS_PROPERTY = "nifi.components.status.repository.buffer.size";
    public static final int DEFAULT_NUM_DATA_POINTS = 288;
    private final RingBuffer<Capture> captures;
    private final Logger logger = LoggerFactory.getLogger(VolatileComponentStatusRepository.class);
    private volatile long lastCaptureTime = 0L;
    private static final List<MetricDescriptor<ProcessorStatus>> PROCESSOR_METRIC_DESCRIPTORS;
    private static final List<MetricDescriptor<ConnectionStatus>> CONNECTION_METRIC_DESCRIPTORS;
    private static final List<MetricDescriptor<ProcessGroupStatus>> PROCESS_GROUP_METRIC_DESCRIPTORS;
    private static final List<MetricDescriptor<RemoteProcessGroupStatus>> REMOTE_PROCESS_GROUP_METRIC_DESCRIPTORS;

    public VolatileComponentStatusRepository() {
        NiFiProperties properties = NiFiProperties.getInstance();
        int numDataPoints = properties.getIntegerProperty(NUM_DATA_POINTS_PROPERTY, Integer.valueOf(288));
        this.captures = new RingBuffer(numDataPoints);
    }

    public void capture(ProcessGroupStatus rootGroupStatus) {
        this.capture(rootGroupStatus, new Date());
    }

    public synchronized void capture(ProcessGroupStatus rootGroupStatus, Date timestamp) {
        this.captures.add((Object)new Capture(timestamp, ComponentStatusReport.fromProcessGroupStatus(rootGroupStatus, ComponentStatusReport.ComponentType.PROCESSOR, ComponentStatusReport.ComponentType.CONNECTION, ComponentStatusReport.ComponentType.PROCESS_GROUP, ComponentStatusReport.ComponentType.REMOTE_PROCESS_GROUP)));
        this.logger.debug("Captured metrics for {}", (Object)this);
        this.lastCaptureTime = Math.max(this.lastCaptureTime, timestamp.getTime());
    }

    public Date getLastCaptureDate() {
        return new Date(this.lastCaptureTime);
    }

    public StatusHistory getProcessorStatusHistory(final String processorId, Date start, Date end, int preferredDataPoints) {
        final StandardStatusHistory history = new StandardStatusHistory();
        history.setComponentDetail("Id", processorId);
        this.captures.forEach((RingBuffer.ForEachEvaluator)new RingBuffer.ForEachEvaluator<Capture>(){

            public boolean evaluate(Capture capture) {
                ComponentStatusReport statusReport = capture.getStatusReport();
                ProcessorStatus status = statusReport.getProcessorStatus(processorId);
                if (status == null) {
                    return true;
                }
                history.setComponentDetail("Group Id", status.getGroupId());
                history.setComponentDetail("Name", status.getName());
                history.setComponentDetail("Type", status.getType());
                StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
                snapshot.setTimestamp(capture.getCaptureDate());
                for (ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) {
                    snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue((Object)status));
                }
                history.addStatusSnapshot(snapshot);
                return true;
            }
        });
        return history;
    }

    public StatusHistory getConnectionStatusHistory(final String connectionId, Date start, Date end, int preferredDataPoints) {
        final StandardStatusHistory history = new StandardStatusHistory();
        history.setComponentDetail("Id", connectionId);
        this.captures.forEach((RingBuffer.ForEachEvaluator)new RingBuffer.ForEachEvaluator<Capture>(){

            public boolean evaluate(Capture capture) {
                ComponentStatusReport statusReport = capture.getStatusReport();
                ConnectionStatus status = statusReport.getConnectionStatus(connectionId);
                if (status == null) {
                    return true;
                }
                history.setComponentDetail("Group Id", status.getGroupId());
                history.setComponentDetail("Name", status.getName());
                history.setComponentDetail("Source Name", status.getSourceName());
                history.setComponentDetail("Destination Name", status.getDestinationName());
                StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
                snapshot.setTimestamp(capture.getCaptureDate());
                for (ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) {
                    snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue((Object)status));
                }
                history.addStatusSnapshot(snapshot);
                return true;
            }
        });
        return history;
    }

    public StatusHistory getProcessGroupStatusHistory(final String processGroupId, Date start, Date end, int preferredDataPoints) {
        final StandardStatusHistory history = new StandardStatusHistory();
        history.setComponentDetail("Id", processGroupId);
        this.captures.forEach((RingBuffer.ForEachEvaluator)new RingBuffer.ForEachEvaluator<Capture>(){

            public boolean evaluate(Capture capture) {
                ComponentStatusReport statusReport = capture.getStatusReport();
                ProcessGroupStatus status = statusReport.getProcessGroupStatus(processGroupId);
                if (status == null) {
                    return true;
                }
                history.setComponentDetail("Name", status.getName());
                StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
                snapshot.setTimestamp(capture.getCaptureDate());
                for (ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) {
                    snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue((Object)status));
                }
                history.addStatusSnapshot(snapshot);
                return true;
            }
        });
        return history;
    }

    public StatusHistory getRemoteProcessGroupStatusHistory(final String remoteGroupId, Date start, Date end, int preferredDataPoints) {
        final StandardStatusHistory history = new StandardStatusHistory();
        history.setComponentDetail("Id", remoteGroupId);
        this.captures.forEach((RingBuffer.ForEachEvaluator)new RingBuffer.ForEachEvaluator<Capture>(){

            public boolean evaluate(Capture capture) {
                ComponentStatusReport statusReport = capture.getStatusReport();
                RemoteProcessGroupStatus status = statusReport.getRemoteProcessGroupStatus(remoteGroupId);
                if (status == null) {
                    return true;
                }
                history.setComponentDetail("Group Id", status.getGroupId());
                history.setComponentDetail("Name", status.getName());
                history.setComponentDetail("Uri", status.getTargetUri());
                StandardStatusSnapshot snapshot = new StandardStatusSnapshot();
                snapshot.setTimestamp(capture.getCaptureDate());
                for (RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) {
                    snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue((Object)status));
                }
                history.addStatusSnapshot(snapshot);
                return true;
            }
        });
        return history;
    }

    private static long calculateTaskMillis(ProcessGroupStatus status) {
        long nanos = 0L;
        for (ProcessorStatus procStatus : status.getProcessorStatus()) {
            nanos += procStatus.getProcessingNanos();
        }
        for (ProcessGroupStatus childStatus : status.getProcessGroupStatus()) {
            nanos += VolatileComponentStatusRepository.calculateTaskMillis(childStatus);
        }
        return TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS);
    }

    public List<MetricDescriptor<ConnectionStatus>> getConnectionMetricDescriptors() {
        return CONNECTION_METRIC_DESCRIPTORS;
    }

    public List<MetricDescriptor<ProcessGroupStatus>> getProcessGroupMetricDescriptors() {
        return PROCESS_GROUP_METRIC_DESCRIPTORS;
    }

    public List<MetricDescriptor<RemoteProcessGroupStatus>> getRemoteProcessGroupMetricDescriptors() {
        return REMOTE_PROCESS_GROUP_METRIC_DESCRIPTORS;
    }

    public List<MetricDescriptor<ProcessorStatus>> getProcessorMetricDescriptors() {
        return PROCESSOR_METRIC_DESCRIPTORS;
    }

    static {
        ArrayList<MetricDescriptor<ProcessorStatus>> procFields = new ArrayList<MetricDescriptor<ProcessorStatus>>();
        for (ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) {
            procFields.add(descriptor.getDescriptor());
        }
        PROCESSOR_METRIC_DESCRIPTORS = Collections.unmodifiableList(procFields);
        ArrayList<MetricDescriptor<ConnectionStatus>> connFields = new ArrayList<MetricDescriptor<ConnectionStatus>>();
        for (ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) {
            connFields.add(descriptor.getDescriptor());
        }
        CONNECTION_METRIC_DESCRIPTORS = Collections.unmodifiableList(connFields);
        ArrayList<MetricDescriptor<ProcessGroupStatus>> groupFields = new ArrayList<MetricDescriptor<ProcessGroupStatus>>();
        for (ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) {
            groupFields.add(descriptor.getDescriptor());
        }
        PROCESS_GROUP_METRIC_DESCRIPTORS = Collections.unmodifiableList(groupFields);
        ArrayList<MetricDescriptor<RemoteProcessGroupStatus>> remoteGroupFields = new ArrayList<MetricDescriptor<RemoteProcessGroupStatus>>();
        for (RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) {
            remoteGroupFields.add(descriptor.getDescriptor());
        }
        REMOTE_PROCESS_GROUP_METRIC_DESCRIPTORS = Collections.unmodifiableList(remoteGroupFields);
    }

    public static enum ProcessorStatusDescriptor {
        BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>("bytesRead", "Bytes Read (5 mins)", "The total number of bytes read from the Content Repository by this Processor in the past 5 minutes", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>(){

            public Long getValue(ProcessorStatus status) {
                return status.getBytesRead();
            }
        })),
        BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>("bytesWritten", "Bytes Written (5 mins)", "The total number of bytes written to the Content Repository by this Processor in the past 5 minutes", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>(){

            public Long getValue(ProcessorStatus status) {
                return status.getBytesWritten();
            }
        })),
        BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>("bytesTransferred", "Bytes Transferred (5 mins)", "The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>(){

            public Long getValue(ProcessorStatus status) {
                return status.getBytesRead() + status.getBytesWritten();
            }
        })),
        INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("inputBytes", "Bytes In (5 mins)", "The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>(){

            public Long getValue(ProcessorStatus status) {
                return status.getInputBytes();
            }
        })),
        INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("inputCount", "FlowFiles In (5 mins)", "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes", MetricDescriptor.Formatter.COUNT, new ValueMapper<ProcessorStatus>(){

            public Long getValue(ProcessorStatus status) {
                return status.getInputCount();
            }
        })),
        OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("outputBytes", "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>(){

            public Long getValue(ProcessorStatus status) {
                return status.getOutputBytes();
            }
        })),
        OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("outputCount", "FlowFiles Out (5 mins)", "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", MetricDescriptor.Formatter.COUNT, new ValueMapper<ProcessorStatus>(){

            public Long getValue(ProcessorStatus status) {
                return status.getOutputCount();
            }
        })),
        TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>("taskCount", "Tasks (5 mins)", "The number of tasks that this Processor has completed in the past 5 minutes", MetricDescriptor.Formatter.COUNT, new ValueMapper<ProcessorStatus>(){

            public Long getValue(ProcessorStatus status) {
                return status.getInvocations();
            }
        })),
        TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>("taskMillis", "Total Task Duration (5 mins)", "The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes", MetricDescriptor.Formatter.DURATION, new ValueMapper<ProcessorStatus>(){

            public Long getValue(ProcessorStatus status) {
                return TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS);
            }
        })),
        FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>("flowFilesRemoved", "FlowFiles Removed (5 mins)", "The total number of FlowFiles removed by this Processor in the last 5 minutes", MetricDescriptor.Formatter.COUNT, new ValueMapper<ProcessorStatus>(){

            public Long getValue(ProcessorStatus status) {
                return status.getFlowFilesRemoved();
            }
        })),
        AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<ProcessorStatus>("averageLineageDuration", "Average Lineage Duration (5 mins)", "The average amount of time that a FlowFile took to process (from receipt until this Processor finished processing it) in the past 5 minutes.", MetricDescriptor.Formatter.DURATION, new ValueMapper<ProcessorStatus>(){

            public Long getValue(ProcessorStatus status) {
                return status.getAverageLineageDuration(TimeUnit.MILLISECONDS);
            }
        }, new ValueReducer<StatusSnapshot, Long>(){

            public Long reduce(List<StatusSnapshot> values) {
                long millis = 0L;
                int count = 0;
                for (StatusSnapshot snapshot : values) {
                    long removed = (Long)snapshot.getStatusMetrics().get(FLOWFILES_REMOVED.getDescriptor());
                    count = (int)((long)count + removed);
                    count = (int)((long)count + (Long)snapshot.getStatusMetrics().get(OUTPUT_COUNT.getDescriptor()));
                    long avgMillis = (Long)snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor());
                    long totalMillis = avgMillis * removed;
                    millis += totalMillis;
                }
                return count == 0 ? 0L : millis / (long)count;
            }
        })),
        AVERAGE_TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>("averageTaskMillis", "Average Task Duration", "The average duration it took this Processor to complete a task, as averaged over the past 5 minutes", MetricDescriptor.Formatter.DURATION, new ValueMapper<ProcessorStatus>(){

            public Long getValue(ProcessorStatus status) {
                return status.getInvocations() == 0 ? 0L : TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS) / (long)status.getInvocations();
            }
        }, new ValueReducer<StatusSnapshot, Long>(){

            public Long reduce(List<StatusSnapshot> values) {
                long procMillis = 0L;
                int invocations = 0;
                for (StatusSnapshot snapshot : values) {
                    procMillis += ((Long)snapshot.getStatusMetrics().get(TASK_MILLIS.getDescriptor())).longValue();
                    invocations += ((Long)snapshot.getStatusMetrics().get(TASK_COUNT.getDescriptor())).intValue();
                }
                if (invocations == 0) {
                    return 0L;
                }
                return procMillis / (long)invocations;
            }
        }));

        private MetricDescriptor<ProcessorStatus> descriptor;

        private ProcessorStatusDescriptor(MetricDescriptor<ProcessorStatus> descriptor) {
            this.descriptor = descriptor;
        }

        public String getField() {
            return this.descriptor.getField();
        }

        public MetricDescriptor<ProcessorStatus> getDescriptor() {
            return this.descriptor;
        }
    }

    public static enum ConnectionStatusDescriptor {
        INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("inputBytes", "Bytes In (5 mins)", "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>(){

            public Long getValue(ConnectionStatus status) {
                return status.getInputBytes();
            }
        })),
        INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("inputCount", "FlowFiles In (5 mins)", "The number of FlowFiles that were transferred to this Connection in the past 5 minutes", MetricDescriptor.Formatter.COUNT, new ValueMapper<ConnectionStatus>(){

            public Long getValue(ConnectionStatus status) {
                return status.getInputCount();
            }
        })),
        OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("outputBytes", "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>(){

            public Long getValue(ConnectionStatus status) {
                return status.getOutputBytes();
            }
        })),
        OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("outputCount", "FlowFiles Out (5 mins)", "The number of FlowFiles that were pulled from this Connection in the past 5 minutes", MetricDescriptor.Formatter.COUNT, new ValueMapper<ConnectionStatus>(){

            public Long getValue(ConnectionStatus status) {
                return status.getOutputCount();
            }
        })),
        QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>("queuedBytes", "Queued Bytes", "The number of Bytes queued in this Connection", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>(){

            public Long getValue(ConnectionStatus status) {
                return status.getQueuedBytes();
            }
        })),
        QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>("queuedCount", "Queued Count", "The number of FlowFiles queued in this Connection", MetricDescriptor.Formatter.COUNT, new ValueMapper<ConnectionStatus>(){

            public Long getValue(ConnectionStatus status) {
                return status.getQueuedCount();
            }
        }));

        private MetricDescriptor<ConnectionStatus> descriptor;

        private ConnectionStatusDescriptor(MetricDescriptor<ConnectionStatus> descriptor) {
            this.descriptor = descriptor;
        }

        public String getField() {
            return this.descriptor.getField();
        }

        public MetricDescriptor<ConnectionStatus> getDescriptor() {
            return this.descriptor;
        }
    }

    public static enum ProcessGroupStatusDescriptor {
        BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 mins)", "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>(){

            public Long getValue(ProcessGroupStatus status) {
                return status.getBytesRead();
            }
        })),
        BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 mins)", "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>(){

            public Long getValue(ProcessGroupStatus status) {
                return status.getBytesWritten();
            }
        })),
        BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes Transferred (5 mins)", "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>(){

            public Long getValue(ProcessGroupStatus status) {
                return status.getBytesRead() + status.getBytesWritten();
            }
        })),
        INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)", "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>(){

            public Long getValue(ProcessGroupStatus status) {
                return status.getInputContentSize();
            }
        })),
        INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 mins)", "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", MetricDescriptor.Formatter.COUNT, new ValueMapper<ProcessGroupStatus>(){

            public Long getValue(ProcessGroupStatus status) {
                return status.getInputCount().longValue();
            }
        })),
        OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>(){

            public Long getValue(ProcessGroupStatus status) {
                return status.getOutputContentSize();
            }
        })),
        OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 mins)", "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", MetricDescriptor.Formatter.COUNT, new ValueMapper<ProcessGroupStatus>(){

            public Long getValue(ProcessGroupStatus status) {
                return status.getOutputCount().longValue();
            }
        })),
        QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes", "The cumulative size of all FlowFiles queued in all Connections of this Process Group", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>(){

            public Long getValue(ProcessGroupStatus status) {
                return status.getQueuedContentSize();
            }
        })),
        QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count", "The number of FlowFiles queued in all Connections of this Process Group", MetricDescriptor.Formatter.COUNT, new ValueMapper<ProcessGroupStatus>(){

            public Long getValue(ProcessGroupStatus status) {
                return status.getQueuedCount().longValue();
            }
        })),
        TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration (5 mins)", "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes", MetricDescriptor.Formatter.DURATION, new ValueMapper<ProcessGroupStatus>(){

            public Long getValue(ProcessGroupStatus status) {
                return VolatileComponentStatusRepository.calculateTaskMillis(status);
            }
        }));

        private MetricDescriptor<ProcessGroupStatus> descriptor;

        private ProcessGroupStatusDescriptor(MetricDescriptor<ProcessGroupStatus> descriptor) {
            this.descriptor = descriptor;
        }

        public String getField() {
            return this.descriptor.getField();
        }

        public MetricDescriptor<ProcessGroupStatus> getDescriptor() {
            return this.descriptor;
        }
    }

    public static enum RemoteProcessGroupStatusDescriptor {
        SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 mins)", "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>(){

            public Long getValue(RemoteProcessGroupStatus status) {
                return status.getSentContentSize();
            }
        })),
        SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent (5 mins)", "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes", MetricDescriptor.Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>(){

            public Long getValue(RemoteProcessGroupStatus status) {
                return status.getSentCount().longValue();
            }
        })),
        RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes Received (5 mins)", "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>(){

            public Long getValue(RemoteProcessGroupStatus status) {
                return status.getReceivedContentSize();
            }
        })),
        RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles Received (5 mins)", "The number of FlowFiles that have been received from the remote system in the past 5 minutes", MetricDescriptor.Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>(){

            public Long getValue(RemoteProcessGroupStatus status) {
                return status.getReceivedCount().longValue();
            }
        })),
        RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", "Received Bytes Per Second", "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>(){

            public Long getValue(RemoteProcessGroupStatus status) {
                return status.getReceivedContentSize() / 300L;
            }
        })),
        SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent Bytes Per Second", "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>(){

            public Long getValue(RemoteProcessGroupStatus status) {
                return status.getSentContentSize() / 300L;
            }
        })),
        TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", "Total Bytes Per Second", "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second", MetricDescriptor.Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>(){

            public Long getValue(RemoteProcessGroupStatus status) {
                return (status.getReceivedContentSize() + status.getSentContentSize()) / 300L;
            }
        })),
        AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<RemoteProcessGroupStatus>("averageLineageDuration", "Average Lineage Duration (5 mins)", "The average amount of time that a FlowFile took to process from receipt to drop in the past 5 minutes. For Processors that do not terminate FlowFiles, this value will be 0.", MetricDescriptor.Formatter.DURATION, new ValueMapper<RemoteProcessGroupStatus>(){

            public Long getValue(RemoteProcessGroupStatus status) {
                return status.getAverageLineageDuration(TimeUnit.MILLISECONDS);
            }
        }, new ValueReducer<StatusSnapshot, Long>(){

            public Long reduce(List<StatusSnapshot> values) {
                long millis = 0L;
                int count = 0;
                for (StatusSnapshot snapshot : values) {
                    long sent = (Long)snapshot.getStatusMetrics().get(SENT_COUNT.getDescriptor());
                    count = (int)((long)count + sent);
                    long avgMillis = (Long)snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor());
                    long totalMillis = avgMillis * sent;
                    millis += totalMillis;
                }
                return count == 0 ? 0L : millis / (long)count;
            }
        }));

        private final MetricDescriptor<RemoteProcessGroupStatus> descriptor;

        private RemoteProcessGroupStatusDescriptor(MetricDescriptor<RemoteProcessGroupStatus> descriptor) {
            this.descriptor = descriptor;
        }

        public String getField() {
            return this.descriptor.getField();
        }

        public MetricDescriptor<RemoteProcessGroupStatus> getDescriptor() {
            return this.descriptor;
        }
    }

    private static class Capture {
        private final Date captureDate;
        private final ComponentStatusReport statusReport;

        public Capture(Date date, ComponentStatusReport statusReport) {
            this.captureDate = date;
            this.statusReport = statusReport;
        }

        public Date getCaptureDate() {
            return this.captureDate;
        }

        public ComponentStatusReport getStatusReport() {
            return this.statusReport;
        }
    }
}

