package org.graylog2.caches;

import com.codahale.metrics.InstrumentedScheduledExecutorService;
import com.codahale.metrics.InstrumentedThreadFactory;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.apache.harmony.beans.BeansUtils;
import org.graylog2.Configuration;
import org.graylog2.inputs.InputCache;
import org.graylog2.inputs.OutputCache;
import org.graylog2.plugin.Message;
import org.graylog2.utilities.MessageToJsonSerializer;
import org.mapdb.Atomic;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.Store;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/caches/DiskJournalCache.class */
public abstract class DiskJournalCache implements InputCache, OutputCache {
    private static final Logger LOG = LoggerFactory.getLogger(DiskJournalCache.class);
    private final DB db;
    private final BlockingQueue<byte[]> queue;
    private final Atomic.Long counter;
    private final MessageToJsonSerializer serializer;
    private final Store store;
    private final MetricRegistry metricRegistry;
    private final Timer addTimer;
    private final Timer popTimer;
    private final Timer commitTimer;

    /* loaded from: input_file:org/graylog2/caches/DiskJournalCache$Input.class */
    public static class Input extends DiskJournalCache {
        @Inject
        public Input(Configuration configuration, MessageToJsonSerializer messageToJsonSerializer, MetricRegistry metricRegistry) throws IOException, DiskJournalCacheCorruptSpoolException {
            super(configuration, messageToJsonSerializer, metricRegistry);
        }

        @Override // org.graylog2.caches.DiskJournalCache
        protected String getDbFileName() {
            return "input-cache";
        }
    }

    /* loaded from: input_file:org/graylog2/caches/DiskJournalCache$Output.class */
    public static class Output extends DiskJournalCache {
        @Inject
        public Output(Configuration configuration, MessageToJsonSerializer messageToJsonSerializer, MetricRegistry metricRegistry) throws IOException, DiskJournalCacheCorruptSpoolException {
            super(configuration, messageToJsonSerializer, metricRegistry);
        }

        @Override // org.graylog2.caches.DiskJournalCache
        protected String getDbFileName() {
            return "output-cache";
        }
    }

    @Inject
    public DiskJournalCache(Configuration configuration, MessageToJsonSerializer messageToJsonSerializer, MetricRegistry metricRegistry) throws IOException, DiskJournalCacheCorruptSpoolException {
        Files.createDirectories(new File(configuration.getMessageCacheSpoolDir()).toPath(), new FileAttribute[0]);
        this.metricRegistry = metricRegistry;
        try {
            this.db = DBMaker.newFileDB(getDbFile(configuration)).mmapFileEnable().checksumEnable().closeOnJvmShutdown().make();
            this.store = Store.forDB(this.db);
            this.queue = this.db.getQueue("messages");
            this.counter = this.db.getAtomicLong("counter");
            this.serializer = messageToJsonSerializer;
            this.addTimer = metricRegistry.timer(MetricRegistry.name(getClass(), getDbFileName(), BeansUtils.ADD, "executionTime"));
            this.popTimer = metricRegistry.timer(MetricRegistry.name(getClass(), getDbFileName(), "pop", "executionTime"));
            this.commitTimer = metricRegistry.timer(MetricRegistry.name(getClass(), getDbFileName(), "commit", "executionTime"));
            commit();
            LOG.info("Compacting off-heap message cache database files ({})", getDbFileName());
            compact();
            if (this.queue.isEmpty() && this.counter.get() != 0) {
                LOG.warn("Setting counter from {} to 0 because the queue is empty!", Long.valueOf(this.counter.get()));
                this.counter.set(0L);
                commit();
            }
            commitExecutorService().scheduleWithFixedDelay(new Runnable() { // from class: org.graylog2.caches.DiskJournalCache.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        DiskJournalCache.this.commit();
                    } catch (Exception e) {
                        DiskJournalCache.LOG.error("Commit thread error", (Throwable) e);
                    }
                }
            }, 0L, configuration.getMessageCacheCommitInterval(), TimeUnit.MILLISECONDS);
        } catch (ArrayIndexOutOfBoundsException e) {
            LOG.error("Caught exception during disk journal initialization: ", (Throwable) e);
            throw new DiskJournalCacheCorruptSpoolException();
        }
    }

    private ScheduledExecutorService commitExecutorService() {
        return new InstrumentedScheduledExecutorService(Executors.newSingleThreadScheduledExecutor(threadFactory()), this.metricRegistry);
    }

    private ThreadFactory threadFactory() {
        return new InstrumentedThreadFactory(new ThreadFactoryBuilder().setNameFormat("disk-journal-" + getDbFileName() + "-%d").build(), this.metricRegistry);
    }

    @Override // org.graylog2.inputs.Cache
    public void add(Message message) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Adding message to cache: {}", message.toString());
        }
        if (this.db.isClosed()) {
            return;
        }
        Timer.Context time = this.addTimer.time();
        try {
            try {
                if (this.queue.offer(this.serializer.serializeToBytes(message))) {
                    this.counter.incrementAndGet();
                }
                time.stop();
            } catch (IOException e) {
                LOG.error("Unable to enqueue message", (Throwable) e);
                time.stop();
            }
        } catch (Throwable th) {
            time.stop();
            throw th;
        }
    }

    @Override // org.graylog2.inputs.Cache
    public void add(Collection<Message> collection) {
        Iterator<Message> it = collection.iterator();
        while (it.hasNext()) {
            add(it.next());
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.graylog2.inputs.Cache
    public Message pop() {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Consuming message from cache");
        }
        if (this.db.isClosed()) {
            return null;
        }
        try {
            Timer.Context time = this.popTimer.time();
            Throwable th = null;
            try {
                byte[] take = this.queue.take();
                if (take == null) {
                    if (time != null) {
                        if (0 != 0) {
                            try {
                                time.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            time.close();
                        }
                    }
                    return null;
                }
                this.counter.decrementAndGet();
                Message deserialize = this.serializer.deserialize(take);
                if (time != null) {
                    if (0 != 0) {
                        try {
                            time.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        time.close();
                    }
                }
                return deserialize;
            } catch (Throwable th4) {
                if (time != null) {
                    if (0 != 0) {
                        try {
                            time.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        time.close();
                    }
                }
                throw th4;
            }
        } catch (IOException e) {
            LOG.error("Error deserializing message", (Throwable) e);
            return null;
        } catch (InterruptedException e2) {
            LOG.error("Interrupted while dequeueing message: ", (Throwable) e2);
            return null;
        }
    }

    @Override // org.graylog2.inputs.Cache
    public int drainTo(Collection<? super Message> collection, int i) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Consuming message from cache");
        }
        if (this.db.isClosed()) {
            return 0;
        }
        Timer.Context time = this.popTimer.time();
        ArrayList<byte[]> arrayList = new ArrayList();
        this.queue.drainTo(arrayList, i);
        int i2 = 0;
        for (byte[] bArr : arrayList) {
            if (bArr != null) {
                this.counter.decrementAndGet();
                try {
                    collection.add(this.serializer.deserialize(bArr));
                    i2++;
                } catch (IOException e) {
                    LOG.error("Error deserializing message", (Throwable) e);
                }
            }
        }
        time.stop();
        return i2;
    }

    @Override // org.graylog2.inputs.Cache
    public int size() {
        if (this.db.isClosed()) {
            return 0;
        }
        return this.counter.intValue();
    }

    @Override // org.graylog2.inputs.Cache
    public boolean isEmpty() {
        return this.db.isClosed() || this.queue.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commit() {
        if (this.db.isClosed()) {
            return;
        }
        Timer.Context time = this.commitTimer.time();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Committing {} (entries {})", getDbFileName(), Integer.valueOf(size()));
        }
        this.db.commit();
        time.stop();
    }

    private void compact() {
        if (this.db.isClosed()) {
            return;
        }
        long currSize = this.store.getCurrSize();
        this.db.compact();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Compacted db {} (freed up {} bytes)", getDbFileName(), Long.valueOf(currSize - this.store.getCurrSize()));
        }
    }

    private File getDbFile(Configuration configuration) {
        return new File(configuration.getMessageCacheSpoolDir(), getDbFileName()).getAbsoluteFile();
    }

    protected abstract String getDbFileName();
}
