package org.openmetadata.service.resources.elasticSearch;

import io.swagger.annotations.Api;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.validation.Valid;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.CreateEventPublisherJob;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.settings.EventPublisherJob;
import org.openmetadata.schema.settings.FailureDetails;
import org.openmetadata.schema.settings.Stats;
import org.openmetadata.schema.type.ColumnProfile;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.elasticsearch.ElasticSearchIndex;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexFactory;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.UserRepository;
import org.openmetadata.service.resources.Collection;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.util.ElasticSearchClientUtils;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.ResultList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Api(value = "Elastic Search Collection", tags = {"Elastic Search Collection"})
@Path("/v1/indexResource")
@Consumes({"application/json"})
@Produces({"application/json"})
@Collection(name = "indexResource")
/* loaded from: input_file:org/openmetadata/service/resources/elasticSearch/BuildSearchIndexResource.class */
public class BuildSearchIndexResource {
    private static final Logger LOG = LoggerFactory.getLogger(BuildSearchIndexResource.class);
    public static final String ELASTIC_SEARCH_EXTENSION = "service.eventPublisher";
    public static final String ELASTIC_SEARCH_ENTITY_FQN_STREAM = "eventPublisher:ElasticSearch:STREAM";
    public static final String ELASTIC_SEARCH_ENTITY_FQN_BATCH = "eventPublisher:ElasticSearch:BATCH";
    private RestHighLevelClient client;
    private ElasticSearchIndexDefinition elasticSearchIndexDefinition;
    private final CollectionDAO dao;
    private final Authorizer authorizer;
    private final ExecutorService threadScheduler = new ThreadPoolExecutor(2, 2, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(5), new ThreadPoolExecutor.CallerRunsPolicy());
    private final UserRepository userRepository;

    public BuildSearchIndexResource(CollectionDAO collectionDAO, Authorizer authorizer) {
        this.dao = collectionDAO;
        this.userRepository = new UserRepository(collectionDAO);
        this.authorizer = authorizer;
    }

    public void initialize(OpenMetadataApplicationConfig openMetadataApplicationConfig) throws IOException {
        if (openMetadataApplicationConfig.getElasticSearchConfiguration() != null) {
            this.client = ElasticSearchClientUtils.createElasticSearchClient(openMetadataApplicationConfig.getElasticSearchConfiguration());
            this.elasticSearchIndexDefinition = new ElasticSearchIndexDefinition(this.client, this.dao);
        }
    }

    private BulkProcessor getBulkProcessor(BulkProcessorListener bulkProcessorListener, int i, int i2) {
        BulkProcessor.Builder builder = BulkProcessor.builder((bulkRequest, actionListener) -> {
            this.client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, actionListener);
        }, bulkProcessorListener, "es-reindex");
        builder.setBulkActions(i);
        builder.setConcurrentRequests(2);
        builder.setFlushInterval(TimeValue.timeValueSeconds(i2));
        builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
        return builder.build();
    }

    @POST
    @Path("/reindex")
    @Operation(operationId = "reindexEntities", summary = "Reindex Entities", tags = {"indexResource"}, description = "Reindex Elastic Search Entities", responses = {@ApiResponse(responseCode = "200", description = "Success"), @ApiResponse(responseCode = "404", description = "Bot for instance {id} is not found")})
    public Response reindexAllEntities(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateEventPublisherJob createEventPublisherJob) throws IOException {
        this.authorizer.authorizeAdmin(securityContext, false);
        User byName = this.userRepository.getByName(null, securityContext.getUserPrincipal().getName(), this.userRepository.getFields("id"));
        return createEventPublisherJob.getRunMode() == CreateEventPublisherJob.RunMode.BATCH ? startReindexingBatchMode(uriInfo, byName.getId(), createEventPublisherJob) : startReindexingStreamMode(uriInfo, byName.getId(), createEventPublisherJob);
    }

    @GET
    @Path("/reindex/status/{runMode}")
    @Operation(operationId = "getReindexAllLastJobStatus", summary = "Get Last Run Reindex All Job Status", tags = {"indexResource"}, description = "Reindex All job last status", responses = {@ApiResponse(responseCode = "200", description = "Success"), @ApiResponse(responseCode = "404", description = "Bot for instance {id} is not found")})
    public Response reindexAllJobLastStatus(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @PathParam("runMode") String str) throws IOException {
        String latestExtension;
        this.authorizer.authorizeAdmin(securityContext, false);
        if (str.equals(CreateEventPublisherJob.RunMode.BATCH.toString())) {
            latestExtension = this.dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION);
        } else {
            if (!str.equals(CreateEventPublisherJob.RunMode.STREAM.toString())) {
                return Response.status(Response.Status.BAD_REQUEST).entity("Invalid Run Mode").build();
            }
            latestExtension = this.dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION);
        }
        return latestExtension != null ? Response.status(Response.Status.OK).entity(JsonUtils.readValue(latestExtension, EventPublisherJob.class)).build() : Response.status(Response.Status.NOT_FOUND).entity("No Last Run.").build();
    }

    private synchronized Response startReindexingStreamMode(UriInfo uriInfo, UUID uuid, CreateEventPublisherJob createEventPublisherJob) {
        this.threadScheduler.submit(() -> {
            submitStreamJob(uriInfo, uuid, createEventPublisherJob);
        });
        return Response.status(Response.Status.OK).entity("Reindexing Started").build();
    }

    private synchronized Response startReindexingBatchMode(UriInfo uriInfo, UUID uuid, CreateEventPublisherJob createEventPublisherJob) {
        this.threadScheduler.submit(() -> {
            try {
                submitBatchJob(uriInfo, uuid, createEventPublisherJob);
            } catch (IOException e) {
                LOG.error("Reindexing Batch Job error", e);
            }
        });
        return Response.status(Response.Status.OK).entity("Reindexing Started").build();
    }

    /* JADX WARN: Type inference failed for: r0v17, types: [java.time.ZonedDateTime] */
    private synchronized void submitStreamJob(UriInfo uriInfo, UUID uuid, CreateEventPublisherJob createEventPublisherJob) {
        try {
            Iterator it = createEventPublisherJob.getEntities().iterator();
            while (it.hasNext()) {
                updateEntityStream(uriInfo, uuid, (String) it.next(), createEventPublisherJob);
            }
            EventPublisherJob eventPublisherJob = (EventPublisherJob) JsonUtils.readValue(this.dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION), EventPublisherJob.class);
            long longValue = eventPublisherJob.getTimestamp().longValue();
            Long valueOf = Long.valueOf(Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime());
            eventPublisherJob.setTimestamp(valueOf);
            eventPublisherJob.setEndTime(valueOf);
            if (eventPublisherJob.getFailureDetails() != null) {
                eventPublisherJob.setStatus(EventPublisherJob.Status.ACTIVEWITHERROR);
            } else {
                eventPublisherJob.setStatus(EventPublisherJob.Status.ACTIVE);
            }
            this.dao.entityExtensionTimeSeriesDao().update(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(eventPublisherJob), Long.valueOf(longValue));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.time.ZonedDateTime] */
    private synchronized void submitBatchJob(UriInfo uriInfo, UUID uuid, CreateEventPublisherJob createEventPublisherJob) throws IOException {
        long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
        EventPublisherJob eventPublisherJob = (EventPublisherJob) JsonUtils.readValue(this.dao.entityExtensionTimeSeriesDao().getExtension(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION), EventPublisherJob.class);
        long longValue = eventPublisherJob.getTimestamp().longValue();
        eventPublisherJob.setStatus(EventPublisherJob.Status.STARTING);
        eventPublisherJob.setStats(new Stats().withFailed(0).withTotal(0).withSuccess(0));
        eventPublisherJob.setTimestamp(Long.valueOf(time));
        eventPublisherJob.setEntities(createEventPublisherJob.getEntities());
        this.dao.entityExtensionTimeSeriesDao().update(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(eventPublisherJob), Long.valueOf(longValue));
        BulkProcessorListener bulkProcessorListener = new BulkProcessorListener(this.dao, uuid);
        BulkProcessor bulkProcessor = getBulkProcessor(bulkProcessorListener, createEventPublisherJob.getBatchSize().intValue(), createEventPublisherJob.getFlushIntervalInSec().intValue());
        for (String str : createEventPublisherJob.getEntities()) {
            try {
                updateEntityBatch(bulkProcessor, bulkProcessorListener, uriInfo, str, createEventPublisherJob);
            } catch (Exception e) {
                LOG.error("Reindexing intermittent failure for entityType : {}", str, e);
            }
        }
    }

    private synchronized void updateEntityBatch(BulkProcessor bulkProcessor, BulkProcessorListener bulkProcessorListener, UriInfo uriInfo, String str, CreateEventPublisherJob createEventPublisherJob) {
        bulkProcessorListener.allowTotalRequestUpdate();
        ElasticSearchIndexDefinition.ElasticSearchIndexType indexMappingByEntityType = this.elasticSearchIndexDefinition.getIndexMappingByEntityType(str);
        if (Boolean.TRUE.equals(createEventPublisherJob.getRecreateIndex())) {
            this.elasticSearchIndexDefinition.deleteIndex(indexMappingByEntityType);
            this.elasticSearchIndexDefinition.createIndex(indexMappingByEntityType);
        }
        EntityRepository entityRepository = Entity.getEntityRepository(str);
        List<String> allowedFields = entityRepository.getAllowedFields();
        String join = String.join(",", allowedFields);
        String str2 = null;
        do {
            try {
                if (str.equals(Entity.TEAM)) {
                    join = "name,displayName";
                }
                ResultList listAfter = entityRepository.listAfter(uriInfo, new EntityUtil.Fields(allowedFields, join), new ListFilter(Include.ALL), createEventPublisherJob.getBatchSize().intValue(), str2);
                bulkProcessorListener.addRequests(listAfter.getPaging().getTotal().intValue());
                updateElasticSearchForEntityBatch(indexMappingByEntityType, bulkProcessor, str, listAfter.getData());
                bulkProcessor.flush();
                str2 = listAfter.getPaging().getAfter();
            } catch (Exception e) {
                LOG.error("Failed in listing all Entities of type : {}, Reason : ", str, e);
                bulkProcessorListener.updateElasticSearchStatus(EventPublisherJob.Status.IDLE, new FailureDetails().withContext(String.format("%s:Failure in fetching Data", str)).withLastFailedReason(String.format("Failed in listing all Entities \n Reason : %s", ExceptionUtils.getStackTrace(e))), null);
                return;
            }
        } while (str2 != null);
    }

    private synchronized void updateEntityStream(UriInfo uriInfo, UUID uuid, String str, CreateEventPublisherJob createEventPublisherJob) throws IOException {
        ElasticSearchIndexDefinition.ElasticSearchIndexType indexMappingByEntityType = this.elasticSearchIndexDefinition.getIndexMappingByEntityType(str);
        if (Boolean.TRUE.equals(createEventPublisherJob.getRecreateIndex())) {
            this.elasticSearchIndexDefinition.deleteIndex(indexMappingByEntityType);
            this.elasticSearchIndexDefinition.createIndex(indexMappingByEntityType);
        }
        EntityRepository entityRepository = Entity.getEntityRepository(str);
        List<String> allowedFields = entityRepository.getAllowedFields();
        String join = String.join(",", allowedFields);
        String str2 = null;
        do {
            try {
                ResultList listAfter = entityRepository.listAfter(uriInfo, new EntityUtil.Fields(allowedFields, join), new ListFilter(Include.ALL), createEventPublisherJob.getBatchSize().intValue(), str2);
                updateElasticSearchForEntityStream(str, listAfter.getData());
                str2 = listAfter.getPaging().getAfter();
            } catch (Exception e) {
                LOG.error("Failed in listing all Entities of type : {}, Reason {}", str, e);
                return;
            }
        } while (str2 != null);
    }

    private synchronized void updateElasticSearchForEntityBatch(ElasticSearchIndexDefinition.ElasticSearchIndexType elasticSearchIndexType, BulkProcessor bulkProcessor, String str, List<EntityInterface> list) {
        Iterator<EntityInterface> it = list.iterator();
        while (it.hasNext()) {
            Table table = (EntityInterface) it.next();
            if (str.equals("table")) {
                table.getColumns().forEach(column -> {
                    column.setProfile((ColumnProfile) null);
                });
            }
            UpdateRequest updateRequest = getUpdateRequest(elasticSearchIndexType, str, table);
            if (updateRequest != null) {
                bulkProcessor.add(updateRequest);
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v22, types: [java.time.ZonedDateTime] */
    private synchronized void updateElasticSearchForEntityStream(String str, List<EntityInterface> list) throws IOException {
        EventPublisherJob eventPublisherJob = (EventPublisherJob) JsonUtils.readValue(this.dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION), EventPublisherJob.class);
        Long timestamp = eventPublisherJob.getTimestamp();
        ElasticSearchIndexDefinition.ElasticSearchIndexType indexMappingByEntityType = this.elasticSearchIndexDefinition.getIndexMappingByEntityType(str);
        Iterator<EntityInterface> it = list.iterator();
        while (it.hasNext()) {
            Table table = (EntityInterface) it.next();
            if (str.equals("table")) {
                table.getColumns().forEach(column -> {
                    column.setProfile((ColumnProfile) null);
                });
            }
            Long valueOf = Long.valueOf(Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime());
            try {
                this.client.update(getUpdateRequest(indexMappingByEntityType, str, table), RequestOptions.DEFAULT);
            } catch (IOException e) {
                eventPublisherJob.setFailureDetails(new FailureDetails().withLastFailedAt(valueOf).withLastFailedReason(e.getMessage()));
                eventPublisherJob.setStatus(EventPublisherJob.Status.ACTIVEWITHERROR);
            }
            eventPublisherJob.setTimestamp(valueOf);
            this.dao.entityExtensionTimeSeriesDao().update(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(eventPublisherJob), timestamp);
            timestamp = valueOf;
        }
    }

    private UpdateRequest getUpdateRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType elasticSearchIndexType, String str, EntityInterface entityInterface) {
        try {
            UpdateRequest updateRequest = new UpdateRequest(elasticSearchIndexType.indexName, entityInterface.getId().toString());
            updateRequest.doc(JsonUtils.pojoToJson(((ElasticSearchIndex) Objects.requireNonNull(ElasticSearchIndexFactory.buildIndex(str, entityInterface))).buildESDoc()), XContentType.JSON);
            updateRequest.docAsUpsert(true);
            return updateRequest;
        } catch (Exception e) {
            LOG.error("Failed in creating update Request for indexType : {}, entityType: {}", new Object[]{elasticSearchIndexType, str, e});
            return null;
        }
    }
}
