/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.status.analytics;

import com.google.common.primitives.Doubles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.status.analytics.QueryWindow;
import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsModel;
import org.apache.nifi.controller.status.analytics.StatusMetricExtractFunction;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.controller.status.history.StatusHistory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionStatusAnalytics
implements StatusAnalytics {
    private static final Logger LOG = LoggerFactory.getLogger(ConnectionStatusAnalytics.class);
    private final Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap;
    private QueryWindow queryWindow;
    private final ComponentStatusRepository componentStatusRepository;
    private final FlowFileEventRepository flowFileEventRepository;
    private final String connectionIdentifier;
    private final FlowManager flowManager;
    private final Boolean supportOnlineLearning;
    private Boolean extendWindow = false;
    private long intervalMillis = 180000L;
    private long queryIntervalMillis = 300000L;
    private String scoreName = "rSquared";
    private double scoreThreshold = 0.9;

    public ConnectionStatusAnalytics(ComponentStatusRepository componentStatusRepository, FlowManager flowManager, FlowFileEventRepository flowFileEventRepository, Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, String connectionIdentifier, Boolean supportOnlineLearning) {
        this.componentStatusRepository = componentStatusRepository;
        this.flowManager = flowManager;
        this.flowFileEventRepository = flowFileEventRepository;
        this.modelMap = modelMap;
        this.connectionIdentifier = connectionIdentifier;
        this.supportOnlineLearning = supportOnlineLearning;
    }

    public void refresh() {
        this.queryWindow = this.supportOnlineLearning != false && this.queryWindow != null ? new QueryWindow(this.extendWindow != false ? this.queryWindow.getStartTimeMillis() : this.queryWindow.getEndTimeMillis(), System.currentTimeMillis()) : new QueryWindow(System.currentTimeMillis() - this.getQueryIntervalMillis(), System.currentTimeMillis());
        this.modelMap.forEach((metric, modelFunction) -> {
            StatusAnalyticsModel model = (StatusAnalyticsModel)modelFunction.getKey();
            StatusMetricExtractFunction extract = (StatusMetricExtractFunction)modelFunction.getValue();
            StatusHistory statusHistory = this.componentStatusRepository.getConnectionStatusHistory(this.connectionIdentifier, this.queryWindow.getStartDateTime(), this.queryWindow.getEndDateTime(), Integer.MAX_VALUE);
            Tuple<Stream<Double[]>, Stream<Double>> modelData = extract.extractMetric((String)metric, statusHistory);
            Double[][] features = (Double[][])((Stream)modelData.getKey()).toArray(size -> new Double[size][1]);
            Double[] values = (Double[])((Stream)modelData.getValue()).toArray(Double[]::new);
            if (ArrayUtils.isNotEmpty((Object[])features)) {
                try {
                    LOG.debug("Refreshing model with new data for connection id: {} ", (Object)this.connectionIdentifier);
                    model.learn(Stream.of(features), Stream.of(values));
                    if (LOG.isDebugEnabled() && MapUtils.isNotEmpty((Map)model.getScores())) {
                        model.getScores().forEach((key, value) -> LOG.debug("Model Scores for prediction metric {} for connection id {}: {}={} ", new Object[]{metric, this.connectionIdentifier, key, value}));
                    }
                    this.extendWindow = false;
                }
                catch (Exception ex) {
                    LOG.debug("Exception encountered while training model for connection id {}: {}", (Object)this.connectionIdentifier, (Object)ex.getMessage());
                    this.extendWindow = true;
                }
            } else {
                this.extendWindow = true;
            }
        });
    }

    protected StatusAnalyticsModel getModel(String modelType) {
        if (this.modelMap.containsKey(modelType)) {
            return (StatusAnalyticsModel)this.modelMap.get(modelType).getKey();
        }
        throw new IllegalArgumentException("Model cannot be found for provided type: " + modelType);
    }

    public Long getTimeToBytesBackpressureMillis() {
        StatusAnalyticsModel bytesModel = this.getModel("queuedBytes");
        FlowFileEvent flowFileEvent = this.getStatusReport();
        Connection connection = this.getConnection();
        if (connection == null) {
            throw new NoSuchElementException("Connection with the following id cannot be found:" + this.connectionIdentifier + ". Model should be invalidated!");
        }
        String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
        double backPressureBytes = DataUnit.parseDataSize((String)backPressureDataSize, (DataUnit)DataUnit.B);
        if (this.validModel(bytesModel) && flowFileEvent != null) {
            HashMap<Integer, Double> predictFeatures = new HashMap<Integer, Double>();
            Double inOutRatio = (double)flowFileEvent.getContentSizeOut() / (double)flowFileEvent.getContentSizeIn();
            predictFeatures.put(1, inOutRatio);
            return this.convertTimePrediction(bytesModel.predictVariable(Integer.valueOf(0), predictFeatures, Double.valueOf(backPressureBytes)), System.currentTimeMillis());
        }
        LOG.debug("Model is not valid for calculating time back pressure by content size in bytes. Returning -1");
        return -1L;
    }

    public Long getTimeToCountBackpressureMillis() {
        StatusAnalyticsModel countModel = this.getModel("queuedCount");
        FlowFileEvent flowFileEvent = this.getStatusReport();
        Connection connection = this.getConnection();
        if (connection == null) {
            throw new NoSuchElementException("Connection with the following id cannot be found:" + this.connectionIdentifier + ". Model should be invalidated!");
        }
        double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
        if (this.validModel(countModel) && flowFileEvent != null) {
            HashMap<Integer, Double> predictFeatures = new HashMap<Integer, Double>();
            Double inOutRatio = (double)flowFileEvent.getFlowFilesOut() / (double)flowFileEvent.getFlowFilesIn();
            predictFeatures.put(1, inOutRatio);
            return this.convertTimePrediction(countModel.predictVariable(Integer.valueOf(0), predictFeatures, Double.valueOf(backPressureCountThreshold)), System.currentTimeMillis());
        }
        LOG.debug("Model is not valid for calculating time to back pressure by object count. Returning -1");
        return -1L;
    }

    public Long getNextIntervalBytes() {
        StatusAnalyticsModel bytesModel = this.getModel("queuedBytes");
        FlowFileEvent flowFileEvent = this.getStatusReport();
        if (this.validModel(bytesModel) && flowFileEvent != null) {
            ArrayList<Double> predictFeatures = new ArrayList<Double>();
            Long nextInterval = System.currentTimeMillis() + this.getIntervalTimeMillis();
            Double inOutRatio = (double)flowFileEvent.getContentSizeOut() / (double)flowFileEvent.getContentSizeIn();
            predictFeatures.add(nextInterval.doubleValue());
            predictFeatures.add(inOutRatio);
            return this.convertCountPrediction(bytesModel.predict(predictFeatures.toArray(new Double[2])));
        }
        LOG.debug("Model is not valid for predicting content size in bytes for next interval. Returning -1");
        return -1L;
    }

    public Long getNextIntervalCount() {
        StatusAnalyticsModel countModel = this.getModel("queuedCount");
        FlowFileEvent flowFileEvent = this.getStatusReport();
        if (this.validModel(countModel) && flowFileEvent != null) {
            ArrayList<Double> predictFeatures = new ArrayList<Double>();
            Long nextInterval = System.currentTimeMillis() + this.getIntervalTimeMillis();
            Double inOutRatio = (double)flowFileEvent.getFlowFilesOut() / (double)flowFileEvent.getFlowFilesIn();
            predictFeatures.add(nextInterval.doubleValue());
            predictFeatures.add(inOutRatio);
            return this.convertCountPrediction(countModel.predict(predictFeatures.toArray(new Double[2])));
        }
        LOG.debug("Model is not valid for predicting object count for next interval. Returning -1");
        return -1L;
    }

    public Long getNextIntervalPercentageUseCount() {
        Connection connection = this.getConnection();
        if (connection == null) {
            throw new NoSuchElementException("Connection with the following id cannot be found:" + this.connectionIdentifier + ". Model should be invalidated!");
        }
        double backPressureCountThreshold = connection.getFlowFileQueue().getBackPressureObjectThreshold();
        long nextIntervalCount = this.getNextIntervalCount();
        if (nextIntervalCount > -1L) {
            return Math.min(100L, Math.round((double)nextIntervalCount / backPressureCountThreshold * 100.0));
        }
        return -1L;
    }

    public Long getNextIntervalPercentageUseBytes() {
        Connection connection = this.getConnection();
        if (connection == null) {
            throw new NoSuchElementException("Connection with the following id cannot be found:" + this.connectionIdentifier + ". Model should be invalidated!");
        }
        String backPressureDataSize = connection.getFlowFileQueue().getBackPressureDataSizeThreshold();
        double backPressureBytes = DataUnit.parseDataSize((String)backPressureDataSize, (DataUnit)DataUnit.B);
        long nextIntervalBytes = this.getNextIntervalBytes();
        if (nextIntervalBytes > -1L) {
            return Math.min(100L, Math.round((double)this.getNextIntervalBytes().longValue() / backPressureBytes * 100.0));
        }
        return -1L;
    }

    public Long getIntervalTimeMillis() {
        return this.intervalMillis;
    }

    public void setIntervalTimeMillis(long intervalTimeMillis) {
        this.intervalMillis = intervalTimeMillis;
    }

    public long getQueryIntervalMillis() {
        return this.queryIntervalMillis;
    }

    public void setQueryIntervalMillis(long queryIntervalMillis) {
        this.queryIntervalMillis = queryIntervalMillis;
    }

    public String getScoreName() {
        return this.scoreName;
    }

    public void setScoreName(String scoreName) {
        this.scoreName = scoreName;
    }

    public double getScoreThreshold() {
        return this.scoreThreshold;
    }

    public void setScoreThreshold(double scoreThreshold) {
        this.scoreThreshold = scoreThreshold;
    }

    public QueryWindow getQueryWindow() {
        return this.queryWindow;
    }

    public Map<String, Long> getPredictions() {
        HashMap<String, Long> predictions = new HashMap<String, Long>();
        predictions.put("timeToBytesBackpressureMillis", this.getTimeToBytesBackpressureMillis());
        predictions.put("timeToCountBackpressureMillis", this.getTimeToCountBackpressureMillis());
        predictions.put("nextIntervalBytes", this.getNextIntervalBytes());
        predictions.put("nextIntervalCount", this.getNextIntervalCount());
        predictions.put("nextIntervalPercentageUseCount", this.getNextIntervalPercentageUseCount());
        predictions.put("nextIntervalPercentageUseBytes", this.getNextIntervalPercentageUseBytes());
        predictions.put("intervalTimeMillis", this.getIntervalTimeMillis());
        predictions.forEach((key, value) -> LOG.debug("Prediction model for connection id {}: {}={} ", new Object[]{this.connectionIdentifier, key, value}));
        return predictions;
    }

    public boolean supportsOnlineLearning() {
        return this.supportOnlineLearning;
    }

    private Connection getConnection() {
        ProcessGroup rootGroup = this.flowManager.getRootGroup();
        Optional<Connection> connection = rootGroup.findAllConnections().stream().filter(c -> c.getIdentifier().equals(this.connectionIdentifier)).findFirst();
        return connection.orElse(null);
    }

    private FlowFileEvent getStatusReport() {
        RepositoryStatusReport statusReport = this.flowFileEventRepository.reportTransferEvents(System.currentTimeMillis());
        return statusReport.getReportEntry(this.connectionIdentifier);
    }

    private Long convertTimePrediction(Double prediction, Long timeMillis) {
        if (Double.isNaN(prediction) || Double.isInfinite(prediction) || prediction < (double)timeMillis.longValue()) {
            LOG.debug("Time prediction value is invalid: {}. Returning -1.", (Object)prediction);
            return -1L;
        }
        return Math.max(0L, Math.round(prediction) - timeMillis);
    }

    private Long convertCountPrediction(Double prediction) {
        if (Double.isNaN(prediction) || Double.isInfinite(prediction)) {
            LOG.debug("Count prediction value is invalid: {}. Returning -1.", (Object)prediction);
            return -1L;
        }
        return Math.max(0L, Math.round(prediction));
    }

    private boolean validModel(StatusAnalyticsModel model) {
        Double score = this.getScore(model);
        if (score == null || Doubles.isFinite((double)score) && !Double.isNaN(score) && score < this.scoreThreshold) {
            if (this.supportOnlineLearning.booleanValue() && model.supportsOnlineLearning().booleanValue()) {
                model.clear();
            }
            return false;
        }
        return true;
    }

    private Double getScore(StatusAnalyticsModel model) {
        if (model != null && model.getScores() != null) {
            return (Double)model.getScores().get(this.scoreName);
        }
        return null;
    }
}

