package org.graylog2.shared.buffers.processors;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.lmax.disruptor.EventHandler;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.buffers.MessageEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/shared/buffers/processors/ProcessBufferProcessor.class */
public abstract class ProcessBufferProcessor implements EventHandler<MessageEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessBufferProcessor.class);
    private final Meter incomingMessages;
    private final Timer processTime;
    private final Meter outgoingMessages;
    protected final MetricRegistry metricRegistry;
    private final long ordinal;
    private final long numberOfConsumers;

    public ProcessBufferProcessor(MetricRegistry metricRegistry, long j, long j2) {
        this.metricRegistry = metricRegistry;
        this.ordinal = j;
        this.numberOfConsumers = j2;
        this.incomingMessages = metricRegistry.meter(MetricRegistry.name((Class<?>) ProcessBufferProcessor.class, "incomingMessages"));
        this.outgoingMessages = metricRegistry.meter(MetricRegistry.name((Class<?>) ProcessBufferProcessor.class, "outgoingMessages"));
        this.processTime = metricRegistry.timer(MetricRegistry.name((Class<?>) ProcessBufferProcessor.class, "processTime"));
    }

    @Override // com.lmax.disruptor.EventHandler
    public void onEvent(MessageEvent messageEvent, long j, boolean z) throws Exception {
        if (j % this.numberOfConsumers != this.ordinal) {
            return;
        }
        this.incomingMessages.mark();
        Timer.Context time = this.processTime.time();
        Message message = messageEvent.getMessage();
        LOG.debug("Starting to process message <{}>.", message.getId());
        try {
            try {
                LOG.debug("Finished processing message <{}>. Writing to output buffer.", message.getId());
                handleMessage(message);
                this.outgoingMessages.mark();
                time.stop();
            } catch (Exception e) {
                LOG.warn("Unable to process message <{}>: {}", message.getId(), e);
                this.outgoingMessages.mark();
                time.stop();
            }
        } catch (Throwable th) {
            this.outgoingMessages.mark();
            time.stop();
            throw th;
        }
    }

    protected abstract void handleMessage(Message message);
}
