package org.graylog2.shared.messageq.localkafka;

import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.graylog2.plugin.Message;
import org.graylog2.shared.journal.LocalKafkaJournal;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:org/graylog2/shared/messageq/localkafka/LocalKafkaMessageQueueAcknowledger.class */
public class LocalKafkaMessageQueueAcknowledger implements MessageQueueAcknowledger {
    private static final Logger LOG = LoggerFactory.getLogger(LocalKafkaMessageQueueAcknowledger.class);
    private LocalKafkaJournal kafkaJournal;
    private final MessageQueueAcknowledger.Metrics metrics;

    @Inject
    public LocalKafkaMessageQueueAcknowledger(LocalKafkaJournal localKafkaJournal, MessageQueueAcknowledger.Metrics metrics) {
        this.kafkaJournal = localKafkaJournal;
        this.metrics = metrics;
    }

    @Override // org.graylog2.shared.messageq.MessageQueueAcknowledger
    public void acknowledge(Object obj) {
        doAcknowledge(obj);
        this.metrics.acknowledgedMessages().mark();
    }

    @Override // org.graylog2.shared.messageq.MessageQueueAcknowledger
    public void acknowledge(Message message) {
        doAcknowledge(message.getMessageQueueId());
        this.metrics.acknowledgedMessages().mark();
    }

    @Override // org.graylog2.shared.messageq.MessageQueueAcknowledger
    public void acknowledge(List<Message> list) {
        Stream<R> map = list.stream().map((v0) -> {
            return v0.getMessageQueueId();
        });
        Class<Long> cls = Long.class;
        Objects.requireNonNull(Long.class);
        Stream filter = map.filter(cls::isInstance);
        Class<Long> cls2 = Long.class;
        Objects.requireNonNull(Long.class);
        filter.map(cls2::cast).max((v0, v1) -> {
            return Long.compare(v0, v1);
        }).ifPresent((v1) -> {
            doAcknowledge(v1);
        });
        this.metrics.acknowledgedMessages().mark(list.size());
    }

    private void doAcknowledge(Object obj) {
        if (!(obj instanceof Long)) {
            LOG.error("Couldn't acknowledge message. Expected <" + obj + "> to be of type Long");
        } else {
            this.kafkaJournal.markJournalOffsetCommitted(((Long) obj).longValue());
        }
    }
}
