package org.graylog2.buffers.processors;

import com.codahale.metrics.Counter;
import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.lmax.disruptor.WorkHandler;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.graylog2.Configuration;
import org.graylog2.outputs.DefaultMessageOutput;
import org.graylog2.outputs.OutputRouter;
import org.graylog2.plugin.GlobalMetricNames;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.buffers.MessageEvent;
import org.graylog2.plugin.outputs.MessageOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/buffers/processors/OutputBufferProcessor.class */
public class OutputBufferProcessor implements WorkHandler<MessageEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(OutputBufferProcessor.class);
    private static final String INCOMING_MESSAGES_METRICNAME = MetricRegistry.name(OutputBufferProcessor.class, new String[]{"incomingMessages"});
    private static final String PROCESS_TIME_METRICNAME = MetricRegistry.name(OutputBufferProcessor.class, new String[]{"processTime"});
    private final ExecutorService executor;
    private final Configuration configuration;
    private final ServerStatus serverStatus;
    private final Meter incomingMessages;
    private final Counter outputThroughput;
    private final Timer processTime;
    private final OutputRouter outputRouter;
    private final MessageOutput defaultMessageOutput;

    @Inject
    public OutputBufferProcessor(Configuration configuration, MetricRegistry metricRegistry, ServerStatus serverStatus, OutputRouter outputRouter, @DefaultMessageOutput MessageOutput messageOutput) {
        this.configuration = configuration;
        this.serverStatus = serverStatus;
        this.outputRouter = outputRouter;
        this.defaultMessageOutput = messageOutput;
        this.executor = executorService(metricRegistry, "outputbuffer-processor-executor-%d", configuration.getOutputBufferProcessorThreadsCorePoolSize(), configuration.getOutputBufferProcessorThreadsMaxPoolSize(), configuration.getOutputBufferProcessorKeepAliveTime());
        this.incomingMessages = metricRegistry.meter(INCOMING_MESSAGES_METRICNAME);
        this.outputThroughput = metricRegistry.counter(GlobalMetricNames.OUTPUT_THROUGHPUT);
        this.processTime = metricRegistry.timer(PROCESS_TIME_METRICNAME);
    }

    private ExecutorService executorService(MetricRegistry metricRegistry, String str, int i, int i2, int i3) {
        return new InstrumentedExecutorService(new ThreadPoolExecutor(i, i2, i3, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat(str).build()), metricRegistry, MetricRegistry.name(getClass(), new String[]{"executor-service"}));
    }

    public void onEvent(MessageEvent messageEvent) throws Exception {
        this.incomingMessages.mark();
        Message message = messageEvent.getMessage();
        if (message == null) {
            LOG.debug("Skipping null message.");
            return;
        }
        LOG.debug("Processing message <{}> from OutputBuffer.", message.getId());
        Set<MessageOutput> streamOutputsForMessage = this.outputRouter.getStreamOutputsForMessage(message);
        message.recordCounter(this.serverStatus, "matched-outputs", streamOutputsForMessage.size());
        Future<?> processMessage = processMessage(message, this.defaultMessageOutput);
        CountDownLatch countDownLatch = new CountDownLatch(streamOutputsForMessage.size());
        Iterator<MessageOutput> it = streamOutputsForMessage.iterator();
        while (it.hasNext()) {
            processMessage(message, it.next(), countDownLatch);
        }
        if (!countDownLatch.await(this.configuration.getOutputModuleTimeout(), TimeUnit.MILLISECONDS)) {
            LOG.warn("Timeout reached. Not waiting any longer for stream output writer threads to complete.");
        }
        if (processMessage != null) {
            Uninterruptibles.getUninterruptibly(processMessage);
        } else {
            LOG.error("The default output future was null, this is a bug!");
        }
        if (message.hasRecordings()) {
            LOG.debug("Message event trace: {}", message.recordingsAsString());
        }
        this.outputThroughput.inc();
        LOG.debug("Wrote message <{}> to all outputs. Finished handling.", message.getId());
    }

    private Future<?> processMessage(Message message, MessageOutput messageOutput) {
        return processMessage(message, messageOutput, new CountDownLatch(0));
    }

    private Future<?> processMessage(final Message message, final MessageOutput messageOutput, final CountDownLatch countDownLatch) {
        if (messageOutput == null) {
            LOG.error("Output was null!");
            return Futures.immediateCancelledFuture();
        }
        if (!messageOutput.isRunning()) {
            LOG.debug("Skipping stopped output {}", messageOutput.getClass().getName());
            return Futures.immediateCancelledFuture();
        }
        Future<?> future = null;
        try {
            LOG.debug("Writing message to [{}].", messageOutput.getClass());
            if (LOG.isTraceEnabled()) {
                LOG.trace("Message id for [{}]: <{}>", messageOutput.getClass(), message.getId());
            }
            future = this.executor.submit(new Runnable() { // from class: org.graylog2.buffers.processors.OutputBufferProcessor.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        try {
                            Timer.Context time = OutputBufferProcessor.this.processTime.time();
                            Throwable th = null;
                            try {
                                messageOutput.write(message);
                                if (time != null) {
                                    if (0 != 0) {
                                        try {
                                            time.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        time.close();
                                    }
                                }
                                countDownLatch.countDown();
                            } catch (Throwable th3) {
                                if (time != null) {
                                    if (0 != 0) {
                                        try {
                                            time.close();
                                        } catch (Throwable th4) {
                                            th.addSuppressed(th4);
                                        }
                                    } else {
                                        time.close();
                                    }
                                }
                                throw th3;
                            }
                        } catch (Exception e) {
                            OutputBufferProcessor.LOG.error("Error in output [" + messageOutput.getClass() + "].", e);
                            countDownLatch.countDown();
                        }
                    } catch (Throwable th5) {
                        countDownLatch.countDown();
                        throw th5;
                    }
                }
            });
        } catch (Exception e) {
            LOG.error("Could not write message batch to output [" + messageOutput.getClass() + "].", e);
            countDownLatch.countDown();
        }
        return future;
    }
}
