/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.buffers.processors;

import com.codahale.metrics.Counter;
import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.lmax.disruptor.WorkHandler;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.graylog2.Configuration;
import org.graylog2.outputs.DefaultMessageOutput;
import org.graylog2.outputs.OutputRouter;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.buffers.MessageEvent;
import org.graylog2.plugin.outputs.MessageOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OutputBufferProcessor
implements WorkHandler<MessageEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(OutputBufferProcessor.class);
    private static final String INCOMING_MESSAGES_METRICNAME = MetricRegistry.name(OutputBufferProcessor.class, (String[])new String[]{"incomingMessages"});
    private static final String PROCESS_TIME_METRICNAME = MetricRegistry.name(OutputBufferProcessor.class, (String[])new String[]{"processTime"});
    private final ExecutorService executor;
    private final Configuration configuration;
    private final ServerStatus serverStatus;
    private final Meter incomingMessages;
    private final Counter outputThroughput;
    private final Timer processTime;
    private final OutputRouter outputRouter;
    private final MessageOutput defaultMessageOutput;

    @Inject
    public OutputBufferProcessor(Configuration configuration, MetricRegistry metricRegistry, ServerStatus serverStatus, OutputRouter outputRouter, @DefaultMessageOutput MessageOutput defaultMessageOutput) {
        this.configuration = configuration;
        this.serverStatus = serverStatus;
        this.outputRouter = outputRouter;
        this.defaultMessageOutput = defaultMessageOutput;
        String nameFormat = "outputbuffer-processor-executor-%d";
        int corePoolSize = configuration.getOutputBufferProcessorThreadsCorePoolSize();
        this.executor = this.executorService(metricRegistry, "outputbuffer-processor-executor-%d", corePoolSize);
        this.incomingMessages = metricRegistry.meter(INCOMING_MESSAGES_METRICNAME);
        this.outputThroughput = metricRegistry.counter("org.graylog2.throughput.output");
        this.processTime = metricRegistry.timer(PROCESS_TIME_METRICNAME);
    }

    private ExecutorService executorService(MetricRegistry metricRegistry, String nameFormat, int corePoolSize) {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(nameFormat).build();
        return new InstrumentedExecutorService(Executors.newFixedThreadPool(corePoolSize, threadFactory), metricRegistry, MetricRegistry.name(this.getClass(), (String[])new String[]{"executor-service"}));
    }

    public void onEvent(MessageEvent event) throws Exception {
        this.incomingMessages.mark();
        Message msg = event.getMessage();
        if (msg == null) {
            LOG.debug("Skipping null message.");
            return;
        }
        LOG.trace("Processing message <{}> from OutputBuffer.", (Object)msg.getId());
        Set<MessageOutput> messageOutputs = this.outputRouter.getStreamOutputsForMessage(msg);
        msg.recordCounter(this.serverStatus, "matched-outputs", messageOutputs.size());
        Future<?> defaultOutputCompletion = this.processMessage(msg, this.defaultMessageOutput);
        CountDownLatch streamOutputsDoneSignal = new CountDownLatch(messageOutputs.size());
        for (MessageOutput output : messageOutputs) {
            this.processMessage(msg, output, streamOutputsDoneSignal);
        }
        if (!streamOutputsDoneSignal.await(this.configuration.getOutputModuleTimeout(), TimeUnit.MILLISECONDS)) {
            LOG.warn("Timeout reached. Not waiting any longer for stream output writer threads to complete.");
        }
        if (defaultOutputCompletion != null) {
            Uninterruptibles.getUninterruptibly(defaultOutputCompletion);
        } else {
            LOG.error("The default output future was null, this is a bug!");
        }
        if (msg.hasRecordings()) {
            LOG.debug("Message event trace: {}", (Object)msg.recordingsAsString());
        }
        this.outputThroughput.inc();
        LOG.debug("Wrote message <{}> to all outputs. Finished handling.", (Object)msg.getId());
        event.clearMessages();
    }

    private Future<?> processMessage(Message msg, MessageOutput defaultMessageOutput) {
        return this.processMessage(msg, defaultMessageOutput, new CountDownLatch(0));
    }

    private Future<?> processMessage(final Message msg, final MessageOutput output, final CountDownLatch doneSignal) {
        if (output == null) {
            LOG.error("Output was null!");
            doneSignal.countDown();
            return Futures.immediateCancelledFuture();
        }
        if (!output.isRunning()) {
            LOG.debug("Skipping stopped output {}", (Object)output.getClass().getName());
            doneSignal.countDown();
            return Futures.immediateCancelledFuture();
        }
        Future<?> future = null;
        try {
            LOG.debug("Writing message to [{}].", output.getClass());
            if (LOG.isTraceEnabled()) {
                LOG.trace("Message id for [{}]: <{}>", output.getClass(), (Object)msg.getId());
            }
            future = this.executor.submit(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try (Timer.Context ignored = OutputBufferProcessor.this.processTime.time();){
                        output.write(msg);
                    }
                    catch (Exception e) {
                        LOG.error("Error in output [" + output.getClass() + "].", (Throwable)e);
                    }
                    finally {
                        doneSignal.countDown();
                    }
                }
            });
        }
        catch (Exception e) {
            LOG.error("Could not write message batch to output [" + output.getClass() + "].", (Throwable)e);
            doneSignal.countDown();
        }
        return future;
    }
}

