/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.apps.bundles.insights;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.app.SuccessContext;
import org.openmetadata.schema.entity.applications.configuration.internal.BackfillConfiguration;
import org.openmetadata.schema.entity.applications.configuration.internal.DataInsightsAppConfig;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.apps.AbstractNativeApplication;
import org.openmetadata.service.apps.bundles.insights.search.DataInsightsSearchInterface;
import org.openmetadata.service.apps.bundles.insights.search.elasticsearch.ElasticSearchDataInsightsClient;
import org.openmetadata.service.apps.bundles.insights.search.opensearch.OpenSearchDataInsightsClient;
import org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils;
import org.openmetadata.service.apps.bundles.insights.workflows.WorkflowStats;
import org.openmetadata.service.apps.bundles.insights.workflows.costAnalysis.CostAnalysisWorkflow;
import org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.DataAssetsWorkflow;
import org.openmetadata.service.apps.bundles.insights.workflows.webAnalytics.WebAnalyticsWorkflow;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import os.org.opensearch.client.RestClient;

public class DataInsightsApp
extends AbstractNativeApplication {
    private static final Logger LOG = LoggerFactory.getLogger(DataInsightsApp.class);
    public static final String REPORT_DATA_TYPE_KEY = "ReportDataType";
    public static final String DATA_ASSET_INDEX_PREFIX = "di-data-assets";
    private Long timestamp;
    private int batchSize;
    private Optional<Boolean> recreateDataAssetsIndex;
    private Optional<Backfill> backfill;
    EventPublisherJob jobData;
    private volatile boolean stopped = false;
    public final Set<String> dataAssetTypes = Set.of("table", "storedProcedure", "databaseSchema", "database", "chart", "dashboard", "dashboardDataModel", "pipeline", "topic", "container", "searchIndex", "mlmodel", "dataProduct", "glossaryTerm", "tag");

    public DataInsightsApp(CollectionDAO collectionDAO, SearchRepository searchRepository) {
        super(collectionDAO, searchRepository);
    }

    private DataInsightsSearchInterface getSearchInterface() {
        DataInsightsSearchInterface searchInterface = this.searchRepository.getSearchType().equals((Object)ElasticSearchConfiguration.SearchType.ELASTICSEARCH) ? new ElasticSearchDataInsightsClient((es.org.elasticsearch.client.RestClient)this.searchRepository.getSearchClient().getLowLevelClient()) : new OpenSearchDataInsightsClient((RestClient)this.searchRepository.getSearchClient().getLowLevelClient());
        return searchInterface;
    }

    public static String getDataStreamName(String dataAssetType) {
        return String.format("%s-%s", DATA_ASSET_INDEX_PREFIX, dataAssetType).toLowerCase();
    }

    private void createDataAssetsDataStream() {
        DataInsightsSearchInterface searchInterface = this.getSearchInterface();
        try {
            for (String dataAssetType : this.dataAssetTypes) {
                String dataStreamName = DataInsightsApp.getDataStreamName(dataAssetType);
                if (searchInterface.dataAssetDataStreamExists(dataStreamName).booleanValue()) continue;
                searchInterface.createDataAssetsDataStream(dataStreamName);
            }
        }
        catch (IOException ex) {
            LOG.error("Couldn't install DataInsightsApp: Can't initialize ElasticSearch Index.", (Throwable)ex);
        }
    }

    private void deleteDataAssetsDataStream() {
        DataInsightsSearchInterface searchInterface = this.getSearchInterface();
        try {
            for (String dataAssetType : this.dataAssetTypes) {
                String dataStreamName = DataInsightsApp.getDataStreamName(dataAssetType);
                if (!searchInterface.dataAssetDataStreamExists(dataStreamName).booleanValue()) continue;
                searchInterface.deleteDataAssetDataStream(dataStreamName);
            }
        }
        catch (IOException ex) {
            LOG.error("Couldn't delete DataAssets DataStream", (Throwable)ex);
        }
    }

    @Override
    public void init(App app) {
        super.init(app);
        this.createDataAssetsDataStream();
        DataInsightsAppConfig config = JsonUtils.convertValue(app.getAppConfiguration(), DataInsightsAppConfig.class);
        this.batchSize = config.getBatchSize();
        this.recreateDataAssetsIndex = Optional.ofNullable(config.getRecreateDataAssetsIndex());
        Optional<BackfillConfiguration> backfillConfig = Optional.ofNullable(config.getBackfillConfiguration());
        this.backfill = Optional.empty();
        if (backfillConfig.isPresent() && backfillConfig.get().getEnabled().booleanValue()) {
            this.backfill = Optional.of(new Backfill(backfillConfig.get().getStartDate(), backfillConfig.get().getEndDate()));
        }
        this.jobData = new EventPublisherJob().withStats(new Stats());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void startApp(JobExecutionContext jobExecutionContext) {
        try {
            this.initializeJob();
            LOG.info("Executing DataInsights Job with JobData: {}", (Object)this.jobData);
            this.jobData.setStatus(EventPublisherJob.Status.RUNNING);
            String runType = (String)jobExecutionContext.getJobDetail().getJobDataMap().get((Object)"triggerType");
            if (!runType.equals("OnDemandJob")) {
                this.backfill = Optional.empty();
                this.recreateDataAssetsIndex = Optional.empty();
            }
            if (this.recreateDataAssetsIndex.isPresent() && this.recreateDataAssetsIndex.get().equals(true)) {
                this.deleteDataAssetsDataStream();
                this.createDataAssetsDataStream();
            }
            WorkflowStats webAnalyticsStats = this.processWebAnalytics();
            this.updateJobStatsWithWorkflowStats(webAnalyticsStats);
            WorkflowStats costAnalysisStats = this.processCostAnalysis();
            this.updateJobStatsWithWorkflowStats(costAnalysisStats);
            WorkflowStats dataAssetsStats = this.processDataAssets();
            this.updateJobStatsWithWorkflowStats(dataAssetsStats);
            if (webAnalyticsStats.hasFailed().booleanValue() || costAnalysisStats.hasFailed().booleanValue() || dataAssetsStats.hasFailed().booleanValue()) {
                String errorMessage = "Errors Found:\n";
                for (WorkflowStats stats : List.of(webAnalyticsStats, costAnalysisStats, dataAssetsStats)) {
                    if (!stats.hasFailed().booleanValue()) continue;
                    errorMessage = String.format("%s\n  %s\n", errorMessage, stats.getName());
                    for (String failure : stats.getFailures()) {
                        errorMessage = String.format("%s    - %s\n", errorMessage, failure);
                    }
                }
                IndexingError indexingError = new IndexingError().withErrorSource(IndexingError.ErrorSource.JOB).withMessage(errorMessage);
                LOG.error(indexingError.getMessage());
                this.jobData.setStatus(EventPublisherJob.Status.FAILED);
                this.jobData.setFailure(indexingError);
            }
            this.updateJobStatus();
        }
        catch (Exception ex) {
            IndexingError indexingError = new IndexingError().withErrorSource(IndexingError.ErrorSource.JOB).withMessage(String.format("Data Insights Job Has Encountered an Exception. %n Job Data: %s, %n  Stack : %s ", this.jobData.toString(), ExceptionUtils.getStackTrace((Throwable)ex)));
            LOG.error(indexingError.getMessage());
            this.jobData.setStatus(EventPublisherJob.Status.FAILED);
            this.jobData.setFailure(indexingError);
        }
        finally {
            this.sendUpdates(jobExecutionContext);
        }
    }

    private void initializeJob() {
        this.timestamp = TimestampUtils.getStartOfDayTimestamp(System.currentTimeMillis());
    }

    private WorkflowStats processWebAnalytics() {
        WebAnalyticsWorkflow workflow = new WebAnalyticsWorkflow(this.timestamp, this.batchSize, this.backfill);
        WorkflowStats workflowStats = workflow.getWorkflowStats();
        try {
            workflow.process();
        }
        catch (SearchIndexException ex) {
            this.jobData.setStatus(EventPublisherJob.Status.FAILED);
            this.jobData.setFailure(ex.getIndexingError());
        }
        return workflowStats;
    }

    private WorkflowStats processCostAnalysis() {
        CostAnalysisWorkflow workflow = new CostAnalysisWorkflow(this.timestamp, this.batchSize, this.backfill);
        WorkflowStats workflowStats = workflow.getWorkflowStats();
        try {
            workflow.process();
        }
        catch (SearchIndexException ex) {
            this.jobData.setStatus(EventPublisherJob.Status.FAILED);
            this.jobData.setFailure(ex.getIndexingError());
        }
        return workflowStats;
    }

    private WorkflowStats processDataAssets() {
        DataAssetsWorkflow workflow = new DataAssetsWorkflow(this.timestamp, this.batchSize, this.backfill, this.dataAssetTypes, this.collectionDAO, this.searchRepository);
        WorkflowStats workflowStats = workflow.getWorkflowStats();
        try {
            workflow.process();
        }
        catch (SearchIndexException ex) {
            this.jobData.setStatus(EventPublisherJob.Status.FAILED);
            this.jobData.setFailure(ex.getIndexingError());
        }
        return workflowStats;
    }

    private void updateJobStatsWithWorkflowStats(WorkflowStats workflowStats) {
        for (Map.Entry<String, StepStats> entry : workflowStats.getWorkflowStepStats().entrySet()) {
            String stepName = entry.getKey();
            StepStats stats = entry.getValue();
            this.updateStats(stepName, stats);
        }
    }

    private void updateJobStatus() {
        if (this.stopped) {
            this.jobData.setStatus(EventPublisherJob.Status.STOPPED);
        } else if (this.jobData.getFailure() != null) {
            this.jobData.setStatus(EventPublisherJob.Status.FAILED);
        } else {
            this.jobData.setStatus(EventPublisherJob.Status.COMPLETED);
        }
    }

    public void updateStats(String entityType, StepStats currentEntityStats) {
        Stats jobDataStats = this.jobData.getStats();
        StepStats entityLevelStats = jobDataStats.getEntityStats();
        if (entityLevelStats == null) {
            entityLevelStats = new StepStats().withTotalRecords(null).withFailedRecords(null).withSuccessRecords(null);
        }
        entityLevelStats.withAdditionalProperty(entityType, (Object)currentEntityStats);
        StepStats stats = this.jobData.getStats().getJobStats();
        if (stats == null) {
            stats = new StepStats().withTotalRecords(Integer.valueOf(ReindexingUtil.getTotalRequestToProcess(this.jobData.getEntities(), this.collectionDAO)));
        }
        stats.setTotalRecords(Integer.valueOf(entityLevelStats.getAdditionalProperties().values().stream().map(s -> (StepStats)s).mapToInt(StepStats::getTotalRecords).sum()));
        stats.setSuccessRecords(Integer.valueOf(entityLevelStats.getAdditionalProperties().values().stream().map(s -> (StepStats)s).mapToInt(StepStats::getSuccessRecords).sum()));
        stats.setFailedRecords(Integer.valueOf(entityLevelStats.getAdditionalProperties().values().stream().map(s -> (StepStats)s).mapToInt(StepStats::getFailedRecords).sum()));
        jobDataStats.setJobStats(stats);
        jobDataStats.setEntityStats(entityLevelStats);
        this.jobData.setStats(jobDataStats);
    }

    public void updateRecordToDb(JobExecutionContext jobExecutionContext) {
        AppRunRecord appRecord = this.getJobRecord(jobExecutionContext);
        appRecord.setStatus(AppRunRecord.Status.fromValue((String)this.jobData.getStatus().value()));
        if (this.jobData.getFailure() != null) {
            appRecord.setFailureContext(new FailureContext().withAdditionalProperty("failure", (Object)this.jobData.getFailure()));
        }
        if (this.jobData.getStats() != null) {
            appRecord.setSuccessContext(new SuccessContext().withAdditionalProperty("stats", (Object)this.jobData.getStats()));
        }
        this.pushAppStatusUpdates(jobExecutionContext, appRecord, true);
    }

    private void sendUpdates(JobExecutionContext jobExecutionContext) {
        try {
            jobExecutionContext.getJobDetail().getJobDataMap().put("AppRunStats", (Object)this.jobData.getStats());
            this.updateRecordToDb(jobExecutionContext);
            if (WebSocketManager.getInstance() != null) {
                WebSocketManager.getInstance().broadCastMessageToAll("jobStatus", JsonUtils.pojoToJson(this.jobData));
            }
        }
        catch (Exception ex) {
            LOG.error("Failed to send updated stats with WebSocket", (Throwable)ex);
        }
    }

    public Long getTimestamp() {
        return this.timestamp;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    public Optional<Backfill> getBackfill() {
        return this.backfill;
    }

    public EventPublisherJob getJobData() {
        return this.jobData;
    }

    public record Backfill(String startDate, String endDate) {
    }
}

