/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.search;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.UUID;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.rest.RestStatus;
import org.openmetadata.schema.api.CreateEventPublisherJob;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.Failure;
import org.openmetadata.schema.system.FailureDetails;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.events.AbstractEventPublisher;
import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.resources.events.EventResource;
import org.openmetadata.service.search.IndexUtil;
import org.openmetadata.service.search.SearchClient;
import org.openmetadata.service.search.SearchIndexDefinition;
import org.openmetadata.service.search.SearchRetriableException;
import org.openmetadata.service.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SearchEventPublisher
extends AbstractEventPublisher {
    private static final Logger LOG = LoggerFactory.getLogger(SearchEventPublisher.class);
    private final SearchClient searchClient;
    private final CollectionDAO dao;

    public SearchEventPublisher(ElasticSearchConfiguration esConfig, CollectionDAO dao) {
        super(esConfig.getBatchSize());
        this.dao = dao;
        this.registerElasticSearchJobs();
        this.searchClient = IndexUtil.getSearchClient(esConfig, dao);
        SearchIndexDefinition esIndexDefinition = new SearchIndexDefinition(this.searchClient);
        esIndexDefinition.createIndexes(esConfig);
    }

    public void onStart() {
        LOG.info("ElasticSearch Publisher Started");
    }

    @Override
    public void publish(EventResource.EventList events) throws EventPublisherException {
        for (ChangeEvent event : events.getData()) {
            String entityType = event.getEntityType();
            String contextInfo = event.getEntity() != null ? String.format("Entity Info : %s", JsonUtils.pojoToJson(event.getEntity())) : null;
            try {
                switch (entityType) {
                    case "table": 
                    case "dashboard": 
                    case "topic": 
                    case "pipeline": 
                    case "mlmodel": 
                    case "container": 
                    case "query": {
                        this.searchClient.updateEntity(event);
                        break;
                    }
                    case "user": {
                        this.searchClient.updateUser(event);
                        break;
                    }
                    case "team": {
                        this.searchClient.updateTeam(event);
                        break;
                    }
                    case "glossaryTerm": {
                        this.searchClient.updateGlossaryTerm(event);
                        break;
                    }
                    case "glossary": {
                        this.searchClient.updateGlossary(event);
                        break;
                    }
                    case "database": {
                        this.searchClient.updateDatabase(event);
                        break;
                    }
                    case "databaseSchema": {
                        this.searchClient.updateDatabaseSchema(event);
                        break;
                    }
                    case "dashboardService": {
                        this.searchClient.updateDashboardService(event);
                        break;
                    }
                    case "databaseService": {
                        this.searchClient.updateDatabaseService(event);
                        break;
                    }
                    case "messagingService": {
                        this.searchClient.updateMessagingService(event);
                        break;
                    }
                    case "pipelineService": {
                        this.searchClient.updatePipelineService(event);
                        break;
                    }
                    case "mlmodelService": {
                        this.searchClient.updateMlModelService(event);
                        break;
                    }
                    case "storageService": {
                        this.searchClient.updateStorageService(event);
                        break;
                    }
                    case "tag": {
                        this.searchClient.updateTag(event);
                        break;
                    }
                    case "classification": {
                        this.searchClient.updateClassification(event);
                        break;
                    }
                    case "testCase": {
                        this.searchClient.updateTestCase(event);
                        break;
                    }
                    case "testSuite": {
                        this.searchClient.updateTestSuite(event);
                        break;
                    }
                    default: {
                        LOG.warn("Ignoring Entity Type {}", (Object)entityType);
                        break;
                    }
                }
            }
            catch (DocumentMissingException ex) {
                LOG.error("Missing Document", (Throwable)ex);
                this.updateElasticSearchFailureStatus(contextInfo, EventPublisherJob.Status.ACTIVE_WITH_ERROR, String.format("Missing Document while Updating ES. Reason[%s], Cause[%s], Stack [%s]", ex.getMessage(), ex.getCause(), ExceptionUtils.getStackTrace((Throwable)ex)));
            }
            catch (ElasticsearchException e) {
                LOG.error("failed to update ES doc");
                LOG.debug(e.getMessage());
                if (e.status() == RestStatus.GATEWAY_TIMEOUT || e.status() == RestStatus.REQUEST_TIMEOUT) {
                    LOG.error("Error in publishing to ElasticSearch");
                    this.updateElasticSearchFailureStatus(contextInfo, EventPublisherJob.Status.ACTIVE_WITH_ERROR, String.format("Timeout when updating ES request. Reason[%s], Cause[%s], Stack [%s]", e.getMessage(), e.getCause(), ExceptionUtils.getStackTrace((Throwable)e)));
                    throw new SearchRetriableException(e.getMessage());
                }
                this.updateElasticSearchFailureStatus(contextInfo, EventPublisherJob.Status.ACTIVE_WITH_ERROR, String.format("Failed while updating ES. Reason[%s], Cause[%s], Stack [%s]", e.getMessage(), e.getCause(), ExceptionUtils.getStackTrace((Throwable)e)));
                LOG.error(e.getMessage(), (Throwable)e);
            }
            catch (IOException ie) {
                this.updateElasticSearchFailureStatus(contextInfo, EventPublisherJob.Status.ACTIVE_WITH_ERROR, String.format("Issue in updating ES request. Reason[%s], Cause[%s], Stack [%s]", ie.getMessage(), ie.getCause(), ExceptionUtils.getStackTrace((Throwable)ie)));
                throw new EventPublisherException(ie.getMessage());
            }
        }
    }

    public void onShutdown() {
        this.searchClient.close();
        LOG.info("Shutting down ElasticSearchEventPublisher");
    }

    public void updateElasticSearchFailureStatus(String context, EventPublisherJob.Status status, String failureMessage) {
        try {
            long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
            String recordString = this.dao.entityExtensionTimeSeriesDao().getExtension("eventPublisher:ElasticSearch:STREAM", "service.eventPublisher");
            EventPublisherJob lastRecord = JsonUtils.readValue(recordString, EventPublisherJob.class);
            long originalLastUpdate = lastRecord.getTimestamp();
            lastRecord.setStatus(status);
            lastRecord.setTimestamp(Long.valueOf(updateTime));
            lastRecord.setFailure(new Failure().withSinkError(new FailureDetails().withContext(context).withLastFailedAt(Long.valueOf(updateTime)).withLastFailedReason(failureMessage)));
            this.dao.entityExtensionTimeSeriesDao().update("eventPublisher:ElasticSearch:STREAM", "service.eventPublisher", JsonUtils.pojoToJson(lastRecord), originalLastUpdate);
        }
        catch (Exception e) {
            LOG.error("Failed to Update Elastic Search Job Info");
        }
    }

    private void registerElasticSearchJobs() {
        try {
            this.dao.entityExtensionTimeSeriesDao().delete("eventPublisher:ElasticSearch:STREAM", "service.eventPublisher");
            long startTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
            FailureDetails failureDetails = new FailureDetails().withLastFailedAt(Long.valueOf(0L));
            EventPublisherJob streamJob = new EventPublisherJob().withId(UUID.randomUUID()).withName("Elastic Search Stream").withPublisherType(CreateEventPublisherJob.PublisherType.ELASTIC_SEARCH).withRunMode(CreateEventPublisherJob.RunMode.STREAM).withStatus(EventPublisherJob.Status.ACTIVE).withTimestamp(Long.valueOf(startTime)).withStartedBy("admin").withStartTime(Long.valueOf(startTime)).withFailure(new Failure().withSinkError(failureDetails));
            this.dao.entityExtensionTimeSeriesDao().insert("eventPublisher:ElasticSearch:STREAM", "service.eventPublisher", "eventPublisherJob", JsonUtils.pojoToJson(streamJob));
        }
        catch (Exception e) {
            LOG.error("Failed to register Elastic Search Job");
        }
    }
}

