/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.elasticsearch;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.WildcardQueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.openmetadata.schema.api.CreateEventPublisherJob;
import org.openmetadata.schema.entity.classification.Classification;
import org.openmetadata.schema.entity.classification.Tag;
import org.openmetadata.schema.entity.data.Dashboard;
import org.openmetadata.schema.entity.data.Database;
import org.openmetadata.schema.entity.data.DatabaseSchema;
import org.openmetadata.schema.entity.data.Glossary;
import org.openmetadata.schema.entity.data.GlossaryTerm;
import org.openmetadata.schema.entity.data.MlModel;
import org.openmetadata.schema.entity.data.Pipeline;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.data.Topic;
import org.openmetadata.schema.entity.services.DashboardService;
import org.openmetadata.schema.entity.services.DatabaseService;
import org.openmetadata.schema.entity.services.MessagingService;
import org.openmetadata.schema.entity.services.MlModelService;
import org.openmetadata.schema.entity.services.PipelineService;
import org.openmetadata.schema.entity.teams.Team;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.settings.EventPublisherJob;
import org.openmetadata.schema.settings.FailureDetails;
import org.openmetadata.schema.type.ChangeDescription;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.EventType;
import org.openmetadata.schema.type.FieldChange;
import org.openmetadata.schema.type.UsageDetails;
import org.openmetadata.service.elasticsearch.DashboardIndex;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition;
import org.openmetadata.service.elasticsearch.ElasticSearchRetriableException;
import org.openmetadata.service.elasticsearch.GlossaryTermIndex;
import org.openmetadata.service.elasticsearch.MlModelIndex;
import org.openmetadata.service.elasticsearch.PipelineIndex;
import org.openmetadata.service.elasticsearch.TableIndex;
import org.openmetadata.service.elasticsearch.TagIndex;
import org.openmetadata.service.elasticsearch.TeamIndex;
import org.openmetadata.service.elasticsearch.TopicIndex;
import org.openmetadata.service.elasticsearch.UserIndex;
import org.openmetadata.service.events.AbstractEventPublisher;
import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.resources.events.EventResource;
import org.openmetadata.service.util.ElasticSearchClientUtils;
import org.openmetadata.service.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticSearchEventPublisher
extends AbstractEventPublisher {
    private static final Logger LOG = LoggerFactory.getLogger(ElasticSearchEventPublisher.class);
    private static final String SENDING_REQUEST_TO_ELASTIC_SEARCH = "Sending request to ElasticSearch {}";
    private final RestHighLevelClient client;
    private final CollectionDAO dao;
    private static final String SERVICE_NAME = "service.name";
    private static final String DATABASE_NAME = "database.name";

    public ElasticSearchEventPublisher(ElasticSearchConfiguration esConfig, CollectionDAO dao) {
        super(esConfig.getBatchSize());
        this.dao = dao;
        this.registerElasticSearchJobs();
        this.client = ElasticSearchClientUtils.createElasticSearchClient(esConfig);
        ElasticSearchIndexDefinition esIndexDefinition = new ElasticSearchIndexDefinition(this.client, dao);
        esIndexDefinition.createIndexes(esConfig);
    }

    public void onStart() {
        LOG.info("ElasticSearch Publisher Started");
    }

    @Override
    public void publish(EventResource.ChangeEventList events) throws EventPublisherException, JsonProcessingException {
        for (ChangeEvent event : events.getData()) {
            String entityType = event.getEntityType();
            String contextInfo = event.getEntity() != null ? String.format("Entity Info : %s", JsonUtils.pojoToJson(event.getEntity())) : null;
            try {
                switch (entityType) {
                    case "table": {
                        this.updateTable(event);
                        break;
                    }
                    case "dashboard": {
                        this.updateDashboard(event);
                        break;
                    }
                    case "topic": {
                        this.updateTopic(event);
                        break;
                    }
                    case "pipeline": {
                        this.updatePipeline(event);
                        break;
                    }
                    case "user": {
                        this.updateUser(event);
                        break;
                    }
                    case "team": {
                        this.updateTeam(event);
                        break;
                    }
                    case "glossaryTerm": {
                        this.updateGlossaryTerm(event);
                        break;
                    }
                    case "glossary": {
                        this.updateGlossary(event);
                        break;
                    }
                    case "database": {
                        this.updateDatabase(event);
                        break;
                    }
                    case "databaseSchema": {
                        this.updateDatabaseSchema(event);
                        break;
                    }
                    case "dashboardService": {
                        this.updateDashboardService(event);
                        break;
                    }
                    case "databaseService": {
                        this.updateDatabaseService(event);
                        break;
                    }
                    case "messagingService": {
                        this.updateMessagingService(event);
                        break;
                    }
                    case "pipelineService": {
                        this.updatePipelineService(event);
                        break;
                    }
                    case "mlmodelService": {
                        this.updateMlModelService(event);
                        break;
                    }
                    case "mlmodel": {
                        this.updateMlModel(event);
                        break;
                    }
                    case "tag": {
                        this.updateTag(event);
                        break;
                    }
                    case "classification": {
                        this.updateClassification(event);
                        break;
                    }
                    default: {
                        LOG.warn("Ignoring Entity Type {}", (Object)entityType);
                        break;
                    }
                }
            }
            catch (DocumentMissingException ex) {
                LOG.error("Missing Document", (Throwable)ex);
                this.updateElasticSearchFailureStatus(contextInfo, EventPublisherJob.Status.ACTIVE_WITH_ERROR, String.format("Missing Document while Updating ES. Reason[%s], Cause[%s], Stack [%s]", ex.getMessage(), ex.getCause(), ExceptionUtils.getStackTrace((Throwable)ex)));
            }
            catch (ElasticsearchException e) {
                LOG.error("failed to update ES doc");
                LOG.debug(e.getMessage());
                if (e.status() == RestStatus.GATEWAY_TIMEOUT || e.status() == RestStatus.REQUEST_TIMEOUT) {
                    LOG.error("Error in publishing to ElasticSearch");
                    this.updateElasticSearchFailureStatus(contextInfo, EventPublisherJob.Status.ACTIVE_WITH_ERROR, String.format("Timeout when updating ES request. Reason[%s], Cause[%s], Stack [%s]", e.getMessage(), e.getCause(), ExceptionUtils.getStackTrace((Throwable)e)));
                    throw new ElasticSearchRetriableException(e.getMessage());
                }
                this.updateElasticSearchFailureStatus(contextInfo, EventPublisherJob.Status.ACTIVE_WITH_ERROR, String.format("Failed while updating ES. Reason[%s], Cause[%s], Stack [%s]", e.getMessage(), e.getCause(), ExceptionUtils.getStackTrace((Throwable)e)));
                LOG.error(e.getMessage(), (Throwable)e);
            }
            catch (IOException ie) {
                this.updateElasticSearchFailureStatus(contextInfo, EventPublisherJob.Status.ACTIVE_WITH_ERROR, String.format("Issue in updating ES request. Reason[%s], Cause[%s], Stack [%s]", ie.getMessage(), ie.getCause(), ExceptionUtils.getStackTrace((Throwable)ie)));
                throw new EventPublisherException(ie.getMessage());
            }
        }
    }

    public void onShutdown() {
        this.close();
        LOG.info("Shutting down ElasticSearchEventPublisher");
    }

    private UpdateRequest applyChangeEvent(ChangeEvent event) {
        List entityReferences;
        String entityType = event.getEntityType();
        ElasticSearchIndexDefinition.ElasticSearchIndexType esIndexType = ElasticSearchIndexDefinition.getIndexMappingByEntityType(entityType);
        UUID entityId = event.getEntityId();
        ChangeDescription changeDescription = event.getChangeDescription();
        List fieldsAdded = changeDescription.getFieldsAdded();
        StringBuilder scriptTxt = new StringBuilder();
        HashMap<String, Object> fieldAddParams = new HashMap<String, Object>();
        fieldAddParams.put("updatedAt", event.getTimestamp());
        scriptTxt.append("ctx._source.updatedAt=params.updatedAt;");
        for (FieldChange fieldChange : fieldsAdded) {
            if (!fieldChange.getName().equalsIgnoreCase("followers")) continue;
            entityReferences = (List)fieldChange.getNewValue();
            ArrayList<String> newFollowers = new ArrayList<String>();
            for (EntityReference follower : entityReferences) {
                newFollowers.add(follower.getId().toString());
            }
            fieldAddParams.put(fieldChange.getName(), newFollowers);
            scriptTxt.append("ctx._source.followers.addAll(params.followers);");
        }
        for (FieldChange fieldChange : changeDescription.getFieldsDeleted()) {
            if (!fieldChange.getName().equalsIgnoreCase("followers")) continue;
            entityReferences = (List)fieldChange.getOldValue();
            for (EntityReference follower : entityReferences) {
                fieldAddParams.put(fieldChange.getName(), follower.getId().toString());
            }
            scriptTxt.append("ctx._source.followers.removeAll(Collections.singleton(params.followers));");
        }
        for (FieldChange fieldChange : changeDescription.getFieldsUpdated()) {
            if (!fieldChange.getName().equalsIgnoreCase("usageSummary")) continue;
            UsageDetails usageSummary = (UsageDetails)fieldChange.getNewValue();
            fieldAddParams.put(fieldChange.getName(), JsonUtils.getMap(usageSummary));
            scriptTxt.append("ctx._source.usageSummary = params.usageSummary;");
        }
        if (!scriptTxt.toString().isEmpty()) {
            Script script = new Script(ScriptType.INLINE, "painless", scriptTxt.toString(), fieldAddParams);
            UpdateRequest updateRequest = new UpdateRequest(esIndexType.indexName, entityId.toString());
            updateRequest.script(script);
            return updateRequest;
        }
        return null;
    }

    private void updateTable(ChangeEvent event) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName, event.getEntityId().toString());
        switch (event.getEventType()) {
            case ENTITY_CREATED: {
                TableIndex tableIndex = new TableIndex((Table)event.getEntity());
                updateRequest.doc(JsonUtils.pojoToJson(tableIndex.buildESDoc()), XContentType.JSON);
                updateRequest.docAsUpsert(true);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_UPDATED: {
                if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) {
                    updateRequest = this.applyChangeEvent(event);
                } else {
                    TableIndex tableIndex = new TableIndex((Table)event.getEntity());
                    this.scriptedUpsert(tableIndex.buildESDoc(), updateRequest);
                }
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_SOFT_DELETED: {
                this.softDeleteEntity(updateRequest);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_DELETED: {
                DeleteRequest deleteRequest = new DeleteRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName, event.getEntityId().toString());
                this.deleteEntityFromElasticSearch(deleteRequest);
            }
        }
    }

    private void updateTopic(ChangeEvent event) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.TOPIC_SEARCH_INDEX.indexName, event.getEntityId().toString());
        switch (event.getEventType()) {
            case ENTITY_CREATED: {
                TopicIndex topicIndex = new TopicIndex((Topic)event.getEntity());
                updateRequest.doc(JsonUtils.pojoToJson(topicIndex.buildESDoc()), XContentType.JSON);
                updateRequest.docAsUpsert(true);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_UPDATED: {
                if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) {
                    updateRequest = this.applyChangeEvent(event);
                } else {
                    TopicIndex topicIndex = new TopicIndex((Topic)event.getEntity());
                    this.scriptedUpsert(topicIndex.buildESDoc(), updateRequest);
                }
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_SOFT_DELETED: {
                this.softDeleteEntity(updateRequest);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_DELETED: {
                DeleteRequest deleteRequest = new DeleteRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.TOPIC_SEARCH_INDEX.indexName, event.getEntityId().toString());
                this.deleteEntityFromElasticSearch(deleteRequest);
            }
        }
    }

    private void updateDashboard(ChangeEvent event) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX.indexName, event.getEntityId().toString());
        switch (event.getEventType()) {
            case ENTITY_CREATED: {
                DashboardIndex dashboardIndex = new DashboardIndex((Dashboard)event.getEntity());
                updateRequest.doc(JsonUtils.pojoToJson(dashboardIndex.buildESDoc()), XContentType.JSON);
                updateRequest.docAsUpsert(true);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_UPDATED: {
                if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) {
                    updateRequest = this.applyChangeEvent(event);
                } else {
                    DashboardIndex dashboardIndex = new DashboardIndex((Dashboard)event.getEntity());
                    this.scriptedUpsert(dashboardIndex.buildESDoc(), updateRequest);
                }
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_SOFT_DELETED: {
                this.softDeleteEntity(updateRequest);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_DELETED: {
                DeleteRequest deleteRequest = new DeleteRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX.indexName, event.getEntityId().toString());
                this.deleteEntityFromElasticSearch(deleteRequest);
            }
        }
    }

    private void updatePipeline(ChangeEvent event) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.PIPELINE_SEARCH_INDEX.indexName, event.getEntityId().toString());
        switch (event.getEventType()) {
            case ENTITY_CREATED: {
                PipelineIndex pipelineIndex = new PipelineIndex((Pipeline)event.getEntity());
                updateRequest.doc(JsonUtils.pojoToJson(pipelineIndex.buildESDoc()), XContentType.JSON);
                updateRequest.docAsUpsert(true);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_UPDATED: {
                PipelineIndex pipelineIndex = new PipelineIndex((Pipeline)event.getEntity());
                if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) {
                    updateRequest = this.applyChangeEvent(event);
                } else {
                    this.scriptedUpsert(pipelineIndex.buildESDoc(), updateRequest);
                }
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_SOFT_DELETED: {
                this.softDeleteEntity(updateRequest);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_DELETED: {
                DeleteRequest deleteRequest = new DeleteRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.PIPELINE_SEARCH_INDEX.indexName, event.getEntityId().toString());
                this.deleteEntityFromElasticSearch(deleteRequest);
            }
        }
    }

    private void updateUser(ChangeEvent event) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.USER_SEARCH_INDEX.indexName, event.getEntityId().toString());
        switch (event.getEventType()) {
            case ENTITY_CREATED: {
                UserIndex userIndex = new UserIndex((User)event.getEntity());
                updateRequest.doc(JsonUtils.pojoToJson(userIndex.buildESDoc()), XContentType.JSON);
                updateRequest.docAsUpsert(true);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_UPDATED: {
                UserIndex userIndex = new UserIndex((User)event.getEntity());
                this.scriptedUserUpsert(userIndex.buildESDoc(), updateRequest);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_SOFT_DELETED: {
                this.softDeleteEntity(updateRequest);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_DELETED: {
                DeleteRequest deleteRequest = new DeleteRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.USER_SEARCH_INDEX.indexName, event.getEntityId().toString());
                this.deleteEntityFromElasticSearch(deleteRequest);
            }
        }
    }

    private void updateTeam(ChangeEvent event) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.TEAM_SEARCH_INDEX.indexName, event.getEntityId().toString());
        switch (event.getEventType()) {
            case ENTITY_CREATED: {
                TeamIndex teamIndex = new TeamIndex((Team)event.getEntity());
                updateRequest.doc(JsonUtils.pojoToJson(teamIndex.buildESDoc()), XContentType.JSON);
                updateRequest.docAsUpsert(true);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_UPDATED: {
                TeamIndex teamIndex = new TeamIndex((Team)event.getEntity());
                this.scriptedTeamUpsert(teamIndex.buildESDoc(), updateRequest);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_SOFT_DELETED: {
                this.softDeleteEntity(updateRequest);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_DELETED: {
                DeleteRequest deleteRequest = new DeleteRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.TEAM_SEARCH_INDEX.indexName, event.getEntityId().toString());
                this.deleteEntityFromElasticSearch(deleteRequest);
            }
        }
    }

    private void updateGlossaryTerm(ChangeEvent event) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX.indexName, event.getEntityId().toString());
        switch (event.getEventType()) {
            case ENTITY_CREATED: {
                GlossaryTermIndex glossaryTermIndex = new GlossaryTermIndex((GlossaryTerm)event.getEntity());
                updateRequest.doc(JsonUtils.pojoToJson(glossaryTermIndex.buildESDoc()), XContentType.JSON);
                updateRequest.docAsUpsert(true);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_UPDATED: {
                GlossaryTermIndex glossaryTermIndex = new GlossaryTermIndex((GlossaryTerm)event.getEntity());
                this.scriptedUpsert(glossaryTermIndex.buildESDoc(), updateRequest);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_SOFT_DELETED: {
                this.softDeleteEntity(updateRequest);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_DELETED: {
                DeleteByQueryRequest request = new DeleteByQueryRequest(new String[]{ElasticSearchIndexDefinition.ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX.indexName});
                new DeleteRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX.indexName, event.getEntityId().toString());
                GlossaryTerm glossaryTerm = (GlossaryTerm)event.getEntity();
                request.setQuery((QueryBuilder)QueryBuilders.boolQuery().should((QueryBuilder)QueryBuilders.matchQuery((String)"id", (Object)glossaryTerm.getId().toString())).should((QueryBuilder)QueryBuilders.matchQuery((String)"parent.id", (Object)glossaryTerm.getId().toString())));
                this.deleteEntityFromElasticSearchByQuery(request);
            }
        }
    }

    private void updateGlossary(ChangeEvent event) throws IOException {
        if (event.getEventType() == EventType.ENTITY_DELETED) {
            Glossary glossary = (Glossary)event.getEntity();
            DeleteByQueryRequest request = new DeleteByQueryRequest(new String[]{ElasticSearchIndexDefinition.ElasticSearchIndexType.GLOSSARY_SEARCH_INDEX.indexName});
            request.setQuery((QueryBuilder)QueryBuilders.boolQuery().should((QueryBuilder)QueryBuilders.matchQuery((String)"glossary.id", (Object)glossary.getId().toString())));
            this.deleteEntityFromElasticSearchByQuery(request);
        }
    }

    private void updateMlModel(ChangeEvent event) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.MLMODEL_SEARCH_INDEX.indexName, event.getEntityId().toString());
        switch (event.getEventType()) {
            case ENTITY_CREATED: {
                MlModelIndex mlModelIndex = new MlModelIndex((MlModel)event.getEntity());
                updateRequest.doc(JsonUtils.pojoToJson(mlModelIndex.buildESDoc()), XContentType.JSON);
                updateRequest.docAsUpsert(true);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_UPDATED: {
                if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) {
                    updateRequest = this.applyChangeEvent(event);
                } else {
                    MlModelIndex mlModelIndex = new MlModelIndex((MlModel)event.getEntity());
                    this.scriptedUpsert(mlModelIndex.buildESDoc(), updateRequest);
                }
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_SOFT_DELETED: {
                this.softDeleteEntity(updateRequest);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_DELETED: {
                DeleteRequest deleteRequest = new DeleteRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.MLMODEL_SEARCH_INDEX.indexName, event.getEntityId().toString());
                this.deleteEntityFromElasticSearch(deleteRequest);
            }
        }
    }

    private void updateTag(ChangeEvent event) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.TAG_SEARCH_INDEX.indexName, event.getEntityId().toString());
        switch (event.getEventType()) {
            case ENTITY_CREATED: {
                TagIndex tagIndex = new TagIndex((Tag)event.getEntity());
                updateRequest.doc(JsonUtils.pojoToJson(tagIndex.buildESDoc()), XContentType.JSON);
                updateRequest.docAsUpsert(true);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_UPDATED: {
                if (Objects.equals(event.getCurrentVersion(), event.getPreviousVersion())) {
                    updateRequest = this.applyChangeEvent(event);
                } else {
                    TagIndex tagIndex = new TagIndex((Tag)event.getEntity());
                    this.scriptedUpsert(tagIndex.buildESDoc(), updateRequest);
                }
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_SOFT_DELETED: {
                this.softDeleteEntity(updateRequest);
                this.updateElasticSearch(updateRequest);
                break;
            }
            case ENTITY_DELETED: {
                DeleteRequest deleteRequest = new DeleteRequest(ElasticSearchIndexDefinition.ElasticSearchIndexType.TAG_SEARCH_INDEX.indexName, event.getEntityId().toString());
                this.deleteEntityFromElasticSearch(deleteRequest);
            }
        }
    }

    private void updateDatabase(ChangeEvent event) throws IOException {
        if (event.getEventType() == EventType.ENTITY_DELETED) {
            Database database = (Database)event.getEntity();
            DeleteByQueryRequest request = new DeleteByQueryRequest(new String[]{ElasticSearchIndexDefinition.ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName});
            BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
            queryBuilder.must((QueryBuilder)new TermQueryBuilder(DATABASE_NAME, database.getName()));
            queryBuilder.must((QueryBuilder)new TermQueryBuilder(SERVICE_NAME, database.getService().getName()));
            request.setQuery((QueryBuilder)queryBuilder);
            this.deleteEntityFromElasticSearchByQuery(request);
        }
    }

    private void updateDatabaseSchema(ChangeEvent event) throws IOException {
        if (event.getEventType() == EventType.ENTITY_DELETED) {
            DatabaseSchema databaseSchema = (DatabaseSchema)event.getEntity();
            DeleteByQueryRequest request = new DeleteByQueryRequest(new String[]{ElasticSearchIndexDefinition.ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName});
            BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
            queryBuilder.must((QueryBuilder)new TermQueryBuilder("databaseSchema.name", databaseSchema.getName()));
            queryBuilder.must((QueryBuilder)new TermQueryBuilder(DATABASE_NAME, databaseSchema.getDatabase().getName()));
            request.setQuery((QueryBuilder)queryBuilder);
            this.deleteEntityFromElasticSearchByQuery(request);
        }
    }

    private void updateDatabaseService(ChangeEvent event) throws IOException {
        if (event.getEventType() == EventType.ENTITY_DELETED) {
            DatabaseService databaseService = (DatabaseService)event.getEntity();
            DeleteByQueryRequest request = new DeleteByQueryRequest(new String[]{ElasticSearchIndexDefinition.ElasticSearchIndexType.TABLE_SEARCH_INDEX.indexName});
            request.setQuery((QueryBuilder)new TermQueryBuilder(SERVICE_NAME, databaseService.getName()));
            this.deleteEntityFromElasticSearchByQuery(request);
        }
    }

    private void updatePipelineService(ChangeEvent event) throws IOException {
        if (event.getEventType() == EventType.ENTITY_DELETED) {
            PipelineService pipelineService = (PipelineService)event.getEntity();
            DeleteByQueryRequest request = new DeleteByQueryRequest(new String[]{ElasticSearchIndexDefinition.ElasticSearchIndexType.PIPELINE_SEARCH_INDEX.indexName});
            request.setQuery((QueryBuilder)new TermQueryBuilder(SERVICE_NAME, pipelineService.getName()));
            this.deleteEntityFromElasticSearchByQuery(request);
        }
    }

    private void updateMlModelService(ChangeEvent event) throws IOException {
        if (event.getEventType() == EventType.ENTITY_DELETED) {
            MlModelService mlModelService = (MlModelService)event.getEntity();
            DeleteByQueryRequest request = new DeleteByQueryRequest(new String[]{ElasticSearchIndexDefinition.ElasticSearchIndexType.MLMODEL_SEARCH_INDEX.indexName});
            request.setQuery((QueryBuilder)new TermQueryBuilder(SERVICE_NAME, mlModelService.getName()));
            this.deleteEntityFromElasticSearchByQuery(request);
        }
    }

    private void updateMessagingService(ChangeEvent event) throws IOException {
        if (event.getEventType() == EventType.ENTITY_DELETED) {
            MessagingService messagingService = (MessagingService)event.getEntity();
            DeleteByQueryRequest request = new DeleteByQueryRequest(new String[]{ElasticSearchIndexDefinition.ElasticSearchIndexType.TOPIC_SEARCH_INDEX.indexName});
            request.setQuery((QueryBuilder)new TermQueryBuilder(SERVICE_NAME, messagingService.getName()));
            this.deleteEntityFromElasticSearchByQuery(request);
        }
    }

    private void updateDashboardService(ChangeEvent event) throws IOException {
        if (event.getEventType() == EventType.ENTITY_DELETED) {
            DashboardService dashboardService = (DashboardService)event.getEntity();
            DeleteByQueryRequest request = new DeleteByQueryRequest(new String[]{ElasticSearchIndexDefinition.ElasticSearchIndexType.DASHBOARD_SEARCH_INDEX.indexName});
            request.setQuery((QueryBuilder)new TermQueryBuilder(SERVICE_NAME, dashboardService.getName()));
            this.deleteEntityFromElasticSearchByQuery(request);
        }
    }

    private void updateClassification(ChangeEvent event) throws IOException {
        if (event.getEventType() == EventType.ENTITY_DELETED) {
            Classification classification = (Classification)event.getEntity();
            DeleteByQueryRequest request = new DeleteByQueryRequest(new String[]{ElasticSearchIndexDefinition.ElasticSearchIndexType.TAG_SEARCH_INDEX.indexName});
            String fqnMatch = classification.getName() + ".*";
            request.setQuery((QueryBuilder)new WildcardQueryBuilder("fullyQualifiedName", fqnMatch));
            this.deleteEntityFromElasticSearchByQuery(request);
        }
    }

    private void scriptedUpsert(Object doc, UpdateRequest updateRequest) {
        String scriptTxt = "for (k in params.keySet()) { ctx._source.put(k, params.get(k)) }";
        Script script = new Script(ScriptType.INLINE, "painless", scriptTxt, JsonUtils.getMap(doc));
        updateRequest.script(script);
        updateRequest.scriptedUpsert(true);
        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
    }

    private void scriptedUserUpsert(Object index, UpdateRequest updateRequest) {
        String scriptTxt = "for (k in params.keySet()) {ctx._source.put(k, params.get(k)) }";
        Map<String, Object> doc = JsonUtils.getMap(index);
        Script script = new Script(ScriptType.INLINE, "painless", scriptTxt, doc);
        updateRequest.script(script);
        updateRequest.scriptedUpsert(true);
    }

    private void scriptedTeamUpsert(Object index, UpdateRequest updateRequest) {
        String scriptTxt = "for (k in params.keySet()) { ctx._source.put(k, params.get(k)) }";
        Map<String, Object> doc = JsonUtils.getMap(index);
        Script script = new Script(ScriptType.INLINE, "painless", scriptTxt, doc);
        updateRequest.script(script);
        updateRequest.scriptedUpsert(true);
    }

    private void softDeleteEntity(UpdateRequest updateRequest) {
        String scriptTxt = "ctx._source.deleted=true";
        Script script = new Script(ScriptType.INLINE, "painless", scriptTxt, new HashMap());
        updateRequest.script(script);
    }

    private void updateElasticSearch(UpdateRequest updateRequest) throws IOException {
        if (updateRequest != null) {
            LOG.debug(SENDING_REQUEST_TO_ELASTIC_SEARCH, (Object)updateRequest);
            this.client.update(updateRequest, RequestOptions.DEFAULT);
        }
    }

    private void deleteEntityFromElasticSearch(DeleteRequest deleteRequest) throws IOException {
        if (deleteRequest != null) {
            LOG.debug(SENDING_REQUEST_TO_ELASTIC_SEARCH, (Object)deleteRequest);
            deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
            this.client.delete(deleteRequest, RequestOptions.DEFAULT);
        }
    }

    private void deleteEntityFromElasticSearchByQuery(DeleteByQueryRequest deleteRequest) throws IOException {
        if (deleteRequest != null) {
            LOG.debug(SENDING_REQUEST_TO_ELASTIC_SEARCH, (Object)deleteRequest);
            deleteRequest.setRefresh(true);
            this.client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT);
        }
    }

    public void registerElasticSearchJobs() {
        try {
            this.dao.entityExtensionTimeSeriesDao().delete("eventPublisher:ElasticSearch:BATCH", "service.eventPublisher");
            this.dao.entityExtensionTimeSeriesDao().delete("eventPublisher:ElasticSearch:STREAM", "service.eventPublisher");
            long startTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
            FailureDetails failureDetails = new FailureDetails().withLastFailedAt(Long.valueOf(0L)).withLastFailedReason("No Failures");
            EventPublisherJob batchJob = new EventPublisherJob().withName("Elastic Search Batch").withPublisherType(CreateEventPublisherJob.PublisherType.ELASTIC_SEARCH).withRunMode(CreateEventPublisherJob.RunMode.BATCH).withStatus(EventPublisherJob.Status.IDLE).withTimestamp(Long.valueOf(startTime)).withStartedBy("admin").withStartTime(Long.valueOf(startTime)).withFailureDetails(failureDetails);
            EventPublisherJob streamJob = new EventPublisherJob().withName("Elastic Search Stream").withPublisherType(CreateEventPublisherJob.PublisherType.ELASTIC_SEARCH).withRunMode(CreateEventPublisherJob.RunMode.STREAM).withStatus(EventPublisherJob.Status.ACTIVE).withTimestamp(Long.valueOf(startTime)).withStartedBy("admin").withStartTime(Long.valueOf(startTime)).withFailureDetails(failureDetails);
            this.dao.entityExtensionTimeSeriesDao().insert("eventPublisher:ElasticSearch:BATCH", "service.eventPublisher", "eventPublisherJob", JsonUtils.pojoToJson(batchJob));
            this.dao.entityExtensionTimeSeriesDao().insert("eventPublisher:ElasticSearch:STREAM", "service.eventPublisher", "eventPublisherJob", JsonUtils.pojoToJson(streamJob));
        }
        catch (Exception e) {
            LOG.error("Failed to register Elastic Search Job");
        }
    }

    public void updateElasticSearchFailureStatus(String context, EventPublisherJob.Status status, String failureMessage) {
        try {
            long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
            String recordString = this.dao.entityExtensionTimeSeriesDao().getExtension("eventPublisher:ElasticSearch:STREAM", "service.eventPublisher");
            EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class);
            long originalLastUpdate = lastRecord.getTimestamp();
            lastRecord.setStatus(status);
            lastRecord.setTimestamp(Long.valueOf(updateTime));
            lastRecord.setFailureDetails(new FailureDetails().withContext(context).withLastFailedAt(Long.valueOf(updateTime)).withLastFailedReason(failureMessage));
            this.dao.entityExtensionTimeSeriesDao().update("eventPublisher:ElasticSearch:STREAM", "service.eventPublisher", JsonUtils.pojoToJson(lastRecord), originalLastUpdate);
        }
        catch (Exception e) {
            LOG.error("Failed to Update Elastic Search Job Info");
        }
    }

    public void close() {
        try {
            this.client.close();
        }
        catch (Exception e) {
            LOG.error("Failed to close elastic search", (Throwable)e);
        }
    }
}

