package org.graylog2.shared.buffers.processors;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Strings;
import com.google.inject.Provider;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.lmax.disruptor.WorkHandler;
import de.huxhorn.sulky.ulid.ULID;
import java.util.Collection;
import java.util.Iterator;
import java.util.Optional;
import javax.annotation.Nonnull;
import org.graylog2.buffers.OutputBuffer;
import org.graylog2.messageprocessors.OrderedMessageProcessors;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.buffers.MessageEvent;
import org.graylog2.plugin.messageprocessors.MessageProcessor;
import org.graylog2.plugin.streams.DefaultStream;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.system.processing.ProcessingStatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/shared/buffers/processors/ProcessBufferProcessor.class */
public class ProcessBufferProcessor implements WorkHandler<MessageEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessBufferProcessor.class);
    private final Meter incomingMessages;
    private final Timer processTime;
    private final Meter outgoingMessages;
    private final OrderedMessageProcessors orderedMessageProcessors;
    private final OutputBuffer outputBuffer;
    private final ProcessingStatusRecorder processingStatusRecorder;
    private final ULID ulid;
    private final DecodingProcessor decodingProcessor;
    private final Provider<Stream> defaultStreamProvider;
    private volatile Message currentMessage = null;

    /* loaded from: input_file:org/graylog2/shared/buffers/processors/ProcessBufferProcessor$Factory.class */
    public interface Factory {
        ProcessBufferProcessor create(DecodingProcessor decodingProcessor);
    }

    @AssistedInject
    public ProcessBufferProcessor(MetricRegistry metricRegistry, OrderedMessageProcessors orderedMessageProcessors, OutputBuffer outputBuffer, ProcessingStatusRecorder processingStatusRecorder, ULID ulid, @Assisted DecodingProcessor decodingProcessor, @DefaultStream Provider<Stream> provider) {
        this.orderedMessageProcessors = orderedMessageProcessors;
        this.outputBuffer = outputBuffer;
        this.processingStatusRecorder = processingStatusRecorder;
        this.ulid = ulid;
        this.decodingProcessor = decodingProcessor;
        this.defaultStreamProvider = provider;
        this.incomingMessages = metricRegistry.meter(MetricRegistry.name(ProcessBufferProcessor.class, new String[]{"incomingMessages"}));
        this.outgoingMessages = metricRegistry.meter(MetricRegistry.name(ProcessBufferProcessor.class, new String[]{"outgoingMessages"}));
        this.processTime = metricRegistry.timer(MetricRegistry.name(ProcessBufferProcessor.class, new String[]{"processTime"}));
    }

    public void onEvent(MessageEvent messageEvent) throws Exception {
        try {
            this.decodingProcessor.onEvent(messageEvent, 0L, false);
            if (messageEvent.isSingleMessage()) {
                dispatchMessage(messageEvent.getMessage());
            } else {
                Collection<Message> messages = messageEvent.getMessages();
                if (messages == null) {
                    return;
                }
                Iterator<Message> it = messages.iterator();
                while (it.hasNext()) {
                    dispatchMessage(it.next());
                }
            }
            messageEvent.clearMessages();
        } finally {
            messageEvent.clearMessages();
        }
    }

    public Optional<Message> getCurrentMessage() {
        return Optional.ofNullable(this.currentMessage);
    }

    private void dispatchMessage(Message message) {
        this.currentMessage = message;
        this.incomingMessages.mark();
        if (LOG.isTraceEnabled()) {
            LOG.trace("Starting to process message <{}>.", message.getId());
        }
        try {
            try {
                Timer.Context time = this.processTime.time();
                Throwable th = null;
                try {
                    try {
                        handleMessage(message);
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("Finished processing message <{}>. Writing to output buffer.", message.getId());
                        }
                        if (time != null) {
                            if (0 != 0) {
                                try {
                                    time.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                time.close();
                            }
                        }
                        this.currentMessage = null;
                        this.outgoingMessages.mark();
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (time != null) {
                        if (th != null) {
                            try {
                                time.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            time.close();
                        }
                    }
                    throw th4;
                }
            } catch (Exception e) {
                if (LOG.isDebugEnabled()) {
                    LOG.warn("Unable to process message <{}>:", message.getId(), e);
                    LOG.debug("Failed message <{}>: {}", message.getId(), message.toDumpString());
                } else {
                    LOG.warn("Unable to process message <{}>: {}", message.getId(), e);
                }
                this.currentMessage = null;
                this.outgoingMessages.mark();
            }
        } catch (Throwable th6) {
            this.currentMessage = null;
            this.outgoingMessages.mark();
            throw th6;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v30, types: [org.graylog2.plugin.Messages] */
    private void handleMessage(@Nonnull Message message) {
        message.addStream((Stream) this.defaultStreamProvider.get());
        Message message2 = message;
        Iterator<MessageProcessor> it = this.orderedMessageProcessors.iterator();
        while (it.hasNext()) {
            message2 = it.next().process(message2);
        }
        for (Message message3 : message2) {
            if (!message3.hasField(Message.FIELD_GL2_MESSAGE_ID) || Strings.isNullOrEmpty((String) message3.getFieldAs(String.class, Message.FIELD_GL2_MESSAGE_ID))) {
                message3.addField(Message.FIELD_GL2_MESSAGE_ID, this.ulid.nextULID());
            }
            message3.setProcessingTime(Tools.nowUTC());
            this.processingStatusRecorder.updatePostProcessingReceiveTime(message3.getReceiveTime());
            this.outputBuffer.insertBlocking(message3);
        }
    }
}
