/*
 * Decompiled with CFR 0.152.
 */
package org.openmetadata.service.events;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.openmetadata.schema.filter.EventFilter;
import org.openmetadata.schema.filter.Filters;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.EventType;
import org.openmetadata.service.events.EventPubSub;
import org.openmetadata.service.events.EventPublisher;
import org.openmetadata.service.events.errors.RetriableException;
import org.openmetadata.service.resources.events.EventResource;
import org.openmetadata.service.util.FilterUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractEventPublisher
implements EventPublisher {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractEventPublisher.class);
    protected static final int BACKOFF_NORMAL = 0;
    protected static final int BACKOFF_3_SECONDS = 3000;
    protected static final int BACKOFF_30_SECONDS = 30000;
    protected static final int BACKOFF_5_MINUTES = 300000;
    protected static final int BACKOFF_1_HOUR = 3600000;
    protected static final int BACKOFF_24_HOUR = 86400000;
    protected int currentBackoffTime = 0;
    protected final List<ChangeEvent> batch = new ArrayList<ChangeEvent>();
    protected final ConcurrentHashMap<String, Map<EventType, Filters>> filter = new ConcurrentHashMap();
    private final int batchSize;

    protected AbstractEventPublisher(int batchSize, List<EventFilter> filters) {
        if (filters != null) {
            this.updateFilter(filters);
        }
        this.batchSize = batchSize;
    }

    protected void updateFilter(List<EventFilter> filterList) {
        filterList.forEach(entityFilter -> {
            String entityType = entityFilter.getEntityType();
            HashMap entityBasicFilterMap = new HashMap();
            if (entityFilter.getFilters() != null) {
                entityFilter.getFilters().forEach(f -> entityBasicFilterMap.put(f.getEventType(), f));
            }
            this.filter.put(entityType, entityBasicFilterMap);
        });
    }

    public void onEvent(EventPubSub.ChangeEventHolder changeEventHolder, long sequence, boolean endOfBatch) throws Exception {
        ChangeEvent changeEvent = changeEventHolder.get();
        if (!this.filter.isEmpty() && !FilterUtil.shouldProcessRequest(changeEvent, this.filter)) {
            return;
        }
        this.batch.add(changeEventHolder.get());
        if (!endOfBatch && this.batch.size() < this.batchSize) {
            return;
        }
        EventResource.ChangeEventList list = new EventResource.ChangeEventList(this.batch, null, null, this.batch.size());
        try {
            this.publish(list);
            this.batch.clear();
        }
        catch (RetriableException ex) {
            this.setNextBackOff();
            LOG.error("Failed to publish event {} due to {}, will try again in {} ms", new Object[]{changeEvent, ex, this.currentBackoffTime});
            Thread.sleep(this.currentBackoffTime);
        }
        catch (Exception e) {
            LOG.error("Failed to publish event type {} for entity {}", (Object)changeEvent.getEventType(), (Object)changeEvent.getEntityType());
            LOG.error(e.getMessage(), (Throwable)e);
        }
    }

    protected void setNextBackOff() {
        if (this.currentBackoffTime == 0) {
            this.currentBackoffTime = 3000;
        } else if (this.currentBackoffTime == 3000) {
            this.currentBackoffTime = 30000;
        } else if (this.currentBackoffTime == 30000) {
            this.currentBackoffTime = 300000;
        } else if (this.currentBackoffTime == 300000) {
            this.currentBackoffTime = 3600000;
        } else if (this.currentBackoffTime == 3600000) {
            this.currentBackoffTime = 86400000;
        }
    }
}

