package org.graylog2.shared.buffers;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import com.lmax.disruptor.EventHandler;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
import org.graylog2.shared.journal.Journal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/shared/buffers/JournallingMessageHandler.class */
public class JournallingMessageHandler implements EventHandler<RawMessageEvent> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) JournallingMessageHandler.class);
    private static final Set<Journal.Entry> NULL_SINGLETON = Collections.singleton(null);
    private static final Retryer<Void> JOURNAL_WRITE_RETRYER = RetryerBuilder.newBuilder().retryIfException(new Predicate<Throwable>() { // from class: org.graylog2.shared.buffers.JournallingMessageHandler.1
        @Override // com.google.common.base.Predicate
        public boolean apply(@Nullable Throwable th) {
            JournallingMessageHandler.log.error("Unable to write to journal - retrying with exponential back-off", th);
            return true;
        }
    }).withWaitStrategy(WaitStrategies.exponentialWait(250, 1, TimeUnit.MINUTES)).withStopStrategy(StopStrategies.neverStop()).build();
    private final List<RawMessageEvent> batch = Lists.newArrayList();
    private final Counter byteCounter;
    private final Journal journal;
    private final Semaphore journalFilled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/graylog2/shared/buffers/JournallingMessageHandler$Converter.class */
    public class Converter implements Function<RawMessageEvent, Journal.Entry> {
        private long bytesWritten;

        private Converter() {
            this.bytesWritten = 0L;
        }

        public long getBytesWritten() {
            return this.bytesWritten;
        }

        @Override // com.google.common.base.Function
        @Nullable
        public Journal.Entry apply(RawMessageEvent rawMessageEvent) {
            try {
                if (JournallingMessageHandler.log.isTraceEnabled()) {
                    JournallingMessageHandler.log.trace("Journalling message {}", rawMessageEvent.getMessageId());
                }
                byte[] messageIdBytes = rawMessageEvent.getMessageIdBytes();
                byte[] encodedRawMessage = rawMessageEvent.getEncodedRawMessage();
                int length = encodedRawMessage.length;
                this.bytesWritten += length;
                JournallingMessageHandler.this.byteCounter.inc(length);
                rawMessageEvent.setMessageIdBytes(null);
                rawMessageEvent.setEncodedRawMessage(null);
                return JournallingMessageHandler.this.journal.createEntry(messageIdBytes, encodedRawMessage);
            } catch (Exception e) {
                JournallingMessageHandler.log.error("Unable to convert RawMessageEvent to Journal.Entry - skipping event", (Throwable) e);
                return null;
            }
        }
    }

    @Inject
    public JournallingMessageHandler(MetricRegistry metricRegistry, Journal journal, @Named("JournalSignal") Semaphore semaphore) {
        this.journal = journal;
        this.journalFilled = semaphore;
        this.byteCounter = metricRegistry.counter(MetricRegistry.name((Class<?>) JournallingMessageHandler.class, "written_bytes"));
    }

    @Override // com.lmax.disruptor.EventHandler
    public void onEvent(RawMessageEvent rawMessageEvent, long j, boolean z) throws Exception {
        this.batch.add(rawMessageEvent);
        if (z) {
            log.debug("End of batch, journalling {} messages", Integer.valueOf(this.batch.size()));
            final Converter converter = new Converter();
            final ArrayList newArrayList = Lists.newArrayList(Lists.transform(this.batch, converter));
            newArrayList.removeAll(NULL_SINGLETON);
            this.batch.clear();
            try {
                writeToJournal(converter, newArrayList);
            } catch (Exception e) {
                log.error("Unable to write to journal - retrying", (Throwable) e);
                JOURNAL_WRITE_RETRYER.call(new Callable<Void>() { // from class: org.graylog2.shared.buffers.JournallingMessageHandler.2
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        JournallingMessageHandler.this.writeToJournal(converter, newArrayList);
                        return null;
                    }
                });
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeToJournal(Converter converter, List<Journal.Entry> list) {
        log.debug("Processed batch, wrote {} bytes, last journal offset: {}, signalling reader.", Long.valueOf(converter.getBytesWritten()), Long.valueOf(this.journal.write(list)));
        this.journalFilled.release();
    }
}
