package org.openmetadata.service.apps.bundles.insights.workflows.dataAssets;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.insights.DataInsightsApp;
import org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils;
import org.openmetadata.service.apps.bundles.insights.workflows.WorkflowStats;
import org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.processors.DataInsightsElasticSearchProcessor;
import org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.processors.DataInsightsEntityEnricherProcessor;
import org.openmetadata.service.apps.bundles.insights.workflows.dataAssets.processors.DataInsightsOpenSearchProcessor;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.search.elasticsearch.ElasticSearchIndexSink;
import org.openmetadata.service.search.opensearch.OpenSearchIndexSink;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.workflows.interfaces.Processor;
import org.openmetadata.service.workflows.interfaces.Sink;
import org.openmetadata.service.workflows.interfaces.Source;
import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource;
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.class */
public class DataAssetsWorkflow {
    private static final Logger LOG = LoggerFactory.getLogger(DataAssetsWorkflow.class);
    public static final String DATA_ASSETS_DATA_STREAM = "di-data-assets";
    private final Long startTimestamp;
    private final Long endTimestamp;
    private final int batchSize;
    private final SearchRepository searchRepository;
    private final CollectionDAO collectionDAO;
    private DataInsightsEntityEnricherProcessor entityEnricher;
    private Processor entityProcessor;
    private Sink searchIndexSink;
    private final int retentionDays = 30;
    private final List<PaginatedEntitiesSource> sources = new ArrayList();
    private final Set<String> entityTypes = Set.of((Object[]) new String[]{"table", Entity.STORED_PROCEDURE, Entity.DATABASE_SCHEMA, Entity.DATABASE, Entity.CHART, Entity.DASHBOARD, Entity.DASHBOARD_DATA_MODEL, Entity.PIPELINE, Entity.TOPIC, Entity.CONTAINER, Entity.SEARCH_INDEX, Entity.MLMODEL, Entity.DATA_PRODUCT, Entity.GLOSSARY_TERM, Entity.TAG});
    private final WorkflowStats workflowStats = new WorkflowStats();

    public DataAssetsWorkflow(Long l, int i, Optional<DataInsightsApp.Backfill> optional, CollectionDAO collectionDAO, SearchRepository searchRepository) {
        if (optional.isPresent()) {
            Long startOfDayTimestamp = TimestampUtils.getStartOfDayTimestamp(TimestampUtils.subtractDays(l, 30));
            this.endTimestamp = TimestampUtils.getEndOfDayTimestamp((Long) Collections.max(List.of(TimestampUtils.getTimestampFromDateString(optional.get().endDate()))));
            this.startTimestamp = TimestampUtils.getStartOfDayTimestamp((Long) Collections.max(List.of(TimestampUtils.getTimestampFromDateString(optional.get().startDate()), startOfDayTimestamp)));
            if (startOfDayTimestamp.equals(TimestampUtils.getStartOfDayTimestamp(this.endTimestamp))) {
                LOG.warn("Backfill won't happen because the set date is before the limit of {}", startOfDayTimestamp);
            }
        } else {
            this.endTimestamp = TimestampUtils.getEndOfDayTimestamp(l);
            this.startTimestamp = TimestampUtils.getStartOfDayTimestamp(TimestampUtils.subtractDays(l, 1));
        }
        this.batchSize = i;
        this.searchRepository = searchRepository;
        this.collectionDAO = collectionDAO;
    }

    private void initialize() {
        int totalRequestToProcess = ReindexingUtil.getTotalRequestToProcess(this.entityTypes, this.collectionDAO);
        this.entityTypes.forEach(str -> {
            this.sources.add(new PaginatedEntitiesSource(str, this.batchSize, List.of("*")).withName(String.format("PaginatedEntitiesSource-%s", str)));
        });
        this.entityEnricher = new DataInsightsEntityEnricherProcessor(totalRequestToProcess);
        if (this.searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
            this.entityProcessor = new DataInsightsOpenSearchProcessor(totalRequestToProcess);
            this.searchIndexSink = new OpenSearchIndexSink(this.searchRepository, totalRequestToProcess);
        } else {
            this.entityProcessor = new DataInsightsElasticSearchProcessor(totalRequestToProcess);
            this.searchIndexSink = new ElasticSearchIndexSink(this.searchRepository, totalRequestToProcess);
        }
    }

    public void process() throws SearchIndexException {
        initialize();
        Map<String, Object> hashMap = new HashMap<>();
        hashMap.put(TimestampUtils.START_TIMESTAMP_KEY, this.startTimestamp);
        hashMap.put(TimestampUtils.END_TIMESTAMP_KEY, this.endTimestamp);
        deleteDataBeforeInserting();
        for (PaginatedEntitiesSource paginatedEntitiesSource : this.sources) {
            hashMap.put("entityType", paginatedEntitiesSource.getEntityType());
            while (!paginatedEntitiesSource.isDone()) {
                try {
                    try {
                        processEntity(paginatedEntitiesSource.readNext((Map<String, Object>) null), hashMap, paginatedEntitiesSource);
                        updateWorkflowStats(paginatedEntitiesSource.getName(), paginatedEntitiesSource.getStats());
                    } catch (SearchIndexException e) {
                        paginatedEntitiesSource.updateStats(e.getIndexingError().getSuccessCount().intValue(), e.getIndexingError().getFailedCount().intValue());
                        this.workflowStats.addFailure(String.format("Failed processing Data from %s", paginatedEntitiesSource.getName()));
                        updateWorkflowStats(paginatedEntitiesSource.getName(), paginatedEntitiesSource.getStats());
                    }
                } catch (Throwable th) {
                    updateWorkflowStats(paginatedEntitiesSource.getName(), paginatedEntitiesSource.getStats());
                    throw th;
                }
            }
        }
    }

    private void processEntity(ResultList<? extends EntityInterface> resultList, Map<String, Object> map, Source<?> source) throws SearchIndexException {
        if (resultList.getData().isEmpty()) {
            return;
        }
        this.searchIndexSink.write(this.entityProcessor.process(this.entityEnricher.process2(resultList, map), map), map);
        if (!resultList.getErrors().isEmpty()) {
            throw new SearchIndexException(new IndexingError().withErrorSource(IndexingError.ErrorSource.READER).withLastFailedCursor(source.getLastFailedCursor()).withSubmittedCount(Integer.valueOf(source.getBatchSize())).withSuccessCount(Integer.valueOf(resultList.getData().size())).withFailedCount(Integer.valueOf(resultList.getErrors().size())).withMessage("Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.").withFailedEntities(resultList.getErrors()));
        }
        source.updateStats(resultList.getData().size(), 0);
    }

    private void deleteDataBeforeInserting() throws SearchIndexException {
        try {
            this.searchRepository.getSearchClient().deleteByQuery("di-data-assets", String.format("{\"@timestamp\": {\"gte\": %s, \"lte\": %s}}", this.startTimestamp, this.endTimestamp));
        } catch (Exception e) {
            throw new SearchIndexException(new IndexingError().withMessage(e.getMessage()));
        }
    }

    private void updateWorkflowStats(String str, StepStats stepStats) {
        this.workflowStats.updateWorkflowStepStats(str, stepStats);
        this.workflowStats.updateWorkflowStats(this.workflowStats.getWorkflowStepStats().values().stream().mapToInt((v0) -> {
            return v0.getSuccessRecords();
        }).sum(), this.workflowStats.getWorkflowStepStats().values().stream().mapToInt((v0) -> {
            return v0.getFailedRecords();
        }).sum());
    }

    public WorkflowStats getWorkflowStats() {
        return this.workflowStats;
    }
}
