package org.graylog2.events;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.eventbus.DeadEvent;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.MongoException;
import com.mongodb.WriteConcern;
import java.util.Collections;
import javax.inject.Inject;
import org.graylog2.bindings.providers.MongoJackObjectMapperProvider;
import org.graylog2.database.MongoConnection;
import org.graylog2.plugin.periodical.Periodical;
import org.graylog2.plugin.system.NodeId;
import org.graylog2.security.RestrictedChainingClassLoader;
import org.graylog2.security.SafeClasses;
import org.graylog2.security.UnsafeClassLoadingAttemptException;
import org.graylog2.shared.plugins.ChainingClassLoader;
import org.graylog2.shared.utilities.AutoValueUtils;
import org.mongojack.DBCursor;
import org.mongojack.DBSort;
import org.mongojack.DBUpdate;
import org.mongojack.JacksonDBCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/events/ClusterEventPeriodical.class */
public class ClusterEventPeriodical extends Periodical {
    private static final Logger LOG = LoggerFactory.getLogger(ClusterEventPeriodical.class);

    @VisibleForTesting
    static final String COLLECTION_NAME = "cluster_events";
    private final JacksonDBCollection<ClusterEvent, String> dbCollection;
    private final NodeId nodeId;
    private final ObjectMapper objectMapper;
    private final EventBus serverEventBus;
    private final RestrictedChainingClassLoader chainingClassLoader;

    @Inject
    public ClusterEventPeriodical(MongoJackObjectMapperProvider mongoJackObjectMapperProvider, MongoConnection mongoConnection, NodeId nodeId, RestrictedChainingClassLoader restrictedChainingClassLoader, EventBus eventBus, ClusterEventBus clusterEventBus) {
        this((JacksonDBCollection<ClusterEvent, String>) JacksonDBCollection.wrap(prepareCollection(mongoConnection), ClusterEvent.class, String.class, mongoJackObjectMapperProvider.m396get()), nodeId, mongoJackObjectMapperProvider.m396get(), restrictedChainingClassLoader, eventBus, clusterEventBus);
    }

    @Deprecated
    public ClusterEventPeriodical(MongoJackObjectMapperProvider mongoJackObjectMapperProvider, MongoConnection mongoConnection, NodeId nodeId, ChainingClassLoader chainingClassLoader, EventBus eventBus, ClusterEventBus clusterEventBus) {
        this((JacksonDBCollection<ClusterEvent, String>) JacksonDBCollection.wrap(prepareCollection(mongoConnection), ClusterEvent.class, String.class, mongoJackObjectMapperProvider.m396get()), nodeId, mongoJackObjectMapperProvider.m396get(), new RestrictedChainingClassLoader(chainingClassLoader, SafeClasses.allGraylogInternal()), eventBus, clusterEventBus);
    }

    private ClusterEventPeriodical(JacksonDBCollection<ClusterEvent, String> jacksonDBCollection, NodeId nodeId, ObjectMapper objectMapper, RestrictedChainingClassLoader restrictedChainingClassLoader, EventBus eventBus, ClusterEventBus clusterEventBus) {
        this.nodeId = (NodeId) Preconditions.checkNotNull(nodeId);
        this.dbCollection = (JacksonDBCollection) Preconditions.checkNotNull(jacksonDBCollection);
        this.objectMapper = (ObjectMapper) Preconditions.checkNotNull(objectMapper);
        this.chainingClassLoader = restrictedChainingClassLoader;
        this.serverEventBus = (EventBus) Preconditions.checkNotNull(eventBus);
        ((ClusterEventBus) Preconditions.checkNotNull(clusterEventBus)).registerClusterEventSubscriber(this);
    }

    @VisibleForTesting
    static DBCollection prepareCollection(MongoConnection mongoConnection) {
        DBCollection collection = mongoConnection.getDatabase().getCollection(COLLECTION_NAME);
        collection.createIndex(DBSort.asc("timestamp").asc("producer").asc("consumers"));
        collection.setWriteConcern(WriteConcern.JOURNALED);
        return collection;
    }

    @Override // org.graylog2.plugin.periodical.Periodical
    public boolean runsForever() {
        return false;
    }

    @Override // org.graylog2.plugin.periodical.Periodical
    public boolean stopOnGracefulShutdown() {
        return true;
    }

    @Override // org.graylog2.plugin.periodical.Periodical
    public boolean leaderOnly() {
        return false;
    }

    @Override // org.graylog2.plugin.periodical.Periodical
    public boolean startOnThisNode() {
        return true;
    }

    @Override // org.graylog2.plugin.periodical.Periodical
    public boolean isDaemon() {
        return true;
    }

    @Override // org.graylog2.plugin.periodical.Periodical
    public int getInitialDelaySeconds() {
        return 0;
    }

    @Override // org.graylog2.plugin.periodical.Periodical
    public int getPeriodSeconds() {
        return 1;
    }

    @Override // org.graylog2.plugin.periodical.Periodical
    protected Logger getLogger() {
        return LOG;
    }

    @Override // org.graylog2.plugin.periodical.Periodical
    public void doRun() {
        LOG.debug("Opening MongoDB cursor on \"{}\"", COLLECTION_NAME);
        try {
            DBCursor<ClusterEvent> eventCursor = eventCursor(this.nodeId);
            try {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("MongoDB query plan: {}", eventCursor.explain());
                }
                while (eventCursor.hasNext()) {
                    ClusterEvent clusterEvent = (ClusterEvent) eventCursor.next();
                    LOG.trace("Processing cluster event: {}", clusterEvent);
                    Object extractPayload = extractPayload(clusterEvent.payload(), clusterEvent.eventClass());
                    if (extractPayload != null) {
                        this.serverEventBus.post(extractPayload);
                    } else {
                        LOG.warn("Couldn't extract payload of cluster event with ID <{}>", clusterEvent.id());
                        LOG.debug("Invalid payload in cluster event: {}", clusterEvent);
                    }
                    updateConsumers(clusterEvent.id(), this.nodeId);
                }
                if (eventCursor != null) {
                    eventCursor.close();
                }
            } finally {
            }
        } catch (Exception e) {
            LOG.warn("Error while reading cluster events from MongoDB, retrying.", e);
        }
    }

    @Subscribe
    public void publishClusterEvent(Object obj) {
        if (obj instanceof DeadEvent) {
            LOG.debug("Skipping DeadEvent on cluster event bus");
            return;
        }
        String canonicalName = AutoValueUtils.getCanonicalName(obj.getClass());
        try {
            String str = (String) this.dbCollection.save(ClusterEvent.create(this.nodeId.getNodeId(), canonicalName, Collections.singleton(this.nodeId.getNodeId()), obj), WriteConcern.JOURNALED).getSavedId();
            this.serverEventBus.post(obj);
            LOG.debug("Published cluster event with ID <{}> and type <{}>", str, canonicalName);
        } catch (MongoException e) {
            LOG.error("Couldn't publish cluster event of type <" + canonicalName + ">", e);
        }
    }

    private DBCursor<ClusterEvent> eventCursor(NodeId nodeId) {
        BasicDBList basicDBList = new BasicDBList();
        basicDBList.add(nodeId.getNodeId());
        return this.dbCollection.find(new BasicDBObject("consumers", new BasicDBObject("$nin", basicDBList))).sort(DBSort.asc("timestamp"));
    }

    private void updateConsumers(String str, NodeId nodeId) {
        this.dbCollection.updateById(str, DBUpdate.addToSet("consumers", nodeId.getNodeId()));
    }

    private Object extractPayload(Object obj, String str) {
        try {
            return this.objectMapper.convertValue(obj, this.chainingClassLoader.loadClassSafely(str));
        } catch (ClassNotFoundException e) {
            LOG.debug("Couldn't load class <" + str + "> for event", e);
            return null;
        } catch (IllegalArgumentException e2) {
            LOG.debug("Error while deserializing payload", e2);
            return null;
        } catch (UnsafeClassLoadingAttemptException e3) {
            LOG.warn("Couldn't load class <{}>.", str, e3);
            return null;
        }
    }
}
