/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.status.analytics;

import com.google.common.primitives.Doubles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.status.analytics.QueryWindow;
import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsModel;
import org.apache.nifi.controller.status.analytics.StatusMetricExtractFunction;
import org.apache.nifi.controller.status.history.StatusHistory;
import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionStatusAnalytics
implements StatusAnalytics {
    private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalytics.class);
    private final Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap;
    private QueryWindow queryWindow;
    private final StatusHistoryRepository statusHistoryRepository;
    private final String connectionIdentifier;
    private final FlowManager flowManager;
    private final Boolean supportOnlineLearning;
    private Boolean extendWindow = false;
    private long intervalMillis = 180000L;
    private long queryIntervalMillis = 300000L;
    private String scoreName = "rSquared";
    private double scoreThreshold = 0.9;
    private Map<String, Long> predictions;
    private static String TIME_TO_BYTE_BACKPRESSURE_MILLIS = "timeToBytesBackpressureMillis";
    private static String TIME_TO_COUNT_BACKPRESSURE_MILLIS = "timeToCountBackpressureMillis";
    private static String NEXT_INTERVAL_BYTES = "nextIntervalBytes";
    private static String NEXT_INTERVAL_COUNT = "nextIntervalCount";
    private static String NEXT_INTERVAL_PERCENTAGE_USE_COUNT = "nextIntervalPercentageUseCount";
    private static String NEXT_INTERVAL_PERCENTAGE_USE_BYTES = "nextIntervalPercentageUseBytes";
    private static String INTERVAL_TIME_MILLIS = "intervalTimeMillis";

    public ConnectionStatusAnalytics(StatusHistoryRepository statusHistoryRepository, FlowManager flowManager, Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, String connectionIdentifier, Boolean supportOnlineLearning) {
        this.statusHistoryRepository = statusHistoryRepository;
        this.flowManager = flowManager;
        this.modelMap = modelMap;
        this.connectionIdentifier = connectionIdentifier;
        this.supportOnlineLearning = supportOnlineLearning;
        this.predictions = this.initPredictions();
    }

    public void refresh() {
        this.queryWindow = this.supportOnlineLearning != false && this.queryWindow != null ? new QueryWindow(this.extendWindow != false ? this.queryWindow.getStartTimeMillis() : this.queryWindow.getEndTimeMillis(), System.currentTimeMillis()) : new QueryWindow(System.currentTimeMillis() - this.getQueryIntervalMillis(), System.currentTimeMillis());
        this.modelMap.forEach((metric, modelFunction) -> {
            StatusAnalyticsModel model = (StatusAnalyticsModel)modelFunction.getKey();
            StatusMetricExtractFunction extract = (StatusMetricExtractFunction)modelFunction.getValue();
            StatusHistory statusHistory = this.statusHistoryRepository.getConnectionStatusHistory(this.connectionIdentifier, this.queryWindow.getStartDateTime(), this.queryWindow.getEndDateTime(), Integer.MAX_VALUE);
            Tuple<Stream<Double[]>, Stream<Double>> modelData = extract.extractMetric((String)metric, statusHistory);
            Double[][] features = (Double[][])((Stream)modelData.getKey()).toArray(size -> new Double[size][1]);
            Double[] values = (Double[])((Stream)modelData.getValue()).toArray(Double[]::new);
            if (ArrayUtils.isNotEmpty((Object[])features)) {
                try {
                    LOG.debug("Refreshing model with new data for connection id: {} ", (Object)this.connectionIdentifier);
                    model.learn(Stream.of(features), Stream.of(values));
                    if (LOG.isDebugEnabled() && MapUtils.isNotEmpty((Map)model.getScores())) {
                        model.getScores().forEach((key, value) -> LOG.debug("Model Scores for prediction metric {} for connection id {}: {}={} ", new Object[]{metric, this.connectionIdentifier, key, value}));
                    }
                    this.extendWindow = false;
                }
                catch (Exception ex) {
                    LOG.debug("Exception encountered while training model for connection id {}: {}", (Object)this.connectionIdentifier, (Object)ex.getMessage());
                    this.extendWindow = true;
                }
            } else {
                this.extendWindow = true;
            }
        });
    }

    protected StatusAnalyticsModel getModel(String modelType) {
        if (this.modelMap.containsKey(modelType)) {
            return (StatusAnalyticsModel)this.modelMap.get(modelType).getKey();
        }
        throw new IllegalArgumentException("Model cannot be found for provided type: " + modelType);
    }

    Long getTimeToBytesBackpressureMillis(Connection connection, FlowFileEvent flowFileEvent) {
        StatusAnalyticsModel bytesModel = this.getModel("queuedBytes");
        String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
        double backPressureBytes = DataUnit.parseDataSize((String)backPressureDataSize, (DataUnit)DataUnit.B);
        if (this.validModel(bytesModel) && flowFileEvent != null) {
            HashMap<Integer, Double> predictFeatures = new HashMap<Integer, Double>();
            Double inOutRatio = (double)flowFileEvent.getContentSizeOut() / (double)flowFileEvent.getContentSizeIn();
            predictFeatures.put(1, inOutRatio);
            return this.convertTimePrediction(bytesModel.predictVariable(Integer.valueOf(0), predictFeatures, Double.valueOf(backPressureBytes)), System.currentTimeMillis());
        }
        LOG.debug("Model is not valid for calculating time back pressure by content size in bytes. Returning -1");
        return -1L;
    }

    Long getTimeToCountBackpressureMillis(Connection connection, FlowFileEvent flowFileEvent) {
        StatusAnalyticsModel countModel = this.getModel("queuedCount");
        double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
        if (this.validModel(countModel) && flowFileEvent != null) {
            HashMap<Integer, Double> predictFeatures = new HashMap<Integer, Double>();
            Double inOutRatio = (double)flowFileEvent.getFlowFilesOut() / (double)flowFileEvent.getFlowFilesIn();
            predictFeatures.put(1, inOutRatio);
            return this.convertTimePrediction(countModel.predictVariable(Integer.valueOf(0), predictFeatures, Double.valueOf(backPressureCountThreshold)), System.currentTimeMillis());
        }
        LOG.debug("Model is not valid for calculating time to back pressure by object count. Returning -1");
        return -1L;
    }

    Long getNextIntervalBytes(FlowFileEvent flowFileEvent) {
        StatusAnalyticsModel bytesModel = this.getModel("queuedBytes");
        if (this.validModel(bytesModel) && flowFileEvent != null) {
            ArrayList<Double> predictFeatures = new ArrayList<Double>();
            Long nextInterval = System.currentTimeMillis() + this.getIntervalTimeMillis();
            Double inOutRatio = (double)flowFileEvent.getContentSizeOut() / (double)flowFileEvent.getContentSizeIn();
            predictFeatures.add(nextInterval.doubleValue());
            predictFeatures.add(inOutRatio);
            return this.convertCountPrediction(bytesModel.predict(predictFeatures.toArray(new Double[2])));
        }
        LOG.debug("Model is not valid for predicting content size in bytes for next interval. Returning -1");
        return -1L;
    }

    Long getNextIntervalCount(FlowFileEvent flowFileEvent) {
        StatusAnalyticsModel countModel = this.getModel("queuedCount");
        if (this.validModel(countModel) && flowFileEvent != null) {
            ArrayList<Double> predictFeatures = new ArrayList<Double>();
            Long nextInterval = System.currentTimeMillis() + this.getIntervalTimeMillis();
            Double inOutRatio = (double)flowFileEvent.getFlowFilesOut() / (double)flowFileEvent.getFlowFilesIn();
            predictFeatures.add(nextInterval.doubleValue());
            predictFeatures.add(inOutRatio);
            return this.convertCountPrediction(countModel.predict(predictFeatures.toArray(new Double[2])));
        }
        LOG.debug("Model is not valid for predicting object count for next interval. Returning -1");
        return -1L;
    }

    Long getNextIntervalPercentageUseCount(Connection connection, FlowFileEvent flowFileEvent) {
        double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
        long nextIntervalCount = this.getNextIntervalCount(flowFileEvent);
        if (nextIntervalCount > -1L) {
            return Math.min(100L, Math.round((double)nextIntervalCount / backPressureCountThreshold * 100.0));
        }
        return -1L;
    }

    Long getNextIntervalPercentageUseBytes(Connection connection, FlowFileEvent flowFileEvent) {
        String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
        double backPressureBytes = DataUnit.parseDataSize((String)backPressureDataSize, (DataUnit)DataUnit.B);
        long nextIntervalBytes = this.getNextIntervalBytes(flowFileEvent);
        if (nextIntervalBytes > -1L) {
            return Math.min(100L, Math.round((double)this.getNextIntervalBytes(flowFileEvent).longValue() / backPressureBytes * 100.0));
        }
        return -1L;
    }

    public Long getIntervalTimeMillis() {
        return this.intervalMillis;
    }

    public void setIntervalTimeMillis(long intervalTimeMillis) {
        this.intervalMillis = intervalTimeMillis;
    }

    public long getQueryIntervalMillis() {
        return this.queryIntervalMillis;
    }

    public void setQueryIntervalMillis(long queryIntervalMillis) {
        this.queryIntervalMillis = queryIntervalMillis;
    }

    public String getScoreName() {
        return this.scoreName;
    }

    public void setScoreName(String scoreName) {
        this.scoreName = scoreName;
    }

    public double getScoreThreshold() {
        return this.scoreThreshold;
    }

    public void setScoreThreshold(double scoreThreshold) {
        this.scoreThreshold = scoreThreshold;
    }

    public QueryWindow getQueryWindow() {
        return this.queryWindow;
    }

    public Map<String, Long> getPredictions() {
        return this.predictions;
    }

    public void loadPredictions(RepositoryStatusReport statusReport) {
        long startTs = System.currentTimeMillis();
        Connection connection = this.flowManager.getConnection(this.connectionIdentifier);
        if (connection == null) {
            throw new NoSuchElementException("Connection with the following id cannot be found:" + this.connectionIdentifier + ". Model should be invalidated!");
        }
        FlowFileEvent flowFileEvent = statusReport.getReportEntry(this.connectionIdentifier);
        this.predictions.put(TIME_TO_BYTE_BACKPRESSURE_MILLIS, this.getTimeToBytesBackpressureMillis(connection, flowFileEvent));
        this.predictions.put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, this.getTimeToCountBackpressureMillis(connection, flowFileEvent));
        this.predictions.put(NEXT_INTERVAL_BYTES, this.getNextIntervalBytes(flowFileEvent));
        this.predictions.put(NEXT_INTERVAL_COUNT, this.getNextIntervalCount(flowFileEvent));
        this.predictions.put(NEXT_INTERVAL_PERCENTAGE_USE_COUNT, this.getNextIntervalPercentageUseCount(connection, flowFileEvent));
        this.predictions.put(NEXT_INTERVAL_PERCENTAGE_USE_BYTES, this.getNextIntervalPercentageUseBytes(connection, flowFileEvent));
        this.predictions.put(INTERVAL_TIME_MILLIS, this.getIntervalTimeMillis());
        long endTs = System.currentTimeMillis();
        LOG.debug("Prediction Calculations for connectionID {}: {}", (Object)this.connectionIdentifier, (Object)(endTs - startTs));
        this.predictions.forEach((key, value) -> LOG.trace("Prediction model for connection id {}: {}={} ", new Object[]{this.connectionIdentifier, key, value}));
    }

    public boolean supportsOnlineLearning() {
        return this.supportOnlineLearning;
    }

    private Map<String, Long> initPredictions() {
        this.predictions = new ConcurrentHashMap<String, Long>();
        this.predictions.put(TIME_TO_BYTE_BACKPRESSURE_MILLIS, -1L);
        this.predictions.put(TIME_TO_COUNT_BACKPRESSURE_MILLIS, -1L);
        this.predictions.put(NEXT_INTERVAL_BYTES, -1L);
        this.predictions.put(NEXT_INTERVAL_COUNT, -1L);
        this.predictions.put(NEXT_INTERVAL_PERCENTAGE_USE_COUNT, -1L);
        this.predictions.put(NEXT_INTERVAL_PERCENTAGE_USE_BYTES, -1L);
        this.predictions.put(INTERVAL_TIME_MILLIS, -1L);
        return this.predictions;
    }

    private Long convertTimePrediction(Double prediction, Long timeMillis) {
        if (Double.isNaN(prediction) || Double.isInfinite(prediction) || prediction < (double)timeMillis.longValue()) {
            LOG.debug("Time prediction value is invalid: {}. Returning -1.", (Object)prediction);
            return -1L;
        }
        return Math.max(0L, Math.round(prediction) - timeMillis);
    }

    private Long convertCountPrediction(Double prediction) {
        if (Double.isNaN(prediction) || Double.isInfinite(prediction)) {
            LOG.debug("Count prediction value is invalid: {}. Returning -1.", (Object)prediction);
            return -1L;
        }
        return Math.max(0L, Math.round(prediction));
    }

    private boolean validModel(StatusAnalyticsModel model) {
        Double score = this.getScore(model);
        if (score == null || Doubles.isFinite((double)score) && !Double.isNaN(score) && score < this.scoreThreshold) {
            if (this.supportOnlineLearning.booleanValue() && model.supportsOnlineLearning().booleanValue()) {
                model.clear();
            }
            return false;
        }
        return true;
    }

    private Double getScore(StatusAnalyticsModel model) {
        if (model != null && model.getScores() != null) {
            return (Double)model.getScores().get(this.scoreName);
        }
        return null;
    }
}

