package org.springframework.modulith.events.core;

import java.time.Clock;
import java.time.Duration;
import java.time.temporal.TemporalAmount;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.modulith.events.CompletedEventPublications;
import org.springframework.modulith.events.EventPublication;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/modulith/events/core/DefaultEventPublicationRegistry.class */
public class DefaultEventPublicationRegistry implements DisposableBean, EventPublicationRegistry, CompletedEventPublications {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventPublicationRegistry.class);
    private static final String REGISTER = "Registering publication of {} for {}.";
    private final EventPublicationRepository events;
    private final Clock clock;

    public DefaultEventPublicationRegistry(EventPublicationRepository eventPublicationRepository, Clock clock) {
        Assert.notNull(eventPublicationRepository, "EventPublicationRepository must not be null!");
        Assert.notNull(clock, "Clock must not be null!");
        this.events = eventPublicationRepository;
        this.clock = clock;
    }

    @Override // org.springframework.modulith.events.core.EventPublicationRegistry
    public Collection<TargetEventPublication> store(Object obj, Stream<PublicationTargetIdentifier> stream) {
        Stream peek = stream.map(publicationTargetIdentifier -> {
            return TargetEventPublication.of(obj, publicationTargetIdentifier, this.clock.instant());
        }).peek(targetEventPublication -> {
            LOGGER.debug(REGISTER, targetEventPublication.getEvent().getClass().getName(), targetEventPublication.getTargetIdentifier().getValue());
        });
        EventPublicationRepository eventPublicationRepository = this.events;
        Objects.requireNonNull(eventPublicationRepository);
        return peek.map(eventPublicationRepository::create).toList();
    }

    @Override // org.springframework.modulith.events.core.EventPublicationRegistry
    public Collection<TargetEventPublication> findIncompletePublications() {
        return this.events.findIncompletePublications();
    }

    @Override // org.springframework.modulith.events.core.EventPublicationRegistry
    public Collection<TargetEventPublication> findIncompletePublicationsOlderThan(Duration duration) {
        return this.events.findIncompletePublicationsPublishedBefore(this.clock.instant().minus((TemporalAmount) duration));
    }

    @Override // org.springframework.modulith.events.core.EventPublicationRegistry
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void markCompleted(Object obj, PublicationTargetIdentifier publicationTargetIdentifier) {
        Assert.notNull(obj, "Domain event must not be null!");
        Assert.notNull(publicationTargetIdentifier, "Listener identifier must not be null!");
        LOGGER.debug("Marking publication of event {} to listener {} completed.", obj.getClass().getName(), publicationTargetIdentifier.getValue());
        this.events.markCompleted(obj, publicationTargetIdentifier, this.clock.instant());
    }

    @Override // org.springframework.modulith.events.core.EventPublicationRegistry
    public void deleteCompletedPublicationsOlderThan(Duration duration) {
        Assert.notNull(duration, "Duration must not be null!");
        this.events.deleteCompletedPublicationsBefore(this.clock.instant().minus((TemporalAmount) duration));
    }

    public Collection<? extends TargetEventPublication> findAll() {
        return this.events.findCompletedPublications();
    }

    public void deletePublications(Predicate<EventPublication> predicate) {
        this.events.deletePublications(findAll().stream().filter(predicate).map((v0) -> {
            return v0.getIdentifier();
        }).toList());
    }

    public void deletePublicationsOlderThan(Duration duration) {
        this.events.deleteCompletedPublicationsBefore(this.clock.instant().minus((TemporalAmount) duration));
    }

    public void destroy() {
        List<TargetEventPublication> findIncompletePublications = this.events.findIncompletePublications();
        if (findIncompletePublications.isEmpty()) {
            LOGGER.info("No publications outstanding!");
            return;
        }
        LOGGER.info("Shutting down with the following publications left unfinished:");
        for (int i = 0; i < findIncompletePublications.size(); i++) {
            String str = i + 1 == findIncompletePublications.size() ? "└─" : "├─";
            TargetEventPublication targetEventPublication = findIncompletePublications.get(i);
            LOGGER.info("{} {} - {}", new Object[]{str, targetEventPublication.getEvent().getClass().getName(), targetEventPublication.getTargetIdentifier().getValue()});
        }
    }
}
