package org.graylog2.shared.buffers.processors;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.net.InetAddresses;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.lmax.disruptor.EventHandler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.graylog2.plugin.GlobalMetricNames;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.ResolvableInetSocketAddress;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.buffers.MessageEvent;
import org.graylog2.plugin.inputs.codecs.Codec;
import org.graylog2.plugin.inputs.codecs.MultiMessageCodec;
import org.graylog2.plugin.journal.RawMessage;
import org.graylog2.shared.journal.Journal;
import org.graylog2.shared.utilities.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/shared/buffers/processors/DecodingProcessor.class */
public class DecodingProcessor implements EventHandler<MessageEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(DecodingProcessor.class);
    private final Timer decodeTime;
    private final Counter decodedTrafficCounter;
    private final Map<String, Codec.Factory<? extends Codec>> codecFactory;
    private final ServerStatus serverStatus;
    private final MetricRegistry metricRegistry;
    private final Journal journal;
    private final Timer parseTime;

    /* loaded from: input_file:org/graylog2/shared/buffers/processors/DecodingProcessor$Factory.class */
    public interface Factory {
        DecodingProcessor create(@Assisted("decodeTime") Timer timer, @Assisted("parseTime") Timer timer2);
    }

    @AssistedInject
    public DecodingProcessor(Map<String, Codec.Factory<? extends Codec>> map, ServerStatus serverStatus, MetricRegistry metricRegistry, Journal journal, @Assisted("decodeTime") Timer timer, @Assisted("parseTime") Timer timer2) {
        this.codecFactory = map;
        this.serverStatus = serverStatus;
        this.metricRegistry = metricRegistry;
        this.journal = journal;
        this.parseTime = timer2;
        this.decodeTime = timer;
        this.decodedTrafficCounter = metricRegistry.counter(GlobalMetricNames.DECODED_TRAFFIC);
    }

    public void onEvent(MessageEvent messageEvent, long j, boolean z) throws Exception {
        Timer.Context time = this.decodeTime.time();
        try {
            try {
                processMessage(messageEvent);
                if (messageEvent.getMessage() != null) {
                    messageEvent.getMessage().recordTiming(this.serverStatus, "decode", time.stop());
                } else if (messageEvent.getMessages() != null) {
                    Iterator<Message> it = messageEvent.getMessages().iterator();
                    while (it.hasNext()) {
                        it.next().recordTiming(this.serverStatus, "decode", time.stop());
                    }
                }
                messageEvent.clearRaw();
            } catch (Exception e) {
                RawMessage raw = messageEvent.getRaw();
                LOG.error("Error processing message " + raw, ExceptionUtils.getRootCause(e));
                this.journal.markJournalOffsetCommitted(raw.getJournalOffset());
                messageEvent.clearMessages();
                if (messageEvent.getMessage() != null) {
                    messageEvent.getMessage().recordTiming(this.serverStatus, "decode", time.stop());
                } else if (messageEvent.getMessages() != null) {
                    Iterator<Message> it2 = messageEvent.getMessages().iterator();
                    while (it2.hasNext()) {
                        it2.next().recordTiming(this.serverStatus, "decode", time.stop());
                    }
                }
                messageEvent.clearRaw();
            }
        } catch (Throwable th) {
            if (messageEvent.getMessage() != null) {
                messageEvent.getMessage().recordTiming(this.serverStatus, "decode", time.stop());
            } else if (messageEvent.getMessages() != null) {
                Iterator<Message> it3 = messageEvent.getMessages().iterator();
                while (it3.hasNext()) {
                    it3.next().recordTiming(this.serverStatus, "decode", time.stop());
                }
            }
            messageEvent.clearRaw();
            throw th;
        }
    }

    private void processMessage(MessageEvent messageEvent) throws ExecutionException {
        String str;
        long stop;
        RawMessage raw = messageEvent.getRaw();
        try {
            str = ((RawMessage.SourceNode) Iterables.getLast(raw.getSourceNodes())).inputId;
        } catch (NoSuchElementException e) {
            str = null;
        }
        Codec.Factory<? extends Codec> factory = this.codecFactory.get(raw.getCodecName());
        if (factory == null) {
            LOG.warn("Couldn't find factory for codec <{}>, skipping message {} on input <{}>.", new Object[]{raw.getCodecName(), raw, str});
            return;
        }
        Codec create = factory.create(raw.getCodecConfig());
        String name = MetricRegistry.name(create.getClass(), new String[]{str});
        Message message = null;
        Collection<Message> collection = null;
        Timer.Context time = this.parseTime.time();
        try {
            try {
                if (create instanceof MultiMessageCodec) {
                    collection = ((MultiMessageCodec) create).decodeMessages(raw);
                } else {
                    message = create.decode(raw);
                }
                if (message != null) {
                    messageEvent.setMessage(postProcessMessage(raw, create, str, name, message, stop));
                    return;
                }
                if (collection == null || collection.isEmpty()) {
                    return;
                }
                ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(collection.size());
                Iterator<Message> it = collection.iterator();
                while (it.hasNext()) {
                    Message postProcessMessage = postProcessMessage(raw, create, str, name, it.next(), stop);
                    if (postProcessMessage != null) {
                        newArrayListWithCapacity.add(postProcessMessage);
                    }
                }
                messageEvent.setMessages(newArrayListWithCapacity);
            } finally {
                time.stop();
            }
        } catch (RuntimeException e2) {
            LOG.error("Unable to decode raw message {} on input <{}>.", raw, str);
            this.metricRegistry.meter(MetricRegistry.name(name, new String[]{"failures"})).mark();
            throw e2;
        }
    }

    @Nullable
    private Message postProcessMessage(RawMessage rawMessage, Codec codec, String str, String str2, Message message, long j) {
        if (message == null) {
            this.metricRegistry.meter(MetricRegistry.name(str2, new String[]{"failures"})).mark();
            return null;
        }
        if (!message.isComplete()) {
            this.metricRegistry.meter(MetricRegistry.name(str2, new String[]{"incomplete"})).mark();
            if (!LOG.isDebugEnabled()) {
                return null;
            }
            LOG.debug("Dropping incomplete message {} on input <{}>. Parsed fields: [{}]", new Object[]{rawMessage, str, message.getFields()});
            return null;
        }
        message.setJournalOffset(rawMessage.getJournalOffset());
        message.recordTiming(this.serverStatus, "parse", j);
        this.metricRegistry.timer(MetricRegistry.name(str2, new String[]{"parseTime"})).update(j, TimeUnit.NANOSECONDS);
        for (RawMessage.SourceNode sourceNode : rawMessage.getSourceNodes()) {
            switch (sourceNode.type) {
                case SERVER:
                    if (message.getField(Message.FIELD_GL2_SOURCE_INPUT) != null) {
                        LOG.debug("Multiple server nodes ({} {}) set for message id {}", new Object[]{message.getField(Message.FIELD_GL2_SOURCE_INPUT), sourceNode.nodeId, message.getId()});
                    }
                    message.addField(Message.FIELD_GL2_SOURCE_INPUT, sourceNode.inputId);
                    message.addField(Message.FIELD_GL2_SOURCE_NODE, sourceNode.nodeId);
                    break;
                case RADIO:
                    if (message.getField(Message.FIELD_GL2_SOURCE_RADIO_INPUT) != null) {
                        LOG.debug("Multiple radio nodes ({} {}) set for message id {}", new Object[]{message.getField(Message.FIELD_GL2_SOURCE_RADIO_INPUT), sourceNode.nodeId, message.getId()});
                    }
                    message.addField(Message.FIELD_GL2_SOURCE_RADIO_INPUT, sourceNode.inputId);
                    message.addField(Message.FIELD_GL2_SOURCE_RADIO, sourceNode.nodeId);
                    break;
            }
        }
        if (str != null) {
            try {
                message.setSourceInputId(str);
            } catch (RuntimeException e) {
                LOG.warn("Unable to find input with id " + str + ", not setting input id in this message.", e);
            }
        }
        ResolvableInetSocketAddress remoteAddress = rawMessage.getRemoteAddress();
        if (remoteAddress != null) {
            String addrString = InetAddresses.toAddrString(remoteAddress.getAddress());
            message.addField(Message.FIELD_GL2_REMOTE_IP, addrString);
            if (remoteAddress.getPort() > 0) {
                message.addField(Message.FIELD_GL2_REMOTE_PORT, Integer.valueOf(remoteAddress.getPort()));
            }
            if (remoteAddress.isReverseLookedUp()) {
                message.addField(Message.FIELD_GL2_REMOTE_HOSTNAME, remoteAddress.getHostName());
            }
            if (Strings.isNullOrEmpty(message.getSource())) {
                message.setSource(addrString);
            }
        }
        if (codec.getConfiguration() != null && codec.getConfiguration().stringIsSet(Codec.Config.CK_OVERRIDE_SOURCE)) {
            message.setSource(codec.getConfiguration().getString(Codec.Config.CK_OVERRIDE_SOURCE));
        }
        if (Strings.isNullOrEmpty(message.getSource())) {
            message.setSource("unknown");
        }
        message.setReceiveTime(rawMessage.getTimestamp());
        this.metricRegistry.meter(MetricRegistry.name(str2, new String[]{"processedMessages"})).mark();
        this.decodedTrafficCounter.inc(message.getSize());
        return message;
    }
}
