/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.util;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.ws.rs.core.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.api.CreateEventPublisherJob;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.Failure;
import org.openmetadata.schema.system.Stats;
import org.openmetadata.service.Entity;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition;
import org.openmetadata.service.exception.CustomExceptionMessage;
import org.openmetadata.service.exception.UnhandledServerException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;
import org.openmetadata.service.workflows.searchIndex.SearchIndexWorkflow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReIndexingHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ReIndexingHandler.class);
    public static final String REINDEXING_JOB_EXTENSION = "reindexing.eventPublisher";
    private static ReIndexingHandler INSTANCE;
    private static volatile boolean INITIALIZED;
    private static CollectionDAO dao;
    private static RestHighLevelClient client;
    private static ElasticSearchIndexDefinition esIndexDefinition;
    private static ExecutorService threadScheduler;
    private final Map<UUID, SearchIndexWorkflow> REINDEXING_JOB_MAP = new LinkedHashMap<UUID, SearchIndexWorkflow>();
    private static BlockingQueue<Runnable> taskQueue;

    private ReIndexingHandler() {
    }

    public static ReIndexingHandler getInstance() {
        return INSTANCE;
    }

    public static void initialize(RestHighLevelClient restHighLevelClient, ElasticSearchIndexDefinition elasticSearchIndexDefinition, CollectionDAO daoObject) {
        if (!INITIALIZED) {
            client = restHighLevelClient;
            dao = daoObject;
            esIndexDefinition = elasticSearchIndexDefinition;
            taskQueue = new ArrayBlockingQueue<Runnable>(5);
            threadScheduler = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, taskQueue);
            INSTANCE = new ReIndexingHandler();
            INITIALIZED = true;
        } else {
            LOG.info("Reindexing Handler is already initialized");
        }
    }

    public EventPublisherJob createReindexingJob(String startedBy, CreateEventPublisherJob createReindexingJob) {
        this.clearCompletedJobs();
        this.validateJob(createReindexingJob);
        if (taskQueue.size() >= 5) {
            throw new UnhandledServerException("Cannot create new Reindexing Jobs. There are pending jobs.");
        }
        if (((ThreadPoolExecutor)threadScheduler).getActiveCount() > 5) {
            throw new UnhandledServerException("Thread unavailable to run the jobs. There are pending jobs.");
        }
        EventPublisherJob jobData = this.getReindexJob(startedBy, createReindexingJob);
        ArrayList<SearchIndexWorkflow> activeJobs = new ArrayList<SearchIndexWorkflow>(this.REINDEXING_JOB_MAP.values());
        Set entityList = jobData.getEntities();
        for (SearchIndexWorkflow job : activeJobs) {
            EventPublisherJob runningJob = job.getJobData();
            runningJob.getEntities().forEach(entityList::remove);
        }
        LOG.info("Reindexing triggered for the following Entities: {}", (Object)entityList);
        if (!entityList.isEmpty()) {
            if (!CommonUtil.nullOrEmpty((String)jobData.getAfterCursor()) && entityList.size() > 1) {
                throw new IllegalArgumentException("After Cursor can only be associated with one entity");
            }
            dao.entityExtensionTimeSeriesDao().deleteLastRecords(REINDEXING_JOB_EXTENSION, 5);
            dao.entityExtensionTimeSeriesDao().insert(jobData.getId().toString(), REINDEXING_JOB_EXTENSION, "eventPublisherJob", JsonUtils.pojoToJson(jobData));
            SearchIndexWorkflow job = new SearchIndexWorkflow(dao, esIndexDefinition, client, jobData);
            threadScheduler.submit(job);
            this.REINDEXING_JOB_MAP.put(jobData.getId(), job);
            return jobData;
        }
        throw new UnhandledServerException("There are already executing Jobs working on the same Entities. Please try later.");
    }

    private void clearCompletedJobs() {
        this.REINDEXING_JOB_MAP.entrySet().removeIf(entry -> ((SearchIndexWorkflow)entry.getValue()).getJobData().getStatus() != EventPublisherJob.Status.STARTED && ((SearchIndexWorkflow)entry.getValue()).getJobData().getStatus() != EventPublisherJob.Status.RUNNING);
    }

    public EventPublisherJob stopRunningJob(UUID jobId) {
        SearchIndexWorkflow job = this.REINDEXING_JOB_MAP.get(jobId);
        if (job != null) {
            job.stopJob();
            return job.getJobData();
        }
        throw new CustomExceptionMessage(Response.Status.BAD_REQUEST, "Job is not in Running state.");
    }

    private void validateJob(CreateEventPublisherJob job) {
        Objects.requireNonNull(job);
        HashSet<String> storedEntityList = new HashSet<String>(Entity.getEntityList());
        if (job.getEntities().isEmpty()) {
            throw new IllegalArgumentException("Entities cannot be Empty");
        }
        job.getEntities().forEach(entityType -> {
            if (!storedEntityList.contains(entityType) && !ReindexingUtil.isDataInsightIndex(entityType)) {
                throw new IllegalArgumentException(String.format("Entity Type : %s is not a valid Entity", entityType));
            }
        });
    }

    public void removeCompletedJob(UUID jobId) {
        this.REINDEXING_JOB_MAP.remove(jobId);
    }

    public EventPublisherJob getJob(UUID jobId) throws IOException {
        SearchIndexWorkflow job = this.REINDEXING_JOB_MAP.get(jobId);
        if (job == null) {
            String recordString = dao.entityExtensionTimeSeriesDao().getLatestExtension(jobId.toString(), REINDEXING_JOB_EXTENSION);
            return JsonUtils.readValue(recordString, EventPublisherJob.class);
        }
        return this.REINDEXING_JOB_MAP.get(jobId).getJobData();
    }

    public EventPublisherJob getLatestJob() throws IOException {
        ArrayList<SearchIndexWorkflow> activeJobs = new ArrayList<SearchIndexWorkflow>(this.REINDEXING_JOB_MAP.values());
        if (!activeJobs.isEmpty()) {
            return ((SearchIndexWorkflow)activeJobs.get(activeJobs.size() - 1)).getJobData();
        }
        String recordString = dao.entityExtensionTimeSeriesDao().getLatestByExtension(REINDEXING_JOB_EXTENSION);
        return JsonUtils.readValue(recordString, EventPublisherJob.class);
    }

    public List<EventPublisherJob> getAllJobs() throws IOException {
        ArrayList<EventPublisherJob> result = new ArrayList<EventPublisherJob>();
        ArrayList<SearchIndexWorkflow> activeReindexingJob = new ArrayList<SearchIndexWorkflow>(this.REINDEXING_JOB_MAP.values());
        List activeEventPubJob = activeReindexingJob.stream().map(SearchIndexWorkflow::getJobData).collect(Collectors.toList());
        List<EventPublisherJob> jobsFromDatabase = JsonUtils.readObjects(dao.entityExtensionTimeSeriesDao().getAllByExtension(REINDEXING_JOB_EXTENSION), EventPublisherJob.class);
        jobsFromDatabase.removeIf(job -> {
            for (EventPublisherJob active : activeEventPubJob) {
                if (!active.getId().equals(job.getId())) continue;
                return true;
            }
            return false;
        });
        result.addAll(activeEventPubJob);
        result.addAll(jobsFromDatabase);
        return result;
    }

    private EventPublisherJob getReindexJob(String startedBy, CreateEventPublisherJob job) {
        long updateTime = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
        return new EventPublisherJob().withId(UUID.randomUUID()).withName(job.getName()).withPublisherType(CreateEventPublisherJob.PublisherType.ELASTIC_SEARCH).withRunMode(CreateEventPublisherJob.RunMode.BATCH).withStartedBy(startedBy).withStatus(EventPublisherJob.Status.STARTED).withStats(new Stats()).withStartTime(Long.valueOf(updateTime)).withTimestamp(Long.valueOf(updateTime)).withEntities(job.getEntities()).withBatchSize(job.getBatchSize()).withFailure(new Failure()).withRecreateIndex(job.getRecreateIndex()).withSearchIndexMappingLanguage(job.getSearchIndexMappingLanguage()).withAfterCursor(job.getAfterCursor());
    }

    static {
        INITIALIZED = false;
    }
}

