package org.openmetadata.service.util;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.openmetadata.csv.CsvUtil;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.api.CreateEventPublisherJob;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.settings.EventPublisherJob;
import org.openmetadata.schema.settings.FailureDetails;
import org.openmetadata.schema.settings.Stats;
import org.openmetadata.schema.type.ColumnProfile;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.elasticsearch.ElasticSearchIndex;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexFactory;
import org.openmetadata.service.elasticsearch.ReportDataIndexes;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.resources.elasticsearch.BuildSearchIndexResource;
import org.openmetadata.service.resources.elasticsearch.BulkProcessorListener;
import org.openmetadata.service.util.EntityUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/openmetadata/service/util/ElasticSearchIndexUtil.class */
public class ElasticSearchIndexUtil {
    private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchIndexUtil.class);
    private final CollectionDAO dao;
    private final ExecutorService threadScheduler = new ThreadPoolExecutor(2, 2, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(5), new ThreadPoolExecutor.CallerRunsPolicy());
    private final RestHighLevelClient client;
    private final ElasticSearchIndexDefinition elasticSearchIndexDefinition;
    private final String lang;

    public ElasticSearchIndexUtil(CollectionDAO collectionDAO, RestHighLevelClient restHighLevelClient, ElasticSearchIndexDefinition elasticSearchIndexDefinition, String str) {
        this.dao = collectionDAO;
        this.client = restHighLevelClient;
        this.elasticSearchIndexDefinition = elasticSearchIndexDefinition;
        this.lang = str;
    }

    private BulkProcessor getBulkProcessor(BulkProcessorListener bulkProcessorListener, int i, int i2) {
        BulkProcessor.Builder builder = BulkProcessor.builder((bulkRequest, actionListener) -> {
            this.client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, actionListener);
        }, bulkProcessorListener, "es-reindex");
        builder.setBulkActions(i);
        builder.setConcurrentRequests(2);
        builder.setFlushInterval(TimeValue.timeValueSeconds(i2));
        builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
        return builder.build();
    }

    public synchronized Response startReindexingBatchMode(UriInfo uriInfo, UUID uuid, CreateEventPublisherJob createEventPublisherJob) {
        this.threadScheduler.submit(() -> {
            try {
                submitBatchJob(uriInfo, uuid, createEventPublisherJob);
            } catch (IOException e) {
                LOG.error("Reindexing Batch Job error", e);
            }
        });
        return Response.status(Response.Status.OK).entity("Reindexing Started").build();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.time.ZonedDateTime] */
    private synchronized void submitBatchJob(UriInfo uriInfo, UUID uuid, CreateEventPublisherJob createEventPublisherJob) throws IOException {
        long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
        EventPublisherJob eventPublisherJob = (EventPublisherJob) JsonUtils.readValue(this.dao.entityExtensionTimeSeriesDao().getExtension(BuildSearchIndexResource.ELASTIC_SEARCH_ENTITY_FQN_BATCH, BuildSearchIndexResource.ELASTIC_SEARCH_EXTENSION), EventPublisherJob.class);
        long longValue = eventPublisherJob.getTimestamp().longValue();
        eventPublisherJob.setStatus(EventPublisherJob.Status.STARTING);
        eventPublisherJob.setStats(new Stats().withFailed(0).withTotal(0).withSuccess(0));
        eventPublisherJob.setTimestamp(Long.valueOf(time));
        eventPublisherJob.setEntities(createEventPublisherJob.getEntities());
        this.dao.entityExtensionTimeSeriesDao().update(BuildSearchIndexResource.ELASTIC_SEARCH_ENTITY_FQN_BATCH, BuildSearchIndexResource.ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(eventPublisherJob), Long.valueOf(longValue));
        BulkProcessorListener bulkProcessorListener = new BulkProcessorListener(this.dao, uuid);
        BulkProcessor bulkProcessor = getBulkProcessor(bulkProcessorListener, createEventPublisherJob.getBatchSize().intValue(), createEventPublisherJob.getFlushIntervalInSec().intValue());
        for (String str : createEventPublisherJob.getEntities()) {
            try {
                updateEntityBatch(bulkProcessor, bulkProcessorListener, uriInfo, str, createEventPublisherJob);
            } catch (Exception e) {
                LOG.error("Reindexing intermittent failure for entityType : {}", str, e);
            }
        }
    }

    public ResultList<ReportData> getReportDataPagination(String str, int i, String str2, String str3) {
        List<CollectionDAO.ReportDataRow> afterExtension;
        RestUtil.validateCursors(str2, str3);
        int listCount = this.dao.entityExtensionTimeSeriesDao().listCount(str);
        if (str2 != null) {
            afterExtension = this.dao.entityExtensionTimeSeriesDao().getBeforeExtension(str, i + 1, RestUtil.decodeCursor(str2));
        } else {
            afterExtension = this.dao.entityExtensionTimeSeriesDao().getAfterExtension(str, i + 1, str3 == null ? "0" : RestUtil.decodeCursor(str3));
        }
        return str2 != null ? getBeforeExtensionList(afterExtension, i, listCount) : getAfterExtensionList(afterExtension, str3, i, listCount);
    }

    private ResultList<ReportData> getBeforeExtensionList(List<CollectionDAO.ReportDataRow> list, int i, int i2) {
        String str = null;
        if (list.size() > i) {
            list.remove(0);
            str = list.get(0).getRowNum();
        }
        String rowNum = list.get(list.size() - 1).getRowNum();
        ArrayList arrayList = new ArrayList();
        Iterator<CollectionDAO.ReportDataRow> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getReportData());
        }
        return getReportDataResultList(arrayList, str, rowNum, i2);
    }

    private ResultList<ReportData> getAfterExtensionList(List<CollectionDAO.ReportDataRow> list, String str, int i, int i2) {
        String str2 = null;
        String rowNum = str == null ? null : list.get(0).getRowNum();
        if (list.size() > i) {
            list.remove(i);
            str2 = list.get(i - 1).getRowNum();
        }
        ArrayList arrayList = new ArrayList();
        Iterator<CollectionDAO.ReportDataRow> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getReportData());
        }
        return getReportDataResultList(arrayList, rowNum, str2, i2);
    }

    private ResultList<ReportData> getReportDataResultList(List<ReportData> list, String str, String str2, int i) {
        return new ResultList<>(list, str, str2, i);
    }

    private synchronized void fetchReportData(String str, CreateEventPublisherJob createEventPublisherJob, BulkProcessor bulkProcessor, BulkProcessorListener bulkProcessorListener, String str2, ElasticSearchIndexDefinition.ElasticSearchIndexType elasticSearchIndexType) {
        String str3 = null;
        do {
            try {
                ResultList<ReportData> reportDataPagination = getReportDataPagination(str, createEventPublisherJob.getBatchSize().intValue(), null, str3);
                bulkProcessorListener.addRequests(reportDataPagination.getPaging().getTotal().intValue());
                updateElasticSearchForDataInsightBatch(bulkProcessor, elasticSearchIndexType, str2, reportDataPagination.getData());
                bulkProcessor.flush();
                str3 = reportDataPagination.getPaging().getAfter();
            } catch (Exception e) {
                LOG.error("Failed in listing all Entities of type : {}, Reason : ", str2, e);
                bulkProcessorListener.updateElasticSearchStatus(EventPublisherJob.Status.IDLE, new FailureDetails().withContext(String.format("%s:Failure in fetching Data", str2)).withLastFailedReason(String.format("Failed in listing all ReportData \n Reason : %s", ExceptionUtils.getStackTrace(e))), null);
                return;
            }
        } while (str3 != null);
    }

    private synchronized void updateEntityBatch(BulkProcessor bulkProcessor, BulkProcessorListener bulkProcessorListener, UriInfo uriInfo, String str, CreateEventPublisherJob createEventPublisherJob) {
        bulkProcessorListener.allowTotalRequestUpdate();
        ElasticSearchIndexDefinition.ElasticSearchIndexType indexMappingByEntityType = ElasticSearchIndexDefinition.getIndexMappingByEntityType(str);
        if (Boolean.TRUE.equals(createEventPublisherJob.getRecreateIndex())) {
            this.elasticSearchIndexDefinition.deleteIndex(indexMappingByEntityType);
            this.elasticSearchIndexDefinition.createIndex(indexMappingByEntityType, createEventPublisherJob.getSearchIndexMappingLanguage() == null ? this.lang : createEventPublisherJob.getSearchIndexMappingLanguage().value());
        }
        if (str.equalsIgnoreCase(ElasticSearchIndexDefinition.ENTITY_REPORT_DATA)) {
            fetchReportData(String.valueOf(ReportData.ReportDataType.ENTITY_REPORT_DATA), createEventPublisherJob, bulkProcessor, bulkProcessorListener, str, indexMappingByEntityType);
            return;
        }
        if (str.equalsIgnoreCase(ElasticSearchIndexDefinition.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA)) {
            fetchReportData(String.valueOf(ReportData.ReportDataType.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA), createEventPublisherJob, bulkProcessor, bulkProcessorListener, str, indexMappingByEntityType);
            return;
        }
        if (str.equalsIgnoreCase(ElasticSearchIndexDefinition.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA)) {
            fetchReportData(String.valueOf(ReportData.ReportDataType.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA), createEventPublisherJob, bulkProcessor, bulkProcessorListener, str, indexMappingByEntityType);
            return;
        }
        EntityRepository<? extends EntityInterface> entityRepository = Entity.getEntityRepository(str);
        List<String> allowedFields = entityRepository.getAllowedFields();
        String join = String.join(CsvUtil.SEPARATOR, allowedFields);
        String str2 = null;
        do {
            try {
                if (str.equals(Entity.TEAM)) {
                    join = "name,displayName";
                }
                ResultList<? extends EntityInterface> listAfter = entityRepository.listAfter(uriInfo, new EntityUtil.Fields(allowedFields, join), new ListFilter(Include.ALL), createEventPublisherJob.getBatchSize().intValue(), str2);
                bulkProcessorListener.addRequests(listAfter.getPaging().getTotal().intValue());
                updateElasticSearchForEntityBatch(indexMappingByEntityType, bulkProcessor, str, listAfter.getData());
                bulkProcessor.flush();
                str2 = listAfter.getPaging().getAfter();
            } catch (Exception e) {
                LOG.error("Failed in listing all Entities of type : {}, Reason : ", str, e);
                bulkProcessorListener.updateElasticSearchStatus(EventPublisherJob.Status.IDLE, new FailureDetails().withContext(String.format("%s:Failure in fetching Data", str)).withLastFailedReason(String.format("Failed in listing all Entities \n Reason : %s", ExceptionUtils.getStackTrace(e))), null);
                return;
            }
        } while (str2 != null);
    }

    private synchronized void updateElasticSearchForDataInsightBatch(BulkProcessor bulkProcessor, ElasticSearchIndexDefinition.ElasticSearchIndexType elasticSearchIndexType, String str, List<ReportData> list) {
        Iterator<ReportData> it = list.iterator();
        while (it.hasNext()) {
            UpdateRequest updateRequest = getUpdateRequest(elasticSearchIndexType, str, it.next());
            if (updateRequest != null) {
                bulkProcessor.add(updateRequest);
            }
        }
    }

    private synchronized void updateElasticSearchForEntityBatch(ElasticSearchIndexDefinition.ElasticSearchIndexType elasticSearchIndexType, BulkProcessor bulkProcessor, String str, List<? extends EntityInterface> list) {
        Iterator<? extends EntityInterface> it = list.iterator();
        while (it.hasNext()) {
            Table table = (EntityInterface) it.next();
            if (str.equals("table")) {
                table.getColumns().forEach(column -> {
                    column.setProfile((ColumnProfile) null);
                });
            }
            UpdateRequest updateRequest = getUpdateRequest(elasticSearchIndexType, str, (EntityInterface) table);
            if (updateRequest != null) {
                bulkProcessor.add(updateRequest);
            }
        }
    }

    private UpdateRequest getUpdateRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType elasticSearchIndexType, String str, EntityInterface entityInterface) {
        try {
            UpdateRequest updateRequest = new UpdateRequest(elasticSearchIndexType.indexName, entityInterface.getId().toString());
            updateRequest.doc(JsonUtils.pojoToJson(((ElasticSearchIndex) Objects.requireNonNull(ElasticSearchIndexFactory.buildIndex(str, entityInterface))).buildESDoc()), XContentType.JSON);
            updateRequest.docAsUpsert(true);
            return updateRequest;
        } catch (Exception e) {
            LOG.error("Failed in creating update Request for indexType : {}, entityType: {}", new Object[]{elasticSearchIndexType, str, e});
            return null;
        }
    }

    private UpdateRequest getUpdateRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType elasticSearchIndexType, String str, ReportData reportData) {
        try {
            UpdateRequest updateRequest = new UpdateRequest(elasticSearchIndexType.indexName, reportData.getId().toString());
            updateRequest.doc(JsonUtils.pojoToJson(new ReportDataIndexes(reportData).buildESDoc()), XContentType.JSON);
            updateRequest.docAsUpsert(true);
            return updateRequest;
        } catch (Exception e) {
            LOG.error("Failed in creating update Request for indexType : {}, entityType: {}", new Object[]{elasticSearchIndexType, str, e});
            return null;
        }
    }
}
