/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.shared.buffers.processors;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Strings;
import com.google.inject.Provider;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.lmax.disruptor.WorkHandler;
import de.huxhorn.sulky.ulid.ULID;
import java.util.Collection;
import java.util.Locale;
import java.util.Optional;
import javax.annotation.Nonnull;
import org.graylog.failure.FailureSubmissionService;
import org.graylog2.buffers.OutputBuffer;
import org.graylog2.messageprocessors.OrderedMessageProcessors;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.Messages;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.buffers.MessageEvent;
import org.graylog2.plugin.messageprocessors.MessageProcessor;
import org.graylog2.plugin.streams.DefaultStream;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.shared.buffers.processors.DecodingProcessor;
import org.graylog2.shared.buffers.processors.MessageULIDGenerator;
import org.graylog2.system.processing.ProcessingStatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessBufferProcessor
implements WorkHandler<MessageEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessBufferProcessor.class);
    private final Meter incomingMessages;
    private final Timer processTime;
    private final Meter outgoingMessages;
    private final OrderedMessageProcessors orderedMessageProcessors;
    private final OutputBuffer outputBuffer;
    private final ProcessingStatusRecorder processingStatusRecorder;
    private final ULID ulid;
    private final MessageULIDGenerator messageULIDGenerator;
    private final DecodingProcessor decodingProcessor;
    private final Provider<Stream> defaultStreamProvider;
    private final FailureSubmissionService failureSubmissionService;
    private volatile Message currentMessage;

    @AssistedInject
    public ProcessBufferProcessor(MetricRegistry metricRegistry, OrderedMessageProcessors orderedMessageProcessors, OutputBuffer outputBuffer, ProcessingStatusRecorder processingStatusRecorder, ULID ulid, MessageULIDGenerator messageULIDGenerator, @Assisted DecodingProcessor decodingProcessor, @DefaultStream Provider<Stream> defaultStreamProvider, FailureSubmissionService failureSubmissionService) {
        this.orderedMessageProcessors = orderedMessageProcessors;
        this.outputBuffer = outputBuffer;
        this.processingStatusRecorder = processingStatusRecorder;
        this.ulid = ulid;
        this.messageULIDGenerator = messageULIDGenerator;
        this.decodingProcessor = decodingProcessor;
        this.defaultStreamProvider = defaultStreamProvider;
        this.failureSubmissionService = failureSubmissionService;
        this.incomingMessages = metricRegistry.meter(MetricRegistry.name(ProcessBufferProcessor.class, (String[])new String[]{"incomingMessages"}));
        this.outgoingMessages = metricRegistry.meter(MetricRegistry.name(ProcessBufferProcessor.class, (String[])new String[]{"outgoingMessages"}));
        this.processTime = metricRegistry.timer(MetricRegistry.name(ProcessBufferProcessor.class, (String[])new String[]{"processTime"}));
        this.currentMessage = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEvent(MessageEvent event) throws Exception {
        try {
            this.decodingProcessor.onEvent(event, 0L, false);
            if (event.isSingleMessage()) {
                this.dispatchMessage(event.getMessage());
            } else {
                Collection<Message> messageList = event.getMessages();
                if (messageList == null) {
                    return;
                }
                for (Message message : messageList) {
                    this.dispatchMessage(message);
                }
            }
        }
        finally {
            event.clearMessages();
        }
    }

    public Optional<Message> getCurrentMessage() {
        return Optional.ofNullable(this.currentMessage);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void dispatchMessage(Message msg) {
        this.currentMessage = msg;
        this.incomingMessages.mark();
        if (LOG.isTraceEnabled()) {
            LOG.trace("Starting to process message <{}>.", (Object)msg.getId());
        }
        try (Timer.Context ignored = this.processTime.time();){
            this.handleMessage(msg);
            if (LOG.isTraceEnabled()) {
                LOG.trace("Finished processing message <{}>. Writing to output buffer.", (Object)msg.getId());
            }
        }
        catch (Exception e) {
            if (LOG.isDebugEnabled()) {
                LOG.warn("Unable to process message <{}>:", (Object)msg.getId(), (Object)e);
                LOG.debug("Failed message <{}>: {}", (Object)msg.getId(), (Object)msg.toDumpString());
            } else {
                LOG.warn("Unable to process message <{}>: {}", (Object)msg.getId(), (Object)e);
            }
            this.failureSubmissionService.submitUnknownProcessingError(msg, String.format(Locale.ENGLISH, "Unable to process message <%s>: %s", msg.getId(), e));
        }
        finally {
            this.currentMessage = null;
            this.outgoingMessages.mark();
        }
    }

    private void handleMessage(@Nonnull Message msg) {
        msg.addStream((Stream)this.defaultStreamProvider.get());
        Messages messages = msg;
        for (MessageProcessor messageProcessor : this.orderedMessageProcessors) {
            messages = messageProcessor.process(messages);
        }
        for (Message message : messages) {
            message.ensureValidTimestamp();
            if (!message.hasField("gl2_message_id") || Strings.isNullOrEmpty((String)message.getFieldAs(String.class, "gl2_message_id"))) {
                message.addField("gl2_message_id", this.messageULIDGenerator.createULID(message));
            }
            message.setProcessingTime(Tools.nowUTC());
            this.processingStatusRecorder.updatePostProcessingReceiveTime(message.getReceiveTime());
            if (!this.failureSubmissionService.submitProcessingErrors(message)) continue;
            this.outputBuffer.insertBlocking(message);
        }
    }

    public static interface Factory {
        public ProcessBufferProcessor create(DecodingProcessor var1);
    }
}

