/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.shared.buffers.processors;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.inject.Provider;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.lmax.disruptor.WorkHandler;
import java.util.Collection;
import javax.annotation.Nonnull;
import org.graylog2.buffers.OutputBuffer;
import org.graylog2.messageprocessors.OrderedMessageProcessors;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.Messages;
import org.graylog2.plugin.buffers.MessageEvent;
import org.graylog2.plugin.messageprocessors.MessageProcessor;
import org.graylog2.plugin.streams.DefaultStream;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.shared.buffers.processors.DecodingProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessBufferProcessor
implements WorkHandler<MessageEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessBufferProcessor.class);
    private final Meter incomingMessages;
    private final Timer processTime;
    private final Meter outgoingMessages;
    protected final MetricRegistry metricRegistry;
    private final OrderedMessageProcessors orderedMessageProcessors;
    private final OutputBuffer outputBuffer;
    private final DecodingProcessor decodingProcessor;
    private final Provider<Stream> defaultStreamProvider;

    @AssistedInject
    public ProcessBufferProcessor(MetricRegistry metricRegistry, OrderedMessageProcessors orderedMessageProcessors, OutputBuffer outputBuffer, @Assisted DecodingProcessor decodingProcessor, @DefaultStream Provider<Stream> defaultStreamProvider) {
        this.metricRegistry = metricRegistry;
        this.orderedMessageProcessors = orderedMessageProcessors;
        this.outputBuffer = outputBuffer;
        this.decodingProcessor = decodingProcessor;
        this.defaultStreamProvider = defaultStreamProvider;
        this.incomingMessages = metricRegistry.meter(MetricRegistry.name(ProcessBufferProcessor.class, (String[])new String[]{"incomingMessages"}));
        this.outgoingMessages = metricRegistry.meter(MetricRegistry.name(ProcessBufferProcessor.class, (String[])new String[]{"outgoingMessages"}));
        this.processTime = metricRegistry.timer(MetricRegistry.name(ProcessBufferProcessor.class, (String[])new String[]{"processTime"}));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEvent(MessageEvent event) throws Exception {
        try {
            this.decodingProcessor.onEvent(event, 0L, false);
            if (event.isSingleMessage()) {
                this.dispatchMessage(event.getMessage());
            } else {
                Collection<Message> messageList = event.getMessages();
                if (messageList == null) {
                    return;
                }
                for (Message message : messageList) {
                    this.dispatchMessage(message);
                }
            }
        }
        finally {
            event.clearMessages();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void dispatchMessage(Message msg) {
        this.incomingMessages.mark();
        LOG.debug("Starting to process message <{}>.", (Object)msg.getId());
        try (Timer.Context ignored = this.processTime.time();){
            this.handleMessage(msg);
            LOG.debug("Finished processing message <{}>. Writing to output buffer.", (Object)msg.getId());
        }
        catch (Exception e) {
            LOG.warn("Unable to process message <{}>: {}", (Object)msg.getId(), (Object)e);
        }
        finally {
            this.outgoingMessages.mark();
        }
    }

    private void handleMessage(@Nonnull Message msg) {
        msg.addStream((Stream)this.defaultStreamProvider.get());
        Messages messages = msg;
        for (MessageProcessor messageProcessor : this.orderedMessageProcessors) {
            messages = messageProcessor.process(messages);
        }
        for (Message message : messages) {
            this.outputBuffer.insertBlocking(message);
        }
    }

    public static interface Factory {
        public ProcessBufferProcessor create(DecodingProcessor var1);
    }
}

