package org.graylog2.shared.messageq.localkafka;

import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.util.concurrent.AbstractIdleService;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.graylog2.shared.buffers.RawMessageEvent;
import org.graylog2.shared.journal.Journal;
import org.graylog2.shared.journal.LocalKafkaJournal;
import org.graylog2.shared.messageq.MessageQueueException;
import org.graylog2.shared.messageq.MessageQueueWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:org/graylog2/shared/messageq/localkafka/LocalKafkaMessageQueueWriter.class */
public class LocalKafkaMessageQueueWriter extends AbstractIdleService implements MessageQueueWriter {
    private static final Logger LOG = LoggerFactory.getLogger(LocalKafkaMessageQueueWriter.class);
    private LocalKafkaJournal kafkaJournal;
    private Semaphore journalFilled;
    private final MessageQueueWriter.Metrics metrics;
    private final Retryer<Void> writeRetryer;

    @Inject
    public LocalKafkaMessageQueueWriter(LocalKafkaJournal localKafkaJournal, @Named("JournalSignal") Semaphore semaphore, MessageQueueWriter.Metrics metrics) {
        this.kafkaJournal = localKafkaJournal;
        this.journalFilled = semaphore;
        this.metrics = metrics;
        this.writeRetryer = RetryerBuilder.newBuilder().retryIfException(th -> {
            LOG.error("Unable to write to journal - retrying with exponential back-off", th);
            metrics.failedWriteAttempts().mark();
            return true;
        }).withWaitStrategy(WaitStrategies.exponentialWait(250L, 1L, TimeUnit.MINUTES)).withStopStrategy(StopStrategies.neverStop()).build();
    }

    @Override // org.graylog2.shared.messageq.MessageQueueWriter
    public void write(List<RawMessageEvent> list) throws MessageQueueException {
        AtomicLong atomicLong = new AtomicLong(0L);
        List<Journal.Entry> list2 = (List) list.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).map(rawMessageEvent -> {
            return new Journal.Entry(rawMessageEvent.getMessageIdBytes(), rawMessageEvent.getEncodedRawMessage());
        }).peek(entry -> {
            atomicLong.addAndGet(entry.getMessageBytes().length);
        }).collect(Collectors.toList());
        try {
            writeToJournal(list2);
        } catch (Exception e) {
            LOG.error("Unable to write to journal - retrying", e);
            try {
                this.writeRetryer.call(() -> {
                    writeToJournal(list2);
                    return null;
                });
            } catch (ExecutionException | RetryException e2) {
                throw new MessageQueueException("Retryer exception", e2);
            }
        }
        this.metrics.writtenMessages().mark(list2.size());
        this.metrics.writtenBytes().mark(atomicLong.get());
    }

    private void writeToJournal(List<Journal.Entry> list) {
        LOG.debug("Processed batch, last journal offset: {}, signalling reader.", Long.valueOf(this.kafkaJournal.write(list)));
        this.journalFilled.release();
    }

    protected void startUp() throws Exception {
    }

    protected void shutDown() throws Exception {
    }
}
