package org.openmetadata.service.apps.bundles.searchIndex;

import es.org.elasticsearch.action.bulk.BulkItemResponse;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.csv.EntityCsv;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.app.SuccessContext;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.Failure;
import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.apps.AbstractNativeApplication;
import org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener;
import org.openmetadata.service.exception.ProcessorException;
import org.openmetadata.service.exception.SinkException;
import org.openmetadata.service.exception.SourceException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.search.elasticsearch.ElasticSearchDataInsightProcessor;
import org.openmetadata.service.search.elasticsearch.ElasticSearchEntitiesProcessor;
import org.openmetadata.service.search.elasticsearch.ElasticSearchIndexSink;
import org.openmetadata.service.search.models.IndexMapping;
import org.openmetadata.service.search.opensearch.OpenSearchDataInsightProcessor;
import org.openmetadata.service.search.opensearch.OpenSearchEntitiesProcessor;
import org.openmetadata.service.search.opensearch.OpenSearchIndexSink;
import org.openmetadata.service.security.auth.BotTokenCache;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.workflows.interfaces.Processor;
import org.openmetadata.service.workflows.interfaces.Sink;
import org.openmetadata.service.workflows.searchIndex.PaginatedDataInsightSource;
import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource;
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import os.org.opensearch.action.bulk.BulkItemResponse;
import os.org.opensearch.action.bulk.BulkRequest;
import os.org.opensearch.action.bulk.BulkResponse;

/* loaded from: input_file:org/openmetadata/service/apps/bundles/searchIndex/SearchIndexApp.class */
public class SearchIndexApp extends AbstractNativeApplication {
    private static final Logger LOG = LoggerFactory.getLogger(SearchIndexApp.class);
    private static final String ENTITY_TYPE_ERROR_MSG = "EntityType: %s %n Cause: %s %n Stack: %s";
    private Processor entityProcessor;
    private Processor dataInsightProcessor;
    private Sink searchIndexSink;
    EventPublisherJob jobData;
    private final List<PaginatedEntitiesSource> paginatedEntitiesSources = new ArrayList();
    private final List<PaginatedDataInsightSource> paginatedDataInsightSources = new ArrayList();
    private volatile boolean stopped = false;

    @Override // org.openmetadata.service.apps.AbstractNativeApplication, org.openmetadata.service.apps.NativeApplication
    public void init(App app, CollectionDAO collectionDAO, SearchRepository searchRepository) {
        super.init(app, collectionDAO, searchRepository);
        EventPublisherJob withFailure = ((EventPublisherJob) JsonUtils.convertValue(app.getAppConfiguration(), EventPublisherJob.class)).withStats(new Stats()).withFailure(new Failure());
        int totalRequestToProcess = ReindexingUtil.getTotalRequestToProcess(withFailure.getEntities(), this.collectionDAO);
        this.jobData = withFailure;
        this.jobData.setStats(new Stats().withJobStats(new StepStats().withTotalRecords(Integer.valueOf(totalRequestToProcess)).withFailedRecords(0).withSuccessRecords(0)));
        withFailure.getEntities().forEach(str -> {
            if (ReindexingUtil.isDataInsightIndex(str)) {
                this.paginatedDataInsightSources.add(new PaginatedDataInsightSource(collectionDAO, str, this.jobData.getBatchSize().intValue()));
                return;
            }
            PaginatedEntitiesSource paginatedEntitiesSource = new PaginatedEntitiesSource(str, this.jobData.getBatchSize().intValue(), List.of("*"));
            if (!CommonUtil.nullOrEmpty(withFailure.getAfterCursor())) {
                paginatedEntitiesSource.setCursor(withFailure.getAfterCursor());
            }
            this.paginatedEntitiesSources.add(paginatedEntitiesSource);
        });
        if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
            this.entityProcessor = new OpenSearchEntitiesProcessor(totalRequestToProcess);
            this.dataInsightProcessor = new OpenSearchDataInsightProcessor(totalRequestToProcess);
            this.searchIndexSink = new OpenSearchIndexSink(searchRepository, totalRequestToProcess);
        } else {
            this.entityProcessor = new ElasticSearchEntitiesProcessor(totalRequestToProcess);
            this.dataInsightProcessor = new ElasticSearchDataInsightProcessor(totalRequestToProcess);
            this.searchIndexSink = new ElasticSearchIndexSink(searchRepository, totalRequestToProcess);
        }
    }

    @Override // org.openmetadata.service.apps.NativeApplication
    public void startApp(JobExecutionContext jobExecutionContext) {
        try {
            try {
                LOG.info("Executing Reindexing Job with JobData : {}", this.jobData);
                this.jobData.setStatus(EventPublisherJob.Status.RUNNING);
                entitiesReIndex();
                dataInsightReindex();
                updateJobStatus();
                jobExecutionContext.getJobDetail().getJobDataMap().put(AbstractOmAppJobListener.APP_RUN_STATS, this.jobData.getStats());
                updateRecordToDb(jobExecutionContext);
                sendUpdates();
            } catch (Exception e) {
                String format = String.format("Reindexing Job Has Encountered an Exception. %n Job Data: %s, %n  Stack : %s ", this.jobData.toString(), ExceptionUtils.getStackTrace(e));
                LOG.error(format);
                this.jobData.setStatus(EventPublisherJob.Status.FAILED);
                handleJobError("Failure in Job: Check Stack", format, System.currentTimeMillis());
                jobExecutionContext.getJobDetail().getJobDataMap().put(AbstractOmAppJobListener.APP_RUN_STATS, this.jobData.getStats());
                updateRecordToDb(jobExecutionContext);
                sendUpdates();
            }
        } catch (Throwable th) {
            jobExecutionContext.getJobDetail().getJobDataMap().put(AbstractOmAppJobListener.APP_RUN_STATS, this.jobData.getStats());
            updateRecordToDb(jobExecutionContext);
            sendUpdates();
            throw th;
        }
    }

    public void updateRecordToDb(JobExecutionContext jobExecutionContext) {
        AppRunRecord jobRecord = getJobRecord(jobExecutionContext);
        jobRecord.setStatus(AppRunRecord.Status.fromValue(this.jobData.getStatus().value()));
        if (this.jobData.getFailure() != null) {
            jobRecord.setFailureContext(new FailureContext().withAdditionalProperty(EntityCsv.IMPORT_STATUS_FAILED, JsonUtils.pojoToJson(this.jobData.getFailure())));
        }
        if (this.jobData.getStats() != null) {
            jobRecord.setSuccessContext(new SuccessContext().withAdditionalProperty("stats", this.jobData.getStats()));
        }
        pushAppStausUpdates(jobExecutionContext, jobRecord, true);
    }

    private void entitiesReIndex() {
        HashMap hashMap = new HashMap();
        for (PaginatedEntitiesSource paginatedEntitiesSource : this.paginatedEntitiesSources) {
            reCreateIndexes(paginatedEntitiesSource.getEntityType());
            hashMap.put("entityType", paginatedEntitiesSource.getEntityType());
            while (!this.stopped && !paginatedEntitiesSource.isDone()) {
                long currentTimeMillis = System.currentTimeMillis();
                try {
                    ResultList<? extends EntityInterface> readNext = paginatedEntitiesSource.readNext((Map<String, Object>) null);
                    if (!readNext.getData().isEmpty()) {
                        if (this.searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
                            handleErrorsOs(readNext, paginatedEntitiesSource.getLastFailedCursor(), (BulkResponse) this.searchIndexSink.write((BulkRequest) this.entityProcessor.process(readNext, hashMap), hashMap), currentTimeMillis);
                        } else {
                            handleErrorsEs(readNext, paginatedEntitiesSource.getLastFailedCursor(), (es.org.elasticsearch.action.bulk.BulkResponse) this.searchIndexSink.write((es.org.elasticsearch.action.bulk.BulkRequest) this.entityProcessor.process(readNext, hashMap), hashMap), currentTimeMillis);
                        }
                    }
                } catch (ProcessorException e) {
                    handleProcessorError(e.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), e.getCause(), ExceptionUtils.getStackTrace(e)), currentTimeMillis);
                } catch (SinkException e2) {
                    handleEsSinkError(e2.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), e2.getCause(), ExceptionUtils.getStackTrace(e2)), currentTimeMillis);
                } catch (SourceException e3) {
                    handleSourceError(e3.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), e3.getCause(), ExceptionUtils.getStackTrace(e3)), currentTimeMillis);
                }
            }
            updateStats(paginatedEntitiesSource.getEntityType(), paginatedEntitiesSource.getStats());
            sendUpdates();
        }
    }

    private void dataInsightReindex() {
        HashMap hashMap = new HashMap();
        for (PaginatedDataInsightSource paginatedDataInsightSource : this.paginatedDataInsightSources) {
            reCreateIndexes(paginatedDataInsightSource.getEntityType());
            hashMap.put("entityType", paginatedDataInsightSource.getEntityType());
            while (!this.stopped && !paginatedDataInsightSource.isDone()) {
                long currentTimeMillis = System.currentTimeMillis();
                try {
                    ResultList<ReportData> readNext = paginatedDataInsightSource.readNext((Map<String, Object>) null);
                    if (!readNext.getData().isEmpty()) {
                        if (this.searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
                            handleErrorsOs(readNext, BotTokenCache.EMPTY_STRING, (BulkResponse) this.searchIndexSink.write((BulkRequest) this.dataInsightProcessor.process(readNext, hashMap), hashMap), currentTimeMillis);
                        } else {
                            handleErrorsEs(readNext, BotTokenCache.EMPTY_STRING, (es.org.elasticsearch.action.bulk.BulkResponse) this.searchIndexSink.write((es.org.elasticsearch.action.bulk.BulkRequest) this.dataInsightProcessor.process(readNext, hashMap), hashMap), currentTimeMillis);
                        }
                    }
                } catch (ProcessorException e) {
                    handleProcessorError(e.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), e.getCause(), ExceptionUtils.getStackTrace(e)), currentTimeMillis);
                } catch (SinkException e2) {
                    handleEsSinkError(e2.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), e2.getCause(), ExceptionUtils.getStackTrace(e2)), currentTimeMillis);
                } catch (SourceException e3) {
                    handleSourceError(e3.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), e3.getCause(), ExceptionUtils.getStackTrace(e3)), currentTimeMillis);
                }
            }
            updateStats(paginatedDataInsightSource.getEntityType(), paginatedDataInsightSource.getStats());
            sendUpdates();
        }
    }

    private void sendUpdates() {
        try {
            WebSocketManager.getInstance().broadCastMessageToAll(WebSocketManager.JOB_STATUS_BROADCAST_CHANNEL, JsonUtils.pojoToJson(this.jobData));
        } catch (Exception e) {
            LOG.error("Failed to send updated stats with WebSocket", e);
        }
    }

    public void updateStats(String str, StepStats stepStats) {
        Stats stats = this.jobData.getStats();
        StepStats entityStats = stats.getEntityStats();
        if (entityStats == null) {
            entityStats = new StepStats();
        }
        entityStats.withAdditionalProperty(str, stepStats);
        StepStats jobStats = this.jobData.getStats().getJobStats();
        if (jobStats == null) {
            jobStats = new StepStats().withTotalRecords(Integer.valueOf(ReindexingUtil.getTotalRequestToProcess(this.jobData.getEntities(), this.collectionDAO)));
        }
        ReindexingUtil.getUpdatedStats(jobStats, stepStats.getSuccessRecords().intValue(), stepStats.getFailedRecords().intValue());
        stats.setJobStats(jobStats);
        stats.setEntityStats(entityStats);
        this.jobData.setStats(stats);
    }

    private void reCreateIndexes(String str) {
        if (Boolean.FALSE.equals(this.jobData.getRecreateIndex())) {
            return;
        }
        IndexMapping indexMapping = this.searchRepository.getIndexMapping(str);
        this.searchRepository.deleteIndex(indexMapping);
        this.searchRepository.createIndex(indexMapping);
    }

    private void handleErrorsOs(ResultList<?> resultList, String str, BulkResponse bulkResponse, long j) {
        handleSourceError(resultList, str, j);
        handleOsSinkErrors(bulkResponse, j);
    }

    private void handleErrorsEs(ResultList<?> resultList, String str, es.org.elasticsearch.action.bulk.BulkResponse bulkResponse, long j) {
        handleSourceError(resultList, str, j);
        handleEsSinkErrors(bulkResponse, j);
    }

    private void handleSourceError(String str, String str2, long j) {
        handleError("source", str, str2, j);
    }

    private void handleProcessorError(String str, String str2, long j) {
        handleError("processor", str, str2, j);
    }

    private void handleError(String str, String str2, String str3, long j) {
        Failure failure = this.jobData.getFailure() != null ? this.jobData.getFailure() : new Failure();
        failure.withAdditionalProperty("errorFrom", str);
        failure.withAdditionalProperty("context", str2);
        failure.withAdditionalProperty("lastFailedReason", str3);
        failure.withAdditionalProperty("lastFailedAt", Long.valueOf(j));
        this.jobData.setFailure(failure);
    }

    private void handleEsSinkError(String str, String str2, long j) {
        handleError("sink", str, str2, j);
    }

    private void handleJobError(String str, String str2, long j) {
        handleError("job", str, str2, j);
    }

    private void handleSourceError(ResultList<?> resultList, String str, long j) {
        if (!resultList.getErrors().isEmpty()) {
            StringBuilder sb = new StringBuilder();
            Iterator<String> it = resultList.getErrors().iterator();
            while (it.hasNext()) {
                sb.append(it.next());
                sb.append("%n");
            }
            handleSourceError(String.format("SourceContext: After Cursor : %s, Encountered Error While Reading Data.", str), String.format("Following Entities were not fetched Successfully : %s", sb), j);
        }
    }

    private void handleOsSinkErrors(BulkResponse bulkResponse, long j) {
        ArrayList arrayList = new ArrayList();
        Iterator it = bulkResponse.iterator();
        while (it.hasNext()) {
            BulkItemResponse bulkItemResponse = (BulkItemResponse) it.next();
            if (bulkItemResponse.isFailed()) {
                HashMap hashMap = new HashMap();
                BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                hashMap.put("context", String.format("EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ", failure.getId()));
                hashMap.put("lastFailedReason", String.format("Index Type: [%s], Reason: [%s] %n Trace : [%s]", failure.getIndex(), failure.getMessage(), ExceptionUtils.getStackTrace(failure.getCause())));
                hashMap.put("lastFailedAt", Long.valueOf(System.currentTimeMillis()));
                arrayList.add(hashMap);
            }
        }
        if (!arrayList.isEmpty()) {
            handleEsSinkError("[EsWriter] BulkResponseItems", String.format("[BulkItemResponse] Got Following Error Responses: %n %s ", JsonUtils.pojoToJson(arrayList)), j);
        }
    }

    private void handleEsSinkErrors(es.org.elasticsearch.action.bulk.BulkResponse bulkResponse, long j) {
        ArrayList arrayList = new ArrayList();
        Iterator it = bulkResponse.iterator();
        while (it.hasNext()) {
            es.org.elasticsearch.action.bulk.BulkItemResponse bulkItemResponse = (es.org.elasticsearch.action.bulk.BulkItemResponse) it.next();
            if (bulkItemResponse.isFailed()) {
                HashMap hashMap = new HashMap();
                BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                hashMap.put("context", String.format("EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ", failure.getId()));
                hashMap.put("lastFailedReason", String.format("Index Type: [%s], Reason: [%s] %n Trace : [%s]", failure.getIndex(), failure.getMessage(), ExceptionUtils.getStackTrace(failure.getCause())));
                hashMap.put("lastFailedAt", Long.valueOf(System.currentTimeMillis()));
                arrayList.add(hashMap);
            }
        }
        if (!arrayList.isEmpty()) {
            handleEsSinkError("[EsWriter] BulkResponseItems", String.format("[BulkItemResponse] Got Following Error Responses: %n %s ", JsonUtils.pojoToJson(arrayList)), j);
        }
    }

    private void updateJobStatus() {
        if (this.stopped) {
            this.jobData.setStatus(EventPublisherJob.Status.STOPPED);
        } else if (this.jobData.getFailure() == null || this.jobData.getFailure().getAdditionalProperties().isEmpty()) {
            this.jobData.setStatus(EventPublisherJob.Status.COMPLETED);
        } else {
            this.jobData.setStatus(EventPublisherJob.Status.FAILED);
        }
    }

    public void stopJob() {
        this.stopped = true;
    }

    public EventPublisherJob getJobData() {
        return this.jobData;
    }
}
