/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.workflows.searchIndex;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.Failure;
import org.openmetadata.schema.system.FailureDetails;
import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition;
import org.openmetadata.service.exception.ProcessorException;
import org.openmetadata.service.exception.SinkException;
import org.openmetadata.service.exception.SourceException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.ReIndexingHandler;
import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.workflows.searchIndex.EsDataInsightProcessor;
import org.openmetadata.service.workflows.searchIndex.EsEntitiesProcessor;
import org.openmetadata.service.workflows.searchIndex.EsSearchIndexSink;
import org.openmetadata.service.workflows.searchIndex.PaginatedDataInsightSource;
import org.openmetadata.service.workflows.searchIndex.PaginatedEntitiesSource;
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SearchIndexWorkflow
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(SearchIndexWorkflow.class);
    private static final String ENTITY_TYPE_ERROR_MSG = "EntityType: %s %n Cause: %s %n Stack: %s";
    private final List<PaginatedEntitiesSource> paginatedEntitiesSources = new ArrayList<PaginatedEntitiesSource>();
    private final List<PaginatedDataInsightSource> paginatedDataInsightSources = new ArrayList<PaginatedDataInsightSource>();
    private final EsEntitiesProcessor entitiesProcessor;
    private final EsDataInsightProcessor dataInsightProcessor;
    private final EsSearchIndexSink searchIndexSink;
    private final ElasticSearchIndexDefinition elasticSearchIndexDefinition;
    private final EventPublisherJob jobData;
    private final CollectionDAO dao;
    private volatile boolean stopped = false;

    public SearchIndexWorkflow(CollectionDAO dao, ElasticSearchIndexDefinition elasticSearchIndexDefinition, RestHighLevelClient client, EventPublisherJob request) {
        this.dao = dao;
        this.jobData = request;
        request.getEntities().forEach(entityType -> {
            if (!ReindexingUtil.isDataInsightIndex(entityType)) {
                ArrayList<String> fields = new ArrayList<String>((Collection)Objects.requireNonNull(ElasticSearchIndexDefinition.getIndexFields(entityType, this.jobData.getSearchIndexMappingLanguage())));
                PaginatedEntitiesSource source = new PaginatedEntitiesSource((String)entityType, this.jobData.getBatchSize(), (List<String>)fields);
                if (!CommonUtil.nullOrEmpty((String)request.getAfterCursor())) {
                    source.setCursor(request.getAfterCursor());
                }
                this.paginatedEntitiesSources.add(source);
            } else {
                this.paginatedDataInsightSources.add(new PaginatedDataInsightSource(dao, (String)entityType, this.jobData.getBatchSize()));
            }
        });
        this.entitiesProcessor = new EsEntitiesProcessor();
        this.dataInsightProcessor = new EsDataInsightProcessor();
        this.searchIndexSink = new EsSearchIndexSink(client);
        this.elasticSearchIndexDefinition = elasticSearchIndexDefinition;
    }

    @Override
    public void run() {
        try {
            LOG.info("Executing Reindexing Job with JobData : {}", (Object)this.jobData);
            this.jobData.setStatus(EventPublisherJob.Status.RUNNING);
            this.entitiesReIndex();
            this.dataInsightReindex();
            this.updateJobStatus();
            this.jobData.setEndTime(Long.valueOf(System.currentTimeMillis()));
        }
        catch (Exception ex) {
            String error = String.format("Reindexing Job Has Encountered an Exception. %n Job Data: %s, %n  Stack : %s ", this.jobData.toString(), ExceptionUtils.getStackTrace((Throwable)ex));
            LOG.error(error);
            this.jobData.setStatus(EventPublisherJob.Status.FAILED);
            this.handleJobError("Failure in Job: Check Stack", error, System.currentTimeMillis());
        }
        finally {
            this.updateRecordToDb();
            this.sendUpdates();
            ReIndexingHandler.getInstance().removeCompletedJob(this.jobData.getId());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void entitiesReIndex() {
        HashMap<String, Object> contextData = new HashMap<String, Object>();
        for (PaginatedEntitiesSource paginatedEntitiesSource : this.paginatedEntitiesSources) {
            this.reCreateIndexes(paginatedEntitiesSource.getEntityType());
            contextData.put("entityType", paginatedEntitiesSource.getEntityType());
            while (!this.stopped && !paginatedEntitiesSource.isDone()) {
                int requestToProcess;
                long currentTime = System.currentTimeMillis();
                int failed = requestToProcess = this.jobData.getBatchSize().intValue();
                int success = 0;
                try {
                    Object resultList = paginatedEntitiesSource.readNext((Map)null);
                    requestToProcess = ((ResultList)resultList).getData().size() + ((ResultList)resultList).getErrors().size();
                    if (!((ResultList)resultList).getData().isEmpty()) {
                        BulkRequest requests = this.entitiesProcessor.process((ResultList<? extends EntityInterface>)resultList, (Map<String, Object>)contextData);
                        BulkResponse response = this.searchIndexSink.write(requests, (Map<String, Object>)contextData);
                        this.handleErrors((ResultList<?>)resultList, paginatedEntitiesSource.getLastFailedCursor(), response, currentTime);
                        success = ReindexingUtil.getSuccessFromBulkResponse(response);
                        failed = requestToProcess - success;
                        continue;
                    }
                    failed = 0;
                }
                catch (SourceException rx) {
                    this.handleSourceError(rx.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), rx.getCause(), ExceptionUtils.getStackTrace((Throwable)rx)), currentTime);
                }
                catch (ProcessorException px) {
                    this.handleProcessorError(px.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), px.getCause(), ExceptionUtils.getStackTrace((Throwable)px)), currentTime);
                }
                catch (SinkException wx) {
                    this.handleEsSinkError(wx.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedEntitiesSource.getEntityType(), wx.getCause(), ExceptionUtils.getStackTrace((Throwable)wx)), currentTime);
                }
                finally {
                    this.updateStats(success, failed, paginatedEntitiesSource.getStats(), this.entitiesProcessor.getStats(), this.searchIndexSink.getStats());
                    this.sendUpdates();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void dataInsightReindex() {
        HashMap<String, Object> contextData = new HashMap<String, Object>();
        for (PaginatedDataInsightSource paginatedDataInsightSource : this.paginatedDataInsightSources) {
            this.reCreateIndexes(paginatedDataInsightSource.getEntityType());
            contextData.put("entityType", paginatedDataInsightSource.getEntityType());
            while (!this.stopped && !paginatedDataInsightSource.isDone()) {
                int requestToProcess;
                long currentTime = System.currentTimeMillis();
                int failed = requestToProcess = this.jobData.getBatchSize().intValue();
                int success = 0;
                try {
                    Object resultList = paginatedDataInsightSource.readNext((Map)null);
                    requestToProcess = ((ResultList)resultList).getData().size() + ((ResultList)resultList).getErrors().size();
                    if (!((ResultList)resultList).getData().isEmpty()) {
                        BulkRequest requests = this.dataInsightProcessor.process((ResultList<ReportData>)resultList, (Map<String, Object>)contextData);
                        BulkResponse response = this.searchIndexSink.write(requests, (Map<String, Object>)contextData);
                        this.handleErrors((ResultList<?>)resultList, "", response, currentTime);
                        success = ReindexingUtil.getSuccessFromBulkResponse(response);
                        failed = requestToProcess - success;
                        continue;
                    }
                    failed = 0;
                }
                catch (SourceException rx) {
                    this.handleSourceError(rx.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), rx.getCause(), ExceptionUtils.getStackTrace((Throwable)rx)), currentTime);
                }
                catch (ProcessorException px) {
                    this.handleProcessorError(px.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), px.getCause(), ExceptionUtils.getStackTrace((Throwable)px)), currentTime);
                }
                catch (SinkException wx) {
                    this.handleEsSinkError(wx.getMessage(), String.format(ENTITY_TYPE_ERROR_MSG, paginatedDataInsightSource.getEntityType(), wx.getCause(), ExceptionUtils.getStackTrace((Throwable)wx)), currentTime);
                }
                finally {
                    this.updateStats(success, failed, paginatedDataInsightSource.getStats(), this.dataInsightProcessor.getStats(), this.searchIndexSink.getStats());
                    this.sendUpdates();
                }
            }
        }
    }

    private void sendUpdates() {
        try {
            WebSocketManager.getInstance().sendToOne(this.jobData.getStartedBy(), "jobStatus", JsonUtils.pojoToJson(this.jobData));
        }
        catch (JsonProcessingException ex) {
            LOG.error("Failed to send updated stats with WebSocket", (Throwable)ex);
        }
    }

    public void updateStats(int currentSuccess, int currentFailed, StepStats reader, StepStats processor, StepStats writer) {
        Stats jobDataStats = this.jobData.getStats() != null ? this.jobData.getStats() : new Stats();
        StepStats stats = this.jobData.getStats().getJobStats();
        if (stats == null) {
            stats = new StepStats().withTotalRecords(Integer.valueOf(ReindexingUtil.getTotalRequestToProcess(this.jobData.getEntities(), this.dao)));
        }
        ReindexingUtil.getUpdatedStats(stats, currentSuccess, currentFailed);
        jobDataStats.setJobStats(stats);
        jobDataStats.setSourceStats(reader);
        jobDataStats.setProcessorStats(processor);
        jobDataStats.setSinkStats(writer);
        this.jobData.setStats(jobDataStats);
    }

    public void updateRecordToDb() throws IOException {
        String recordString = this.dao.entityExtensionTimeSeriesDao().getExtension(this.jobData.getId().toString(), "reindexing.eventPublisher");
        EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class);
        long originalLastUpdate = lastRecord.getTimestamp();
        this.dao.entityExtensionTimeSeriesDao().update(this.jobData.getId().toString(), "reindexing.eventPublisher", JsonUtils.pojoToJson(this.jobData), originalLastUpdate);
    }

    private void reCreateIndexes(String entityType) {
        if (Boolean.FALSE.equals(this.jobData.getRecreateIndex())) {
            return;
        }
        ElasticSearchIndexDefinition.ElasticSearchIndexType indexType = ElasticSearchIndexDefinition.getIndexMappingByEntityType(entityType);
        this.elasticSearchIndexDefinition.deleteIndex(indexType);
        this.elasticSearchIndexDefinition.createIndex(indexType, this.jobData.getSearchIndexMappingLanguage().value());
    }

    private void handleErrors(ResultList<?> data, String lastCursor, BulkResponse response, long time) {
        this.handleSourceError(data, lastCursor, time);
        this.handleEsSinkErrors(response, time);
    }

    private void handleSourceError(String context, String reason, long time) {
        Failure failures = this.getFailure();
        FailureDetails readerFailures = this.getFailureDetails(context, reason, time);
        failures.setSourceError(readerFailures);
        this.jobData.setFailure(failures);
    }

    private void handleProcessorError(String context, String reason, long time) {
        Failure failures = this.getFailure();
        FailureDetails processorError = this.getFailureDetails(context, reason, time);
        failures.setProcessorError(processorError);
        this.jobData.setFailure(failures);
    }

    private void handleEsSinkError(String context, String reason, long time) {
        Failure failures = this.getFailure();
        FailureDetails writerFailure = this.getFailureDetails(context, reason, time);
        failures.setSinkError(writerFailure);
        this.jobData.setFailure(failures);
    }

    private void handleJobError(String context, String reason, long time) {
        Failure failures = this.getFailure();
        FailureDetails jobFailure = this.getFailureDetails(context, reason, time);
        failures.setJobError(jobFailure);
        this.jobData.setFailure(failures);
    }

    private void handleSourceError(ResultList<?> data, String lastCursor, long time) {
        if (!data.getErrors().isEmpty()) {
            this.handleSourceError(String.format("SourceContext: After Cursor : %s, Encountered Error While Reading Data.", lastCursor), String.format("Following Entities were not fetched Successfully : %s", JsonUtils.pojoToJson(data.getErrors())), time);
        }
    }

    private void handleEsSinkErrors(BulkResponse response, long time) {
        ArrayList<FailureDetails> details = new ArrayList<FailureDetails>();
        for (BulkItemResponse bulkItemResponse : response) {
            if (!bulkItemResponse.isFailed()) continue;
            BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
            FailureDetails esFailure = new FailureDetails().withContext(String.format("EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ", failure.getId())).withLastFailedReason(String.format("Index Type: [%s], Reason: [%s] %n Trace : [%s]", failure.getIndex(), failure.getMessage(), ExceptionUtils.getStackTrace((Throwable)failure.getCause()))).withLastFailedAt(Long.valueOf(System.currentTimeMillis()));
            details.add(esFailure);
        }
        if (!details.isEmpty()) {
            this.handleEsSinkError("[EsWriter] BulkResponseItems", String.format("[BulkItemResponse] Got Following Error Responses: %n %s ", JsonUtils.pojoToJson(details)), time);
        }
    }

    private void updateJobStatus() {
        if (this.stopped) {
            this.jobData.setStatus(EventPublisherJob.Status.STOPPED);
        } else if (this.jobData.getFailure().getSinkError() != null || this.jobData.getFailure().getSourceError() != null || this.jobData.getFailure().getProcessorError() != null) {
            this.jobData.setStatus(EventPublisherJob.Status.FAILED);
        } else {
            this.jobData.setStatus(EventPublisherJob.Status.COMPLETED);
        }
    }

    private Failure getFailure() {
        return this.jobData.getFailure() != null ? this.jobData.getFailure() : new Failure();
    }

    private FailureDetails getFailureDetails(String context, String reason, long time) {
        return new FailureDetails().withContext(context).withLastFailedReason(reason).withLastFailedAt(Long.valueOf(time));
    }

    public void stopJob() {
        this.stopped = true;
    }

    public EventPublisherJob getJobData() {
        return this.jobData;
    }
}

