package org.graylog2.buffers.processors;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.lmax.disruptor.EventHandler;
import org.graylog2.Core;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.buffers.MessageEvent;
import org.graylog2.plugin.filters.MessageFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/buffers/processors/ProcessBufferProcessor.class */
public class ProcessBufferProcessor implements EventHandler<MessageEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessBufferProcessor.class);
    private Core server;
    private final Meter incomingMessages;
    private final Timer processTime;
    private final Meter filteredOutMessages;
    private final Meter outgoingMessages;
    private final long ordinal;
    private final long numberOfConsumers;

    public ProcessBufferProcessor(Core core, long j, long j2) {
        this.ordinal = j;
        this.numberOfConsumers = j2;
        this.server = core;
        this.incomingMessages = core.metrics().meter(MetricRegistry.name((Class<?>) ProcessBufferProcessor.class, "incomingMessages"));
        this.outgoingMessages = core.metrics().meter(MetricRegistry.name((Class<?>) ProcessBufferProcessor.class, "outgoingMessages"));
        this.filteredOutMessages = core.metrics().meter(MetricRegistry.name((Class<?>) ProcessBufferProcessor.class, "filteredOutMessages"));
        this.processTime = core.metrics().timer(MetricRegistry.name((Class<?>) ProcessBufferProcessor.class, "processTime"));
    }

    @Override // com.lmax.disruptor.EventHandler
    public void onEvent(MessageEvent messageEvent, long j, boolean z) throws Exception {
        if (j % this.numberOfConsumers != this.ordinal) {
            return;
        }
        this.server.processBufferWatermark().decrementAndGet();
        this.incomingMessages.mark();
        Timer.Context time = this.processTime.time();
        Message message = messageEvent.getMessage();
        LOG.debug("Starting to process message <{}>.", message.getId());
        for (MessageFilter messageFilter : this.server.getFilters()) {
            Timer.Context time2 = this.server.metrics().timer(MetricRegistry.name(messageFilter.getClass(), "executionTime")).time();
            try {
                try {
                    LOG.debug("Applying filter [{}] on message <{}>.", messageFilter.getName(), message.getId());
                } catch (Exception e) {
                    LOG.error("Could not apply filter [" + messageFilter.getName() + "] on message <" + message.getId() + ">: ", (Throwable) e);
                    time2.stop();
                }
                if (messageFilter.filter(message, this.server)) {
                    LOG.debug("Filter [{}] marked message <{}> to be discarded. Dropping message.", messageFilter.getName(), message.getId());
                    this.filteredOutMessages.mark();
                    time2.stop();
                    return;
                }
                time2.stop();
            } catch (Throwable th) {
                time2.stop();
                throw th;
            }
        }
        LOG.debug("Finished processing message. Writing to output buffer.");
        this.server.getOutputBuffer().insertCached(message, null);
        this.outgoingMessages.mark();
        time.stop();
    }
}
