/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.jdbi3;

import com.lmax.disruptor.BatchEventProcessor;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.events.EventFilterRule;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.SubscriptionStatus;
import org.openmetadata.service.events.EventPubSub;
import org.openmetadata.service.events.subscription.AlertUtil;
import org.openmetadata.service.events.subscription.SubscriptionPublisher;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.util.EntityUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventSubscriptionRepository
extends EntityRepository<EventSubscription> {
    private static final Logger LOG = LoggerFactory.getLogger(EventSubscriptionRepository.class);
    private static final ConcurrentHashMap<UUID, SubscriptionPublisher> subscriptionPublisherMap = new ConcurrentHashMap();
    static final String ALERT_PATCH_FIELDS = "owner,enabled,batchSize,timeout";
    static final String ALERT_UPDATE_FIELDS = "owner,enabled,batchSize,timeout,filteringRules,subscriptionType,subscriptionConfig";

    public EventSubscriptionRepository(CollectionDAO dao) {
        super("/v1/events/subscription", "eventsubscription", EventSubscription.class, dao.eventSubscriptionDAO(), dao, ALERT_PATCH_FIELDS, ALERT_UPDATE_FIELDS);
    }

    @Override
    public EventSubscription setFields(EventSubscription entity, EntityUtil.Fields fields) throws IOException {
        entity.setStatusDetails(fields.contains("statusDetails") ? this.getStatusForEventSubscription(entity.getId()) : null);
        return entity;
    }

    @Override
    public void prepare(EventSubscription entity) throws IOException {
        this.validateFilterRules(entity);
    }

    private void validateFilterRules(EventSubscription entity) {
        List rules = entity.getFilteringRules().getRules();
        for (EventFilterRule rule : rules) {
            AlertUtil.validateExpression(rule.getCondition(), Boolean.class);
        }
        rules.sort(Comparator.comparing(EventFilterRule::getName));
    }

    @Override
    public void storeEntity(EventSubscription entity, boolean update) throws IOException {
        this.store(entity, update);
    }

    @Override
    public void storeRelationships(EventSubscription entity) {
        this.storeOwner(entity, entity.getOwner());
    }

    @Override
    public void restorePatchAttributes(EventSubscription original, EventSubscription updated) {
        updated.withId(original.getId()).withName(original.getName());
    }

    private SubscriptionPublisher getPublisher(UUID id) {
        return subscriptionPublisherMap.get(id);
    }

    public void addSubscriptionPublisher(EventSubscription eventSubscription) {
        SubscriptionPublisher publisher = AlertUtil.getAlertPublisher(eventSubscription, this.daoCollection);
        if (Boolean.FALSE.equals(eventSubscription.getEnabled())) {
            eventSubscription.setStatusDetails(this.getSubscriptionStatusAtCurrentTime(SubscriptionStatus.Status.DISABLED));
        } else {
            eventSubscription.setStatusDetails(this.getSubscriptionStatusAtCurrentTime(SubscriptionStatus.Status.ACTIVE));
            BatchEventProcessor<EventPubSub.ChangeEventHolder> processor = EventPubSub.addEventHandler(publisher);
            publisher.setProcessor(processor);
        }
        subscriptionPublisherMap.put(eventSubscription.getId(), publisher);
        LOG.info("Webhook publisher subscription started as {} : status {}", (Object)eventSubscription.getName(), (Object)eventSubscription.getStatusDetails().getStatus());
    }

    private SubscriptionStatus getSubscriptionStatusAtCurrentTime(SubscriptionStatus.Status status) {
        return new SubscriptionStatus().withStatus(status).withTimestamp(Long.valueOf(System.currentTimeMillis()));
    }

    public void updateWebhookPublisher(EventSubscription eventSubscription) {
        if (Boolean.TRUE.equals(eventSubscription.getEnabled())) {
            SubscriptionPublisher previousPublisher = this.getPublisher(eventSubscription.getId());
            if (previousPublisher == null) {
                this.addSubscriptionPublisher(eventSubscription);
                return;
            }
            SubscriptionStatus.Status status = previousPublisher.getEventSubscription().getStatusDetails().getStatus();
            previousPublisher.updateEventSubscription(eventSubscription);
            if (status != SubscriptionStatus.Status.ACTIVE && status != SubscriptionStatus.Status.AWAITING_RETRY) {
                BatchEventProcessor<EventPubSub.ChangeEventHolder> processor = EventPubSub.addEventHandler(previousPublisher);
                previousPublisher.setProcessor(processor);
                LOG.info("Webhook publisher restarted for {}", (Object)eventSubscription.getName());
            }
        } else {
            this.removeProcessorForEventSubscription(eventSubscription.getId(), this.getSubscriptionStatusAtCurrentTime(SubscriptionStatus.Status.DISABLED));
        }
    }

    public void removeProcessorForEventSubscription(UUID id, SubscriptionStatus reasonForRemoval) throws InterruptedException {
        SubscriptionPublisher publisher = subscriptionPublisherMap.get(id);
        if (publisher != null) {
            publisher.getProcessor().halt();
            publisher.awaitShutdown();
            EventPubSub.removeProcessor(publisher.getProcessor());
            publisher.getEventSubscription().setStatusDetails(reasonForRemoval);
            LOG.info("Webhook publisher deleted for {}", (Object)publisher.getEventSubscription().getName());
        }
    }

    public void deleteEventSubscriptionPublisher(UUID id) throws InterruptedException {
        SubscriptionPublisher publisher = subscriptionPublisherMap.remove(id);
        if (publisher != null) {
            publisher.getProcessor().halt();
            publisher.awaitShutdown();
            EventPubSub.removeProcessor(publisher.getProcessor());
            LOG.info("Webhook publisher deleted for {}", (Object)publisher.getEventSubscription().getName());
        }
    }

    public SubscriptionStatus getStatusForEventSubscription(UUID id) {
        SubscriptionPublisher publisher = subscriptionPublisherMap.get(id);
        if (publisher != null) {
            return publisher.getEventSubscription().getStatusDetails();
        }
        return null;
    }

    public EventSubscriptionUpdater getUpdater(EventSubscription original, EventSubscription updated, EntityRepository.Operation operation) {
        return new EventSubscriptionUpdater(original, updated, operation);
    }

    public class EventSubscriptionUpdater
    extends EntityRepository.EntityUpdater {
        public EventSubscriptionUpdater(EventSubscription original, EventSubscription updated, EntityRepository.Operation operation) {
            super((EntityRepository)EventSubscriptionRepository.this, (EntityInterface)original, (EntityInterface)updated, operation);
        }

        @Override
        public void entitySpecificUpdate() throws IOException {
            this.recordChange("enabled", ((EventSubscription)this.original).getEnabled(), ((EventSubscription)this.updated).getEnabled());
            this.recordChange("batchSize", ((EventSubscription)this.original).getBatchSize(), ((EventSubscription)this.updated).getBatchSize());
            this.recordChange("timeout", ((EventSubscription)this.original).getTimeout(), ((EventSubscription)this.updated).getTimeout());
            this.recordChange("filteringRules", ((EventSubscription)this.original).getFilteringRules(), ((EventSubscription)this.updated).getFilteringRules());
            this.recordChange("subscriptionType", ((EventSubscription)this.original).getSubscriptionType(), ((EventSubscription)this.updated).getSubscriptionType());
            this.recordChange("subscriptionConfig", ((EventSubscription)this.original).getSubscriptionConfig(), ((EventSubscription)this.updated).getSubscriptionConfig());
        }
    }
}

