/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.apps.bundles.changeEvent;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import org.openmetadata.schema.entity.events.AlertMetrics;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.EventSubscriptionOffset;
import org.openmetadata.schema.entity.events.FailedEvent;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.changeEvent.Alert;
import org.openmetadata.service.apps.bundles.changeEvent.AlertFactory;
import org.openmetadata.service.apps.bundles.changeEvent.Consumer;
import org.openmetadata.service.apps.bundles.changeEvent.Destination;
import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.events.subscription.AlertUtil;
import org.openmetadata.service.util.JsonUtils;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.PersistJobDataAfterExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DisallowConcurrentExecution
@PersistJobDataAfterExecution
public abstract class AbstractEventConsumer
implements Alert<ChangeEvent>,
Consumer<ChangeEvent>,
Job {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractEventConsumer.class);
    public static final String DESTINATION_MAP_KEY = "SubscriptionMapKey";
    public static final String ALERT_OFFSET_KEY = "alertOffsetKey";
    public static final String ALERT_INFO_KEY = "alertInfoKey";
    public static final String OFFSET_EXTENSION = "eventSubscription.Offset";
    public static final String METRICS_EXTENSION = "eventSubscription.metrics";
    public static final String FAILED_EVENT_EXTENSION = "eventSubscription.failedEvent";
    private long offset = -1L;
    private AlertMetrics alertMetrics;
    private JobDetail jobDetail;
    protected EventSubscription eventSubscription;
    protected Map<UUID, Destination<ChangeEvent>> destinationMap;

    protected AbstractEventConsumer() {
    }

    private void init(JobExecutionContext context) {
        EventSubscription sub = (EventSubscription)context.getJobDetail().getJobDataMap().get((Object)ALERT_INFO_KEY);
        this.jobDetail = context.getJobDetail();
        this.eventSubscription = sub;
        this.offset = this.loadInitialOffset(context);
        this.alertMetrics = this.loadInitialMetrics();
        this.destinationMap = this.loadDestinationsMap(context);
        this.doInit(context);
    }

    protected void doInit(JobExecutionContext context) {
    }

    @Override
    public void handleFailedEvent(EventPublisherException ex) {
        UUID failingSubscriptionId = (UUID)ex.getChangeEventWithSubscription().getLeft();
        ChangeEvent changeEvent = (ChangeEvent)ex.getChangeEventWithSubscription().getRight();
        LOG.debug("Change Event Failed for Event Subscription: {} ,  for Subscription : {} , Change Event : {} ", new Object[]{this.eventSubscription.getName(), failingSubscriptionId, changeEvent});
        Entity.getCollectionDAO().eventSubscriptionDAO().upsertFailedEvent(this.eventSubscription.getId().toString(), String.format("%s-%s", FAILED_EVENT_EXTENSION, changeEvent.getId()), JsonUtils.pojoToJson(new FailedEvent().withFailingSubscriptionId(failingSubscriptionId).withChangeEvent(changeEvent).withRetriesLeft(this.eventSubscription.getRetries()).withTimestamp(Long.valueOf(System.currentTimeMillis()))));
    }

    private long loadInitialOffset(JobExecutionContext context) {
        EventSubscriptionOffset jobStoredOffset = (EventSubscriptionOffset)this.jobDetail.getJobDataMap().get((Object)ALERT_OFFSET_KEY);
        if (jobStoredOffset != null) {
            return jobStoredOffset.getOffset();
        }
        EventSubscriptionOffset eventSubscriptionOffset = AlertUtil.getStartingOffset(this.eventSubscription.getId());
        context.getJobDetail().getJobDataMap().put(ALERT_OFFSET_KEY, (Object)eventSubscriptionOffset);
        return eventSubscriptionOffset.getOffset();
    }

    private Map<UUID, Destination<ChangeEvent>> loadDestinationsMap(JobExecutionContext context) {
        HashMap<UUID, Destination<ChangeEvent>> dMap = (HashMap<UUID, Destination<ChangeEvent>>)context.getJobDetail().getJobDataMap().get((Object)DESTINATION_MAP_KEY);
        if (dMap == null) {
            dMap = new HashMap<UUID, Destination<ChangeEvent>>();
            for (SubscriptionDestination subscriptionDest : this.eventSubscription.getDestinations()) {
                dMap.put(subscriptionDest.getId(), AlertFactory.getAlert(this.eventSubscription, subscriptionDest));
            }
            context.getJobDetail().getJobDataMap().put(DESTINATION_MAP_KEY, dMap);
        }
        return dMap;
    }

    private AlertMetrics loadInitialMetrics() {
        AlertMetrics metrics = (AlertMetrics)this.jobDetail.getJobDataMap().get((Object)METRICS_EXTENSION);
        if (metrics != null) {
            return metrics;
        }
        String json = Entity.getCollectionDAO().eventSubscriptionDAO().getSubscriberExtension(this.eventSubscription.getId().toString(), METRICS_EXTENSION);
        if (json != null) {
            return JsonUtils.readValue(json, AlertMetrics.class);
        }
        return new AlertMetrics().withTotalEvents(Integer.valueOf(0)).withFailedEvents(Integer.valueOf(0)).withSuccessEvents(Integer.valueOf(0));
    }

    @Override
    public void publishEvents(Map<ChangeEvent, Set<UUID>> events) {
        if (events.isEmpty()) {
            return;
        }
        Map<ChangeEvent, Set<UUID>> filteredEvents = AlertUtil.getFilteredEvents(this.eventSubscription, events);
        for (Map.Entry<ChangeEvent, Set<UUID>> eventWithReceivers : filteredEvents.entrySet()) {
            for (UUID receiverId : eventWithReceivers.getValue()) {
                try {
                    this.sendAlert(receiverId, eventWithReceivers.getKey());
                    this.alertMetrics.withSuccessEvents(Integer.valueOf(this.alertMetrics.getSuccessEvents() + 1));
                }
                catch (EventPublisherException e) {
                    this.alertMetrics.withFailedEvents(Integer.valueOf(this.alertMetrics.getFailedEvents() + 1));
                    this.handleFailedEvent(e);
                }
            }
        }
    }

    @Override
    public void commit(JobExecutionContext jobExecutionContext) {
        long currentTime = System.currentTimeMillis();
        EventSubscriptionOffset eventSubscriptionOffset = new EventSubscriptionOffset().withOffset(Long.valueOf(this.offset)).withTimestamp(Long.valueOf(currentTime));
        Entity.getCollectionDAO().eventSubscriptionDAO().upsertSubscriberExtension(this.eventSubscription.getId().toString(), OFFSET_EXTENSION, "eventSubscriptionOffset", JsonUtils.pojoToJson(eventSubscriptionOffset));
        jobExecutionContext.getJobDetail().getJobDataMap().put(ALERT_OFFSET_KEY, (Object)eventSubscriptionOffset);
        AlertMetrics metrics = new AlertMetrics().withTotalEvents(this.alertMetrics.getTotalEvents()).withFailedEvents(this.alertMetrics.getFailedEvents()).withSuccessEvents(this.alertMetrics.getSuccessEvents()).withTimestamp(Long.valueOf(currentTime));
        Entity.getCollectionDAO().eventSubscriptionDAO().upsertSubscriberExtension(this.eventSubscription.getId().toString(), METRICS_EXTENSION, "alertMetrics", JsonUtils.pojoToJson(metrics));
        jobExecutionContext.getJobDetail().getJobDataMap().put(METRICS_EXTENSION, (Object)this.alertMetrics);
        jobExecutionContext.getJobDetail().getJobDataMap().put(DESTINATION_MAP_KEY, this.destinationMap);
    }

    @Override
    public List<ChangeEvent> pollEvents(long offset, long batchSize) {
        List<String> eventJson = Entity.getCollectionDAO().changeEventDAO().list(batchSize, offset);
        ArrayList<ChangeEvent> changeEvents = new ArrayList<ChangeEvent>();
        for (String json : eventJson) {
            ChangeEvent event = JsonUtils.readValue(json, ChangeEvent.class);
            changeEvents.add(event);
        }
        return changeEvents;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        this.init(jobExecutionContext);
        List<ChangeEvent> batch = this.pollEvents(this.offset, this.eventSubscription.getBatchSize().intValue());
        int batchSize = batch.size();
        Map<ChangeEvent, Set<UUID>> eventsWithReceivers = this.createEventsWithReceivers(batch);
        try {
            if (!eventsWithReceivers.isEmpty()) {
                this.alertMetrics.withTotalEvents(Integer.valueOf(this.alertMetrics.getTotalEvents() + eventsWithReceivers.size()));
                this.publishEvents(eventsWithReceivers);
            }
        }
        catch (Exception e) {
            LOG.error("Error in executing the Job : {} ", (Object)e.getMessage());
        }
        finally {
            if (!eventsWithReceivers.isEmpty()) {
                this.offset += (long)batchSize;
                this.commit(jobExecutionContext);
            }
        }
    }

    public EventSubscription getEventSubscription() {
        return (EventSubscription)this.jobDetail.getJobDataMap().get((Object)ALERT_INFO_KEY);
    }

    private Map<ChangeEvent, Set<UUID>> createEventsWithReceivers(List<ChangeEvent> events) {
        TreeMap<ChangeEvent, Set<UUID>> eventsWithReceivers = new TreeMap<ChangeEvent, Set<UUID>>(Comparator.comparing(ChangeEvent::getId));
        for (ChangeEvent changeEvent : events) {
            Set<UUID> receivers = Set.of((UUID[])this.destinationMap.keySet().toArray(UUID[]::new));
            eventsWithReceivers.put(changeEvent, receivers);
        }
        return eventsWithReceivers;
    }

    public JobDetail getJobDetail() {
        return this.jobDetail;
    }

    public void setJobDetail(JobDetail jobDetail) {
        this.jobDetail = jobDetail;
    }
}

