package org.graylog2.shared.buffers;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.lmax.disruptor.EventHandler;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.inject.Inject;
import org.graylog2.shared.messageq.MessageQueueWriter;
import org.graylog2.system.processing.ProcessingStatusRecorder;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/shared/buffers/JournallingMessageHandler.class */
public class JournallingMessageHandler implements EventHandler<RawMessageEvent> {
    private static final Logger log = LoggerFactory.getLogger(JournallingMessageHandler.class);
    private final List<RawMessageEvent> batch = Lists.newArrayList();
    private final Counter byteCounter;
    private final MessageQueueWriter messageQueueWriter;
    private final ProcessingStatusRecorder processingStatusRecorder;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/graylog2/shared/buffers/JournallingMessageHandler$Filter.class */
    public class Filter implements Function<RawMessageEvent, RawMessageEvent> {
        private long bytesWritten;
        private DateTime latestReceiveTime;

        private Filter() {
            this.bytesWritten = 0L;
            this.latestReceiveTime = new DateTime(0L, DateTimeZone.UTC);
        }

        public long getBytesWritten() {
            return this.bytesWritten;
        }

        public DateTime getLatestReceiveTime() {
            return this.latestReceiveTime;
        }

        @Nullable
        public RawMessageEvent apply(RawMessageEvent rawMessageEvent) {
            if (JournallingMessageHandler.log.isTraceEnabled()) {
                JournallingMessageHandler.log.trace("Journalling message {}", rawMessageEvent.getMessageId());
            }
            if (rawMessageEvent.getEncodedRawMessage() == null) {
                JournallingMessageHandler.log.error("Skipping RawMessageEvent with null encodedRawMessage");
                return null;
            }
            int length = rawMessageEvent.getEncodedRawMessage().length;
            this.bytesWritten += length;
            JournallingMessageHandler.this.byteCounter.inc(length);
            DateTime messageTimestamp = rawMessageEvent.getMessageTimestamp();
            if (messageTimestamp != null) {
                this.latestReceiveTime = this.latestReceiveTime.isBefore(messageTimestamp) ? messageTimestamp : this.latestReceiveTime;
            }
            return rawMessageEvent;
        }
    }

    @Inject
    public JournallingMessageHandler(MetricRegistry metricRegistry, MessageQueueWriter messageQueueWriter, ProcessingStatusRecorder processingStatusRecorder) {
        this.messageQueueWriter = messageQueueWriter;
        this.processingStatusRecorder = processingStatusRecorder;
        this.byteCounter = metricRegistry.counter(MetricRegistry.name(JournallingMessageHandler.class, new String[]{"written_bytes"}));
    }

    public void onEvent(RawMessageEvent rawMessageEvent, long j, boolean z) throws Exception {
        this.batch.add(rawMessageEvent);
        if (z) {
            log.debug("End of batch, journaling {} messages", Integer.valueOf(this.batch.size()));
            Filter filter = new Filter();
            List<RawMessageEvent> list = (List) this.batch.stream().map(filter).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
            this.processingStatusRecorder.updateIngestReceiveTime(filter.getLatestReceiveTime());
            this.batch.clear();
            this.messageQueueWriter.write(list);
        }
    }
}
