package org.openmetadata.service.events.scheduled;

import java.util.List;
import java.util.UUID;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.events.CreateEventSubscription;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.EventSubscriptionOffset;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.entity.events.SubscriptionStatus;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.changeEvent.AbstractEventConsumer;
import org.openmetadata.service.apps.bundles.changeEvent.AlertPublisher;
import org.openmetadata.service.events.subscription.AlertUtil;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/openmetadata/service/events/scheduled/EventSubscriptionScheduler.class */
public class EventSubscriptionScheduler {
    public static final String ALERT_JOB_GROUP = "OMAlertJobGroup";
    public static final String ALERT_TRIGGER_GROUP = "OMAlertJobGroup";
    private static final String INVALID_ALERT = "Invalid Alert Type";
    private static EventSubscriptionScheduler instance;
    private final Scheduler alertsScheduler = new StdSchedulerFactory().getScheduler();
    private static final Logger LOG = LoggerFactory.getLogger(EventSubscriptionScheduler.class);
    private static volatile boolean initialized = false;

    private EventSubscriptionScheduler() throws SchedulerException {
        this.alertsScheduler.start();
    }

    public static EventSubscriptionScheduler getInstance() {
        if (!initialized) {
            initialize();
        }
        return instance;
    }

    private static void initialize() throws SchedulerException {
        if (initialized) {
            LOG.info("Event Subscription Scheduler is already initialized");
        } else {
            instance = new EventSubscriptionScheduler();
            initialized = true;
        }
    }

    @Transaction
    public void addSubscriptionPublisher(EventSubscription eventSubscription) throws SchedulerException {
        if (eventSubscription.getAlertType().equals(CreateEventSubscription.AlertType.ACTIVITY_FEED)) {
            throw new IllegalArgumentException("Activity Feed is not a valid Alert Type");
        }
        AlertPublisher alertPublisher = new AlertPublisher();
        if (Boolean.FALSE.equals(eventSubscription.getEnabled())) {
            eventSubscription.getDestinations().forEach(subscriptionDestination -> {
                subscriptionDestination.setStatusDetails(getSubscriptionStatusAtCurrentTime(SubscriptionStatus.Status.DISABLED));
            });
            LOG.info("Event Subscription started as {} : status {} for all Destinations", eventSubscription.getName(), SubscriptionStatus.Status.ACTIVE);
        } else {
            eventSubscription.getDestinations().forEach(subscriptionDestination2 -> {
                subscriptionDestination2.setStatusDetails(getSubscriptionStatusAtCurrentTime(SubscriptionStatus.Status.ACTIVE));
            });
            this.alertsScheduler.scheduleJob(jobBuilder(alertPublisher, eventSubscription, String.format("%s", eventSubscription.getId().toString())), trigger(eventSubscription));
            LOG.info("Event Subscription started as {} : status {} for all Destinations", eventSubscription.getName(), SubscriptionStatus.Status.ACTIVE);
        }
    }

    private JobDetail jobBuilder(AlertPublisher alertPublisher, EventSubscription eventSubscription, String str) {
        JobDataMap jobDataMap = new JobDataMap();
        jobDataMap.put(AbstractEventConsumer.ALERT_INFO_KEY, eventSubscription);
        jobDataMap.put(AbstractEventConsumer.ALERT_OFFSET_KEY, AlertUtil.getStartingOffset(eventSubscription.getId()));
        return JobBuilder.newJob(alertPublisher.getClass()).withIdentity(str, "OMAlertJobGroup").usingJobData(jobDataMap).build();
    }

    private Trigger trigger(EventSubscription eventSubscription) {
        return TriggerBuilder.newTrigger().withIdentity(eventSubscription.getId().toString(), "OMAlertJobGroup").withSchedule(SimpleScheduleBuilder.repeatSecondlyForever(3)).startNow().build();
    }

    private SubscriptionStatus getSubscriptionStatusAtCurrentTime(SubscriptionStatus.Status status) {
        return new SubscriptionStatus().withStatus(status).withTimestamp(Long.valueOf(System.currentTimeMillis()));
    }

    @Transaction
    public void updateEventSubscription(EventSubscription eventSubscription) {
        deleteEventSubscriptionPublisher(eventSubscription);
        if (Boolean.TRUE.equals(eventSubscription.getEnabled()) && !eventSubscription.getAlertType().equals(CreateEventSubscription.AlertType.ACTIVITY_FEED)) {
            addSubscriptionPublisher(eventSubscription);
        }
    }

    @Transaction
    public void deleteEventSubscriptionPublisher(EventSubscription eventSubscription) throws SchedulerException {
        this.alertsScheduler.deleteJob(new JobKey(eventSubscription.getId().toString(), "OMAlertJobGroup"));
        this.alertsScheduler.unscheduleJob(new TriggerKey(eventSubscription.getId().toString(), "OMAlertJobGroup"));
        LOG.info("Alert publisher deleted for {}", eventSubscription.getName());
    }

    public SubscriptionStatus getStatusForEventSubscription(UUID uuid, UUID uuid2) {
        EventSubscription eventSubscriptionFromScheduledJob = getEventSubscriptionFromScheduledJob(uuid);
        if (eventSubscriptionFromScheduledJob != null) {
            List list = eventSubscriptionFromScheduledJob.getDestinations().stream().filter(subscriptionDestination -> {
                return subscriptionDestination.getId().equals(uuid2);
            }).toList();
            if (list.size() == 1) {
                return ((SubscriptionDestination) list.get(0)).getStatusDetails();
            }
            return null;
        }
        EntityRepository<? extends EntityInterface> entityRepository = Entity.getEntityRepository(Entity.EVENT_SUBSCRIPTION);
        EventSubscription eventSubscription = entityRepository.get(null, uuid, entityRepository.getFields("id"));
        if (eventSubscription == null || !Boolean.FALSE.equals(eventSubscription.getEnabled())) {
            return null;
        }
        return new SubscriptionStatus().withStatus(SubscriptionStatus.Status.DISABLED);
    }

    public EventSubscription getEventSubscriptionFromScheduledJob(UUID uuid) {
        try {
            JobDetail jobDetail = this.alertsScheduler.getJobDetail(new JobKey(uuid.toString(), "OMAlertJobGroup"));
            if (jobDetail != null) {
                return (EventSubscription) jobDetail.getJobDataMap().get(AbstractEventConsumer.ALERT_INFO_KEY);
            }
            return null;
        } catch (SchedulerException e) {
            LOG.error("Failed to get Event Subscription from Job, Subscription Id : {}", uuid);
            return null;
        }
    }

    public boolean checkIfPublisherPublishedAllEvents(UUID uuid) {
        EventSubscriptionOffset eventSubscriptionOffset;
        long latestOffset = Entity.getCollectionDAO().changeEventDAO().getLatestOffset();
        try {
            JobDetail jobDetail = this.alertsScheduler.getJobDetail(new JobKey(uuid.toString(), "OMAlertJobGroup"));
            if (jobDetail == null || (eventSubscriptionOffset = (EventSubscriptionOffset) jobDetail.getJobDataMap().get(AbstractEventConsumer.ALERT_OFFSET_KEY)) == null) {
                return false;
            }
            return eventSubscriptionOffset.getOffset().longValue() == latestOffset;
        } catch (Exception e) {
            LOG.error("Failed to get Event Subscription from Job, Subscription Id : {}, Exception: ", uuid.toString(), e);
            return false;
        }
    }

    public static void shutDown() throws SchedulerException {
        LOG.info("Shutting Down Event Subscription Scheduler");
        if (instance != null) {
            instance.alertsScheduler.shutdown(true);
        }
    }
}
