/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.apps.bundles.searchIndex;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.EntityTimeSeriesInterface;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.app.SuccessContext;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.apps.AbstractNativeApplication;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.search.elasticsearch.ElasticSearchEntitiesProcessor;
import org.openmetadata.service.search.elasticsearch.ElasticSearchEntityTimeSeriesProcessor;
import org.openmetadata.service.search.elasticsearch.ElasticSearchIndexSink;
import org.openmetadata.service.search.models.IndexMapping;
import org.openmetadata.service.search.opensearch.OpenSearchEntitiesProcessor;
import org.openmetadata.service.search.opensearch.OpenSearchEntityTimeSeriesProcessor;
import org.openmetadata.service.search.opensearch.OpenSearchIndexSink;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.workflows.interfaces.Processor;
import org.openmetadata.service.workflows.interfaces.Sink;
import org.openmetadata.service.workflows.interfaces.Source;
import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource;
import org.openmetadata.service.workflows.searchIndex.PaginatedEntityTimeSeriesSource;
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SearchIndexApp
extends AbstractNativeApplication {
    private static final Logger LOG = LoggerFactory.getLogger(SearchIndexApp.class);
    private static final String ALL = "all";
    private static final Set<String> ALL_ENTITIES = Set.of("table", "dashboard", "topic", "pipeline", "ingestionPipeline", "searchIndex", "user", "team", "glossary", "glossaryTerm", "mlmodel", "tag", "classification", "query", "container", "database", "databaseSchema", "testCase", "testSuite", "chart", "dashboardDataModel", "databaseService", "messagingService", "dashboardService", "pipelineService", "mlmodelService", "searchService", "entityReportData", "webAnalyticEntityViewReportData", "webAnalyticUserActivityReportData", "domain", "storedProcedure", "storageService", "testCaseResolutionStatus", "apiService", "apiEndpoint", "apiCollection");
    public static final Set<String> TIME_SERIES_ENTITIES = Set.of(ReportData.ReportDataType.ENTITY_REPORT_DATA.value(), ReportData.ReportDataType.RAW_COST_ANALYSIS_REPORT_DATA.value(), ReportData.ReportDataType.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA.value(), ReportData.ReportDataType.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA.value(), ReportData.ReportDataType.AGGREGATED_COST_ANALYSIS_REPORT_DATA.value(), "testCaseResolutionStatus");
    private final List<Source> paginatedSources = new ArrayList<Source>();
    private Processor entityProcessor;
    private Processor entityTimeSeriesProcessor;
    private Sink searchIndexSink;
    EventPublisherJob jobData;
    private volatile boolean stopped = false;

    public SearchIndexApp(CollectionDAO collectionDAO, SearchRepository searchRepository) {
        super(collectionDAO, searchRepository);
    }

    @Override
    public void init(App app) {
        super.init(app);
        EventPublisherJob request = JsonUtils.convertValue(app.getAppConfiguration(), EventPublisherJob.class).withStats(new Stats());
        if (request.getEntities().contains(ALL)) {
            request.setEntities(ALL_ENTITIES);
        }
        this.jobData = request;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void startApp(JobExecutionContext jobExecutionContext) {
        try {
            this.initializeJob();
            LOG.info("Executing Reindexing Job with JobData : {}", (Object)this.jobData);
            this.jobData.setStatus(EventPublisherJob.Status.RUNNING);
            String runType = (String)jobExecutionContext.getJobDetail().getJobDataMap().get((Object)"triggerType");
            if (!runType.equals("OnDemandJob")) {
                this.jobData.setRecreateIndex(Boolean.valueOf(false));
            }
            this.performReindex(jobExecutionContext);
        }
        catch (Exception ex) {
            IndexingError indexingError = new IndexingError().withErrorSource(IndexingError.ErrorSource.JOB).withMessage(String.format("Reindexing Job Has Encountered an Exception. %n Job Data: %s, %n  Stack : %s ", this.jobData.toString(), ExceptionUtils.getStackTrace((Throwable)ex)));
            LOG.error(indexingError.getMessage());
            this.jobData.setStatus(EventPublisherJob.Status.RUNNING);
            this.jobData.setFailure(indexingError);
        }
        finally {
            this.sendUpdates(jobExecutionContext);
        }
    }

    private void initializeJob() {
        int totalRecords = ReindexingUtil.getTotalRequestToProcess(this.jobData.getEntities(), this.collectionDAO);
        this.jobData.setStats(new Stats().withJobStats(new StepStats().withTotalRecords(Integer.valueOf(totalRecords)).withFailedRecords(Integer.valueOf(0)).withSuccessRecords(Integer.valueOf(0))));
        this.jobData.getEntities().forEach(entityType -> {
            if (!TIME_SERIES_ENTITIES.contains(entityType)) {
                List<String> fields = List.of("*");
                PaginatedEntitiesSource source = new PaginatedEntitiesSource((String)entityType, this.jobData.getBatchSize(), fields);
                if (!CommonUtil.nullOrEmpty((String)this.jobData.getAfterCursor())) {
                    source.setCursor(this.jobData.getAfterCursor());
                }
                this.paginatedSources.add(source);
            } else {
                PaginatedEntityTimeSeriesSource source = new PaginatedEntityTimeSeriesSource((String)entityType, this.jobData.getBatchSize(), List.of("*"));
                if (!CommonUtil.nullOrEmpty((String)this.jobData.getAfterCursor())) {
                    source.setCursor(this.jobData.getAfterCursor());
                }
                this.paginatedSources.add(source);
            }
        });
        if (this.searchRepository.getSearchType().equals((Object)ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
            this.entityProcessor = new OpenSearchEntitiesProcessor(totalRecords);
            this.entityTimeSeriesProcessor = new OpenSearchEntityTimeSeriesProcessor(totalRecords);
            this.searchIndexSink = new OpenSearchIndexSink(this.searchRepository, totalRecords, this.jobData.getPayLoadSize());
        } else {
            this.entityProcessor = new ElasticSearchEntitiesProcessor(totalRecords);
            this.entityTimeSeriesProcessor = new ElasticSearchEntityTimeSeriesProcessor(totalRecords);
            this.searchIndexSink = new ElasticSearchIndexSink(this.searchRepository, totalRecords, this.jobData.getPayLoadSize());
        }
    }

    public void updateRecordToDb(JobExecutionContext jobExecutionContext) {
        AppRunRecord appRecord = this.getJobRecord(jobExecutionContext);
        appRecord.setStatus(AppRunRecord.Status.fromValue((String)this.jobData.getStatus().value()));
        if (this.jobData.getFailure() != null) {
            appRecord.setFailureContext(new FailureContext().withAdditionalProperty("failure", (Object)this.jobData.getFailure()));
        }
        if (this.jobData.getStats() != null) {
            appRecord.setSuccessContext(new SuccessContext().withAdditionalProperty("stats", (Object)this.jobData.getStats()));
        }
        this.pushAppStatusUpdates(jobExecutionContext, appRecord, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void performReindex(JobExecutionContext jobExecutionContext) throws SearchIndexException {
        HashMap<String, Object> contextData = new HashMap<String, Object>();
        for (Source paginatedSource : this.paginatedSources) {
            List<Object> entityName = new ArrayList();
            this.reCreateIndexes(paginatedSource.getEntityType());
            contextData.put("entityType", paginatedSource.getEntityType());
            while (!this.stopped && !paginatedSource.isDone()) {
                try {
                    Object resultList = paginatedSource.readNext(null);
                    if (!TIME_SERIES_ENTITIES.contains(paginatedSource.getEntityType())) {
                        entityName = this.getEntityNameFromEntity((ResultList)resultList, paginatedSource.getEntityType());
                        contextData.put("entityNameList", entityName);
                        this.processEntity((ResultList)resultList, contextData, paginatedSource);
                        continue;
                    }
                    entityName = this.getEntityNameFromEntityTimeSeries((ResultList)resultList, paginatedSource.getEntityType());
                    contextData.put("entityNameList", entityName);
                    this.processEntityTimeSeries((ResultList)resultList, contextData, paginatedSource);
                }
                catch (SearchIndexException rx) {
                    this.jobData.setStatus(EventPublisherJob.Status.RUNNING);
                    this.jobData.setFailure(rx.getIndexingError());
                    paginatedSource.updateStats(rx.getIndexingError().getSuccessCount(), rx.getIndexingError().getFailedCount());
                }
                finally {
                    this.updateStats(paginatedSource.getEntityType(), paginatedSource.getStats());
                    this.sendUpdates(jobExecutionContext);
                }
            }
        }
    }

    private List<String> getEntityNameFromEntity(ResultList<? extends EntityInterface> resultList, String entityType) {
        return resultList.getData().stream().map(entity -> String.format("%s %s", entityType, entity.getId())).toList();
    }

    private List<String> getEntityNameFromEntityTimeSeries(ResultList<? extends EntityTimeSeriesInterface> resultList, String entityType) {
        return resultList.getData().stream().map(entity -> String.format("%s %s", entityType, entity.getId())).toList();
    }

    private void processEntity(ResultList<? extends EntityInterface> resultList, Map<String, Object> contextData, Source paginatedSource) throws SearchIndexException {
        if (!resultList.getData().isEmpty()) {
            this.searchIndexSink.write(this.entityProcessor.process(resultList, contextData), contextData);
            if (!resultList.getErrors().isEmpty()) {
                throw new SearchIndexException(new IndexingError().withErrorSource(IndexingError.ErrorSource.READER).withLastFailedCursor(paginatedSource.getLastFailedCursor()).withSubmittedCount(Integer.valueOf(paginatedSource.getBatchSize())).withSuccessCount(Integer.valueOf(resultList.getData().size())).withFailedCount(Integer.valueOf(resultList.getErrors().size())).withMessage("Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.").withFailedEntities(resultList.getErrors()));
            }
            paginatedSource.updateStats(resultList.getData().size(), 0);
        }
    }

    private void processEntityTimeSeries(ResultList<? extends EntityTimeSeriesInterface> resultList, Map<String, Object> contextData, Source paginatedSource) throws SearchIndexException {
        if (!resultList.getData().isEmpty()) {
            this.searchIndexSink.write(this.entityTimeSeriesProcessor.process(resultList, contextData), contextData);
            if (!resultList.getErrors().isEmpty()) {
                throw new SearchIndexException(new IndexingError().withErrorSource(IndexingError.ErrorSource.READER).withLastFailedCursor(paginatedSource.getLastFailedCursor()).withSubmittedCount(Integer.valueOf(paginatedSource.getBatchSize())).withSuccessCount(Integer.valueOf(resultList.getData().size())).withFailedCount(Integer.valueOf(resultList.getErrors().size())).withMessage("Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.").withFailedEntities(resultList.getErrors()));
            }
            paginatedSource.updateStats(resultList.getData().size(), 0);
        }
    }

    private void sendUpdates(JobExecutionContext jobExecutionContext) {
        try {
            jobExecutionContext.getJobDetail().getJobDataMap().put("AppRunStats", (Object)this.jobData.getStats());
            this.updateRecordToDb(jobExecutionContext);
            if (WebSocketManager.getInstance() != null) {
                WebSocketManager.getInstance().broadCastMessageToAll("jobStatus", JsonUtils.pojoToJson(this.jobData));
            }
        }
        catch (Exception ex) {
            LOG.error("Failed to send updated stats with WebSocket", (Throwable)ex);
        }
    }

    public void updateStats(String entityType, StepStats currentEntityStats) {
        Stats jobDataStats = this.jobData.getStats();
        StepStats entityLevelStats = jobDataStats.getEntityStats();
        if (entityLevelStats == null) {
            entityLevelStats = new StepStats().withTotalRecords(null).withFailedRecords(null).withSuccessRecords(null);
        }
        entityLevelStats.withAdditionalProperty(entityType, (Object)currentEntityStats);
        StepStats stats = this.jobData.getStats().getJobStats();
        if (stats == null) {
            stats = new StepStats().withTotalRecords(Integer.valueOf(ReindexingUtil.getTotalRequestToProcess(this.jobData.getEntities(), this.collectionDAO)));
        }
        stats.setSuccessRecords(Integer.valueOf(entityLevelStats.getAdditionalProperties().values().stream().map(s -> (StepStats)s).mapToInt(StepStats::getSuccessRecords).sum()));
        stats.setFailedRecords(Integer.valueOf(entityLevelStats.getAdditionalProperties().values().stream().map(s -> (StepStats)s).mapToInt(StepStats::getFailedRecords).sum()));
        jobDataStats.setJobStats(stats);
        jobDataStats.setEntityStats(entityLevelStats);
        this.jobData.setStats(jobDataStats);
    }

    private void reCreateIndexes(String entityType) {
        if (Boolean.FALSE.equals(this.jobData.getRecreateIndex())) {
            return;
        }
        IndexMapping indexType = this.searchRepository.getIndexMapping(entityType);
        this.searchRepository.deleteIndex(indexType);
        this.searchRepository.createIndex(indexType);
    }

    public void stopJob() {
        this.stopped = true;
    }

    public EventPublisherJob getJobData() {
        return this.jobData;
    }
}

