/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.apps.bundles.insights.workflows.dataAssets;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.apps.bundles.insights.DataInsightsApp;
import org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils;
import org.openmetadata.service.apps.bundles.insights.workflows.WorkflowStats;
import org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.processors.DataInsightsElasticSearchProcessor;
import org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.processors.DataInsightsEntityEnricherProcessor;
import org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.processors.DataInsightsOpenSearchProcessor;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.search.elasticsearch.ElasticSearchIndexSink;
import org.openmetadata.service.search.opensearch.OpenSearchIndexSink;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.workflows.interfaces.Processor;
import org.openmetadata.service.workflows.interfaces.Sink;
import org.openmetadata.service.workflows.interfaces.Source;
import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource;
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataAssetsWorkflow {
    private static final Logger LOG = LoggerFactory.getLogger(DataAssetsWorkflow.class);
    public static final String DATA_STREAM_KEY = "DataStreamKey";
    private final int retentionDays = 30;
    private final Long startTimestamp;
    private final Long endTimestamp;
    private final int batchSize;
    private final SearchRepository searchRepository;
    private final CollectionDAO collectionDAO;
    private final List<PaginatedEntitiesSource> sources = new ArrayList<PaginatedEntitiesSource>();
    private final Set<String> entityTypes;
    private DataInsightsEntityEnricherProcessor entityEnricher;
    private Processor entityProcessor;
    private Sink searchIndexSink;
    private final WorkflowStats workflowStats = new WorkflowStats("DataAssetsWorkflow");

    public DataAssetsWorkflow(Long timestamp, int batchSize, Optional<DataInsightsApp.Backfill> backfill, Set<String> entityTypes, CollectionDAO collectionDAO, SearchRepository searchRepository) {
        if (backfill.isPresent()) {
            Long oldestPossibleTimestamp = TimestampUtils.getStartOfDayTimestamp(TimestampUtils.subtractDays(timestamp, 30));
            this.endTimestamp = TimestampUtils.getEndOfDayTimestamp(Collections.max(List.of(TimestampUtils.getTimestampFromDateString(backfill.get().endDate()))));
            this.startTimestamp = TimestampUtils.getStartOfDayTimestamp(Collections.max(List.of(TimestampUtils.getTimestampFromDateString(backfill.get().startDate()), oldestPossibleTimestamp)));
            if (oldestPossibleTimestamp.equals(TimestampUtils.getStartOfDayTimestamp(this.endTimestamp))) {
                LOG.warn("Backfill won't happen because the set date is before the limit of {}", (Object)oldestPossibleTimestamp);
            }
        } else {
            this.endTimestamp = TimestampUtils.getEndOfDayTimestamp(timestamp);
            this.startTimestamp = TimestampUtils.getStartOfDayTimestamp(TimestampUtils.subtractDays(timestamp, 1));
        }
        this.batchSize = batchSize;
        this.searchRepository = searchRepository;
        this.collectionDAO = collectionDAO;
        this.entityTypes = entityTypes;
    }

    private void initialize() {
        int totalRecords = ReindexingUtil.getTotalRequestToProcess(this.entityTypes, this.collectionDAO);
        this.entityTypes.forEach(entityType -> {
            List<String> fields = List.of("*");
            PaginatedEntitiesSource source = new PaginatedEntitiesSource((String)entityType, this.batchSize, fields).withName(String.format("[DataAssetsWorkflow] %s", entityType));
            this.sources.add(source);
        });
        this.entityEnricher = new DataInsightsEntityEnricherProcessor(totalRecords);
        if (this.searchRepository.getSearchType().equals((Object)ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
            this.entityProcessor = new DataInsightsOpenSearchProcessor(totalRecords);
            this.searchIndexSink = new OpenSearchIndexSink(this.searchRepository, totalRecords, this.searchRepository.getElasticSearchConfiguration().getPayLoadSize());
        } else {
            this.entityProcessor = new DataInsightsElasticSearchProcessor(totalRecords);
            this.searchIndexSink = new ElasticSearchIndexSink(this.searchRepository, totalRecords, this.searchRepository.getElasticSearchConfiguration().getPayLoadSize());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process() throws SearchIndexException {
        this.initialize();
        HashMap<String, Object> contextData = new HashMap<String, Object>();
        contextData.put("startTimestamp", this.startTimestamp);
        contextData.put("endTimestamp", this.endTimestamp);
        for (PaginatedEntitiesSource source : this.sources) {
            this.deleteDataBeforeInserting(DataInsightsApp.getDataStreamName(source.getEntityType()));
            contextData.put(DATA_STREAM_KEY, DataInsightsApp.getDataStreamName(source.getEntityType()));
            contextData.put("entityType", source.getEntityType());
            while (!source.isDone()) {
                try {
                    this.processEntity((ResultList<? extends EntityInterface>)source.readNext((Map)null), contextData, source);
                }
                catch (SearchIndexException ex) {
                    source.updateStats(ex.getIndexingError().getSuccessCount(), ex.getIndexingError().getFailedCount());
                    String errorMessage = String.format("Failed processing Data from %s: %s", source.getName(), ex);
                    this.workflowStats.addFailure(errorMessage);
                }
                finally {
                    this.updateWorkflowStats(source.getName(), source.getStats());
                }
            }
        }
    }

    private void processEntity(ResultList<? extends EntityInterface> resultList, Map<String, Object> contextData, Source<?> paginatedSource) throws SearchIndexException {
        if (!resultList.getData().isEmpty()) {
            this.searchIndexSink.write(this.entityProcessor.process(this.entityEnricher.process(resultList, contextData), contextData), contextData);
            if (!resultList.getErrors().isEmpty()) {
                throw new SearchIndexException(new IndexingError().withErrorSource(IndexingError.ErrorSource.READER).withLastFailedCursor(paginatedSource.getLastFailedCursor()).withSubmittedCount(Integer.valueOf(paginatedSource.getBatchSize())).withSuccessCount(Integer.valueOf(resultList.getData().size())).withFailedCount(Integer.valueOf(resultList.getErrors().size())).withMessage(String.format("Issues in Reading A Batch For Entities: %s", resultList.getErrors())).withFailedEntities(resultList.getErrors()));
            }
            paginatedSource.updateStats(resultList.getData().size(), 0);
        }
    }

    private void deleteDataBeforeInserting(String dataStreamName) throws SearchIndexException {
        try {
            this.searchRepository.getSearchClient().deleteByQuery(dataStreamName, String.format("{\"@timestamp\": {\"gte\": %s, \"lte\": %s}}", this.startTimestamp, this.endTimestamp));
        }
        catch (Exception rx) {
            SearchIndexException exception = new SearchIndexException(new IndexingError().withMessage(rx.getMessage()));
            throw exception;
        }
    }

    private void updateWorkflowStats(String stepName, StepStats newStepStats) {
        this.workflowStats.updateWorkflowStepStats(stepName, newStepStats);
        int currentSuccess = this.workflowStats.getWorkflowStepStats().values().stream().mapToInt(StepStats::getSuccessRecords).sum();
        int currentFailed = this.workflowStats.getWorkflowStepStats().values().stream().mapToInt(StepStats::getFailedRecords).sum();
        this.workflowStats.updateWorkflowStats(currentSuccess, currentFailed);
    }

    public WorkflowStats getWorkflowStats() {
        return this.workflowStats;
    }
}

