package org.graylog2.streams;

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import javax.inject.Inject;
import javax.inject.Named;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.streams.StreamRouterEngine;
import org.graylog2.streams.events.StreamsChangedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/streams/StreamRouter.class */
public class StreamRouter {
    private static final Logger LOG = LoggerFactory.getLogger(StreamRouter.class);
    protected final StreamService streamService;
    private final ServerStatus serverStatus;
    private final ScheduledExecutorService scheduler;
    private final AtomicReference<StreamRouterEngine> routerEngine = new AtomicReference<>(null);
    private final StreamRouterEngineUpdater engineUpdater;

    /* loaded from: input_file:org/graylog2/streams/StreamRouter$StreamRouterEngineUpdater.class */
    private static class StreamRouterEngineUpdater implements Runnable {
        private final AtomicReference<StreamRouterEngine> routerEngine;
        private final StreamRouterEngine.Factory engineFactory;
        private final StreamService streamService;
        private final ExecutorService executorService;

        public StreamRouterEngineUpdater(AtomicReference<StreamRouterEngine> atomicReference, StreamRouterEngine.Factory factory, StreamService streamService, ExecutorService executorService) {
            this.routerEngine = atomicReference;
            this.engineFactory = factory;
            this.streamService = streamService;
            this.executorService = executorService;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                StreamRouterEngine newEngine = getNewEngine();
                if (newEngine.getFingerprint().equals(this.routerEngine.get().getFingerprint())) {
                    StreamRouter.LOG.debug("Not updating router engine, streams did not change (fingerprint={})", newEngine.getFingerprint());
                } else {
                    StreamRouter.LOG.debug("Updating to new stream router engine. (old-fingerprint={} new-fingerprint={}", this.routerEngine.get().getFingerprint(), newEngine.getFingerprint());
                    this.routerEngine.set(newEngine);
                }
            } catch (Exception e) {
                StreamRouter.LOG.error("Stream router engine update failed!", e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public StreamRouterEngine getNewEngine() {
            return this.engineFactory.create(this.streamService.loadAllEnabled(), this.executorService);
        }
    }

    @Inject
    public StreamRouter(StreamService streamService, ServerStatus serverStatus, StreamRouterEngine.Factory factory, EventBus eventBus, @Named("daemonScheduler") ScheduledExecutorService scheduledExecutorService) {
        this.streamService = streamService;
        this.serverStatus = serverStatus;
        this.scheduler = scheduledExecutorService;
        this.engineUpdater = new StreamRouterEngineUpdater(this.routerEngine, factory, streamService, executorService());
        this.routerEngine.set(this.engineUpdater.getNewEngine());
        eventBus.register(this);
    }

    @Subscribe
    public void handleStreamsUpdate(StreamsChangedEvent streamsChangedEvent) {
        this.scheduler.submit(this.engineUpdater);
    }

    private ExecutorService executorService() {
        return Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("stream-router-%d").setDaemon(true).build());
    }

    public List<Stream> route(Message message) {
        StreamRouterEngine streamRouterEngine = this.routerEngine.get();
        message.recordCounter(this.serverStatus, "streams-evaluated", streamRouterEngine.getStreams().size());
        return streamRouterEngine.match(message);
    }
}
