package org.openmetadata.service.workflows.searchIndex;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.Failure;
import org.openmetadata.schema.system.FailureDetails;
import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition;
import org.openmetadata.service.exception.ProcessorException;
import org.openmetadata.service.exception.SinkException;
import org.openmetadata.service.exception.SourceException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.security.auth.BotTokenCache;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.ReIndexingHandler;
import org.openmetadata.service.util.ResultList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/openmetadata/service/workflows/searchIndex/SearchIndexWorkflow.class */
public class SearchIndexWorkflow implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(SearchIndexWorkflow.class);
    private static final String ENTITY_TYPE_ERROR_MSG = "EntityType: %s %n Cause: %s %n Stack: %s";
    private final EsEntitiesProcessor entitiesProcessor;
    private final EsDataInsightProcessor dataInsightProcessor;
    private final EsSearchIndexSink searchIndexSink;
    private final ElasticSearchIndexDefinition elasticSearchIndexDefinition;
    private final EventPublisherJob jobData;
    private final CollectionDAO dao;
    private final List<PaginatedEntitiesSource> paginatedEntitiesSources = new ArrayList();
    private final List<PaginatedDataInsightSource> paginatedDataInsightSources = new ArrayList();
    private volatile boolean stopped = false;

    public SearchIndexWorkflow(CollectionDAO collectionDAO, ElasticSearchIndexDefinition elasticSearchIndexDefinition, RestHighLevelClient restHighLevelClient, EventPublisherJob eventPublisherJob) {
        this.dao = collectionDAO;
        this.jobData = eventPublisherJob;
        eventPublisherJob.getEntities().forEach(str -> {
            if (ReindexingUtil.isDataInsightIndex(str)) {
                this.paginatedDataInsightSources.add(new PaginatedDataInsightSource(collectionDAO, str, this.jobData.getBatchSize().intValue()));
                return;
            }
            PaginatedEntitiesSource paginatedEntitiesSource = new PaginatedEntitiesSource(str, this.jobData.getBatchSize().intValue(), new ArrayList((Collection) Objects.requireNonNull(ElasticSearchIndexDefinition.getIndexFields(str, this.jobData.getSearchIndexMappingLanguage()))));
            if (!CommonUtil.nullOrEmpty(eventPublisherJob.getAfterCursor())) {
                paginatedEntitiesSource.setCursor(eventPublisherJob.getAfterCursor());
            }
            this.paginatedEntitiesSources.add(paginatedEntitiesSource);
        });
        this.entitiesProcessor = new EsEntitiesProcessor();
        this.dataInsightProcessor = new EsDataInsightProcessor();
        this.searchIndexSink = new EsSearchIndexSink(restHighLevelClient);
        this.elasticSearchIndexDefinition = elasticSearchIndexDefinition;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                LOG.info("Executing Reindexing Job with JobData : {}", this.jobData);
                this.jobData.setStatus(EventPublisherJob.Status.RUNNING);
                entitiesReIndex();
                dataInsightReindex();
                updateJobStatus();
                this.jobData.setEndTime(Long.valueOf(System.currentTimeMillis()));
                updateRecordToDb();
                sendUpdates();
                ReIndexingHandler.getInstance().removeCompletedJob(this.jobData.getId());
            } catch (Exception e) {
                String format = String.format("Reindexing Job Has Encountered an Exception. %n Job Data: %s, %n  Stack : %s ", this.jobData.toString(), ExceptionUtils.getStackTrace(e));
                LOG.error(format);
                this.jobData.setStatus(EventPublisherJob.Status.FAILED);
                handleJobError("Failure in Job: Check Stack", format, System.currentTimeMillis());
                updateRecordToDb();
                sendUpdates();
                ReIndexingHandler.getInstance().removeCompletedJob(this.jobData.getId());
            }
        } catch (Throwable th) {
            updateRecordToDb();
            sendUpdates();
            ReIndexingHandler.getInstance().removeCompletedJob(this.jobData.getId());
            throw th;
        }
    }

    private void entitiesReIndex() {
        int i;
        HashMap hashMap = new HashMap();
        for (PaginatedEntitiesSource paginatedEntitiesSource : this.paginatedEntitiesSources) {
            reCreateIndexes(paginatedEntitiesSource.getEntityType());
            hashMap.put(ReindexingUtil.ENTITY_TYPE_KEY, paginatedEntitiesSource.getEntityType());
            while (!this.stopped && !paginatedEntitiesSource.isDone()) {
                long currentTimeMillis = System.currentTimeMillis();
                int intValue = this.jobData.getBatchSize().intValue();
                int i2 = 0;
                try {
                    try {
                        try {
                            try {
                                ResultList<? extends EntityInterface> readNext = paginatedEntitiesSource.readNext((Map<String, Object>) null);
                                int size = readNext.getData().size() + readNext.getErrors().size();
                                if (readNext.getData().isEmpty()) {
                                    i = 0;
                                } else {
                                    BulkResponse write2 = this.searchIndexSink.write2(this.entitiesProcessor.process2(readNext, (Map<String, Object>) hashMap), (Map<String, Object>) hashMap);
                                    handleErrors(readNext, paginatedEntitiesSource.getLastFailedCursor(), write2, currentTimeMillis);
                                    i2 = ReindexingUtil.getSuccessFromBulkResponse(write2);
                                    i = size - i2;
                                }
                                updateStats(i2, i, paginatedEntitiesSource.getStats(), this.entitiesProcessor.getStats(), this.searchIndexSink.getStats());
                                sendUpdates();
                            } catch (SinkException e) {
                                handleEsSinkError(e.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), e.getCause(), ExceptionUtils.getStackTrace(e)), currentTimeMillis);
                                updateStats(0, intValue, paginatedEntitiesSource.getStats(), this.entitiesProcessor.getStats(), this.searchIndexSink.getStats());
                                sendUpdates();
                            }
                        } catch (ProcessorException e2) {
                            handleProcessorError(e2.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), e2.getCause(), ExceptionUtils.getStackTrace(e2)), currentTimeMillis);
                            updateStats(0, intValue, paginatedEntitiesSource.getStats(), this.entitiesProcessor.getStats(), this.searchIndexSink.getStats());
                            sendUpdates();
                        }
                    } catch (SourceException e3) {
                        handleSourceError(e3.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), e3.getCause(), ExceptionUtils.getStackTrace(e3)), currentTimeMillis);
                        updateStats(0, intValue, paginatedEntitiesSource.getStats(), this.entitiesProcessor.getStats(), this.searchIndexSink.getStats());
                        sendUpdates();
                    }
                } catch (Throwable th) {
                    updateStats(0, intValue, paginatedEntitiesSource.getStats(), this.entitiesProcessor.getStats(), this.searchIndexSink.getStats());
                    sendUpdates();
                    throw th;
                }
            }
        }
    }

    private void dataInsightReindex() {
        int i;
        HashMap hashMap = new HashMap();
        for (PaginatedDataInsightSource paginatedDataInsightSource : this.paginatedDataInsightSources) {
            reCreateIndexes(paginatedDataInsightSource.getEntityType());
            hashMap.put(ReindexingUtil.ENTITY_TYPE_KEY, paginatedDataInsightSource.getEntityType());
            while (!this.stopped && !paginatedDataInsightSource.isDone()) {
                long currentTimeMillis = System.currentTimeMillis();
                int intValue = this.jobData.getBatchSize().intValue();
                int i2 = 0;
                try {
                    try {
                        ResultList<ReportData> readNext = paginatedDataInsightSource.readNext((Map<String, Object>) null);
                        int size = readNext.getData().size() + readNext.getErrors().size();
                        if (readNext.getData().isEmpty()) {
                            i = 0;
                        } else {
                            BulkResponse write2 = this.searchIndexSink.write2(this.dataInsightProcessor.process2(readNext, (Map<String, Object>) hashMap), (Map<String, Object>) hashMap);
                            handleErrors(readNext, BotTokenCache.EMPTY_STRING, write2, currentTimeMillis);
                            i2 = ReindexingUtil.getSuccessFromBulkResponse(write2);
                            i = size - i2;
                        }
                        updateStats(i2, i, paginatedDataInsightSource.getStats(), this.dataInsightProcessor.getStats(), this.searchIndexSink.getStats());
                        sendUpdates();
                    } catch (ProcessorException e) {
                        handleProcessorError(e.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), e.getCause(), ExceptionUtils.getStackTrace(e)), currentTimeMillis);
                        updateStats(0, intValue, paginatedDataInsightSource.getStats(), this.dataInsightProcessor.getStats(), this.searchIndexSink.getStats());
                        sendUpdates();
                    } catch (SinkException e2) {
                        handleEsSinkError(e2.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), e2.getCause(), ExceptionUtils.getStackTrace(e2)), currentTimeMillis);
                        updateStats(0, intValue, paginatedDataInsightSource.getStats(), this.dataInsightProcessor.getStats(), this.searchIndexSink.getStats());
                        sendUpdates();
                    } catch (SourceException e3) {
                        handleSourceError(e3.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), e3.getCause(), ExceptionUtils.getStackTrace(e3)), currentTimeMillis);
                        updateStats(0, intValue, paginatedDataInsightSource.getStats(), this.dataInsightProcessor.getStats(), this.searchIndexSink.getStats());
                        sendUpdates();
                    }
                } catch (Throwable th) {
                    updateStats(0, intValue, paginatedDataInsightSource.getStats(), this.dataInsightProcessor.getStats(), this.searchIndexSink.getStats());
                    sendUpdates();
                    throw th;
                }
            }
        }
    }

    private void sendUpdates() {
        try {
            WebSocketManager.getInstance().sendToOne(this.jobData.getStartedBy(), WebSocketManager.JOB_STATUS_BROADCAST_CHANNEL, JsonUtils.pojoToJson(this.jobData));
        } catch (JsonProcessingException e) {
            LOG.error("Failed to send updated stats with WebSocket", e);
        }
    }

    public void updateStats(int i, int i2, StepStats stepStats, StepStats stepStats2, StepStats stepStats3) {
        Stats stats = this.jobData.getStats() != null ? this.jobData.getStats() : new Stats();
        StepStats jobStats = this.jobData.getStats().getJobStats();
        if (jobStats == null) {
            jobStats = new StepStats().withTotalRecords(Integer.valueOf(ReindexingUtil.getTotalRequestToProcess(this.jobData.getEntities(), this.dao)));
        }
        ReindexingUtil.getUpdatedStats(jobStats, i, i2);
        stats.setJobStats(jobStats);
        stats.setSourceStats(stepStats);
        stats.setProcessorStats(stepStats2);
        stats.setSinkStats(stepStats3);
        this.jobData.setStats(stats);
    }

    public void updateRecordToDb() throws IOException {
        this.dao.entityExtensionTimeSeriesDao().update(this.jobData.getId().toString(), ReIndexingHandler.REINDEXING_JOB_EXTENSION, JsonUtils.pojoToJson(this.jobData), Long.valueOf(((EventPublisherJob) JsonUtils.readValue(this.dao.entityExtensionTimeSeriesDao().getExtension(this.jobData.getId().toString(), ReIndexingHandler.REINDEXING_JOB_EXTENSION), EventPublisherJob.class)).getTimestamp().longValue()));
    }

    private void reCreateIndexes(String str) {
        if (Boolean.FALSE.equals(this.jobData.getRecreateIndex())) {
            return;
        }
        ElasticSearchIndexDefinition.ElasticSearchIndexType indexMappingByEntityType = ElasticSearchIndexDefinition.getIndexMappingByEntityType(str);
        this.elasticSearchIndexDefinition.deleteIndex(indexMappingByEntityType);
        this.elasticSearchIndexDefinition.createIndex(indexMappingByEntityType, this.jobData.getSearchIndexMappingLanguage().value());
    }

    private void handleErrors(ResultList<?> resultList, String str, BulkResponse bulkResponse, long j) {
        handleSourceError(resultList, str, j);
        handleEsSinkErrors(bulkResponse, j);
    }

    private void handleSourceError(String str, String str2, long j) {
        Failure failure = getFailure();
        failure.setSourceError(getFailureDetails(str, str2, j));
        this.jobData.setFailure(failure);
    }

    private void handleProcessorError(String str, String str2, long j) {
        Failure failure = getFailure();
        failure.setProcessorError(getFailureDetails(str, str2, j));
        this.jobData.setFailure(failure);
    }

    private void handleEsSinkError(String str, String str2, long j) {
        Failure failure = getFailure();
        failure.setSinkError(getFailureDetails(str, str2, j));
        this.jobData.setFailure(failure);
    }

    private void handleJobError(String str, String str2, long j) {
        Failure failure = getFailure();
        failure.setJobError(getFailureDetails(str, str2, j));
        this.jobData.setFailure(failure);
    }

    private void handleSourceError(ResultList<?> resultList, String str, long j) {
        if (!resultList.getErrors().isEmpty()) {
            handleSourceError(String.format("SourceContext: After Cursor : %s, Encountered Error While Reading Data.", str), String.format("Following Entities were not fetched Successfully : %s", JsonUtils.pojoToJson(resultList.getErrors())), j);
        }
    }

    private void handleEsSinkErrors(BulkResponse bulkResponse, long j) {
        ArrayList arrayList = new ArrayList();
        Iterator it = bulkResponse.iterator();
        while (it.hasNext()) {
            BulkItemResponse bulkItemResponse = (BulkItemResponse) it.next();
            if (bulkItemResponse.isFailed()) {
                BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                arrayList.add(new FailureDetails().withContext(String.format("EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ", failure.getId())).withLastFailedReason(String.format("Index Type: [%s], Reason: [%s] %n Trace : [%s]", failure.getIndex(), failure.getMessage(), ExceptionUtils.getStackTrace(failure.getCause()))).withLastFailedAt(Long.valueOf(System.currentTimeMillis())));
            }
        }
        if (!arrayList.isEmpty()) {
            handleEsSinkError("[EsWriter] BulkResponseItems", String.format("[BulkItemResponse] Got Following Error Responses: %n %s ", JsonUtils.pojoToJson(arrayList)), j);
        }
    }

    private void updateJobStatus() {
        if (this.stopped) {
            this.jobData.setStatus(EventPublisherJob.Status.STOPPED);
        } else if (this.jobData.getFailure().getSinkError() == null && this.jobData.getFailure().getSourceError() == null && this.jobData.getFailure().getProcessorError() == null) {
            this.jobData.setStatus(EventPublisherJob.Status.COMPLETED);
        } else {
            this.jobData.setStatus(EventPublisherJob.Status.FAILED);
        }
    }

    private Failure getFailure() {
        return this.jobData.getFailure() != null ? this.jobData.getFailure() : new Failure();
    }

    private FailureDetails getFailureDetails(String str, String str2, long j) {
        return new FailureDetails().withContext(str).withLastFailedReason(str2).withLastFailedAt(Long.valueOf(j));
    }

    public void stopJob() {
        this.stopped = true;
    }

    public EventPublisherJob getJobData() {
        return this.jobData;
    }
}
