/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.apps.bundles.searchIndex;

import es.org.elasticsearch.action.bulk.BulkItemResponse;
import es.org.elasticsearch.action.bulk.BulkRequest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.AppRunType;
import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.app.SuccessContext;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.Failure;
import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.apps.AbstractNativeApplication;
import org.openmetadata.service.exception.ProcessorException;
import org.openmetadata.service.exception.SinkException;
import org.openmetadata.service.exception.SourceException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.search.elasticsearch.ElasticSearchDataInsightProcessor;
import org.openmetadata.service.search.elasticsearch.ElasticSearchEntitiesProcessor;
import org.openmetadata.service.search.elasticsearch.ElasticSearchIndexSink;
import org.openmetadata.service.search.models.IndexMapping;
import org.openmetadata.service.search.opensearch.OpenSearchDataInsightProcessor;
import org.openmetadata.service.search.opensearch.OpenSearchEntitiesProcessor;
import org.openmetadata.service.search.opensearch.OpenSearchIndexSink;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.workflows.interfaces.Processor;
import org.openmetadata.service.workflows.interfaces.Sink;
import org.openmetadata.service.workflows.searchIndex.PaginatedDataInsightSource;
import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource;
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import os.org.opensearch.action.bulk.BulkItemResponse;
import os.org.opensearch.action.bulk.BulkResponse;

public class SearchIndexApp
extends AbstractNativeApplication {
    private static final Logger LOG = LoggerFactory.getLogger(SearchIndexApp.class);
    private static final String ENTITY_TYPE_ERROR_MSG = "EntityType: %s %n Cause: %s %n Stack: %s";
    private final List<PaginatedEntitiesSource> paginatedEntitiesSources = new ArrayList<PaginatedEntitiesSource>();
    private final List<PaginatedDataInsightSource> paginatedDataInsightSources = new ArrayList<PaginatedDataInsightSource>();
    private Processor entityProcessor;
    private Processor dataInsightProcessor;
    private Sink searchIndexSink;
    EventPublisherJob jobData;
    private volatile boolean stopped = false;

    @Override
    public void init(App app, CollectionDAO dao, SearchRepository searchRepository) {
        super.init(app, dao, searchRepository);
        EventPublisherJob request = JsonUtils.convertValue(app.getAppConfiguration(), EventPublisherJob.class).withStats(new Stats()).withFailure(new Failure());
        int totalRecords = ReindexingUtil.getTotalRequestToProcess(request.getEntities(), this.collectionDAO);
        this.jobData = request;
        this.jobData.setStats(new Stats().withJobStats(new StepStats().withTotalRecords(Integer.valueOf(totalRecords)).withFailedRecords(Integer.valueOf(0)).withSuccessRecords(Integer.valueOf(0))));
        request.getEntities().forEach(entityType -> {
            if (!ReindexingUtil.isDataInsightIndex(entityType)) {
                List<String> fields = List.of("*");
                PaginatedEntitiesSource source = new PaginatedEntitiesSource((String)entityType, this.jobData.getBatchSize(), fields);
                if (!CommonUtil.nullOrEmpty((String)request.getAfterCursor())) {
                    source.setCursor(request.getAfterCursor());
                }
                this.paginatedEntitiesSources.add(source);
            } else {
                this.paginatedDataInsightSources.add(new PaginatedDataInsightSource(dao, (String)entityType, this.jobData.getBatchSize()));
            }
        });
        if (searchRepository.getSearchType().equals((Object)ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
            this.entityProcessor = new OpenSearchEntitiesProcessor(totalRecords);
            this.dataInsightProcessor = new OpenSearchDataInsightProcessor(totalRecords);
            this.searchIndexSink = new OpenSearchIndexSink(searchRepository, totalRecords);
        } else {
            this.entityProcessor = new ElasticSearchEntitiesProcessor(totalRecords);
            this.dataInsightProcessor = new ElasticSearchDataInsightProcessor(totalRecords);
            this.searchIndexSink = new ElasticSearchIndexSink(searchRepository, totalRecords);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void startApp(JobExecutionContext jobExecutionContext) {
        try {
            LOG.info("Executing Reindexing Job with JobData : {}", (Object)this.jobData);
            this.jobData.setStatus(EventPublisherJob.Status.RUNNING);
            AppRunType runType = AppRunType.fromValue((String)((String)jobExecutionContext.getJobDetail().getJobDataMap().get((Object)"triggerType")));
            if (runType.equals((Object)AppRunType.Scheduled)) {
                this.jobData.setRecreateIndex(Boolean.valueOf(false));
            }
            this.entitiesReIndex();
            this.dataInsightReindex();
            this.updateJobStatus();
        }
        catch (Exception ex) {
            String error = String.format("Reindexing Job Has Encountered an Exception. %n Job Data: %s, %n  Stack : %s ", this.jobData.toString(), ExceptionUtils.getStackTrace((Throwable)ex));
            LOG.error(error);
            this.jobData.setStatus(EventPublisherJob.Status.FAILED);
            this.handleJobError(error, System.currentTimeMillis());
        }
        finally {
            jobExecutionContext.getJobDetail().getJobDataMap().put("AppRunStats", (Object)this.jobData.getStats());
            this.updateRecordToDb(jobExecutionContext);
            this.sendUpdates();
        }
    }

    public void updateRecordToDb(JobExecutionContext jobExecutionContext) {
        AppRunRecord appRecord = this.getJobRecord(jobExecutionContext);
        appRecord.setStatus(AppRunRecord.Status.fromValue((String)this.jobData.getStatus().value()));
        if (this.jobData.getFailure() != null) {
            appRecord.setFailureContext(new FailureContext().withAdditionalProperty("failure", (Object)JsonUtils.pojoToJson(this.jobData.getFailure())));
        }
        if (this.jobData.getStats() != null) {
            appRecord.setSuccessContext(new SuccessContext().withAdditionalProperty("stats", (Object)this.jobData.getStats()));
        }
        this.pushAppStausUpdates(jobExecutionContext, appRecord, true);
    }

    private void entitiesReIndex() {
        HashMap<String, Object> contextData = new HashMap<String, Object>();
        for (PaginatedEntitiesSource paginatedEntitiesSource : this.paginatedEntitiesSources) {
            this.reCreateIndexes(paginatedEntitiesSource.getEntityType());
            contextData.put("entityType", paginatedEntitiesSource.getEntityType());
            while (!this.stopped && !paginatedEntitiesSource.isDone()) {
                long currentTime = System.currentTimeMillis();
                try {
                    BulkResponse response;
                    os.org.opensearch.action.bulk.BulkRequest requests;
                    Object resultList = paginatedEntitiesSource.readNext((Map)null);
                    if (((ResultList)resultList).getData().isEmpty()) continue;
                    if (this.searchRepository.getSearchType().equals((Object)ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
                        requests = (os.org.opensearch.action.bulk.BulkRequest)this.entityProcessor.process(resultList, contextData);
                        response = (BulkResponse)this.searchIndexSink.write(requests, contextData);
                        this.handleErrorsOs((ResultList<?>)resultList, paginatedEntitiesSource.getLastFailedCursor(), response, currentTime);
                        continue;
                    }
                    requests = (BulkRequest)this.entityProcessor.process(resultList, contextData);
                    response = (es.org.elasticsearch.action.bulk.BulkResponse)this.searchIndexSink.write(requests, contextData);
                    this.handleErrorsEs((ResultList<?>)resultList, paginatedEntitiesSource.getLastFailedCursor(), (es.org.elasticsearch.action.bulk.BulkResponse)response, currentTime);
                }
                catch (SourceException rx) {
                    this.handleSourceError(String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), rx.getCause(), ""), currentTime);
                }
                catch (ProcessorException px) {
                    this.handleProcessorError(String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), px.getCause(), ""), currentTime);
                }
                catch (SinkException wx) {
                    this.handleEsSinkError(String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), wx.getCause(), ""), currentTime);
                }
            }
            this.updateStats(paginatedEntitiesSource.getEntityType(), paginatedEntitiesSource.getStats());
            this.sendUpdates();
        }
    }

    private void dataInsightReindex() {
        HashMap<String, Object> contextData = new HashMap<String, Object>();
        for (PaginatedDataInsightSource paginatedDataInsightSource : this.paginatedDataInsightSources) {
            this.reCreateIndexes(paginatedDataInsightSource.getEntityType());
            contextData.put("entityType", paginatedDataInsightSource.getEntityType());
            while (!this.stopped && !paginatedDataInsightSource.isDone()) {
                long currentTime = System.currentTimeMillis();
                try {
                    BulkResponse response;
                    os.org.opensearch.action.bulk.BulkRequest requests;
                    Object resultList = paginatedDataInsightSource.readNext((Map)null);
                    if (((ResultList)resultList).getData().isEmpty()) continue;
                    if (this.searchRepository.getSearchType().equals((Object)ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
                        requests = (os.org.opensearch.action.bulk.BulkRequest)this.dataInsightProcessor.process(resultList, contextData);
                        response = (BulkResponse)this.searchIndexSink.write(requests, contextData);
                        this.handleErrorsOs((ResultList<?>)resultList, "", response, currentTime);
                        continue;
                    }
                    requests = (BulkRequest)this.dataInsightProcessor.process(resultList, contextData);
                    response = (es.org.elasticsearch.action.bulk.BulkResponse)this.searchIndexSink.write(requests, contextData);
                    this.handleErrorsEs((ResultList<?>)resultList, "", (es.org.elasticsearch.action.bulk.BulkResponse)response, currentTime);
                }
                catch (SourceException rx) {
                    this.handleSourceError(String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), rx.getCause(), ""), currentTime);
                }
                catch (ProcessorException px) {
                    this.handleProcessorError(String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), px.getCause(), ""), currentTime);
                }
                catch (SinkException wx) {
                    this.handleEsSinkError(String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), wx.getCause(), ""), currentTime);
                }
            }
            this.updateStats(paginatedDataInsightSource.getEntityType(), paginatedDataInsightSource.getStats());
            this.sendUpdates();
        }
    }

    private void sendUpdates() {
        try {
            WebSocketManager.getInstance().broadCastMessageToAll("jobStatus", JsonUtils.pojoToJson(this.jobData));
        }
        catch (Exception ex) {
            LOG.error("Failed to send updated stats with WebSocket", (Throwable)ex);
        }
    }

    public void updateStats(String entityType, StepStats currentEntityStats) {
        Stats jobDataStats = this.jobData.getStats();
        StepStats entityLevelStats = jobDataStats.getEntityStats();
        if (entityLevelStats == null) {
            entityLevelStats = new StepStats();
        }
        entityLevelStats.withAdditionalProperty(entityType, (Object)currentEntityStats);
        StepStats stats = this.jobData.getStats().getJobStats();
        if (stats == null) {
            stats = new StepStats().withTotalRecords(Integer.valueOf(ReindexingUtil.getTotalRequestToProcess(this.jobData.getEntities(), this.collectionDAO)));
        }
        ReindexingUtil.getUpdatedStats(stats, currentEntityStats.getSuccessRecords(), currentEntityStats.getFailedRecords());
        jobDataStats.setJobStats(stats);
        jobDataStats.setEntityStats(entityLevelStats);
        this.jobData.setStats(jobDataStats);
    }

    private void reCreateIndexes(String entityType) {
        if (Boolean.FALSE.equals(this.jobData.getRecreateIndex())) {
            return;
        }
        IndexMapping indexType = this.searchRepository.getIndexMapping(entityType);
        this.searchRepository.deleteIndex(indexType);
        this.searchRepository.createIndex(indexType);
    }

    private void handleErrorsOs(ResultList<?> data, String lastCursor, BulkResponse response, long time) {
        this.handleSourceError(data, lastCursor, time);
        this.handleOsSinkErrors(response, time);
    }

    private void handleErrorsEs(ResultList<?> data, String lastCursor, es.org.elasticsearch.action.bulk.BulkResponse response, long time) {
        this.handleSourceError(data, lastCursor, time);
        this.handleEsSinkErrors(response, time);
    }

    private void handleSourceError(String reason, long time) {
        this.handleError("source", reason, time);
    }

    private void handleProcessorError(String reason, long time) {
        this.handleError("processor", reason, time);
    }

    private void handleError(String errType, String reason, long time) {
        Failure failures = this.jobData.getFailure() != null ? this.jobData.getFailure() : new Failure();
        failures.withAdditionalProperty("errorFrom", (Object)errType);
        failures.withAdditionalProperty("lastFailedReason", (Object)reason);
        failures.withAdditionalProperty("lastFailedAt", (Object)time);
        this.jobData.setFailure(failures);
    }

    private void handleEsSinkError(String reason, long time) {
        this.handleError("sink", reason, time);
    }

    private void handleJobError(String reason, long time) {
        this.handleError("job", reason, time);
    }

    private void handleSourceError(ResultList<?> data, String lastCursor, long time) {
        if (!data.getErrors().isEmpty()) {
            StringBuilder builder = new StringBuilder();
            for (String str : data.getErrors()) {
                builder.append(str);
                builder.append("%n");
            }
            this.handleSourceError(String.format("SourceContext: After Cursor : %s, Encountered Error While Reading Data. Following Entities were not fetched Successfully : %s", lastCursor, builder), time);
        }
    }

    private void handleOsSinkErrors(BulkResponse response, long time) {
        ArrayList details = new ArrayList();
        for (os.org.opensearch.action.bulk.BulkItemResponse bulkItemResponse : response) {
            if (!bulkItemResponse.isFailed()) continue;
            HashMap<String, Object> detailsMap = new HashMap<String, Object>();
            BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
            detailsMap.put("context", String.format("EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ", failure.getId()));
            detailsMap.put("lastFailedReason", String.format("Index Type: [%s], Reason: [%s] %n Trace : [%s]", failure.getIndex(), failure.getMessage(), ExceptionUtils.getStackTrace((Throwable)failure.getCause())));
            detailsMap.put("lastFailedAt", System.currentTimeMillis());
            details.add(detailsMap);
        }
        if (!details.isEmpty()) {
            this.handleEsSinkError(String.format("[EsWriter][BulkItemResponse] Got Following Error Responses: %n %s ", JsonUtils.pojoToJson(details, true)), time);
        }
    }

    private void handleEsSinkErrors(es.org.elasticsearch.action.bulk.BulkResponse response, long time) {
        ArrayList details = new ArrayList();
        for (BulkItemResponse bulkItemResponse : response) {
            if (!bulkItemResponse.isFailed()) continue;
            HashMap<String, Object> detailsMap = new HashMap<String, Object>();
            BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
            detailsMap.put("context", String.format("EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ", failure.getId()));
            detailsMap.put("lastFailedReason", String.format("Index Type: [%s], Reason: [%s] %n Trace : [%s]", failure.getIndex(), failure.getMessage(), ExceptionUtils.getStackTrace((Throwable)failure.getCause())));
            detailsMap.put("lastFailedAt", System.currentTimeMillis());
            details.add(detailsMap);
        }
        if (!details.isEmpty()) {
            this.handleEsSinkError(String.format("[EsWriter][BulkItemResponse] Got Following Error Responses: %s ", JsonUtils.pojoToJson(details, true)), time);
        }
    }

    private void updateJobStatus() {
        if (this.stopped) {
            this.jobData.setStatus(EventPublisherJob.Status.STOPPED);
        } else if (this.jobData.getFailure() != null && !this.jobData.getFailure().getAdditionalProperties().isEmpty()) {
            this.jobData.setStatus(EventPublisherJob.Status.FAILED);
        } else {
            this.jobData.setStatus(EventPublisherJob.Status.COMPLETED);
        }
    }

    public void stopJob() {
        this.stopped = true;
    }

    public EventPublisherJob getJobData() {
        return this.jobData;
    }
}

