package org.openmetadata.service.util;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.ws.rs.core.Response;
import org.elasticsearch.client.RestHighLevelClient;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.api.CreateEventPublisherJob;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.Failure;
import org.openmetadata.schema.system.Stats;
import org.openmetadata.service.Entity;
import org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition;
import org.openmetadata.service.exception.CustomExceptionMessage;
import org.openmetadata.service.exception.UnhandledServerException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.workflows.searchIndex.ReindexingUtil;
import org.openmetadata.service.workflows.searchIndex.SearchIndexWorkflow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/openmetadata/service/util/ReIndexingHandler.class */
public class ReIndexingHandler {
    public static final String REINDEXING_JOB_EXTENSION = "reindexing.eventPublisher";
    private static ReIndexingHandler INSTANCE;
    private static CollectionDAO dao;
    private static RestHighLevelClient client;
    private static ElasticSearchIndexDefinition esIndexDefinition;
    private static ExecutorService threadScheduler;
    private final Map<UUID, SearchIndexWorkflow> REINDEXING_JOB_MAP = new LinkedHashMap();
    private static BlockingQueue<Runnable> taskQueue;
    private static final Logger LOG = LoggerFactory.getLogger(ReIndexingHandler.class);
    private static volatile boolean INITIALIZED = false;

    private ReIndexingHandler() {
    }

    public static ReIndexingHandler getInstance() {
        return INSTANCE;
    }

    public static void initialize(RestHighLevelClient restHighLevelClient, ElasticSearchIndexDefinition elasticSearchIndexDefinition, CollectionDAO collectionDAO) {
        if (INITIALIZED) {
            LOG.info("Reindexing Handler is already initialized");
            return;
        }
        client = restHighLevelClient;
        dao = collectionDAO;
        esIndexDefinition = elasticSearchIndexDefinition;
        taskQueue = new ArrayBlockingQueue(5);
        threadScheduler = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, taskQueue);
        INSTANCE = new ReIndexingHandler();
        INITIALIZED = true;
    }

    public EventPublisherJob createReindexingJob(String str, CreateEventPublisherJob createEventPublisherJob) {
        clearCompletedJobs();
        validateJob(createEventPublisherJob);
        if (taskQueue.size() >= 5) {
            throw new UnhandledServerException("Cannot create new Reindexing Jobs. There are pending jobs.");
        }
        if (((ThreadPoolExecutor) threadScheduler).getActiveCount() > 5) {
            throw new UnhandledServerException("Thread unavailable to run the jobs. There are pending jobs.");
        }
        EventPublisherJob reindexJob = getReindexJob(str, createEventPublisherJob);
        ArrayList arrayList = new ArrayList(this.REINDEXING_JOB_MAP.values());
        Set entities = reindexJob.getEntities();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Set entities2 = ((SearchIndexWorkflow) it.next()).getJobData().getEntities();
            Objects.requireNonNull(entities);
            entities2.forEach((v1) -> {
                r1.remove(v1);
            });
        }
        LOG.info("Reindexing triggered for the following Entities: {}", entities);
        if (entities.isEmpty()) {
            throw new UnhandledServerException("There are already executing Jobs working on the same Entities. Please try later.");
        }
        if (!CommonUtil.nullOrEmpty(reindexJob.getAfterCursor()) && entities.size() > 1) {
            throw new IllegalArgumentException("After Cursor can only be associated with one entity");
        }
        dao.entityExtensionTimeSeriesDao().deleteLastRecords(REINDEXING_JOB_EXTENSION, 5);
        dao.entityExtensionTimeSeriesDao().insert(reindexJob.getId().toString(), REINDEXING_JOB_EXTENSION, "eventPublisherJob", JsonUtils.pojoToJson(reindexJob));
        SearchIndexWorkflow searchIndexWorkflow = new SearchIndexWorkflow(dao, esIndexDefinition, client, reindexJob);
        threadScheduler.submit(searchIndexWorkflow);
        this.REINDEXING_JOB_MAP.put(reindexJob.getId(), searchIndexWorkflow);
        return reindexJob;
    }

    private void clearCompletedJobs() {
        this.REINDEXING_JOB_MAP.entrySet().removeIf(entry -> {
            return (((SearchIndexWorkflow) entry.getValue()).getJobData().getStatus() == EventPublisherJob.Status.STARTED || ((SearchIndexWorkflow) entry.getValue()).getJobData().getStatus() == EventPublisherJob.Status.RUNNING) ? false : true;
        });
    }

    public EventPublisherJob stopRunningJob(UUID uuid) {
        SearchIndexWorkflow searchIndexWorkflow = this.REINDEXING_JOB_MAP.get(uuid);
        if (searchIndexWorkflow == null) {
            throw new CustomExceptionMessage(Response.Status.BAD_REQUEST, "Job is not in Running state.");
        }
        searchIndexWorkflow.stopJob();
        return searchIndexWorkflow.getJobData();
    }

    private void validateJob(CreateEventPublisherJob createEventPublisherJob) {
        Objects.requireNonNull(createEventPublisherJob);
        HashSet hashSet = new HashSet(Entity.getEntityList());
        if (createEventPublisherJob.getEntities().isEmpty()) {
            throw new IllegalArgumentException("Entities cannot be Empty");
        }
        createEventPublisherJob.getEntities().forEach(str -> {
            if (!hashSet.contains(str) && !ReindexingUtil.isDataInsightIndex(str)) {
                throw new IllegalArgumentException(String.format("Entity Type : %s is not a valid Entity", str));
            }
        });
    }

    public void removeCompletedJob(UUID uuid) {
        this.REINDEXING_JOB_MAP.remove(uuid);
    }

    public EventPublisherJob getJob(UUID uuid) throws IOException {
        return this.REINDEXING_JOB_MAP.get(uuid) == null ? (EventPublisherJob) JsonUtils.readValue(dao.entityExtensionTimeSeriesDao().getLatestExtension(uuid.toString(), REINDEXING_JOB_EXTENSION), EventPublisherJob.class) : this.REINDEXING_JOB_MAP.get(uuid).getJobData();
    }

    public EventPublisherJob getLatestJob() throws IOException {
        ArrayList arrayList = new ArrayList(this.REINDEXING_JOB_MAP.values());
        return !arrayList.isEmpty() ? ((SearchIndexWorkflow) arrayList.get(arrayList.size() - 1)).getJobData() : (EventPublisherJob) JsonUtils.readValue(dao.entityExtensionTimeSeriesDao().getLatestByExtension(REINDEXING_JOB_EXTENSION), EventPublisherJob.class);
    }

    public List<EventPublisherJob> getAllJobs() throws IOException {
        ArrayList arrayList = new ArrayList();
        List list = (List) new ArrayList(this.REINDEXING_JOB_MAP.values()).stream().map((v0) -> {
            return v0.getJobData();
        }).collect(Collectors.toList());
        List readObjects = JsonUtils.readObjects(dao.entityExtensionTimeSeriesDao().getAllByExtension(REINDEXING_JOB_EXTENSION), EventPublisherJob.class);
        readObjects.removeIf(eventPublisherJob -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                if (((EventPublisherJob) it.next()).getId().equals(eventPublisherJob.getId())) {
                    return true;
                }
            }
            return false;
        });
        arrayList.addAll(list);
        arrayList.addAll(readObjects);
        return arrayList;
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.time.ZonedDateTime] */
    private EventPublisherJob getReindexJob(String str, CreateEventPublisherJob createEventPublisherJob) {
        long time = Date.from(LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant()).getTime();
        return new EventPublisherJob().withId(UUID.randomUUID()).withName(createEventPublisherJob.getName()).withPublisherType(CreateEventPublisherJob.PublisherType.ELASTIC_SEARCH).withRunMode(CreateEventPublisherJob.RunMode.BATCH).withStartedBy(str).withStatus(EventPublisherJob.Status.STARTED).withStats(new Stats()).withStartTime(Long.valueOf(time)).withTimestamp(Long.valueOf(time)).withEntities(createEventPublisherJob.getEntities()).withBatchSize(createEventPublisherJob.getBatchSize()).withFailure(new Failure()).withRecreateIndex(createEventPublisherJob.getRecreateIndex()).withSearchIndexMappingLanguage(createEventPublisherJob.getSearchIndexMappingLanguage()).withAfterCursor(createEventPublisherJob.getAfterCursor());
    }
}
