package org.openmetadata.service.apps.bundles.changeEvent;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import org.openmetadata.schema.entity.events.AlertMetrics;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.EventSubscriptionOffset;
import org.openmetadata.schema.entity.events.FailedEvent;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.Entity;
import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.events.subscription.AlertUtil;
import org.openmetadata.service.util.JsonUtils;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.PersistJobDataAfterExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
/* loaded from: input_file:org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.class */
public abstract class AbstractEventConsumer implements Alert<ChangeEvent>, Consumer<ChangeEvent>, Job {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractEventConsumer.class);
    public static final String DESTINATION_MAP_KEY = "SubscriptionMapKey";
    public static final String ALERT_OFFSET_KEY = "alertOffsetKey";
    public static final String ALERT_INFO_KEY = "alertInfoKey";
    public static final String OFFSET_EXTENSION = "eventSubscription.Offset";
    public static final String METRICS_EXTENSION = "eventSubscription.metrics";
    public static final String FAILED_EVENT_EXTENSION = "eventSubscription.failedEvent";
    private long offset = -1;
    private AlertMetrics alertMetrics;
    private JobDetail jobDetail;
    protected EventSubscription eventSubscription;
    protected Map<UUID, Destination<ChangeEvent>> destinationMap;

    private void init(JobExecutionContext jobExecutionContext) {
        EventSubscription eventSubscription = (EventSubscription) jobExecutionContext.getJobDetail().getJobDataMap().get(ALERT_INFO_KEY);
        this.jobDetail = jobExecutionContext.getJobDetail();
        this.eventSubscription = eventSubscription;
        this.offset = loadInitialOffset(jobExecutionContext);
        this.alertMetrics = loadInitialMetrics();
        this.destinationMap = loadDestinationsMap(jobExecutionContext);
        doInit(jobExecutionContext);
    }

    protected void doInit(JobExecutionContext jobExecutionContext) {
    }

    @Override // org.openmetadata.service.apps.bundles.changeEvent.Consumer
    public void handleFailedEvent(EventPublisherException eventPublisherException) {
        UUID uuid = (UUID) eventPublisherException.getChangeEventWithSubscription().getLeft();
        ChangeEvent changeEvent = (ChangeEvent) eventPublisherException.getChangeEventWithSubscription().getRight();
        LOG.debug("Change Event Failed for Event Subscription: {} ,  for Subscription : {} , Change Event : {} ", new Object[]{this.eventSubscription.getName(), uuid, changeEvent});
        Entity.getCollectionDAO().eventSubscriptionDAO().upsertFailedEvent(this.eventSubscription.getId().toString(), String.format("%s-%s", FAILED_EVENT_EXTENSION, changeEvent.getId()), JsonUtils.pojoToJson(new FailedEvent().withFailingSubscriptionId(uuid).withChangeEvent(changeEvent).withRetriesLeft(this.eventSubscription.getRetries()).withTimestamp(Long.valueOf(System.currentTimeMillis()))));
    }

    private long loadInitialOffset(JobExecutionContext jobExecutionContext) {
        EventSubscriptionOffset eventSubscriptionOffset = (EventSubscriptionOffset) this.jobDetail.getJobDataMap().get(ALERT_OFFSET_KEY);
        if (eventSubscriptionOffset != null) {
            return eventSubscriptionOffset.getOffset().longValue();
        }
        EventSubscriptionOffset startingOffset = AlertUtil.getStartingOffset(this.eventSubscription.getId());
        jobExecutionContext.getJobDetail().getJobDataMap().put(ALERT_OFFSET_KEY, startingOffset);
        return startingOffset.getOffset().longValue();
    }

    private Map<UUID, Destination<ChangeEvent>> loadDestinationsMap(JobExecutionContext jobExecutionContext) {
        Map<UUID, Destination<ChangeEvent>> map = (Map) jobExecutionContext.getJobDetail().getJobDataMap().get(DESTINATION_MAP_KEY);
        if (map == null) {
            map = new HashMap();
            for (SubscriptionDestination subscriptionDestination : this.eventSubscription.getDestinations()) {
                map.put(subscriptionDestination.getId(), AlertFactory.getAlert(subscriptionDestination));
            }
            jobExecutionContext.getJobDetail().getJobDataMap().put(DESTINATION_MAP_KEY, map);
        }
        return map;
    }

    private AlertMetrics loadInitialMetrics() {
        AlertMetrics alertMetrics = (AlertMetrics) this.jobDetail.getJobDataMap().get(METRICS_EXTENSION);
        if (alertMetrics != null) {
            return alertMetrics;
        }
        String subscriberExtension = Entity.getCollectionDAO().eventSubscriptionDAO().getSubscriberExtension(this.eventSubscription.getId().toString(), METRICS_EXTENSION);
        return subscriberExtension != null ? (AlertMetrics) JsonUtils.readValue(subscriberExtension, AlertMetrics.class) : new AlertMetrics().withTotalEvents(0).withFailedEvents(0).withSuccessEvents(0);
    }

    @Override // org.openmetadata.service.apps.bundles.changeEvent.Consumer
    public void publishEvents(Map<ChangeEvent, Set<UUID>> map) {
        if (map.isEmpty()) {
            return;
        }
        for (Map.Entry<ChangeEvent, Set<UUID>> entry : AlertUtil.getFilteredEvents(this.eventSubscription, map).entrySet()) {
            Iterator<UUID> it = entry.getValue().iterator();
            while (it.hasNext()) {
                try {
                    sendAlert(it.next(), entry.getKey());
                    this.alertMetrics.withSuccessEvents(Integer.valueOf(this.alertMetrics.getSuccessEvents().intValue() + 1));
                } catch (EventPublisherException e) {
                    this.alertMetrics.withFailedEvents(Integer.valueOf(this.alertMetrics.getFailedEvents().intValue() + 1));
                    handleFailedEvent(e);
                }
            }
        }
    }

    @Override // org.openmetadata.service.apps.bundles.changeEvent.Consumer
    public void commit(JobExecutionContext jobExecutionContext) {
        long currentTimeMillis = System.currentTimeMillis();
        EventSubscriptionOffset withTimestamp = new EventSubscriptionOffset().withOffset(Long.valueOf(this.offset)).withTimestamp(Long.valueOf(currentTimeMillis));
        Entity.getCollectionDAO().eventSubscriptionDAO().upsertSubscriberExtension(this.eventSubscription.getId().toString(), OFFSET_EXTENSION, "eventSubscriptionOffset", JsonUtils.pojoToJson(withTimestamp));
        jobExecutionContext.getJobDetail().getJobDataMap().put(ALERT_OFFSET_KEY, withTimestamp);
        Entity.getCollectionDAO().eventSubscriptionDAO().upsertSubscriberExtension(this.eventSubscription.getId().toString(), METRICS_EXTENSION, "alertMetrics", JsonUtils.pojoToJson(new AlertMetrics().withTotalEvents(this.alertMetrics.getTotalEvents()).withFailedEvents(this.alertMetrics.getFailedEvents()).withSuccessEvents(this.alertMetrics.getSuccessEvents()).withTimestamp(Long.valueOf(currentTimeMillis))));
        jobExecutionContext.getJobDetail().getJobDataMap().put(METRICS_EXTENSION, this.alertMetrics);
        jobExecutionContext.getJobDetail().getJobDataMap().put(DESTINATION_MAP_KEY, this.destinationMap);
    }

    @Override // org.openmetadata.service.apps.bundles.changeEvent.Consumer
    public List<ChangeEvent> pollEvents(long j, long j2) {
        List<String> list = Entity.getCollectionDAO().changeEventDAO().list(j2, j);
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add((ChangeEvent) JsonUtils.readValue(it.next(), ChangeEvent.class));
        }
        return arrayList;
    }

    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        init(jobExecutionContext);
        List<ChangeEvent> pollEvents = pollEvents(this.offset, this.eventSubscription.getBatchSize().intValue());
        int size = pollEvents.size();
        Map<ChangeEvent, Set<UUID>> createEventsWithReceivers = createEventsWithReceivers(pollEvents);
        try {
            try {
                if (!createEventsWithReceivers.isEmpty()) {
                    this.alertMetrics.withTotalEvents(Integer.valueOf(this.alertMetrics.getTotalEvents().intValue() + createEventsWithReceivers.size()));
                    publishEvents(createEventsWithReceivers);
                }
                if (createEventsWithReceivers.isEmpty()) {
                    return;
                }
                this.offset += size;
                commit(jobExecutionContext);
            } catch (Exception e) {
                LOG.error("Error in executing the Job : {} ", e.getMessage());
                if (createEventsWithReceivers.isEmpty()) {
                    return;
                }
                this.offset += size;
                commit(jobExecutionContext);
            }
        } catch (Throwable th) {
            if (!createEventsWithReceivers.isEmpty()) {
                this.offset += size;
                commit(jobExecutionContext);
            }
            throw th;
        }
    }

    public EventSubscription getEventSubscription() {
        return (EventSubscription) this.jobDetail.getJobDataMap().get(ALERT_INFO_KEY);
    }

    private Map<ChangeEvent, Set<UUID>> createEventsWithReceivers(List<ChangeEvent> list) {
        TreeMap treeMap = new TreeMap(Comparator.comparing((v0) -> {
            return v0.getId();
        }));
        Iterator<ChangeEvent> it = list.iterator();
        while (it.hasNext()) {
            treeMap.put(it.next(), Set.of(this.destinationMap.keySet().toArray(i -> {
                return new UUID[i];
            })));
        }
        return treeMap;
    }

    public JobDetail getJobDetail() {
        return this.jobDetail;
    }

    public void setJobDetail(JobDetail jobDetail) {
        this.jobDetail = jobDetail;
    }
}
