package org.graylog2.shared.buffers;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.graylog2.inputs.Cache;
import org.graylog2.inputs.InputCache;
import org.graylog2.plugin.BaseConfiguration;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.buffers.Buffer;
import org.graylog2.plugin.buffers.BufferOutOfCapacityException;
import org.graylog2.plugin.buffers.MessageEvent;
import org.graylog2.plugin.buffers.ProcessingDisabledException;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.shared.buffers.processors.ProcessBufferProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/shared/buffers/ProcessBuffer.class */
public class ProcessBuffer extends Buffer {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessBuffer.class);
    public static String SOURCE_INPUT_ATTR_NAME;
    public static String SOURCE_NODE_ATTR_NAME;
    protected ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("processbufferprocessor-%d").build());
    private final BaseConfiguration configuration;
    private final InputCache inputCache;
    private final AtomicInteger processBufferWatermark;
    private final Meter incomingMessages;
    private final Meter rejectedMessages;
    private final Meter cachedMessages;
    private final ServerStatus serverStatus;

    /* loaded from: input_file:org/graylog2/shared/buffers/ProcessBuffer$Factory.class */
    public interface Factory {
        ProcessBuffer create(InputCache inputCache, AtomicInteger atomicInteger);
    }

    @AssistedInject
    public ProcessBuffer(MetricRegistry metricRegistry, ServerStatus serverStatus, BaseConfiguration baseConfiguration, @Assisted InputCache inputCache, @Assisted AtomicInteger atomicInteger) {
        this.serverStatus = serverStatus;
        this.configuration = baseConfiguration;
        this.inputCache = inputCache;
        this.processBufferWatermark = atomicInteger;
        this.incomingMessages = metricRegistry.meter(MetricRegistry.name((Class<?>) ProcessBuffer.class, "incomingMessages"));
        this.rejectedMessages = metricRegistry.meter(MetricRegistry.name((Class<?>) ProcessBuffer.class, "rejectedMessages"));
        this.cachedMessages = metricRegistry.meter(MetricRegistry.name((Class<?>) ProcessBuffer.class, "cachedMessages"));
        if (serverStatus.hasCapability(ServerStatus.Capability.RADIO)) {
            SOURCE_INPUT_ATTR_NAME = "gl2_source_radio_input";
            SOURCE_NODE_ATTR_NAME = "gl2_source_radio";
        } else {
            SOURCE_INPUT_ATTR_NAME = "gl2_source_input";
            SOURCE_NODE_ATTR_NAME = "gl2_source_node";
        }
    }

    public Cache getInputCache() {
        return this.inputCache;
    }

    public void initialize(ProcessBufferProcessor[] processBufferProcessorArr, int i, WaitStrategy waitStrategy, int i2) {
        Disruptor disruptor = new Disruptor(MessageEvent.EVENT_FACTORY, i, this.executor, ProducerType.MULTI, waitStrategy);
        LOG.info("Initialized ProcessBuffer with ring size <{}> and wait strategy <{}>.", Integer.valueOf(i), waitStrategy.getClass().getSimpleName());
        disruptor.handleEventsWith(processBufferProcessorArr);
        this.ringBuffer = disruptor.start();
    }

    @Override // org.graylog2.plugin.buffers.Buffer
    public void insertCached(Message message, MessageInput messageInput) {
        message.setSourceInput(messageInput);
        message.addField(SOURCE_INPUT_ATTR_NAME, messageInput != null ? messageInput.getId() : "<nonexistent input>");
        message.addField(SOURCE_NODE_ATTR_NAME, this.serverStatus.getNodeId());
        if (!this.serverStatus.isProcessing()) {
            LOG.debug("Message processing is paused. Writing to cache.");
            this.cachedMessages.mark();
            this.inputCache.add(message);
        } else {
            if (hasCapacity()) {
                insert(message);
                return;
            }
            if (this.configuration.getInputCacheMaxSize() != 0 && this.inputCache.size() >= this.configuration.getInputCacheMaxSize()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Out of capacity. Input cache limit reached. Dropping message.");
                }
                this.rejectedMessages.mark();
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Out of capacity. Writing to cache.");
                }
                this.cachedMessages.mark();
                this.inputCache.add(message);
            }
        }
    }

    @Override // org.graylog2.plugin.buffers.Buffer
    public void insertFailFast(Message message, MessageInput messageInput) throws BufferOutOfCapacityException, ProcessingDisabledException {
        message.setSourceInput(messageInput);
        message.addField(SOURCE_INPUT_ATTR_NAME, messageInput != null ? messageInput.getId() : "<nonexistent input>");
        message.addField(SOURCE_NODE_ATTR_NAME, this.serverStatus.getNodeId());
        if (!this.serverStatus.isProcessing()) {
            LOG.debug("Rejecting message, because message processing is paused.");
            throw new ProcessingDisabledException();
        }
        if (hasCapacity()) {
            insert(message);
        } else {
            LOG.debug("Rejecting message, because I am full and caching was disabled by input. Raise my size or add more processors.");
            this.rejectedMessages.mark();
            throw new BufferOutOfCapacityException();
        }
    }

    private void insert(Message message) {
        long next = this.ringBuffer.next();
        this.ringBuffer.get(next).setMessage(message);
        this.ringBuffer.publish(next);
        this.processBufferWatermark.incrementAndGet();
        this.incomingMessages.mark();
    }
}
