package org.graylog2.system.processing;

import com.github.joschi.jadconfig.util.Duration;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractIdleService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.graylog2.plugin.lifecycles.Lifecycle;
import org.graylog2.system.processing.ProcessingStatusDto;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:org/graylog2/system/processing/MongoDBProcessingStatusRecorderService.class */
public class MongoDBProcessingStatusRecorderService extends AbstractIdleService implements ProcessingStatusRecorder {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBProcessingStatusRecorderService.class);
    private static final DateTime DEFAULT_RECEIVE_TIME = new DateTime(0, DateTimeZone.UTC);
    private final DBProcessingStatusService dbService;
    private final EventBus eventBus;
    private final Duration persistInterval;
    private final ScheduledExecutorService scheduler;
    private ScheduledFuture<?> future;
    private final AtomicReference<DateTime> ingestReceiveTime = new AtomicReference<>(DEFAULT_RECEIVE_TIME);
    private final AtomicReference<DateTime> postProcessingReceiveTime = new AtomicReference<>(DEFAULT_RECEIVE_TIME);
    private final AtomicReference<DateTime> postIndexReceiveTime = new AtomicReference<>(DEFAULT_RECEIVE_TIME);
    private final AtomicBoolean inShutdown = new AtomicBoolean(false);

    @Inject
    public MongoDBProcessingStatusRecorderService(DBProcessingStatusService dBProcessingStatusService, EventBus eventBus, @Named("processing_status_persist_interval") Duration duration, @Named("daemonScheduler") ScheduledExecutorService scheduledExecutorService) {
        this.dbService = dBProcessingStatusService;
        this.eventBus = eventBus;
        this.persistInterval = duration;
        this.scheduler = scheduledExecutorService;
    }

    @Subscribe
    public void handleServerShutdown(Lifecycle lifecycle) {
        if (lifecycle == Lifecycle.HALTING) {
            this.inShutdown.set(true);
        }
    }

    protected void startUp() {
        this.eventBus.register(this);
        LOG.debug("Starting processing status recorder service");
        try {
            this.dbService.get().ifPresent(processingStatusDto -> {
                LOG.debug("Loaded persisted processing status: {}", processingStatusDto);
                ProcessingStatusDto.ReceiveTimes receiveTimes = processingStatusDto.receiveTimes();
                updateIngestReceiveTime(receiveTimes.ingest());
                updatePostProcessingReceiveTime(receiveTimes.postProcessing());
                updatePostIndexingReceiveTime(receiveTimes.postIndexing());
            });
        } catch (Exception e) {
            LOG.error("Couldn't load persisted processing status", e);
        }
        long milliseconds = this.persistInterval.toMilliseconds();
        this.future = this.scheduler.scheduleWithFixedDelay(this::doPersist, milliseconds, milliseconds, TimeUnit.MILLISECONDS);
    }

    protected void shutDown() {
        LOG.debug("Shutting down processing status recorder service");
        this.inShutdown.set(true);
        this.eventBus.unregister(this);
        if (this.future != null) {
            this.future.cancel(true);
        }
    }

    private void doPersist() {
        if (this.inShutdown.get()) {
            LOG.debug("Not persisting data because server is shutting down");
            return;
        }
        try {
            LOG.debug("Persisted processing status: {}", this.dbService.save(this));
        } catch (Exception e) {
            LOG.error("Couldn't persist processing status", e);
        }
    }

    @Override // org.graylog2.system.processing.ProcessingStatusRecorder
    public DateTime getIngestReceiveTime() {
        return this.ingestReceiveTime.get();
    }

    @Override // org.graylog2.system.processing.ProcessingStatusRecorder
    public DateTime getPostProcessingReceiveTime() {
        return this.postProcessingReceiveTime.get();
    }

    @Override // org.graylog2.system.processing.ProcessingStatusRecorder
    public DateTime getPostIndexingReceiveTime() {
        return this.postIndexReceiveTime.get();
    }

    @Override // org.graylog2.system.processing.ProcessingStatusRecorder
    public void updateIngestReceiveTime(DateTime dateTime) {
        if (dateTime != null) {
            this.ingestReceiveTime.updateAndGet(dateTime2 -> {
                return latestTimestamp(dateTime2, dateTime);
            });
        }
    }

    @Override // org.graylog2.system.processing.ProcessingStatusRecorder
    public void updatePostProcessingReceiveTime(DateTime dateTime) {
        if (dateTime != null) {
            this.postProcessingReceiveTime.updateAndGet(dateTime2 -> {
                return latestTimestamp(dateTime2, dateTime);
            });
        }
    }

    @Override // org.graylog2.system.processing.ProcessingStatusRecorder
    public void updatePostIndexingReceiveTime(DateTime dateTime) {
        if (dateTime != null) {
            this.postIndexReceiveTime.updateAndGet(dateTime2 -> {
                return latestTimestamp(dateTime2, dateTime);
            });
        }
    }

    private DateTime latestTimestamp(DateTime dateTime, DateTime dateTime2) {
        return dateTime2.isAfter(dateTime) ? dateTime2 : dateTime;
    }
}
