/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.resources.elasticSearch;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.swagger.annotations.Api;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import javax.validation.Valid;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.CreateEventPublisherJob;
import org.openmetadata.schema.api.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.settings.EventPublisherJob;
import org.openmetadata.schema.settings.FailureDetails;
import org.openmetadata.schema.settings.Stats;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexFactory;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.resources.Collection;
import org.openmetadata.service.resources.elasticSearch.BulkProcessorListener;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.util.ConfigurationHolder;
import org.openmetadata.service.util.ElasticSearchClientUtils;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.ResultList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/v1/indexResource")
@Api(value="Elastic Search Collection", tags={"Elastic Search Collection"})
@Produces(value={"application/json"})
@Consumes(value={"application/json"})
@Collection(name="indexResource")
public class BuildSearchIndexResource {
    private static final Logger LOG = LoggerFactory.getLogger(BuildSearchIndexResource.class);
    public static final String ELASTIC_SEARCH_EXTENSION = "service.eventPublisher";
    public static final String ELASTIC_SEARCH_ENTITY_FQN_STREAM = "eventPublisher:ElasticSearch:STREAM";
    public static final String ELASTIC_SEARCH_ENTITY_FQN_BATCH = "eventPublisher:ElasticSearch:BATCH";
    private RestHighLevelClient client;
    private ElasticSearchIndexDefinition elasticSearchIndexDefinition;
    private final CollectionDAO dao;
    private final Authorizer authorizer;
    private final ExecutorService threadScheduler;

    public BuildSearchIndexResource(CollectionDAO dao, Authorizer authorizer) {
        if (ConfigurationHolder.getInstance().getConfig(ConfigurationHolder.ConfigurationType.ELASTICSEARCH_CONFIG, ElasticSearchConfiguration.class) != null) {
            this.client = ElasticSearchClientUtils.createElasticSearchClient(ConfigurationHolder.getInstance().getConfig(ConfigurationHolder.ConfigurationType.ELASTICSEARCH_CONFIG, ElasticSearchConfiguration.class));
            this.elasticSearchIndexDefinition = new ElasticSearchIndexDefinition(this.client, dao);
        }
        this.dao = dao;
        this.authorizer = authorizer;
        this.threadScheduler = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    private BulkProcessor getBulkProcessor(BulkProcessorListener listener, int bulkSize, int flushIntervalInSeconds) {
        BiConsumer<BulkRequest, ActionListener> bulkConsumer = (request, bulkListener) -> this.client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
        BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, (BulkProcessor.Listener)listener, (String)"es-reindex");
        builder.setBulkActions(bulkSize);
        builder.setConcurrentRequests(2);
        builder.setFlushInterval(TimeValue.timeValueSeconds((long)flushIntervalInSeconds));
        builder.setBackoffPolicy(BackoffPolicy.constantBackoff((TimeValue)TimeValue.timeValueSeconds((long)1L), (int)3));
        return builder.build();
    }

    @POST
    @Path(value="/reindex")
    @Operation(operationId="reindexEntities", summary="Reindex Entities", tags={"indexResource"}, description="Reindex Elastic Search Entities", responses={@ApiResponse(responseCode="200", description="Success"), @ApiResponse(responseCode="404", description="Bot for instance {id} is not found")})
    public Response reindexAllEntities(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateEventPublisherJob createRequest) {
        this.authorizer.authorizeAdmin(securityContext, false);
        String startedBy = securityContext.getUserPrincipal().getName();
        if (createRequest.getRunMode() == CreateEventPublisherJob.RunMode.BATCH) {
            return this.startReindexingBatchMode(uriInfo, startedBy, createRequest);
        }
        return this.startReindexingStreamMode(uriInfo, startedBy, createRequest);
    }

    @GET
    @Path(value="/reindex/status/{runMode}")
    @Operation(operationId="getReindexAllLastJobStatus", summary="Get Last Run Reindex All Job Status", tags={"indexResource"}, description="Reindex All job last status", responses={@ApiResponse(responseCode="200", description="Success"), @ApiResponse(responseCode="404", description="Bot for instance {id} is not found")})
    public Response reindexAllJobLastStatus(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @PathParam(value="runMode") String runMode) throws IOException {
        String record;
        this.authorizer.authorizeAdmin(securityContext, false);
        if (runMode.equals(CreateEventPublisherJob.RunMode.BATCH.toString())) {
            record = this.dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION);
        } else if (runMode.equals(CreateEventPublisherJob.RunMode.STREAM.toString())) {
            record = this.dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION);
        } else {
            return Response.status((Response.Status)Response.Status.BAD_REQUEST).entity((Object)"Invalid Run Mode").build();
        }
        if (record != null) {
            return Response.status((Response.Status)Response.Status.OK).entity((Object)JsonUtils.readValue(record, EventPublisherJob.class)).build();
        }
        return Response.status((Response.Status)Response.Status.NOT_FOUND).entity((Object)"No Last Run.").build();
    }

    private synchronized Response startReindexingStreamMode(UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) {
        this.threadScheduler.submit(() -> this.submitStreamJob(uriInfo, startedBy, createRequest));
        return Response.status((Response.Status)Response.Status.OK).entity((Object)"Reindexing Started").build();
    }

    private synchronized Response startReindexingBatchMode(UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) {
        this.threadScheduler.submit(() -> {
            try {
                this.submitBatchJob(uriInfo, startedBy, createRequest);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        return Response.status((Response.Status)Response.Status.OK).entity((Object)"Reindexing Started").build();
    }

    private synchronized void submitStreamJob(UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) {
        try {
            for (String entityName : createRequest.getEntities()) {
                this.updateEntityStream(uriInfo, entityName, createRequest);
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private synchronized void submitBatchJob(UriInfo uriInfo, String startedBy, CreateEventPublisherJob createRequest) throws IOException {
        long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
        String recordString = this.dao.entityExtensionTimeSeriesDao().getExtension(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION);
        EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class);
        long originalLastUpdate = lastRecord.getTimestamp();
        lastRecord.setStatus(EventPublisherJob.Status.ACTIVE);
        lastRecord.setTimestamp(Long.valueOf(updateTime));
        lastRecord.setEntities(createRequest.getEntities());
        this.dao.entityExtensionTimeSeriesDao().update(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(lastRecord), originalLastUpdate);
        BulkProcessorListener bulkProcessorListener = new BulkProcessorListener(this.dao);
        BulkProcessor processor = this.getBulkProcessor(bulkProcessorListener, createRequest.getBatchSize(), createRequest.getFlushIntervalInSec());
        for (String entityName : createRequest.getEntities()) {
            this.updateEntityBatch(processor, bulkProcessorListener, uriInfo, entityName, createRequest);
        }
        EventPublisherJob lastReadRecord = JsonUtils.readValue(this.dao.entityExtensionTimeSeriesDao().getExtension(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION), EventPublisherJob.class);
        long lastRecordTimestamp = lastReadRecord.getTimestamp();
        lastReadRecord.setStatus(EventPublisherJob.Status.IDLE);
        lastReadRecord.setTimestamp(Long.valueOf(Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime()));
        lastReadRecord.setStats(new Stats().withTotal(Integer.valueOf(bulkProcessorListener.getTotalRequests())).withSuccess(Integer.valueOf(bulkProcessorListener.getTotalSuccessCount())).withFailed(Integer.valueOf(bulkProcessorListener.getTotalFailedCount())));
        this.dao.entityExtensionTimeSeriesDao().update(ELASTIC_SEARCH_ENTITY_FQN_BATCH, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(lastReadRecord), lastRecordTimestamp);
    }

    private synchronized void updateEntityBatch(BulkProcessor processor, BulkProcessorListener listener, UriInfo uriInfo, String entityType, CreateEventPublisherJob createRequest) {
        listener.allowTotalRequestUpdate();
        ElasticSearchIndexDefinition.ElasticSearchIndexType indexType = this.elasticSearchIndexDefinition.getIndexMappingByEntityType(entityType);
        if (Boolean.TRUE.equals(createRequest.getRecreateIndex())) {
            this.elasticSearchIndexDefinition.deleteIndex(indexType);
            this.elasticSearchIndexDefinition.createIndex(indexType);
        }
        EntityRepository entityRepository = Entity.getEntityRepository(entityType);
        List<String> allowedFields = entityRepository.getAllowedFields();
        String fields = String.join((CharSequence)",", allowedFields);
        String after = null;
        try {
            ResultList result;
            do {
                if (entityType.equals("team")) {
                    fields = "name,displayName";
                }
                result = entityRepository.listAfter(uriInfo, new EntityUtil.Fields(allowedFields, fields), new ListFilter(Include.ALL), createRequest.getBatchSize(), after);
                listener.addRequests(result.getPaging().getTotal());
                this.updateElasticSearchForEntityBatch(indexType, processor, entityType, result.getData());
                processor.flush();
            } while ((after = result.getPaging().getAfter()) != null);
        }
        catch (Exception ex) {
            LOG.error("Failed in listing all Entities of type : {}", (Object)entityType);
        }
    }

    private synchronized void updateEntityStream(UriInfo uriInfo, String entityType, CreateEventPublisherJob createRequest) throws IOException {
        ElasticSearchIndexDefinition.ElasticSearchIndexType indexType = this.elasticSearchIndexDefinition.getIndexMappingByEntityType(entityType);
        if (Boolean.TRUE.equals(createRequest.getRecreateIndex())) {
            this.elasticSearchIndexDefinition.deleteIndex(indexType);
            this.elasticSearchIndexDefinition.createIndex(indexType);
        }
        EntityRepository entityRepository = Entity.getEntityRepository(entityType);
        List<String> allowedFields = entityRepository.getAllowedFields();
        String fields = String.join((CharSequence)",", allowedFields);
        String after = null;
        try {
            ResultList result;
            do {
                result = entityRepository.listAfter(uriInfo, new EntityUtil.Fields(allowedFields, fields), new ListFilter(Include.ALL), createRequest.getBatchSize(), after);
                this.updateElasticSearchForEntityStream(entityType, result.getData());
            } while ((after = result.getPaging().getAfter()) != null);
        }
        catch (Exception ex) {
            LOG.error("Failed in listing all Entities of type : {}", (Object)entityType);
        }
        String reindexJobString = this.dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION);
        EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class);
        long lastUpdateTime = latestJob.getTimestamp();
        Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
        latestJob.setTimestamp(time);
        latestJob.setEndTime(time);
        if (latestJob.getFailureDetails() != null) {
            latestJob.setStatus(EventPublisherJob.Status.ACTIVEWITHERROR);
        } else {
            latestJob.setStatus(EventPublisherJob.Status.ACTIVE);
        }
        this.dao.entityExtensionTimeSeriesDao().update(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(latestJob), lastUpdateTime);
    }

    private synchronized void updateElasticSearchForEntityBatch(ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, BulkProcessor bulkProcessor, String entityType, List<EntityInterface> entities) throws IOException {
        for (EntityInterface entity : entities) {
            if (entityType.equals("table")) {
                ((Table)entity).getColumns().forEach(table -> table.setProfile(null));
            }
            bulkProcessor.add((DocWriteRequest)this.getUpdateRequest(indexType, entityType, entity));
        }
    }

    private synchronized void updateElasticSearchForEntityStream(String entityType, List<EntityInterface> entities) throws IOException {
        String reindexJobString = this.dao.entityExtensionTimeSeriesDao().getLatestExtension(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION);
        EventPublisherJob latestJob = JsonUtils.readValue(reindexJobString, EventPublisherJob.class);
        Long lastUpdateTime = latestJob.getTimestamp();
        ElasticSearchIndexDefinition.ElasticSearchIndexType indexType = this.elasticSearchIndexDefinition.getIndexMappingByEntityType(entityType);
        for (EntityInterface entity : entities) {
            if (entityType.equals("table")) {
                ((Table)entity).getColumns().forEach(table -> table.setProfile(null));
            }
            Long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
            try {
                this.client.update(this.getUpdateRequest(indexType, entityType, entity), RequestOptions.DEFAULT);
            }
            catch (IOException ex) {
                FailureDetails failureDetails = new FailureDetails().withLastFailedAt(time).withLastFailedReason(ex.getMessage());
                latestJob.setFailureDetails(failureDetails);
                latestJob.setStatus(EventPublisherJob.Status.ACTIVEWITHERROR);
            }
            latestJob.setTimestamp(time);
            this.dao.entityExtensionTimeSeriesDao().update(ELASTIC_SEARCH_ENTITY_FQN_STREAM, ELASTIC_SEARCH_EXTENSION, JsonUtils.pojoToJson(latestJob), lastUpdateTime);
            lastUpdateTime = time;
        }
    }

    private UpdateRequest getUpdateRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType indexType, String entityType, EntityInterface entity) throws JsonProcessingException {
        UpdateRequest updateRequest = new UpdateRequest(indexType.indexName, entity.getId().toString());
        updateRequest.doc(JsonUtils.pojoToJson(Objects.requireNonNull(ElasticSearchIndexFactory.buildIndex(entityType, entity)).buildESDoc()), XContentType.JSON);
        updateRequest.docAsUpsert(true);
        return updateRequest;
    }
}

