/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.apps.bundles.insights.workflows.webAnalytics;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.analytics.WebAnalyticEntityViewReportData;
import org.openmetadata.schema.analytics.WebAnalyticEventData;
import org.openmetadata.schema.analytics.WebAnalyticUserActivityReportData;
import org.openmetadata.schema.analytics.type.WebAnalyticEventType;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.insights.DataInsightsApp;
import org.openmetadata.service.apps.bundles.insights.processors.CreateReportDataProcessor;
import org.openmetadata.service.apps.bundles.insights.sinks.ReportDataSink;
import org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils;
import org.openmetadata.service.apps.bundles.insights.workflows.WorkflowStats;
import org.openmetadata.service.apps.bundles.insights.workflows.webAnalytics.processors.WebAnalyticsEntityViewProcessor;
import org.openmetadata.service.apps.bundles.insights.workflows.webAnalytics.processors.WebAnalyticsUserActivityAggregator;
import org.openmetadata.service.apps.bundles.insights.workflows.webAnalytics.processors.WebAnalyticsUserActivityProcessor;
import org.openmetadata.service.apps.bundles.insights.workflows.webAnalytics.sources.PaginatedWebAnalyticEventDataSource;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.jdbi3.ReportDataRepository;
import org.openmetadata.service.jdbi3.WebAnalyticEventRepository;
import org.openmetadata.service.util.ResultList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebAnalyticsWorkflow {
    private static final Logger LOG = LoggerFactory.getLogger(WebAnalyticsWorkflow.class);
    private Long startTimestamp;
    private Long endTimestamp;
    private int batchSize;
    private final int webAnalyticEventsRetentionDays = 7;
    private final List<PaginatedWebAnalyticEventDataSource> sources = new ArrayList<PaginatedWebAnalyticEventDataSource>();
    private WebAnalyticsEntityViewProcessor webAnalyticsEntityViewProcessor;
    private WebAnalyticsUserActivityProcessor webAnalyticsUserActivityProcessor;
    private final WorkflowStats workflowStats = new WorkflowStats("WebAnalyticsWorkflow");
    public static final String USER_ACTIVITY_DATA_KEY = "userActivityData";
    public static final String USER_ACTIVITY_REPORT_DATA_KEY = "userActivityReportData";
    public static final String ENTITY_VIEW_REPORT_DATA_KEY = "entityViewReportData";

    public WebAnalyticsWorkflow(Long timestamp, int batchSize, Optional<DataInsightsApp.Backfill> backfill) {
        if (backfill.isPresent()) {
            Long oldestPossibleTimestamp = TimestampUtils.getStartOfDayTimestamp(TimestampUtils.subtractDays(timestamp, 7));
            this.endTimestamp = TimestampUtils.getEndOfDayTimestamp(Collections.max(List.of(TimestampUtils.getTimestampFromDateString(backfill.get().endDate()))));
            this.startTimestamp = TimestampUtils.getStartOfDayTimestamp(Collections.max(List.of(TimestampUtils.getTimestampFromDateString(backfill.get().startDate()), oldestPossibleTimestamp)));
            if (oldestPossibleTimestamp.equals(this.endTimestamp)) {
                LOG.warn("Backfill won't happen because the set date is before the limit of {}", (Object)oldestPossibleTimestamp);
            }
        } else {
            this.endTimestamp = TimestampUtils.getEndOfDayTimestamp(TimestampUtils.subtractDays(timestamp, 1));
            this.startTimestamp = TimestampUtils.getStartOfDayTimestamp(this.endTimestamp);
        }
        this.batchSize = batchSize;
    }

    private void initialize() {
        Long pointerTimestamp = this.endTimestamp;
        while (pointerTimestamp > this.startTimestamp) {
            this.sources.add(new PaginatedWebAnalyticEventDataSource(this.batchSize, TimestampUtils.getStartOfDayTimestamp(pointerTimestamp), pointerTimestamp));
            pointerTimestamp = TimestampUtils.subtractDays(pointerTimestamp, 1);
        }
        int total = 0;
        for (PaginatedWebAnalyticEventDataSource source : this.sources) {
            total += source.getTotalRecords();
        }
        this.workflowStats.setWorkflowStatsTotalRecords(total);
        this.webAnalyticsEntityViewProcessor = new WebAnalyticsEntityViewProcessor(total);
        this.webAnalyticsUserActivityProcessor = new WebAnalyticsUserActivityProcessor(total);
    }

    public void process() throws SearchIndexException {
        this.initialize();
        HashMap<String, Object> contextData = new HashMap<String, Object>();
        for (PaginatedWebAnalyticEventDataSource source : this.sources) {
            HashMap<UUID, UserActivityData> userActivityData = new HashMap<UUID, UserActivityData>();
            HashMap<UUID, WebAnalyticUserActivityReportData> userActivityReportData = new HashMap<UUID, WebAnalyticUserActivityReportData>();
            HashMap<String, WebAnalyticEntityViewReportData> entityViewReportData = new HashMap<String, WebAnalyticEntityViewReportData>();
            Long referenceTimestamp = source.getStartTs();
            this.deleteReportDataRecordsAtDate(referenceTimestamp, ReportData.ReportDataType.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA);
            this.deleteReportDataRecordsAtDate(referenceTimestamp, ReportData.ReportDataType.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA);
            contextData.put("@timestamp", referenceTimestamp);
            contextData.put(USER_ACTIVITY_DATA_KEY, userActivityData);
            contextData.put(USER_ACTIVITY_REPORT_DATA_KEY, userActivityReportData);
            contextData.put(ENTITY_VIEW_REPORT_DATA_KEY, entityViewReportData);
            Optional<String> processEventsError = this.processEvents(source, contextData);
            if (processEventsError.isPresent()) {
                LOG.debug(processEventsError.get());
                continue;
            }
            Optional<String> processEntityViewDataError = this.processEntityViewData(entityViewReportData, contextData);
            processEntityViewDataError.ifPresent(arg_0 -> ((Logger)LOG).debug(arg_0));
            Optional<String> processUserActivityError = this.processUserActivityData(userActivityData, userActivityReportData, contextData);
            processUserActivityError.ifPresent(arg_0 -> ((Logger)LOG).debug(arg_0));
        }
        this.pruneWebAnalyticEvents();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<String> processEvents(PaginatedWebAnalyticEventDataSource source, Map<String, Object> contextData) throws SearchIndexException {
        Optional<String> error = Optional.empty();
        while (!source.isDone()) {
            Object resultList = source.readNext((Map)null);
            try {
                if (!((ResultList)resultList).getData().isEmpty()) {
                    this.webAnalyticsEntityViewProcessor.process((ResultList<WebAnalyticEventData>)resultList, contextData);
                    this.webAnalyticsUserActivityProcessor.process((ResultList<WebAnalyticEventData>)resultList, contextData);
                }
                source.updateStats(((ResultList)resultList).getData().size(), 0);
            }
            catch (SearchIndexException ex) {
                source.updateStats(ex.getIndexingError().getSuccessCount(), ex.getIndexingError().getFailedCount());
                error = Optional.of(String.format("Failed processing events from %s: %s", source.getName(), ex));
                this.workflowStats.addFailure(error.get());
            }
            finally {
                this.updateWorkflowStats(source.getName(), source.getStats());
            }
        }
        return error;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<String> processEntityViewData(Map<String, WebAnalyticEntityViewReportData> entityViewReportData, Map<String, Object> contextData) throws SearchIndexException {
        Optional<Object> error = Optional.empty();
        contextData.put("ReportDataType", ReportData.ReportDataType.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA);
        CreateReportDataProcessor createReportDataProcessor = new CreateReportDataProcessor(entityViewReportData.values().size(), "[WebAnalyticsWorkflow] Entity View Report Data Processor");
        Optional<Object> entityViewReportDataList = Optional.empty();
        try {
            entityViewReportDataList = Optional.of(createReportDataProcessor.process(entityViewReportData.values().stream().toList(), contextData));
        }
        catch (SearchIndexException ex) {
            error = Optional.of(String.format("Failed Processing Entity View Data: %s", ex.getMessage()));
            this.workflowStats.addFailure((String)error.get());
        }
        finally {
            this.updateWorkflowStats(createReportDataProcessor.getName(), createReportDataProcessor.getStats());
        }
        if (entityViewReportDataList.isPresent()) {
            ReportDataSink reportDataSink = new ReportDataSink(((List)entityViewReportDataList.get()).size(), "[WebAnalyticsWorkflow] Entity View Report Data Sink");
            try {
                reportDataSink.write((List)entityViewReportDataList.get(), contextData);
            }
            catch (SearchIndexException ex) {
                error = Optional.of(String.format("Failed Sinking Entity View Data: %s", ex.getMessage()));
                this.workflowStats.addFailure((String)error.get());
            }
            finally {
                this.updateWorkflowStats(reportDataSink.getName(), reportDataSink.getStats());
            }
        }
        return error;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Optional<String> processUserActivityData(Map<UUID, UserActivityData> userActivityData, Map<UUID, WebAnalyticUserActivityReportData> userActivityReportData, Map<String, Object> contextData) throws SearchIndexException {
        Optional<Object> error = Optional.empty();
        contextData.put("ReportDataType", ReportData.ReportDataType.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA);
        WebAnalyticsUserActivityAggregator webAnalyticsUserActivityAggregator = new WebAnalyticsUserActivityAggregator(userActivityData.size());
        try {
            webAnalyticsUserActivityAggregator.process(userActivityData, contextData);
        }
        catch (SearchIndexException ex) {
            error = Optional.of(String.format("Failed Aggregating User Activity Data: %s", ex.getMessage()));
            this.workflowStats.addFailure((String)error.get());
        }
        finally {
            this.updateWorkflowStats(webAnalyticsUserActivityAggregator.getName(), webAnalyticsUserActivityAggregator.getStats());
        }
        CreateReportDataProcessor createReportdataProcessor = new CreateReportDataProcessor(userActivityReportData.values().size(), "[WebAnalyticsWorkflow] User Activity Report Data Processor");
        Optional<Object> userActivityReportDataList = Optional.empty();
        try {
            userActivityReportDataList = Optional.of(createReportdataProcessor.process(userActivityReportData.values().stream().toList(), contextData));
        }
        catch (SearchIndexException ex) {
            error = Optional.of(String.format("Failed Processing User Activity Report Data: %s", ex.getMessage()));
            this.workflowStats.addFailure((String)error.get());
        }
        finally {
            this.updateWorkflowStats(createReportdataProcessor.getName(), createReportdataProcessor.getStats());
        }
        if (userActivityReportDataList.isPresent()) {
            ReportDataSink reportDataSink = new ReportDataSink(((List)userActivityReportDataList.get()).size(), "[WebAnalyticsWorkflow] User Activity Report Data Sink");
            try {
                reportDataSink.write((List)userActivityReportDataList.get(), contextData);
            }
            catch (SearchIndexException ex) {
                error = Optional.of(String.format("Failed Sinking User Activity Report Data: %s", ex.getMessage()));
                this.workflowStats.addFailure((String)error.get());
            }
            finally {
                this.updateWorkflowStats(reportDataSink.getName(), reportDataSink.getStats());
            }
        }
        return error;
    }

    private void deleteReportDataRecordsAtDate(Long timestamp, ReportData.ReportDataType reportDataType) {
        String timestampString = TimestampUtils.timestampToString(timestamp, "yyyy-MM-dd");
        ((ReportDataRepository)Entity.getEntityTimeSeriesRepository("entityReportData")).deleteReportDataAtDate(reportDataType, timestampString);
    }

    private void pruneWebAnalyticEvents() {
        for (WebAnalyticEventType eventType : WebAnalyticEventType.values()) {
            ((WebAnalyticEventRepository)Entity.getEntityRepository("webAnalyticEvent")).deleteWebAnalyticEventData(eventType, TimestampUtils.subtractDays(this.endTimestamp, 7));
        }
    }

    private void updateWorkflowStats(String stepName, StepStats newStepStats) {
        this.workflowStats.updateWorkflowStepStats(stepName, newStepStats);
        int currentSuccess = this.workflowStats.getWorkflowStepStats().values().stream().mapToInt(StepStats::getSuccessRecords).sum();
        int currentFailed = this.workflowStats.getWorkflowStepStats().values().stream().mapToInt(StepStats::getFailedRecords).sum();
        this.workflowStats.updateWorkflowStats(currentSuccess, currentFailed);
    }

    public WorkflowStats getWorkflowStats() {
        return this.workflowStats;
    }

    public record UserActivityData(String userName, UUID userId, String team, Map<UUID, List<Long>> sessions, int totalPageView, int totalSessions, Long lastSession) {
    }
}

