package org.graylog2.buffers.processors;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.EventHandler;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.bson.types.ObjectId;
import org.graylog2.Core;
import org.graylog2.outputs.OutputRouter;
import org.graylog2.outputs.OutputStreamConfigurationImpl;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.buffers.MessageEvent;
import org.graylog2.plugin.outputs.MessageOutput;
import org.graylog2.plugin.outputs.OutputStreamConfiguration;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.streams.StreamImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/buffers/processors/OutputBufferProcessor.class */
public class OutputBufferProcessor implements EventHandler<MessageEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(OutputBufferProcessor.class);
    private final ExecutorService executor;
    private Core server;
    private List<Message> buffer = Lists.newArrayList();
    private final Meter incomingMessages;
    private final Histogram batchSize;
    private final long ordinal;
    private final long numberOfConsumers;

    public OutputBufferProcessor(Core core, long j, long j2) {
        this.ordinal = j;
        this.numberOfConsumers = j2;
        this.server = core;
        this.executor = new ThreadPoolExecutor(core.getConfiguration().getOutputBufferProcessorThreadsCorePoolSize(), core.getConfiguration().getOutputBufferProcessorThreadsMaxPoolSize(), 5L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat("outputbuffer-processor-" + j + "-executor-%d").build());
        this.incomingMessages = core.metrics().meter(MetricRegistry.name((Class<?>) OutputBufferProcessor.class, "incomingMessages"));
        this.batchSize = core.metrics().histogram(MetricRegistry.name((Class<?>) OutputBufferProcessor.class, "batchSize"));
    }

    @Override // com.lmax.disruptor.EventHandler
    public void onEvent(MessageEvent messageEvent, long j, boolean z) throws Exception {
        if (j % this.numberOfConsumers != this.ordinal) {
            return;
        }
        this.server.outputBufferWatermark().decrementAndGet();
        this.incomingMessages.mark();
        Message message = messageEvent.getMessage();
        LOG.debug("Processing message <{}> from OutputBuffer.", message.getId());
        this.buffer.add(message);
        if (z || this.buffer.size() >= this.server.getConfiguration().getOutputBatchSize()) {
            final CountDownLatch countDownLatch = new CountDownLatch(this.server.outputs().count());
            for (final MessageOutput messageOutput : this.server.outputs().get()) {
                final String canonicalName = messageOutput.getClass().getCanonicalName();
                try {
                    final ArrayList newArrayList = Lists.newArrayList(this.buffer);
                    LOG.debug("Writing message batch to [{}]. Size <{}>", messageOutput.getName(), Integer.valueOf(this.buffer.size()));
                    this.batchSize.update(this.buffer.size());
                    this.executor.submit(new Runnable() { // from class: org.graylog2.buffers.processors.OutputBufferProcessor.1
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                try {
                                    messageOutput.write(OutputRouter.getMessagesForOutput(newArrayList, canonicalName), OutputBufferProcessor.this.buildStreamConfigs(newArrayList, canonicalName), OutputBufferProcessor.this.server);
                                    countDownLatch.countDown();
                                } catch (Exception e) {
                                    OutputBufferProcessor.LOG.error("Error in output [" + messageOutput.getName() + "].", (Throwable) e);
                                    countDownLatch.countDown();
                                }
                            } catch (Throwable th) {
                                countDownLatch.countDown();
                                throw th;
                            }
                        }
                    });
                } catch (Exception e) {
                    LOG.error("Could not write message batch to output [" + messageOutput.getName() + "].", (Throwable) e);
                    countDownLatch.countDown();
                }
            }
            if (!countDownLatch.await(10L, TimeUnit.SECONDS)) {
                LOG.warn("Timeout reached. Not waiting any longer for writer threads to complete.");
            }
            int size = this.buffer.size();
            if (this.server.isStatsMode()) {
                this.server.getBenchmarkCounter().add(size);
            }
            this.server.getThroughputCounter().add(size);
            this.buffer.clear();
        }
        LOG.debug("Wrote message <{}> to all outputs. Finished handling.", message.getId());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public OutputStreamConfiguration buildStreamConfigs(List<Message> list, String str) {
        OutputStreamConfigurationImpl outputStreamConfigurationImpl = new OutputStreamConfigurationImpl();
        HashMap newHashMap = Maps.newHashMap();
        Iterator<Message> it = list.iterator();
        while (it.hasNext()) {
            for (Stream stream : it.next().getStreams()) {
                newHashMap.put(new ObjectId(stream.getId()), stream);
            }
        }
        for (Map.Entry entry : newHashMap.entrySet()) {
            outputStreamConfigurationImpl.add((ObjectId) entry.getKey(), ((StreamImpl) entry.getValue()).getOutputConfigurations(str));
        }
        return outputStreamConfigurationImpl;
    }
}
