/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.apps.bundles.insights.workflows.costAnalysis;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.analytics.DataAssetMetrics;
import org.openmetadata.schema.analytics.RawCostAnalysisReportData;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.api.services.CreateDatabaseService;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.services.DatabaseService;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.schema.type.LifeCycle;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.insights.DataInsightsApp;
import org.openmetadata.service.apps.bundles.insights.processors.CreateReportDataProcessor;
import org.openmetadata.service.apps.bundles.insights.sinks.ReportDataSink;
import org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils;
import org.openmetadata.service.apps.bundles.insights.workflows.WorkflowStats;
import org.openmetadata.service.apps.bundles.insights.workflows.costAnalysis.processors.AggregatedCostAnalysisReportDataAggregator;
import org.openmetadata.service.apps.bundles.insights.workflows.costAnalysis.processors.AggregatedCostAnalysisReportDataProcessor;
import org.openmetadata.service.apps.bundles.insights.workflows.costAnalysis.processors.DatabaseServiceTablesProcessor;
import org.openmetadata.service.apps.bundles.insights.workflows.costAnalysis.processors.RawCostAnalysisReportDataProcessor;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.ReportDataRepository;
import org.openmetadata.service.jdbi3.TableRepository;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CostAnalysisWorkflow {
    private static final Logger LOG = LoggerFactory.getLogger(CostAnalysisWorkflow.class);
    public static final String AGGREGATED_COST_ANALYSIS_DATA_MAP_KEY = "aggregatedCostAnalysisDataMap";
    private final int batchSize;
    private final Long startTimestamp;
    private final Long endTimestamp;
    private final int retentionDays = 30;
    private final List<PaginatedEntitiesSource> sources = new ArrayList<PaginatedEntitiesSource>();
    private DatabaseServiceTablesProcessor databaseServiceTablesProcessor;
    private RawCostAnalysisReportDataProcessor rawCostAnalysisReportDataProcessor;
    private AggregatedCostAnalysisReportDataProcessor aggregatedCostAnalysisReportDataProcessor;
    private final WorkflowStats workflowStats = new WorkflowStats("CostAnalysisWorkflow");

    public CostAnalysisWorkflow(Long timestamp, int batchSize, Optional<DataInsightsApp.Backfill> backfill) {
        this.endTimestamp = TimestampUtils.getEndOfDayTimestamp(TimestampUtils.subtractDays(timestamp, 1));
        this.startTimestamp = TimestampUtils.getStartOfDayTimestamp(this.endTimestamp);
        this.batchSize = batchSize;
    }

    private void initialize() throws SearchIndexException {
        PaginatedEntitiesSource databaseServices = new PaginatedEntitiesSource("databaseService", this.batchSize, List.of("*"));
        int total = 0;
        while (!databaseServices.isDone()) {
            ResultList<DatabaseService> resultList = this.filterDatabaseServices((ResultList<? extends EntityInterface>)databaseServices.readNext((Map)null));
            if (resultList.getData().isEmpty()) continue;
            for (DatabaseService databaseService : resultList.getData()) {
                ListFilter filter = new ListFilter(null);
                filter.addQueryParam("database", databaseService.getFullyQualifiedName());
                this.sources.add(new PaginatedEntitiesSource("table", this.batchSize, List.of("*"), filter).withName(String.format("[CostAnalysisWorkflow] %s", databaseService.getFullyQualifiedName())));
                total += ((TableRepository)Entity.getEntityRepository("table")).getDao().listCount(filter);
            }
        }
        this.databaseServiceTablesProcessor = new DatabaseServiceTablesProcessor(total);
        this.rawCostAnalysisReportDataProcessor = new RawCostAnalysisReportDataProcessor(total);
        this.aggregatedCostAnalysisReportDataProcessor = new AggregatedCostAnalysisReportDataProcessor(total);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process() throws SearchIndexException {
        this.initialize();
        HashMap<String, Object> contextData = new HashMap<String, Object>();
        Long pointerTimestamp = TimestampUtils.getStartOfDayTimestamp(this.endTimestamp);
        while (pointerTimestamp >= this.startTimestamp) {
            this.deleteReportDataRecordsAtDate(pointerTimestamp, ReportData.ReportDataType.AGGREGATED_COST_ANALYSIS_REPORT_DATA);
            pointerTimestamp = TimestampUtils.subtractDays(pointerTimestamp, 1);
        }
        this.deleteReportDataRecords(ReportData.ReportDataType.RAW_COST_ANALYSIS_REPORT_DATA);
        for (PaginatedEntitiesSource source : this.sources) {
            ArrayList<RawCostAnalysisReportData> rawCostAnalysisReportDataList = new ArrayList<RawCostAnalysisReportData>();
            HashMap<String, Map<String, Map<String, AggregatedCostAnalysisData>>> aggregatedCostAnalysisDataMap = new HashMap<String, Map<String, Map<String, AggregatedCostAnalysisData>>>();
            contextData.put("@timestamp", this.startTimestamp);
            contextData.put(AGGREGATED_COST_ANALYSIS_DATA_MAP_KEY, aggregatedCostAnalysisDataMap);
            Optional<Object> initialProcessorError = Optional.empty();
            while (!source.isDone()) {
                try {
                    Object resultList = source.readNext((Map)null);
                    List<CostAnalysisTableData> costAnalysisTableData = this.databaseServiceTablesProcessor.process((ResultList<? extends EntityInterface>)resultList, (Map<String, Object>)contextData);
                    rawCostAnalysisReportDataList.addAll(this.rawCostAnalysisReportDataProcessor.process(costAnalysisTableData, (Map<String, Object>)contextData));
                    this.aggregatedCostAnalysisReportDataProcessor.process(costAnalysisTableData, (Map<String, Object>)contextData);
                    source.updateStats(((ResultList)resultList).getData().size(), 0);
                }
                catch (SearchIndexException ex) {
                    source.updateStats(ex.getIndexingError().getSuccessCount(), ex.getIndexingError().getFailedCount());
                    String errorMessage = String.format("Failed processing Data from %s: ", source.getName(), ex);
                    initialProcessorError = Optional.of(errorMessage);
                    this.workflowStats.addFailure(errorMessage);
                }
                finally {
                    this.updateWorkflowStats(source.getName(), source.getStats());
                }
            }
            if (initialProcessorError.isPresent()) continue;
            Optional<String> processRawCostAnalysisError = this.processRawCostAnalysisReportData(rawCostAnalysisReportDataList, contextData);
            processRawCostAnalysisError.ifPresent(arg_0 -> ((Logger)LOG).debug(arg_0));
            Optional<String> processAggregatedCostAnalysisError = this.processAggregatedCostAnalysisReportData(aggregatedCostAnalysisDataMap, contextData);
            processAggregatedCostAnalysisError.ifPresent(arg_0 -> ((Logger)LOG).debug(arg_0));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<String> processRawCostAnalysisReportData(List<RawCostAnalysisReportData> rawCostAnalysisReportDataList, Map<String, Object> contextData) throws SearchIndexException {
        Optional<String> error = Optional.empty();
        contextData.put("ReportDataType", ReportData.ReportDataType.RAW_COST_ANALYSIS_REPORT_DATA);
        CreateReportDataProcessor createReportdataProcessor = new CreateReportDataProcessor(rawCostAnalysisReportDataList.size(), "[CostAnalysisWorkflow] Raw Cost Analysis Report Data Processor");
        Optional<Object> rawCostAnalysisReportData = Optional.empty();
        try {
            rawCostAnalysisReportData = Optional.of(createReportdataProcessor.process(rawCostAnalysisReportDataList, contextData));
        }
        catch (SearchIndexException ex) {
            error = Optional.of(String.format("Failed Processing Raw Cost Analysis Report Data: %s", ex.getMessage()));
            this.workflowStats.addFailure(error.get());
        }
        finally {
            this.updateWorkflowStats(createReportdataProcessor.getName(), createReportdataProcessor.getStats());
        }
        if (rawCostAnalysisReportData.isPresent()) {
            ReportDataSink reportDataSink = new ReportDataSink(((List)rawCostAnalysisReportData.get()).size(), "[CostAnalysisWorkflow] Raw Cost Analysis Report Data Sink");
            try {
                reportDataSink.write((List)rawCostAnalysisReportData.get(), contextData);
            }
            catch (SearchIndexException ex) {
                error = Optional.of(String.format("Failed Sinking Raw Cost Analysis Report Data: %s", ex.getMessage()));
                this.workflowStats.addFailure(error.get());
            }
            finally {
                this.updateWorkflowStats(reportDataSink.getName(), reportDataSink.getStats());
            }
        }
        return error;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<String> processAggregatedCostAnalysisReportData(Map<String, Map<String, Map<String, AggregatedCostAnalysisData>>> aggregatedCostAnalysisDataMap, Map<String, Object> contextData) throws SearchIndexException {
        Optional<String> error = Optional.empty();
        contextData.put("ReportDataType", ReportData.ReportDataType.AGGREGATED_COST_ANALYSIS_REPORT_DATA);
        AggregatedCostAnalysisReportDataAggregator aggregatedCostAnalysisReportDataAggregator = new AggregatedCostAnalysisReportDataAggregator(aggregatedCostAnalysisDataMap.size());
        Optional<Object> aggregatedCostAnalysisReportDataList = Optional.empty();
        try {
            aggregatedCostAnalysisReportDataList = Optional.of(aggregatedCostAnalysisReportDataAggregator.process(aggregatedCostAnalysisDataMap, contextData));
        }
        catch (SearchIndexException ex) {
            error = Optional.of(String.format("Failed Aggregating Cost Analysis Report Data: %s", ex.getMessage()));
            this.workflowStats.addFailure(error.get());
        }
        finally {
            this.updateWorkflowStats(aggregatedCostAnalysisReportDataAggregator.getName(), aggregatedCostAnalysisReportDataAggregator.getStats());
        }
        if (aggregatedCostAnalysisReportDataList.isPresent()) {
            CreateReportDataProcessor createReportdataProcessor = new CreateReportDataProcessor(((List)aggregatedCostAnalysisReportDataList.get()).size(), "[CostAnalysisWorkflow] Aggregated Cost Analysis Report Data Processor");
            Optional<Object> aggregatedCostAnalysisReportData = Optional.empty();
            try {
                aggregatedCostAnalysisReportData = Optional.of(createReportdataProcessor.process((List)aggregatedCostAnalysisReportDataList.get(), contextData));
            }
            catch (SearchIndexException ex) {
                error = Optional.of(String.format("Failed Processing Aggregated Cost Analysis Report Data: %s", ex.getMessage()));
                this.workflowStats.addFailure(error.get());
            }
            finally {
                this.updateWorkflowStats(createReportdataProcessor.getName(), createReportdataProcessor.getStats());
            }
            if (aggregatedCostAnalysisReportData.isPresent()) {
                ReportDataSink reportDataSink = new ReportDataSink(((List)aggregatedCostAnalysisReportData.get()).size(), "[CostAnalysisWorkflow] Aggregated Cost Analysis Report Data Sink");
                try {
                    reportDataSink.write((List)aggregatedCostAnalysisReportData.get(), contextData);
                }
                catch (SearchIndexException ex) {
                    error = Optional.of(String.format("Failed Sinking Aggregated Cost Analysis Report Data: %s", ex.getMessage()));
                    this.workflowStats.addFailure(error.get());
                }
                finally {
                    this.updateWorkflowStats(reportDataSink.getName(), reportDataSink.getStats());
                }
            }
        }
        return error;
    }

    private ResultList<DatabaseService> filterDatabaseServices(ResultList<? extends EntityInterface> resultList) {
        return new ResultList<DatabaseService>(resultList.getData().stream().map(object -> (DatabaseService)object).filter(this::databaseServiceSupportsProfilerAndUsage).toList());
    }

    private boolean databaseServiceSupportsProfilerAndUsage(DatabaseService databaseService) {
        return List.of(CreateDatabaseService.DatabaseServiceType.BigQuery, CreateDatabaseService.DatabaseServiceType.Redshift, CreateDatabaseService.DatabaseServiceType.Snowflake).contains(databaseService.getServiceType());
    }

    private void deleteReportDataRecordsAtDate(Long timestamp, ReportData.ReportDataType reportDataType) {
        String timestampString = TimestampUtils.timestampToString(timestamp, "yyyy-MM-dd");
        ((ReportDataRepository)Entity.getEntityTimeSeriesRepository("entityReportData")).deleteReportDataAtDate(reportDataType, timestampString);
    }

    private void deleteReportDataRecords(ReportData.ReportDataType reportDataType) {
        ((ReportDataRepository)Entity.getEntityTimeSeriesRepository("entityReportData")).deleteReportData(reportDataType);
    }

    private void updateWorkflowStats(String stepName, StepStats newStepStats) {
        this.workflowStats.updateWorkflowStepStats(stepName, newStepStats);
        int currentSuccess = this.workflowStats.getWorkflowStepStats().values().stream().mapToInt(StepStats::getSuccessRecords).sum();
        int currentFailed = this.workflowStats.getWorkflowStepStats().values().stream().mapToInt(StepStats::getFailedRecords).sum();
        this.workflowStats.updateWorkflowStats(currentSuccess, currentFailed);
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public Long getStartTimestamp() {
        return this.startTimestamp;
    }

    public Long getEndTimestamp() {
        return this.endTimestamp;
    }

    public List<PaginatedEntitiesSource> getSources() {
        return this.sources;
    }

    public DatabaseServiceTablesProcessor getDatabaseServiceTablesProcessor() {
        return this.databaseServiceTablesProcessor;
    }

    public RawCostAnalysisReportDataProcessor getRawCostAnalysisReportDataProcessor() {
        return this.rawCostAnalysisReportDataProcessor;
    }

    public AggregatedCostAnalysisReportDataProcessor getAggregatedCostAnalysisReportDataProcessor() {
        return this.aggregatedCostAnalysisReportDataProcessor;
    }

    public WorkflowStats getWorkflowStats() {
        return this.workflowStats;
    }

    public record AggregatedCostAnalysisData(Double totalSize, Double totalCount, DataAssetMetrics unusedDataAssets, DataAssetMetrics frequentlyUsedDataAssets) {
    }

    public record CostAnalysisTableData(Table table, Optional<LifeCycle> oLifeCycle, Optional<Double> oSize) {
    }
}

