package org.graylog2.buffers.processors;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.lmax.disruptor.EventHandler;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.graylog2.Configuration;
import org.graylog2.buffers.OutputBufferWatermark;
import org.graylog2.outputs.CachedOutputRouter;
import org.graylog2.outputs.OutputRegistry;
import org.graylog2.outputs.OutputRouter;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.buffers.MessageEvent;
import org.graylog2.plugin.outputs.MessageOutput;
import org.graylog2.shared.ServerStatus;
import org.graylog2.shared.stats.ThroughputStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/buffers/processors/OutputBufferProcessor.class */
public class OutputBufferProcessor implements EventHandler<MessageEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(OutputBufferProcessor.class);
    private final ExecutorService executor;
    private final Configuration configuration;
    private final OutputRegistry outputRegistry;
    private final ThroughputStats throughputStats;
    private final ServerStatus serverStatus;
    private final Meter incomingMessages;
    private final Histogram batchSize;
    private final Timer processTime;
    private final OutputBufferWatermark outputBufferWatermark;
    private final OutputRouter outputRouter;
    private final long ordinal;
    private final long numberOfConsumers;

    /* loaded from: input_file:org/graylog2/buffers/processors/OutputBufferProcessor$Factory.class */
    public interface Factory {
        OutputBufferProcessor create(@Assisted("ordinal") long j, @Assisted("numberOfConsumers") long j2);
    }

    @AssistedInject
    public OutputBufferProcessor(Configuration configuration, MetricRegistry metricRegistry, OutputRegistry outputRegistry, ThroughputStats throughputStats, ServerStatus serverStatus, OutputBufferWatermark outputBufferWatermark, CachedOutputRouter cachedOutputRouter, @Assisted("ordinal") long j, @Assisted("numberOfConsumers") long j2) {
        this.configuration = configuration;
        this.outputRegistry = outputRegistry;
        this.throughputStats = throughputStats;
        this.serverStatus = serverStatus;
        this.outputBufferWatermark = outputBufferWatermark;
        this.outputRouter = cachedOutputRouter;
        this.ordinal = j;
        this.numberOfConsumers = j2;
        this.executor = new ThreadPoolExecutor(configuration.getOutputBufferProcessorThreadsCorePoolSize(), configuration.getOutputBufferProcessorThreadsMaxPoolSize(), configuration.getOutputBufferProcessorKeepAliveTime(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat("outputbuffer-processor-" + j + "-executor-%d").build());
        this.incomingMessages = metricRegistry.meter(MetricRegistry.name((Class<?>) OutputBufferProcessor.class, "incomingMessages"));
        this.batchSize = metricRegistry.histogram(MetricRegistry.name((Class<?>) OutputBufferProcessor.class, "batchSize"));
        this.processTime = metricRegistry.timer(MetricRegistry.name((Class<?>) OutputBufferProcessor.class, "processTime"));
    }

    @Override // com.lmax.disruptor.EventHandler
    public void onEvent(MessageEvent messageEvent, long j, boolean z) throws Exception {
        if (j % this.numberOfConsumers != this.ordinal) {
            return;
        }
        this.outputBufferWatermark.decrementAndGet();
        this.incomingMessages.mark();
        final Message message = messageEvent.getMessage();
        LOG.debug("Processing message <{}> from OutputBuffer.", message.getId());
        Set<MessageOutput> outputsForMessage = this.outputRouter.getOutputsForMessage(message);
        final CountDownLatch countDownLatch = new CountDownLatch(outputsForMessage.size());
        for (final MessageOutput messageOutput : outputsForMessage) {
            if (messageOutput == null) {
                LOG.error("Got null output!");
            } else if (messageOutput.isRunning()) {
                try {
                    LOG.debug("Writing message to [{}].", messageOutput.getName());
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Message id for [{}]: <{}>", messageOutput.getName(), message.getId());
                    }
                    this.executor.submit(new Runnable() { // from class: org.graylog2.buffers.processors.OutputBufferProcessor.1
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                try {
                                    Timer.Context time = OutputBufferProcessor.this.processTime.time();
                                    Throwable th = null;
                                    try {
                                        messageOutput.write(message);
                                        if (time != null) {
                                            if (0 != 0) {
                                                try {
                                                    time.close();
                                                } catch (Throwable th2) {
                                                    th.addSuppressed(th2);
                                                }
                                            } else {
                                                time.close();
                                            }
                                        }
                                        countDownLatch.countDown();
                                    } catch (Throwable th3) {
                                        if (time != null) {
                                            if (0 != 0) {
                                                try {
                                                    time.close();
                                                } catch (Throwable th4) {
                                                    th.addSuppressed(th4);
                                                }
                                            } else {
                                                time.close();
                                            }
                                        }
                                        throw th3;
                                    }
                                } catch (Exception e) {
                                    OutputBufferProcessor.LOG.error("Error in output [" + messageOutput.getName() + "].", (Throwable) e);
                                    countDownLatch.countDown();
                                }
                            } catch (Throwable th5) {
                                countDownLatch.countDown();
                                throw th5;
                            }
                        }
                    });
                } catch (Exception e) {
                    LOG.error("Could not write message batch to output [" + messageOutput.getName() + "].", (Throwable) e);
                    countDownLatch.countDown();
                }
            } else if (LOG.isDebugEnabled()) {
                LOG.debug("Skipping stopped output {}", messageOutput.getClass().getName());
            }
        }
        if (!countDownLatch.await(this.configuration.getOutputModuleTimeout(), TimeUnit.MILLISECONDS)) {
            LOG.warn("Timeout reached. Not waiting any longer for writer threads to complete.");
        }
        if (this.serverStatus.hasCapability(ServerStatus.Capability.STATSMODE)) {
            this.throughputStats.getBenchmarkCounter().increment();
        }
        this.throughputStats.getThroughputCounter().increment();
        LOG.debug("Wrote message <{}> to all outputs. Finished handling.", message.getId());
    }
}
