/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.shared.buffers;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.InstrumentedThreadFactory;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ThreadFactory;
import javax.inject.Inject;
import javax.inject.Provider;
import javax.inject.Singleton;
import org.graylog2.plugin.BaseConfiguration;
import org.graylog2.plugin.buffers.InputBuffer;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.shared.buffers.DirectMessageHandler;
import org.graylog2.shared.buffers.JournallingMessageHandler;
import org.graylog2.shared.buffers.LoggingExceptionHandler;
import org.graylog2.shared.buffers.RawMessageEncoderHandler;
import org.graylog2.shared.buffers.RawMessageEvent;
import org.graylog2.shared.metrics.MetricUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class InputBufferImpl
implements InputBuffer {
    private static final Logger LOG = LoggerFactory.getLogger(InputBufferImpl.class);
    private final RingBuffer<RawMessageEvent> ringBuffer;
    private final Meter incomingMessages;

    @Inject
    public InputBufferImpl(MetricRegistry metricRegistry, BaseConfiguration configuration, Provider<DirectMessageHandler> directMessageHandlerProvider, Provider<RawMessageEncoderHandler> rawMessageEncoderHandlerProvider, Provider<JournallingMessageHandler> spoolingMessageHandlerProvider) {
        Disruptor disruptor = new Disruptor(RawMessageEvent.FACTORY, configuration.getInputBufferRingSize(), this.threadFactory(metricRegistry), ProducerType.MULTI, configuration.getInputBufferWaitStrategy());
        disruptor.setDefaultExceptionHandler((ExceptionHandler)new LoggingExceptionHandler(LOG));
        int numberOfHandlers = configuration.getInputbufferProcessors();
        if (configuration.isMessageJournalEnabled()) {
            LOG.info("Message journal is enabled.");
            WorkHandler[] handlers = new RawMessageEncoderHandler[numberOfHandlers];
            for (int i = 0; i < numberOfHandlers; ++i) {
                handlers[i] = (RawMessageEncoderHandler)rawMessageEncoderHandlerProvider.get();
            }
            disruptor.handleEventsWithWorkerPool(handlers).then(new EventHandler[]{(EventHandler)spoolingMessageHandlerProvider.get()});
        } else {
            LOG.info("Message journal is disabled.");
            WorkHandler[] handlers = new DirectMessageHandler[numberOfHandlers];
            for (int i = 0; i < numberOfHandlers; ++i) {
                handlers[i] = (DirectMessageHandler)directMessageHandlerProvider.get();
            }
            disruptor.handleEventsWithWorkerPool(handlers);
        }
        this.ringBuffer = disruptor.start();
        this.incomingMessages = metricRegistry.meter(MetricRegistry.name(InputBufferImpl.class, (String[])new String[]{"incomingMessages"}));
        MetricUtils.safelyRegister(metricRegistry, "org.graylog2.buffers.input.usage", new Gauge<Long>(){

            public Long getValue() {
                return InputBufferImpl.this.getUsage();
            }
        });
        MetricUtils.safelyRegister(metricRegistry, "org.graylog2.buffers.input.size", MetricUtils.constantGauge(this.ringBuffer.getBufferSize()));
        LOG.info("Initialized {} with ring size <{}> and wait strategy <{}>, running {} parallel message handlers.", new Object[]{this.getClass().getSimpleName(), configuration.getInputBufferRingSize(), configuration.getInputBufferWaitStrategy().getClass().getSimpleName(), numberOfHandlers});
    }

    public void insert(RawMessage message) {
        this.ringBuffer.publishEvent(RawMessageEvent.TRANSLATOR, (Object)message);
        this.incomingMessages.mark();
    }

    public long getUsage() {
        return (long)this.ringBuffer.getBufferSize() - this.ringBuffer.remainingCapacity();
    }

    private ThreadFactory threadFactory(MetricRegistry metricRegistry) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("inputbufferprocessor-%d").build();
        return new InstrumentedThreadFactory(threadFactory, metricRegistry, MetricRegistry.name(this.getClass(), (String[])new String[]{"thread-factory"}));
    }
}

