/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.util;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.api.CreateEventPublisherJob;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.settings.EventPublisherJob;
import org.openmetadata.schema.settings.FailureDetails;
import org.openmetadata.schema.settings.Stats;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexFactory;
import org.openmetadata.service.elasticsearch.ReportDataIndexes;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.resources.elasticsearch.BulkProcessorListener;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.util.ResultList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticSearchIndexUtil {
    private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchIndexUtil.class);
    private final CollectionDAO dao;
    private final ExecutorService threadScheduler;
    private final RestHighLevelClient client;
    private final ElasticSearchIndexDefinition elasticSearchIndexDefinition;
    private final String lang;

    public ElasticSearchIndexUtil(CollectionDAO dao, RestHighLevelClient client, ElasticSearchIndexDefinition elasticSearchIndexDefinition, String lang) {
        this.dao = dao;
        this.client = client;
        this.elasticSearchIndexDefinition = elasticSearchIndexDefinition;
        this.lang = lang;
        this.threadScheduler = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    private BulkProcessor getBulkProcessor(BulkProcessorListener listener, int bulkSize, int flushIntervalInSeconds) {
        BiConsumer<BulkRequest, ActionListener> bulkConsumer = (request, bulkListener) -> this.client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
        BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, (BulkProcessor.Listener)listener, (String)"es-reindex");
        builder.setBulkActions(bulkSize);
        builder.setConcurrentRequests(2);
        builder.setFlushInterval(TimeValue.timeValueSeconds((long)flushIntervalInSeconds));
        builder.setBackoffPolicy(BackoffPolicy.constantBackoff((TimeValue)TimeValue.timeValueSeconds((long)1L), (int)3));
        return builder.build();
    }

    public synchronized Response startReindexingBatchMode(UriInfo uriInfo, UUID startedBy, CreateEventPublisherJob createRequest) {
        this.threadScheduler.submit(() -> {
            try {
                this.submitBatchJob(uriInfo, startedBy, createRequest);
            }
            catch (IOException e) {
                LOG.error("Reindexing Batch Job error", (Throwable)e);
            }
        });
        return Response.status((Response.Status)Response.Status.OK).entity((Object)"Reindexing Started").build();
    }

    private synchronized void submitBatchJob(UriInfo uriInfo, UUID startedBy, CreateEventPublisherJob createRequest) throws IOException {
        long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
        String recordString = this.dao.entityExtensionTimeSeriesDao().getExtension("eventPublisher:ElasticSearch:BATCH", "service.eventPublisher");
        EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class);
        long originalLastUpdate = lastRecord.getTimestamp();
        lastRecord.setStatus(EventPublisherJob.Status.STARTING);
        lastRecord.setStats(new Stats().withFailed(Integer.valueOf(0)).withTotal(Integer.valueOf(0)).withSuccess(Integer.valueOf(0)));
        lastRecord.setTimestamp(Long.valueOf(updateTime));
        lastRecord.setEntities(createRequest.getEntities());
        this.dao.entityExtensionTimeSeriesDao().update("eventPublisher:ElasticSearch:BATCH", "service.eventPublisher", JsonUtils.pojoToJson(lastRecord), originalLastUpdate);
        BulkProcessorListener bulkProcessorListener = new BulkProcessorListener(this.dao, startedBy);
        BulkProcessor processor = this.getBulkProcessor(bulkProcessorListener, createRequest.getBatchSize(), createRequest.getFlushIntervalInSec());
        for (String entityName : createRequest.getEntities()) {
            try {
                this.updateEntityBatch(processor, bulkProcessorListener, uriInfo, entityName, createRequest);
            }
            catch (Exception ex) {
                LOG.error("Reindexing intermittent failure for entityType : {}", (Object)entityName, (Object)ex);
            }
        }
    }

    public ResultList<ReportData> getReportDataPagination(String entityFQN, int limit, String before, String after) {
        RestUtil.validateCursors(before, after);
        int reportDataCount = this.dao.entityExtensionTimeSeriesDao().listCount(entityFQN);
        List<CollectionDAO.ReportDataRow> reportDataList = before != null ? this.dao.entityExtensionTimeSeriesDao().getBeforeExtension(entityFQN, limit + 1, RestUtil.decodeCursor(before)) : this.dao.entityExtensionTimeSeriesDao().getAfterExtension(entityFQN, limit + 1, after == null ? "0" : RestUtil.decodeCursor(after));
        ResultList<ReportData> reportDataResultList = before != null ? this.getBeforeExtensionList(reportDataList, limit, reportDataCount) : this.getAfterExtensionList(reportDataList, after, limit, reportDataCount);
        return reportDataResultList;
    }

    private ResultList<ReportData> getBeforeExtensionList(List<CollectionDAO.ReportDataRow> reportDataRowList, int limit, int total) {
        String beforeCursor = null;
        if (reportDataRowList.size() > limit) {
            reportDataRowList.remove(0);
            beforeCursor = reportDataRowList.get(0).getRowNum();
        }
        String afterCursor = reportDataRowList.get(reportDataRowList.size() - 1).getRowNum();
        ArrayList<ReportData> reportDataList = new ArrayList<ReportData>();
        for (CollectionDAO.ReportDataRow reportDataRow : reportDataRowList) {
            reportDataList.add(reportDataRow.getReportData());
        }
        return this.getReportDataResultList(reportDataList, beforeCursor, afterCursor, total);
    }

    private ResultList<ReportData> getAfterExtensionList(List<CollectionDAO.ReportDataRow> reportDataRowList, String after, int limit, int total) {
        String beforeCursor;
        String afterCursor = null;
        String string = beforeCursor = after == null ? null : reportDataRowList.get(0).getRowNum();
        if (reportDataRowList.size() > limit) {
            reportDataRowList.remove(limit);
            afterCursor = reportDataRowList.get(limit - 1).getRowNum();
        }
        ArrayList<ReportData> reportDataList = new ArrayList<ReportData>();
        for (CollectionDAO.ReportDataRow reportDataRow : reportDataRowList) {
            reportDataList.add(reportDataRow.getReportData());
        }
        return this.getReportDataResultList(reportDataList, beforeCursor, afterCursor, total);
    }

    private ResultList<ReportData> getReportDataResultList(List<ReportData> queries, String before, String after, int total) {
        return new ResultList<ReportData>(queries, before, after, total);
    }

    private synchronized void fetchReportData(String entityFQN, CreateEventPublisherJob createRequest, BulkProcessor processor, BulkProcessorListener listener, String entityType, ElasticSearchIndexDefinition.ElasticSearchIndexType indexType) {
        String after = null;
        try {
            ResultList<ReportData> result;
            do {
                result = this.getReportDataPagination(entityFQN, createRequest.getBatchSize(), null, after);
                listener.addRequests(result.getPaging().getTotal());
                this.updateElasticSearchForDataInsightBatch(processor, indexType, entityType, result.getData());
                processor.flush();
            } while ((after = result.getPaging().getAfter()) != null);
        }
        catch (Exception ex) {
            LOG.error("Failed in listing all Entities of type : {}, Reason : ", (Object)entityType, (Object)ex);
            FailureDetails failureDetails = new FailureDetails().withContext(String.format("%s:Failure in fetching Data", entityType)).withLastFailedReason(String.format("Failed in listing all ReportData \n Reason : %s", ExceptionUtils.getStackTrace((Throwable)ex)));
            listener.updateElasticSearchStatus(EventPublisherJob.Status.IDLE, failureDetails, null);
        }
    }

    private synchronized void updateEntityBatch(BulkProcessor processor, BulkProcessorListener listener, UriInfo uriInfo, String entityType, CreateEventPublisherJob createRequest) {
        listener.allowTotalRequestUpdate();
        ElasticSearchIndexDefinition.ElasticSearchIndexType indexType = ElasticSearchIndexDefinition.getIndexMappingByEntityType(entityType);
        if (Boolean.TRUE.equals(createRequest.getRecreateIndex())) {
            this.elasticSearchIndexDefinition.deleteIndex(indexType);
            String language = createRequest.getSearchIndexMappingLanguage() == null ? this.lang : createRequest.getSearchIndexMappingLanguage().value();
            this.elasticSearchIndexDefinition.createIndex(indexType, language);
        }
        if (entityType.equalsIgnoreCase("entityReportData")) {
            this.fetchReportData(String.valueOf(ReportData.ReportDataType.ENTITY_REPORT_DATA), createRequest, processor, listener, entityType, indexType);
        } else if (entityType.equalsIgnoreCase("webAnalyticEntityViewReportData")) {
            this.fetchReportData(String.valueOf(ReportData.ReportDataType.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA), createRequest, processor, listener, entityType, indexType);
        } else if (entityType.equalsIgnoreCase("webAnalyticUserActivityReportData")) {
            this.fetchReportData(String.valueOf(ReportData.ReportDataType.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA), createRequest, processor, listener, entityType, indexType);
        } else {
            EntityRepository<? extends EntityInterface> entityRepository = Entity.getEntityRepository(entityType);
            List<String> allowedFields = entityRepository.getAllowedFields();
            String fields = String.join((CharSequence)",", allowedFields);
            String after = null;
            try {
                ResultList<? extends EntityInterface> result;
                do {
                    if (entityType.equals("team")) {
                        fields = "name,displayName";
                    }
                    result = entityRepository.listAfter(uriInfo, new EntityUtil.Fields(allowedFields, fields), new ListFilter(Include.ALL), createRequest.getBatchSize(), after);
                    listener.addRequests(result.getPaging().getTotal());
                    this.updateElasticSearchForEntityBatch(indexType, processor, entityType, result.getData());
                    processor.flush();
                } while ((after = result.getPaging().getAfter()) != null);
            }
            catch (Exception ex) {
                LOG.error("Failed in listing all Entities of type : {}, Reason : ", (Object)entityType, (Object)ex);
                FailureDetails failureDetails = new FailureDetails().withContext(String.format("%s:Failure in fetching Data", entityType)).withLastFailedReason(String.format("Failed in listing all Entities \n Reason : %s", ExceptionUtils.getStackTrace((Throwable)ex)));
                listener.updateElasticSearchStatus(EventPublisherJob.Status.IDLE, failureDetails, null);
            }
        }
    }

    private synchronized void updateElasticSearchForDataInsightBatch(BulkProcessor bulkProcessor, ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, String entityType, List<ReportData> entities) {
        for (ReportData reportData : entities) {
            UpdateRequest request = this.getUpdateRequest(indexType, entityType, reportData);
            if (request == null) continue;
            bulkProcessor.add((DocWriteRequest)request);
        }
    }

    private synchronized void updateElasticSearchForEntityBatch(ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, BulkProcessor bulkProcessor, String entityType, List<? extends EntityInterface> entities) {
        for (EntityInterface entityInterface : entities) {
            UpdateRequest request;
            if (entityType.equals("table")) {
                ((Table)entityInterface).getColumns().forEach(table -> table.setProfile(null));
            }
            if ((request = this.getUpdateRequest(indexType, entityType, entityInterface)) == null) continue;
            bulkProcessor.add((DocWriteRequest)request);
        }
    }

    private UpdateRequest getUpdateRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, String entityType, EntityInterface entity) {
        try {
            UpdateRequest updateRequest = new UpdateRequest(indexType.indexName, entity.getId().toString());
            updateRequest.doc(JsonUtils.pojoToJson(Objects.requireNonNull(ElasticSearchIndexFactory.buildIndex(entityType, entity)).buildESDoc()), XContentType.JSON);
            updateRequest.docAsUpsert(true);
            return updateRequest;
        }
        catch (Exception ex) {
            LOG.error("Failed in creating update Request for indexType : {}, entityType: {}", new Object[]{indexType, entityType, ex});
            return null;
        }
    }

    private UpdateRequest getUpdateRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, String entityType, ReportData reportData) {
        try {
            UpdateRequest updateRequest = new UpdateRequest(indexType.indexName, reportData.getId().toString());
            updateRequest.doc(JsonUtils.pojoToJson(new ReportDataIndexes(reportData).buildESDoc()), XContentType.JSON);
            updateRequest.docAsUpsert(true);
            return updateRequest;
        }
        catch (Exception ex) {
            LOG.error("Failed in creating update Request for indexType : {}, entityType: {}", new Object[]{indexType, entityType, ex});
            return null;
        }
    }
}

