/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.system.processing;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.github.joschi.jadconfig.util.Duration;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractIdleService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.lifecycles.Lifecycle;
import org.graylog2.shared.journal.KafkaJournal;
import org.graylog2.system.processing.DBProcessingStatusService;
import org.graylog2.system.processing.ProcessingStatusDto;
import org.graylog2.system.processing.ProcessingStatusRecorder;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class MongoDBProcessingStatusRecorderService
extends AbstractIdleService
implements ProcessingStatusRecorder {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBProcessingStatusRecorderService.class);
    private static final DateTime DEFAULT_RECEIVE_TIME = new DateTime(0L, DateTimeZone.UTC);
    private static final String READ_MESSAGES_METRIC = MetricRegistry.name((String)KafkaJournal.class.getName(), (String[])new String[]{"readMessages"});
    private static final String WRITTEN_MESSAGES_METRIC = MetricRegistry.name((String)KafkaJournal.class.getName(), (String[])new String[]{"writtenMessages"});
    private static final String UNCOMMITTED_MESSAGES_METRIC = MetricRegistry.name((String)KafkaJournal.class.getName(), (String[])new String[]{"uncommittedMessages"});
    private final AtomicReference<DateTime> ingestReceiveTime = new AtomicReference<DateTime>(DEFAULT_RECEIVE_TIME);
    private final AtomicReference<DateTime> postProcessingReceiveTime = new AtomicReference<DateTime>(DEFAULT_RECEIVE_TIME);
    private final AtomicReference<DateTime> postIndexReceiveTime = new AtomicReference<DateTime>(DEFAULT_RECEIVE_TIME);
    private final DBProcessingStatusService dbService;
    private final EventBus eventBus;
    private final ServerStatus serverStatus;
    private final MetricRegistry metricRegistry;
    private final Duration persistInterval;
    private final ScheduledExecutorService scheduler;
    private final AtomicBoolean inShutdown = new AtomicBoolean(false);
    private ScheduledFuture<?> future;

    @Inject
    public MongoDBProcessingStatusRecorderService(DBProcessingStatusService dbService, EventBus eventBus, ServerStatus serverStatus, MetricRegistry metricRegistry, @Named(value="processing_status_persist_interval") Duration persistInterval, @Named(value="daemonScheduler") ScheduledExecutorService scheduler) {
        this.dbService = dbService;
        this.eventBus = eventBus;
        this.serverStatus = serverStatus;
        this.metricRegistry = metricRegistry;
        this.persistInterval = persistInterval;
        this.scheduler = scheduler;
    }

    @Subscribe
    public void handleServerShutdown(Lifecycle lifecycle) {
        if (lifecycle == Lifecycle.HALTING) {
            this.inShutdown.set(true);
        }
    }

    protected void startUp() {
        this.eventBus.register((Object)this);
        LOG.debug("Starting processing status recorder service");
        try {
            this.dbService.get().ifPresent(processingStatus -> {
                LOG.debug("Loaded persisted processing status: {}", processingStatus);
                ProcessingStatusDto.ReceiveTimes receiveTimes = processingStatus.receiveTimes();
                this.updateIngestReceiveTime(receiveTimes.ingest());
                this.updatePostProcessingReceiveTime(receiveTimes.postProcessing());
                this.updatePostIndexingReceiveTime(receiveTimes.postIndexing());
            });
        }
        catch (Exception e) {
            LOG.error("Couldn't load persisted processing status", (Throwable)e);
        }
        long interval = this.persistInterval.toMilliseconds();
        this.future = this.scheduler.scheduleWithFixedDelay(this::doPersist, interval, interval, TimeUnit.MILLISECONDS);
    }

    protected void shutDown() {
        LOG.debug("Shutting down processing status recorder service");
        this.inShutdown.set(true);
        this.eventBus.unregister((Object)this);
        if (this.future != null) {
            this.future.cancel(true);
        }
    }

    private void doPersist() {
        if (this.inShutdown.get()) {
            LOG.debug("Not persisting data because server is shutting down");
            return;
        }
        try {
            ProcessingStatusDto dto = this.dbService.save(this);
            LOG.debug("Persisted processing status: {}", (Object)dto);
        }
        catch (Exception e) {
            LOG.error("Couldn't persist processing status", (Throwable)e);
        }
    }

    @Override
    public Lifecycle getNodeLifecycleStatus() {
        return this.serverStatus.getLifecycle();
    }

    @Override
    public DateTime getIngestReceiveTime() {
        return this.ingestReceiveTime.get();
    }

    @Override
    public DateTime getPostProcessingReceiveTime() {
        return this.postProcessingReceiveTime.get();
    }

    @Override
    public DateTime getPostIndexingReceiveTime() {
        return this.postIndexReceiveTime.get();
    }

    @Override
    public long getJournalInfoUncommittedEntries() {
        Gauge gauge = (Gauge)this.metricRegistry.getGauges((name, metric) -> UNCOMMITTED_MESSAGES_METRIC.equals(name)).get(UNCOMMITTED_MESSAGES_METRIC);
        if (gauge != null) {
            return (Long)gauge.getValue();
        }
        return 0L;
    }

    @Override
    public double getJournalInfoReadMessages1mRate() {
        return this.getJournalInfoMeter1mRate(READ_MESSAGES_METRIC);
    }

    @Override
    public double getJournalInfoWrittenMessages1mRate() {
        return this.getJournalInfoMeter1mRate(WRITTEN_MESSAGES_METRIC);
    }

    private double getJournalInfoMeter1mRate(String metricName) {
        Meter meter = (Meter)this.metricRegistry.getMeters((name, metric) -> metricName.equals(name)).get(metricName);
        if (meter != null) {
            return meter.getOneMinuteRate();
        }
        return 0.0;
    }

    @Override
    public void updateIngestReceiveTime(DateTime newTimestamp) {
        if (newTimestamp != null) {
            this.ingestReceiveTime.updateAndGet(timestamp -> this.latestTimestamp((DateTime)timestamp, newTimestamp));
        }
    }

    @Override
    public void updatePostProcessingReceiveTime(DateTime newTimestamp) {
        if (newTimestamp != null) {
            this.postProcessingReceiveTime.updateAndGet(timestamp -> this.latestTimestamp((DateTime)timestamp, newTimestamp));
        }
    }

    @Override
    public void updatePostIndexingReceiveTime(DateTime newTimestamp) {
        if (newTimestamp != null) {
            this.postIndexReceiveTime.updateAndGet(timestamp -> this.latestTimestamp((DateTime)timestamp, newTimestamp));
        }
    }

    private DateTime latestTimestamp(DateTime timestamp, DateTime newTimestamp) {
        return newTimestamp.isAfter((ReadableInstant)timestamp) ? newTimestamp : timestamp;
    }
}

