package org.graylog2.shared.journal;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.github.joschi.jadconfig.util.Size;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.SyncFailedException;
import java.io.UncheckedIOException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
import java.nio.file.AccessDeniedException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.apache.commons.lang3.StringUtils;
import org.graylog.shaded.kafka09.common.KafkaException;
import org.graylog.shaded.kafka09.common.OffsetOutOfRangeException;
import org.graylog.shaded.kafka09.common.TopicAndPartition;
import org.graylog.shaded.kafka09.log.CleanerConfig;
import org.graylog.shaded.kafka09.log.Log;
import org.graylog.shaded.kafka09.log.LogAppendInfo;
import org.graylog.shaded.kafka09.log.LogConfig;
import org.graylog.shaded.kafka09.log.LogManager;
import org.graylog.shaded.kafka09.log.LogSegment;
import org.graylog.shaded.kafka09.message.ByteBufferMessageSet;
import org.graylog.shaded.kafka09.message.Message;
import org.graylog.shaded.kafka09.message.MessageAndOffset;
import org.graylog.shaded.kafka09.message.MessageSet;
import org.graylog.shaded.kafka09.scala.Option;
import org.graylog.shaded.kafka09.scala.collection.JavaConversions;
import org.graylog.shaded.kafka09.scala.collection.Map$;
import org.graylog.shaded.kafka09.scala.runtime.AbstractFunction1;
import org.graylog.shaded.kafka09.server.BrokerState;
import org.graylog.shaded.kafka09.server.RunningAsBroker;
import org.graylog.shaded.kafka09.utils.KafkaScheduler;
import org.graylog.shaded.kafka09.utils.Time;
import org.graylog2.plugin.GlobalMetricNames;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.ThrottleState;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.lifecycles.LoadBalancerStatus;
import org.graylog2.shared.journal.Journal;
import org.graylog2.shared.metrics.HdrTimer;
import org.graylog2.shared.utilities.ByteBufferUtils;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:org/graylog2/shared/journal/LocalKafkaJournal.class */
public class LocalKafkaJournal extends AbstractIdleService implements Journal {
    private static final String LEGACY_CLASS_NAME = "org.graylog2.shared.journal.KafkaJournal";
    private static final int NUM_IO_THREADS = 1;
    public static final long DEFAULT_COMMITTED_OFFSET = Long.MIN_VALUE;
    public static final int NOTIFY_ON_UTILIZATION_PERCENTAGE = 95;
    public static final int THRESHOLD_THROTTLING_DISABLED = -1;
    public static final String METER_WRITTEN_MESSAGES = "writtenMessages";
    public static final String METER_READ_MESSAGES = "readMessages";
    private static final String METER_WRITE_DISCARDED_MESSAGES = "writeDiscardedMessages";
    public static final String GAUGE_UNCOMMITTED_MESSAGES = "uncommittedMessages";
    private static final String TIMER_WRITE_TIME = "writeTime";
    private static final String TIMER_READ_TIME = "readTime";
    private static final String METRIC_NAME_SIZE = "size";
    private static final String METRIC_NAME_LOG_END_OFFSET = "logEndOffset";
    private static final String METRIC_NAME_NUMBER_OF_SEGMENTS = "numberOfSegments";
    private static final String METRIC_NAME_UNFLUSHED_MESSAGES = "unflushedMessages";
    private static final String METRIC_NAME_RECOVERY_POINT = "recoveryPoint";
    private static final String METRIC_NAME_LAST_FLUSH_TIME = "lastFlushTime";
    private final LogManager logManager;
    private final Log kafkaLog;
    private final File committedReadOffsetFile;
    private final AtomicLong committedOffset;
    private final MetricRegistry metricRegistry;
    private final ScheduledExecutorService scheduler;
    private final ServerStatus serverStatus;
    private final Timer writeTime;
    private final Timer readTime;
    private final KafkaScheduler kafkaScheduler;
    private final Meter writtenMessages;
    private final Meter readMessages;
    private final Meter writeDiscardedMessages;
    private final OffsetFileFlusher offsetFlusher;
    private final DirtyLogFlusher dirtyLogFlusher;
    private final RecoveryCheckpointFlusher recoveryCheckpointFlusher;
    private final LogRetentionCleaner logRetentionCleaner;
    private final long maxSegmentSize;
    private final int maxMessageSize;
    private final String metricPrefix;
    private long nextReadOffset;
    private ScheduledFuture<?> checkpointFlusherFuture;
    private ScheduledFuture<?> dirtyLogFlushFuture;
    private ScheduledFuture<?> logRetentionFuture;
    private ScheduledFuture<?> offsetFlusherFuture;
    private volatile boolean shuttingDown;
    private final AtomicReference<ThrottleState> throttleState;
    private final AtomicInteger purgedSegmentsInLastRetention;
    private final int throttleThresholdPercentage;
    private static final Logger LOG = LoggerFactory.getLogger(LocalKafkaJournal.class);
    private static final Time JODA_TIME = new Time() { // from class: org.graylog2.shared.journal.LocalKafkaJournal.1
        public long milliseconds() {
            return DateTimeUtils.currentTimeMillis();
        }

        public long nanoseconds() {
            return System.nanoTime();
        }

        public void sleep(long j) {
            Uninterruptibles.sleepUninterruptibly(j, TimeUnit.MILLISECONDS);
        }
    };

    /* loaded from: input_file:org/graylog2/shared/journal/LocalKafkaJournal$DirtyLogFlusher.class */
    public class DirtyLogFlusher implements Runnable {
        public DirtyLogFlusher() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                LocalKafkaJournal.this.flushDirtyLogs();
            } catch (Exception e) {
                LocalKafkaJournal.LOG.error("Unable to flush dirty logs. Will try again.", e);
            }
        }
    }

    /* loaded from: input_file:org/graylog2/shared/journal/LocalKafkaJournal$LogRetentionCleaner.class */
    public class LogRetentionCleaner implements Runnable, Callable<Integer> {
        private final Logger loggerForCleaner = LoggerFactory.getLogger(LogRetentionCleaner.class);

        public LogRetentionCleaner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                call();
            } catch (Exception e) {
                this.loggerForCleaner.error("Unable to delete expired segments. Will try again.", e);
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Integer call() throws Exception {
            this.loggerForCleaner.debug("Beginning log cleanup");
            int i = 0;
            Timer.Context time = new Timer().time();
            for (Log log : JavaConversions.asJavaIterable(LocalKafkaJournal.this.logManager.allLogs())) {
                if (!log.config().compact()) {
                    this.loggerForCleaner.debug("Garbage collecting {}", log.name());
                    i += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log) + cleanupSegmentsToRemoveCommitted(log);
                }
            }
            this.loggerForCleaner.debug("Log cleanup completed. {} files deleted in {} seconds", Integer.valueOf(i), Long.valueOf(TimeUnit.NANOSECONDS.toSeconds(time.stop())));
            return Integer.valueOf(i);
        }

        private int cleanupExpiredSegments(final Log log) {
            if (log.size() == 0 && log.numberOfSegments() < 1) {
                LocalKafkaJournal.this.purgedSegmentsInLastRetention.set(0);
                return 0;
            }
            int deleteOldSegments = log.deleteOldSegments(new AbstractFunction1<LogSegment, Object>() { // from class: org.graylog2.shared.journal.LocalKafkaJournal.LogRetentionCleaner.1
                public Object apply(LogSegment logSegment) {
                    long milliseconds = LocalKafkaJournal.JODA_TIME.milliseconds() - logSegment.lastModified();
                    boolean z = milliseconds > log.config().retentionMs().longValue();
                    if (z) {
                        LogRetentionCleaner.this.loggerForCleaner.debug("[cleanup-time] Removing segment with age {}s, older than then maximum retention age {}s", Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(milliseconds)), Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(log.config().retentionMs().longValue())));
                    }
                    return Boolean.valueOf(z);
                }
            });
            LocalKafkaJournal.this.purgedSegmentsInLastRetention.set(deleteOldSegments);
            return deleteOldSegments;
        }

        private void updateLoadBalancerStatus(double d) {
            LoadBalancerStatus loadbalancerStatus = LocalKafkaJournal.this.serverStatus.getLifecycle().getLoadbalancerStatus();
            if (loadbalancerStatus == LoadBalancerStatus.THROTTLED && d < LocalKafkaJournal.this.throttleThresholdPercentage) {
                LocalKafkaJournal.this.serverStatus.running();
                LocalKafkaJournal.LOG.info(String.format(Locale.ENGLISH, "Journal usage is %.2f%% (threshold %d%%), changing load balancer status from THROTTLED to ALIVE", Double.valueOf(d), Integer.valueOf(LocalKafkaJournal.this.throttleThresholdPercentage)));
            } else {
                if (loadbalancerStatus != LoadBalancerStatus.ALIVE || d < LocalKafkaJournal.this.throttleThresholdPercentage) {
                    return;
                }
                LocalKafkaJournal.this.serverStatus.throttle();
                LocalKafkaJournal.LOG.info(String.format(Locale.ENGLISH, "Journal usage is %.2f%% (threshold %d%%), changing load balancer status from ALIVE to THROTTLED", Double.valueOf(d), Integer.valueOf(LocalKafkaJournal.this.throttleThresholdPercentage)));
            }
        }

        private int cleanupSegmentsToMaintainSize(Log log) {
            final long longValue = log.config().retentionSize().longValue();
            long size = log.size();
            double d = longValue > 0 ? (size * 100) / longValue : 0.0d;
            if (d > 95.0d) {
                LocalKafkaJournal.LOG.warn("Journal utilization ({}%) has gone over {}%.", Double.valueOf(d), 95);
            }
            if (LocalKafkaJournal.this.throttleThresholdPercentage != -1) {
                updateLoadBalancerStatus(d);
            }
            if (longValue < 0 || size < longValue) {
                LocalKafkaJournal.this.purgedSegmentsInLastRetention.set(0);
                return 0;
            }
            final long[] jArr = {size - longValue};
            int deleteOldSegments = log.deleteOldSegments(new AbstractFunction1<LogSegment, Object>() { // from class: org.graylog2.shared.journal.LocalKafkaJournal.LogRetentionCleaner.2
                public Object apply(LogSegment logSegment) {
                    if (jArr[0] - logSegment.size() < 0) {
                        return false;
                    }
                    long[] jArr2 = jArr;
                    jArr2[0] = jArr2[0] - logSegment.size();
                    LogRetentionCleaner.this.loggerForCleaner.debug("[cleanup-size] Removing segment starting at offset {}, size {} bytes, to shrink log to new size {}, target size {}", new Object[]{Long.valueOf(logSegment.baseOffset()), Long.valueOf(logSegment.size()), Long.valueOf(jArr[0]), Long.valueOf(longValue)});
                    return true;
                }
            });
            LocalKafkaJournal.this.purgedSegmentsInLastRetention.set(deleteOldSegments);
            return deleteOldSegments;
        }

        private int cleanupSegmentsToRemoveCommitted(Log log) {
            if (log.numberOfSegments() <= 1) {
                this.loggerForCleaner.debug("[cleanup-committed] The journal is already minimal at {} segment(s), not trying to remove more segments.", Integer.valueOf(log.numberOfSegments()));
                return 0;
            }
            final long j = LocalKafkaJournal.this.committedOffset.get();
            final HashSet newHashSet = Sets.newHashSet(JavaConversions.asJavaIterable(log.logSegments(j, Long.MAX_VALUE)));
            this.loggerForCleaner.debug("[cleanup-committed] Keeping segments {}", newHashSet);
            return log.deleteOldSegments(new AbstractFunction1<LogSegment, Object>() { // from class: org.graylog2.shared.journal.LocalKafkaJournal.LogRetentionCleaner.3
                public Object apply(LogSegment logSegment) {
                    boolean z = !newHashSet.contains(logSegment);
                    if (z) {
                        LogRetentionCleaner.this.loggerForCleaner.debug("[cleanup-committed] Should delete segment {} because it is prior to committed offset {}", logSegment, Long.valueOf(j));
                    }
                    return Boolean.valueOf(z);
                }
            });
        }
    }

    /* loaded from: input_file:org/graylog2/shared/journal/LocalKafkaJournal$OffsetFileFlusher.class */
    public class OffsetFileFlusher implements Runnable {
        public OffsetFileFlusher() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (LocalKafkaJournal.this.committedOffset.get() == Long.MIN_VALUE) {
                return;
            }
            try {
                FileOutputStream fileOutputStream = new FileOutputStream(LocalKafkaJournal.this.committedReadOffsetFile);
                try {
                    fileOutputStream.write(String.valueOf(LocalKafkaJournal.this.committedOffset.get()).getBytes(StandardCharsets.UTF_8));
                    fileOutputStream.flush();
                    fileOutputStream.getFD().sync();
                    fileOutputStream.close();
                } catch (Throwable th) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } catch (SyncFailedException e) {
                LocalKafkaJournal.LOG.error("Cannot sync " + LocalKafkaJournal.this.committedReadOffsetFile.getAbsolutePath() + " to disk. Continuing anyway, but there is no guarantee that the file has been written.", e);
            } catch (IOException e2) {
                LocalKafkaJournal.LOG.error("Cannot write " + LocalKafkaJournal.this.committedReadOffsetFile.getAbsolutePath() + " to disk.", e2);
            }
        }
    }

    /* loaded from: input_file:org/graylog2/shared/journal/LocalKafkaJournal$RecoveryCheckpointFlusher.class */
    public class RecoveryCheckpointFlusher implements Runnable {
        public RecoveryCheckpointFlusher() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                LocalKafkaJournal.this.logManager.checkpointRecoveryPointOffsets();
            } catch (Exception e) {
                LocalKafkaJournal.LOG.error("Unable to flush checkpoint recovery point offsets. Will try again.", e);
            }
        }
    }

    @Inject
    public LocalKafkaJournal(@Named("message_journal_dir") Path path, @Named("scheduler") ScheduledExecutorService scheduledExecutorService, @Named("message_journal_segment_size") Size size, @Named("message_journal_segment_age") Duration duration, @Named("message_journal_max_size") Size size2, @Named("message_journal_max_age") Duration duration2, @Named("message_journal_flush_interval") long j, @Named("message_journal_flush_age") Duration duration3, @Named("lb_throttle_threshold_percentage") int i, MetricRegistry metricRegistry, ServerStatus serverStatus) {
        this(path, scheduledExecutorService, size, duration, size2, duration2, j, duration3, i, metricRegistry, serverStatus, LocalKafkaJournal.class.getName());
    }

    public LocalKafkaJournal(Path path, ScheduledExecutorService scheduledExecutorService, Size size, Duration duration, Size size2, Duration duration2, long j, Duration duration3, int i, MetricRegistry metricRegistry, ServerStatus serverStatus, String str) {
        String readFirstLine;
        this.committedOffset = new AtomicLong(Long.MIN_VALUE);
        this.nextReadOffset = 0L;
        this.throttleState = new AtomicReference<>();
        this.purgedSegmentsInLastRetention = new AtomicInteger();
        if (i == -1) {
            this.throttleThresholdPercentage = i;
        } else {
            this.throttleThresholdPercentage = intRange(i, 0, 100);
        }
        this.scheduler = scheduledExecutorService;
        this.serverStatus = serverStatus;
        this.maxSegmentSize = size.toBytes();
        this.maxMessageSize = Ints.saturatedCast(this.maxSegmentSize);
        this.metricPrefix = str;
        this.metricRegistry = metricRegistry;
        this.writtenMessages = metricRegistry.meter(MetricRegistry.name(this.metricPrefix, new String[]{METER_WRITTEN_MESSAGES}));
        this.readMessages = metricRegistry.meter(MetricRegistry.name(this.metricPrefix, new String[]{METER_READ_MESSAGES}));
        this.writeDiscardedMessages = metricRegistry.meter(MetricRegistry.name(this.metricPrefix, new String[]{METER_WRITE_DISCARDED_MESSAGES}));
        registerUncommittedGauge(metricRegistry, MetricRegistry.name(this.metricPrefix, new String[]{GAUGE_UNCOMMITTED_MESSAGES}));
        this.writeTime = registerHdrTimer(metricRegistry, MetricRegistry.name(this.metricPrefix, new String[]{TIMER_WRITE_TIME}));
        this.readTime = registerHdrTimer(metricRegistry, MetricRegistry.name(this.metricPrefix, new String[]{TIMER_READ_TIME}));
        LogConfig logConfig = new LogConfig(ImmutableMap.builder().put(LogConfig.SegmentBytesProp(), Integer.valueOf(Ints.saturatedCast(size.toBytes()))).put(LogConfig.SegmentMsProp(), Long.valueOf(duration.getMillis())).put(LogConfig.SegmentJitterMsProp(), 0).put(LogConfig.FlushMessagesProp(), Long.valueOf(j)).put(LogConfig.FlushMsProp(), Long.valueOf(duration3.getMillis())).put(LogConfig.RetentionBytesProp(), Long.valueOf(size2.toBytes())).put(LogConfig.RetentionMsProp(), Long.valueOf(duration2.getMillis())).put(LogConfig.MaxMessageBytesProp(), Integer.valueOf(this.maxMessageSize)).put(LogConfig.SegmentIndexBytesProp(), Integer.valueOf(Ints.saturatedCast(Size.megabytes(1L).toBytes()))).put(LogConfig.IndexIntervalBytesProp(), 4096).put(LogConfig.FileDeleteDelayMsProp(), Long.valueOf(TimeUnit.MINUTES.toMillis(1L))).put(LogConfig.DeleteRetentionMsProp(), Long.valueOf(TimeUnit.DAYS.toMillis(1L))).put(LogConfig.MinCleanableDirtyRatioProp(), Double.valueOf(0.5d)).put(LogConfig.Compact(), false).put(LogConfig.UncleanLeaderElectionEnableProp(), true).put(LogConfig.MinInSyncReplicasProp(), 1).build());
        CleanerConfig cleanerConfig = new CleanerConfig(1, Size.megabytes(4L).toBytes(), 0.9d, Ints.saturatedCast(Size.megabytes(1L).toBytes()), Ints.saturatedCast(Size.megabytes(32L).toBytes()), Ints.saturatedCast(Size.megabytes(5L).toBytes()), TimeUnit.SECONDS.toMillis(15L), false, "MD5");
        if (!Files.exists(path, new LinkOption[0])) {
            try {
                Files.createDirectories(path, new FileAttribute[0]);
            } catch (IOException e) {
                LOG.error("Cannot create journal directory at {}, please check the permissions", path.toAbsolutePath());
                throw new UncheckedIOException(e);
            }
        }
        this.committedReadOffsetFile = new File(path.toFile(), "graylog2-committed-read-offset");
        try {
            if (!this.committedReadOffsetFile.createNewFile() && (readFirstLine = com.google.common.io.Files.asCharSource(this.committedReadOffsetFile, StandardCharsets.UTF_8).readFirstLine()) != null) {
                this.committedOffset.set(Long.parseLong(readFirstLine.trim()));
                this.nextReadOffset = this.committedOffset.get() + 1;
            }
            try {
                BrokerState brokerState = new BrokerState();
                brokerState.newState(RunningAsBroker.state());
                this.kafkaScheduler = new KafkaScheduler(2, "kafka-journal-scheduler-", false);
                this.kafkaScheduler.startup();
                this.logManager = new LogManager(new File[]{path.toFile()}, Map$.MODULE$.empty(), logConfig, cleanerConfig, 1, TimeUnit.SECONDS.toMillis(60L), TimeUnit.SECONDS.toMillis(60L), TimeUnit.SECONDS.toMillis(60L), this.kafkaScheduler, brokerState, JODA_TIME);
                TopicAndPartition topicAndPartition = new TopicAndPartition("messagejournal", 0);
                Option log = this.logManager.getLog(topicAndPartition);
                if (log.isEmpty()) {
                    this.kafkaLog = this.logManager.createLog(topicAndPartition, this.logManager.defaultConfig());
                } else {
                    this.kafkaLog = (Log) log.get();
                }
                MetricRegistry metricRegistry2 = this.metricRegistry;
                String name = MetricRegistry.name(str, new String[]{"size"});
                Log log2 = this.kafkaLog;
                Objects.requireNonNull(log2);
                metricRegistry2.register(name, log2::size);
                MetricRegistry metricRegistry3 = this.metricRegistry;
                String name2 = MetricRegistry.name(str, new String[]{METRIC_NAME_LOG_END_OFFSET});
                Log log3 = this.kafkaLog;
                Objects.requireNonNull(log3);
                metricRegistry3.register(name2, log3::logEndOffset);
                MetricRegistry metricRegistry4 = this.metricRegistry;
                String name3 = MetricRegistry.name(str, new String[]{METRIC_NAME_NUMBER_OF_SEGMENTS});
                Log log4 = this.kafkaLog;
                Objects.requireNonNull(log4);
                metricRegistry4.register(name3, log4::numberOfSegments);
                MetricRegistry metricRegistry5 = this.metricRegistry;
                String name4 = MetricRegistry.name(str, new String[]{METRIC_NAME_UNFLUSHED_MESSAGES});
                Log log5 = this.kafkaLog;
                Objects.requireNonNull(log5);
                metricRegistry5.register(name4, log5::unflushedMessages);
                MetricRegistry metricRegistry6 = this.metricRegistry;
                String name5 = MetricRegistry.name(str, new String[]{METRIC_NAME_RECOVERY_POINT});
                Log log6 = this.kafkaLog;
                Objects.requireNonNull(log6);
                metricRegistry6.register(name5, log6::recoveryPoint);
                MetricRegistry metricRegistry7 = this.metricRegistry;
                String name6 = MetricRegistry.name(str, new String[]{METRIC_NAME_LAST_FLUSH_TIME});
                Log log7 = this.kafkaLog;
                Objects.requireNonNull(log7);
                metricRegistry7.register(name6, log7::lastFlushTime);
                this.metricRegistry.register(getOldestSegmentMetricName(), new Gauge<Date>() { // from class: org.graylog2.shared.journal.LocalKafkaJournal.2
                    /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
                    public Date m1242getValue() {
                        long j2 = Long.MAX_VALUE;
                        Iterator<LogSegment> it = LocalKafkaJournal.this.getSegments().iterator();
                        while (it.hasNext()) {
                            j2 = Math.min(j2, it.next().created());
                        }
                        return new Date(j2);
                    }
                });
                LOG.info("Initialized Kafka based journal at {}", path);
                this.offsetFlusher = new OffsetFileFlusher();
                this.dirtyLogFlusher = new DirtyLogFlusher();
                this.recoveryCheckpointFlusher = new RecoveryCheckpointFlusher();
                this.logRetentionCleaner = new LogRetentionCleaner();
                if (LocalKafkaJournal.class.getName().equals(str)) {
                    registerLegacyMetrics();
                }
            } catch (KafkaException e2) {
                LOG.error("Unable to start logmanager.", e2);
                throw new RuntimeException((Throwable) e2);
            }
        } catch (IOException e3) {
            LOG.error("Cannot access offset file: {}", e3.getMessage());
            throw new RuntimeException(new AccessDeniedException(this.committedReadOffsetFile.getAbsolutePath(), null, e3.getMessage()));
        }
    }

    @Override // org.graylog2.shared.journal.Journal
    public void flush() {
        this.offsetFlusher.run();
    }

    private void registerLegacyMetrics() {
        this.metricRegistry.getMetrics().entrySet().stream().filter(entry -> {
            return ((String) entry.getKey()).startsWith(LocalKafkaJournal.class.getName());
        }).forEach(entry2 -> {
            String str = "org.graylog2.shared.journal.KafkaJournal" + StringUtils.removeStart((String) entry2.getKey(), LocalKafkaJournal.class.getName());
            try {
                this.metricRegistry.register(str, (Metric) entry2.getValue());
            } catch (Exception e) {
                LOG.warn("Unable to register metric <{}> under legacy name <{}>.", entry2.getKey(), str);
            }
        });
    }

    private static int intRange(int i, int i2, int i3) {
        return Integer.min(Integer.max(i2, i), i3);
    }

    private Timer registerHdrTimer(MetricRegistry metricRegistry, String str) {
        Timer timer;
        try {
            timer = (Timer) metricRegistry.register(str, new HdrTimer(1L, TimeUnit.MINUTES, 1));
        } catch (IllegalArgumentException e) {
            timer = (Timer) Iterables.getOnlyElement(metricRegistry.getTimers((str2, metric) -> {
                return str.equals(str2);
            }).values());
        }
        return timer;
    }

    private void registerUncommittedGauge(MetricRegistry metricRegistry, String str) {
        try {
            metricRegistry.register(str, () -> {
                if (size() == 0) {
                    return 0L;
                }
                return this.committedOffset.get() == Long.MIN_VALUE ? Long.valueOf(getLogEndOffset() - getLogStartOffset()) : Long.valueOf(Math.max(0L, (getLogEndOffset() - 1) - this.committedOffset.get()));
            });
        } catch (IllegalArgumentException e) {
        }
    }

    public int getPurgedSegmentsInLastRetention() {
        return this.purgedSegmentsInLastRetention.get();
    }

    private void teardownLogMetrics() {
        this.metricRegistry.remove(MetricRegistry.name(this.metricPrefix, new String[]{METER_WRITTEN_MESSAGES}));
        this.metricRegistry.remove(MetricRegistry.name(this.metricPrefix, new String[]{METER_READ_MESSAGES}));
        this.metricRegistry.remove(MetricRegistry.name(this.metricPrefix, new String[]{METER_WRITE_DISCARDED_MESSAGES}));
        this.metricRegistry.remove(MetricRegistry.name(this.metricPrefix, new String[]{GAUGE_UNCOMMITTED_MESSAGES}));
        this.metricRegistry.remove(MetricRegistry.name(this.metricPrefix, new String[]{TIMER_WRITE_TIME}));
        this.metricRegistry.remove(MetricRegistry.name(this.metricPrefix, new String[]{TIMER_READ_TIME}));
        this.metricRegistry.remove(MetricRegistry.name(this.metricPrefix, new String[]{"size"}));
        this.metricRegistry.remove(MetricRegistry.name(this.metricPrefix, new String[]{METRIC_NAME_LOG_END_OFFSET}));
        this.metricRegistry.remove(MetricRegistry.name(this.metricPrefix, new String[]{METRIC_NAME_NUMBER_OF_SEGMENTS}));
        this.metricRegistry.remove(MetricRegistry.name(this.metricPrefix, new String[]{METRIC_NAME_UNFLUSHED_MESSAGES}));
        this.metricRegistry.remove(MetricRegistry.name(this.metricPrefix, new String[]{METRIC_NAME_RECOVERY_POINT}));
        this.metricRegistry.remove(MetricRegistry.name(this.metricPrefix, new String[]{METRIC_NAME_LAST_FLUSH_TIME}));
        this.metricRegistry.remove(getOldestSegmentMetricName());
        if (LocalKafkaJournal.class.getName().equals(this.metricPrefix)) {
            this.metricRegistry.removeMatching(MetricFilter.startsWith(LEGACY_CLASS_NAME));
        }
    }

    private String getOldestSegmentMetricName() {
        String str = GlobalMetricNames.JOURNAL_OLDEST_SEGMENT;
        if (!LocalKafkaJournal.class.getName().equals(this.metricPrefix)) {
            str = MetricRegistry.name(this.metricPrefix, new String[]{GlobalMetricNames.OLDEST_SEGMENT_SUFFIX});
        }
        return str;
    }

    @Override // org.graylog2.shared.journal.Journal
    public Journal.Entry createEntry(byte[] bArr, byte[] bArr2) {
        return new Journal.Entry(bArr, bArr2);
    }

    @Override // org.graylog2.shared.journal.Journal
    public long write(List<Journal.Entry> list) {
        Timer.Context time = this.writeTime.time();
        try {
            long j = 0;
            long j2 = 0;
            long j3 = 0;
            ArrayList arrayList = new ArrayList(list.size());
            for (Journal.Entry entry : list) {
                byte[] messageBytes = entry.getMessageBytes();
                byte[] idBytes = entry.getIdBytes();
                j += messageBytes.length;
                Message message = new Message(messageBytes, idBytes);
                int entrySize = MessageSet.entrySize(message);
                if (entrySize > this.maxMessageSize) {
                    this.writeDiscardedMessages.mark();
                    LOG.warn("Message with ID <{}> is too large to store in journal, skipping! (size: {} bytes / max: {} bytes)", new Object[]{new String(idBytes, StandardCharsets.UTF_8), Integer.valueOf(entrySize), Integer.valueOf(this.maxMessageSize)});
                    j = 0;
                } else {
                    if (j2 + entrySize > this.maxSegmentSize) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Flushing {} bytes message set with {} messages to avoid overflowing segment with max size of {} bytes", new Object[]{Long.valueOf(j2), Integer.valueOf(arrayList.size()), Long.valueOf(this.maxSegmentSize)});
                        }
                        j3 = flushMessages(arrayList, j);
                        arrayList.clear();
                        j2 = 0;
                        j = 0;
                    }
                    arrayList.add(message);
                    j2 += entrySize;
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Message {} contains bytes {}", Tools.bytesToHex(idBytes), Tools.bytesToHex(messageBytes));
                    }
                }
            }
            if (arrayList.size() > 0) {
                j3 = flushMessages(arrayList, j);
            }
            long j4 = j3;
            if (time != null) {
                time.close();
            }
            return j4;
        } catch (Throwable th) {
            if (time != null) {
                try {
                    time.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private long flushMessages(List<Message> list, long j) {
        if (list.isEmpty()) {
            LOG.debug("No messages to flush, not trying to write an empty message set.");
            return -1L;
        }
        ByteBufferMessageSet byteBufferMessageSet = new ByteBufferMessageSet(JavaConversions.asScalaBuffer(list).toSeq());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Trying to write ByteBufferMessageSet with size of {} bytes to journal", Integer.valueOf(byteBufferMessageSet.sizeInBytes()));
        }
        LogAppendInfo append = this.kafkaLog.append(byteBufferMessageSet, true);
        long lastOffset = append.lastOffset();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Wrote {} messages to journal: {} bytes (payload {} bytes), log position {} to {}", new Object[]{Integer.valueOf(list.size()), Integer.valueOf(byteBufferMessageSet.sizeInBytes()), Long.valueOf(j), Long.valueOf(append.firstOffset()), Long.valueOf(lastOffset)});
        }
        this.writtenMessages.mark(list.size());
        return lastOffset;
    }

    @Override // org.graylog2.shared.journal.Journal
    public long write(byte[] bArr, byte[] bArr2) {
        return write(Collections.singletonList(createEntry(bArr, bArr2)));
    }

    @Override // org.graylog2.shared.journal.Journal
    public List<Journal.JournalReadEntry> read(long j) {
        return readNext(this.nextReadOffset, j);
    }

    public List<Journal.JournalReadEntry> readNext(long j, long j2) {
        long logEndOffset = getLogEndOffset();
        List<Journal.JournalReadEntry> read = read(j, j2);
        if (read.isEmpty()) {
            long j3 = j;
            long j4 = j3;
            while (true) {
                long j5 = j4 + 1;
                if (!read.isEmpty() || j3 >= logEndOffset - 1) {
                    break;
                }
                LOG.warn("Couldn't read any messages from offset <{}> but journal has more messages. Skipping and trying to read from offset <{}>", Long.valueOf(j3), Long.valueOf(j5));
                read = read(j5, j2);
                j3++;
                j4 = j5;
            }
        }
        return read;
    }

    public List<Journal.JournalReadEntry> read(long j, long j2) {
        long max = Math.max(1L, j2);
        long j3 = j + max;
        if (this.shuttingDown) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(Ints.saturatedCast(max));
        try {
            Timer.Context time = this.readTime.time();
            try {
                long logStartOffset = getLogStartOffset();
                if (j < logStartOffset) {
                    LOG.info("Read offset {} before start of log at {}, starting to read from the beginning of the journal.", Long.valueOf(j), Long.valueOf(logStartOffset));
                    j = logStartOffset;
                    j3 = j + max;
                }
                LOG.debug("Requesting to read a maximum of {} messages (or 5MB) from the journal, offset interval [{}, {})", new Object[]{Long.valueOf(max), Long.valueOf(j), Long.valueOf(j3)});
                org.graylog.shaded.kafka09.scala.collection.Iterator it = this.kafkaLog.read(j, 5242880, Option.apply(Long.valueOf(j3))).messageSet().iterator();
                long j4 = Long.MIN_VALUE;
                long j5 = Long.MIN_VALUE;
                long j6 = 0;
                while (it.hasNext()) {
                    MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
                    if (j4 == Long.MIN_VALUE) {
                        j4 = messageAndOffset.offset();
                    }
                    j5 = messageAndOffset.offset();
                    byte[] readBytes = ByteBufferUtils.readBytes(messageAndOffset.message().payload());
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Read message {} contains {}", Tools.bytesToHex(ByteBufferUtils.readBytes(messageAndOffset.message().key())), Tools.bytesToHex(readBytes));
                    }
                    j6 += readBytes.length;
                    arrayList.add(new Journal.JournalReadEntry(readBytes, messageAndOffset.offset()));
                    this.nextReadOffset = messageAndOffset.nextOffset();
                }
                if (arrayList.isEmpty()) {
                    LOG.debug("No messages available to read for offset interval [{}, {}).", Long.valueOf(j), Long.valueOf(j3));
                } else {
                    LOG.debug("Read {} messages, total payload size {}, from journal, offset interval [{}, {}], requested read at {}", new Object[]{Integer.valueOf(arrayList.size()), Long.valueOf(j6), Long.valueOf(j4), Long.valueOf(j5), Long.valueOf(j)});
                }
                if (time != null) {
                    time.close();
                }
            } finally {
            }
        } catch (OffsetOutOfRangeException e) {
            LOG.debug("Offset out of range, no messages available starting at offset {}", Long.valueOf(j));
        } catch (Exception e2) {
            if (this.shuttingDown) {
                LOG.debug("Caught exception during shutdown, ignoring it because we might have been blocked on a read.");
                return Collections.emptyList();
            }
            if (!(e2 instanceof ClosedByInterruptException)) {
                throw e2;
            }
            LOG.debug("Interrupted while reading from journal, during shutdown this is harmless and ignored.", e2);
        }
        this.readMessages.mark(arrayList.size());
        return arrayList;
    }

    @Override // org.graylog2.shared.journal.Journal
    public void markJournalOffsetCommitted(long j) {
        long j2;
        int i = 0;
        do {
            j2 = this.committedOffset.get();
            i++;
            if (i % 10 == 0) {
                LOG.warn("Committing journal offset spins {} times now, this might be a bug. Continuing to try update.", Integer.valueOf(i));
            }
        } while (!this.committedOffset.compareAndSet(j2, Math.max(j, j2)));
    }

    protected void flushDirtyLogs() {
        LOG.debug("Checking for dirty logs to flush...");
        for (Map.Entry entry : JavaConversions.mapAsJavaMap(this.logManager.logsByTopicPartition()).entrySet()) {
            TopicAndPartition topicAndPartition = (TopicAndPartition) entry.getKey();
            Log log = (Log) entry.getValue();
            long milliseconds = JODA_TIME.milliseconds() - log.lastFlushTime();
            try {
                LOG.debug("Checking if flush is needed on {} flush interval {} last flushed {} time since last flush: {}", new Object[]{topicAndPartition.topic(), log.config().flushInterval(), Long.valueOf(log.lastFlushTime()), Long.valueOf(milliseconds)});
                if (milliseconds >= log.config().flushMs().longValue()) {
                    log.flush();
                }
            } catch (Exception e) {
                LOG.error("Error flushing topic " + topicAndPartition.topic(), e);
            }
        }
    }

    public long getCommittedOffset() {
        return this.committedOffset.get();
    }

    public long getNextReadOffset() {
        return this.nextReadOffset;
    }

    protected void startUp() throws Exception {
        this.dirtyLogFlushFuture = this.scheduler.scheduleAtFixedRate(this.dirtyLogFlusher, TimeUnit.SECONDS.toMillis(30L), this.logManager.flushCheckMs(), TimeUnit.MILLISECONDS);
        this.checkpointFlusherFuture = this.scheduler.scheduleAtFixedRate(this.recoveryCheckpointFlusher, TimeUnit.SECONDS.toMillis(30L), this.logManager.flushCheckpointMs(), TimeUnit.MILLISECONDS);
        this.logRetentionFuture = this.scheduler.scheduleAtFixedRate(this.logRetentionCleaner, TimeUnit.SECONDS.toMillis(30L), this.logManager.retentionCheckMs(), TimeUnit.MILLISECONDS);
        this.offsetFlusherFuture = this.scheduler.scheduleAtFixedRate(this.offsetFlusher, 1L, 1L, TimeUnit.SECONDS);
    }

    protected void shutDown() throws Exception {
        LOG.debug("Shutting down journal!");
        this.shuttingDown = true;
        this.offsetFlusherFuture.cancel(false);
        this.logRetentionFuture.cancel(false);
        this.checkpointFlusherFuture.cancel(false);
        this.dirtyLogFlushFuture.cancel(false);
        this.kafkaScheduler.shutdown();
        this.logManager.shutdown();
        this.offsetFlusher.run();
        teardownLogMetrics();
    }

    public int cleanupLogs() {
        try {
            return this.logRetentionCleaner.call().intValue();
        } catch (Exception e) {
            LOG.error("Unable to delete expired segments.", e);
            return 0;
        }
    }

    public Iterable<LogSegment> getSegments() {
        return JavaConversions.asJavaIterable(this.kafkaLog.logSegments());
    }

    public long size() {
        return this.kafkaLog.size();
    }

    public int numberOfSegments() {
        return this.kafkaLog.numberOfSegments();
    }

    public long getCommittedReadOffset() {
        return this.committedOffset.get();
    }

    public void truncateTo(long j) {
        this.kafkaLog.truncateTo(j);
    }

    public long getLogStartOffset() {
        LogSegment logSegment = (LogSegment) Iterables.getFirst(JavaConversions.asJavaIterable(this.kafkaLog.logSegments()), (Object) null);
        if (logSegment == null) {
            return 0L;
        }
        return logSegment.baseOffset();
    }

    public long getLogEndOffset() {
        return this.kafkaLog.logEndOffset();
    }

    public ThrottleState getThrottleState() {
        return this.throttleState.get();
    }

    public void setThrottleState(ThrottleState throttleState) {
        this.throttleState.set(throttleState);
    }
}
