/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.shared.journal;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.github.joschi.jadconfig.util.Size;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.SyncFailedException;
import java.io.UncheckedIOException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.AccessDeniedException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import kafka.common.KafkaException;
import kafka.common.OffsetOutOfRangeException;
import kafka.common.TopicAndPartition;
import kafka.log.CleanerConfig;
import kafka.log.Log;
import kafka.log.LogAppendInfo;
import kafka.log.LogConfig;
import kafka.log.LogManager;
import kafka.log.LogSegment;
import kafka.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import kafka.message.MessageSet;
import kafka.server.BrokerState;
import kafka.server.RunningAsBroker;
import kafka.utils.KafkaScheduler;
import kafka.utils.Scheduler;
import kafka.utils.Time;
import org.graylog2.plugin.GlobalMetricNames;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.ThrottleState;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.lifecycles.LoadBalancerStatus;
import org.graylog2.shared.journal.Journal;
import org.graylog2.shared.metrics.HdrTimer;
import org.graylog2.shared.utilities.ByteBufferUtils;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.Option;
import scala.collection.Iterable;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.Map;
import scala.collection.Map$;
import scala.runtime.AbstractFunction1;

@Singleton
public class KafkaJournal
extends AbstractIdleService
implements Journal {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaJournal.class);
    private static final int NUM_IO_THREADS = 1;
    public static final long DEFAULT_COMMITTED_OFFSET = Long.MIN_VALUE;
    public static final int NOTIFY_ON_UTILIZATION_PERCENTAGE = 95;
    public static final int THRESHOLD_THROTTLING_DISABLED = -1;
    public static final String METER_WRITTEN_MESSAGES = "writtenMessages";
    public static final String METER_READ_MESSAGES = "readMessages";
    private static final String METER_WRITE_DISCARDED_MESSAGES = "writeDiscardedMessages";
    public static final String GAUGE_UNCOMMITTED_MESSAGES = "uncommittedMessages";
    private static final String TIMER_WRITE_TIME = "writeTime";
    private static final String TIMER_READ_TIME = "readTime";
    private static final String METRIC_NAME_SIZE = "size";
    private static final String METRIC_NAME_LOG_END_OFFSET = "logEndOffset";
    private static final String METRIC_NAME_NUMBER_OF_SEGMENTS = "numberOfSegments";
    private static final String METRIC_NAME_UNFLUSHED_MESSAGES = "unflushedMessages";
    private static final String METRIC_NAME_RECOVERY_POINT = "recoveryPoint";
    private static final String METRIC_NAME_LAST_FLUSH_TIME = "lastFlushTime";
    private static final Time JODA_TIME = new Time(){

        public long milliseconds() {
            return DateTimeUtils.currentTimeMillis();
        }

        public long nanoseconds() {
            return System.nanoTime();
        }

        public void sleep(long ms) {
            Uninterruptibles.sleepUninterruptibly((long)ms, (TimeUnit)TimeUnit.MILLISECONDS);
        }
    };
    private final LogManager logManager;
    private final Log kafkaLog;
    private final File committedReadOffsetFile;
    private final AtomicLong committedOffset = new AtomicLong(Long.MIN_VALUE);
    private final MetricRegistry metricRegistry;
    private final ScheduledExecutorService scheduler;
    private final ServerStatus serverStatus;
    private final Timer writeTime;
    private final Timer readTime;
    private final KafkaScheduler kafkaScheduler;
    private final Meter writtenMessages;
    private final Meter readMessages;
    private final Meter writeDiscardedMessages;
    private final OffsetFileFlusher offsetFlusher;
    private final DirtyLogFlusher dirtyLogFlusher;
    private final RecoveryCheckpointFlusher recoveryCheckpointFlusher;
    private final LogRetentionCleaner logRetentionCleaner;
    private final long maxSegmentSize;
    private final int maxMessageSize;
    private final String metricPrefix;
    private long nextReadOffset = 0L;
    private ScheduledFuture<?> checkpointFlusherFuture;
    private ScheduledFuture<?> dirtyLogFlushFuture;
    private ScheduledFuture<?> logRetentionFuture;
    private ScheduledFuture<?> offsetFlusherFuture;
    private volatile boolean shuttingDown;
    private final AtomicReference<ThrottleState> throttleState = new AtomicReference();
    private final AtomicInteger purgedSegmentsInLastRetention = new AtomicInteger();
    private final int throttleThresholdPercentage;

    @Inject
    public KafkaJournal(@Named(value="message_journal_dir") Path journalDirectory, @Named(value="scheduler") ScheduledExecutorService scheduler, @Named(value="message_journal_segment_size") Size segmentSize, @Named(value="message_journal_segment_age") Duration segmentAge, @Named(value="message_journal_max_size") Size retentionSize, @Named(value="message_journal_max_age") Duration retentionAge, @Named(value="message_journal_flush_interval") long flushInterval, @Named(value="message_journal_flush_age") Duration flushAge, @Named(value="lb_throttle_threshold_percentage") int throttleThresholdPercentage, MetricRegistry metricRegistry, ServerStatus serverStatus) {
        this(journalDirectory, scheduler, segmentSize, segmentAge, retentionSize, retentionAge, flushInterval, flushAge, throttleThresholdPercentage, metricRegistry, serverStatus, KafkaJournal.class.getName());
    }

    public KafkaJournal(Path journalDirectory, ScheduledExecutorService scheduler, Size segmentSize, Duration segmentAge, Size retentionSize, Duration retentionAge, long flushInterval, Duration flushAge, int throttleThresholdPercentage, MetricRegistry metricRegistry, ServerStatus serverStatus, String metricPrefix) {
        this.throttleThresholdPercentage = throttleThresholdPercentage == -1 ? throttleThresholdPercentage : KafkaJournal.intRange(throttleThresholdPercentage, 0, 100);
        this.scheduler = scheduler;
        this.serverStatus = serverStatus;
        this.maxSegmentSize = segmentSize.toBytes();
        this.maxMessageSize = Ints.saturatedCast((long)this.maxSegmentSize);
        this.metricPrefix = metricPrefix;
        this.metricRegistry = metricRegistry;
        this.writtenMessages = metricRegistry.meter(MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{METER_WRITTEN_MESSAGES}));
        this.readMessages = metricRegistry.meter(MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{METER_READ_MESSAGES}));
        this.writeDiscardedMessages = metricRegistry.meter(MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{METER_WRITE_DISCARDED_MESSAGES}));
        this.registerUncommittedGauge(metricRegistry, MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{GAUGE_UNCOMMITTED_MESSAGES}));
        this.writeTime = this.registerHdrTimer(metricRegistry, MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{TIMER_WRITE_TIME}));
        this.readTime = this.registerHdrTimer(metricRegistry, MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{TIMER_READ_TIME}));
        ImmutableMap config = ImmutableMap.builder().put((Object)LogConfig.SegmentBytesProp(), (Object)Ints.saturatedCast((long)segmentSize.toBytes())).put((Object)LogConfig.SegmentMsProp(), (Object)segmentAge.getMillis()).put((Object)LogConfig.SegmentJitterMsProp(), (Object)0).put((Object)LogConfig.FlushMessagesProp(), (Object)flushInterval).put((Object)LogConfig.FlushMsProp(), (Object)flushAge.getMillis()).put((Object)LogConfig.RetentionBytesProp(), (Object)retentionSize.toBytes()).put((Object)LogConfig.RetentionMsProp(), (Object)retentionAge.getMillis()).put((Object)LogConfig.MaxMessageBytesProp(), (Object)this.maxMessageSize).put((Object)LogConfig.SegmentIndexBytesProp(), (Object)Ints.saturatedCast((long)Size.megabytes((long)1L).toBytes())).put((Object)LogConfig.IndexIntervalBytesProp(), (Object)4096).put((Object)LogConfig.FileDeleteDelayMsProp(), (Object)TimeUnit.MINUTES.toMillis(1L)).put((Object)LogConfig.DeleteRetentionMsProp(), (Object)TimeUnit.DAYS.toMillis(1L)).put((Object)LogConfig.MinCleanableDirtyRatioProp(), (Object)0.5).put((Object)LogConfig.Compact(), (Object)false).put((Object)LogConfig.UncleanLeaderElectionEnableProp(), (Object)true).put((Object)LogConfig.MinInSyncReplicasProp(), (Object)1).build();
        LogConfig defaultConfig = new LogConfig((java.util.Map)config);
        CleanerConfig cleanerConfig = new CleanerConfig(1, Size.megabytes((long)4L).toBytes(), 0.9, Ints.saturatedCast((long)Size.megabytes((long)1L).toBytes()), Ints.saturatedCast((long)Size.megabytes((long)32L).toBytes()), (double)Ints.saturatedCast((long)Size.megabytes((long)5L).toBytes()), TimeUnit.SECONDS.toMillis(15L), false, "MD5");
        if (!Files.exists(journalDirectory, new LinkOption[0])) {
            try {
                Files.createDirectories(journalDirectory, new FileAttribute[0]);
            }
            catch (IOException e) {
                LOG.error("Cannot create journal directory at {}, please check the permissions", (Object)journalDirectory.toAbsolutePath());
                throw new UncheckedIOException(e);
            }
        }
        this.committedReadOffsetFile = new File(journalDirectory.toFile(), "graylog2-committed-read-offset");
        try {
            String line;
            if (!this.committedReadOffsetFile.createNewFile() && (line = com.google.common.io.Files.asCharSource((File)this.committedReadOffsetFile, (Charset)StandardCharsets.UTF_8).readFirstLine()) != null) {
                this.committedOffset.set(Long.parseLong(line.trim()));
                this.nextReadOffset = this.committedOffset.get() + 1L;
            }
        }
        catch (IOException e) {
            LOG.error("Cannot access offset file: {}", (Object)e.getMessage());
            AccessDeniedException accessDeniedException = new AccessDeniedException(this.committedReadOffsetFile.getAbsolutePath(), null, e.getMessage());
            throw new RuntimeException(accessDeniedException);
        }
        try {
            BrokerState brokerState = new BrokerState();
            brokerState.newState(RunningAsBroker.state());
            this.kafkaScheduler = new KafkaScheduler(2, "kafka-journal-scheduler-", false);
            this.kafkaScheduler.startup();
            this.logManager = new LogManager(new File[]{journalDirectory.toFile()}, (Map)Map$.MODULE$.empty(), defaultConfig, cleanerConfig, 1, TimeUnit.SECONDS.toMillis(60L), TimeUnit.SECONDS.toMillis(60L), TimeUnit.SECONDS.toMillis(60L), (Scheduler)this.kafkaScheduler, brokerState, JODA_TIME);
            TopicAndPartition topicAndPartition = new TopicAndPartition("messagejournal", 0);
            Option messageLog = this.logManager.getLog(topicAndPartition);
            this.kafkaLog = messageLog.isEmpty() ? this.logManager.createLog(topicAndPartition, this.logManager.defaultConfig()) : (Log)messageLog.get();
            this.metricRegistry.register(MetricRegistry.name((String)metricPrefix, (String[])new String[]{METRIC_NAME_SIZE}), (Metric)((Gauge)() -> ((Log)this.kafkaLog).size()));
            this.metricRegistry.register(MetricRegistry.name((String)metricPrefix, (String[])new String[]{METRIC_NAME_LOG_END_OFFSET}), (Metric)((Gauge)() -> ((Log)this.kafkaLog).logEndOffset()));
            this.metricRegistry.register(MetricRegistry.name((String)metricPrefix, (String[])new String[]{METRIC_NAME_NUMBER_OF_SEGMENTS}), (Metric)((Gauge)() -> ((Log)this.kafkaLog).numberOfSegments()));
            this.metricRegistry.register(MetricRegistry.name((String)metricPrefix, (String[])new String[]{METRIC_NAME_UNFLUSHED_MESSAGES}), (Metric)((Gauge)() -> ((Log)this.kafkaLog).unflushedMessages()));
            this.metricRegistry.register(MetricRegistry.name((String)metricPrefix, (String[])new String[]{METRIC_NAME_RECOVERY_POINT}), (Metric)((Gauge)() -> ((Log)this.kafkaLog).recoveryPoint()));
            this.metricRegistry.register(MetricRegistry.name((String)metricPrefix, (String[])new String[]{METRIC_NAME_LAST_FLUSH_TIME}), (Metric)((Gauge)() -> ((Log)this.kafkaLog).lastFlushTime()));
            this.metricRegistry.register(this.getOldestSegmentMetricName(), (Metric)new Gauge<Date>(){

                public Date getValue() {
                    long oldestSegment = Long.MAX_VALUE;
                    for (LogSegment segment : KafkaJournal.this.getSegments()) {
                        oldestSegment = Math.min(oldestSegment, segment.created());
                    }
                    return new Date(oldestSegment);
                }
            });
            LOG.info("Initialized Kafka based journal at {}", (Object)journalDirectory);
            this.offsetFlusher = new OffsetFileFlusher();
            this.dirtyLogFlusher = new DirtyLogFlusher();
            this.recoveryCheckpointFlusher = new RecoveryCheckpointFlusher();
            this.logRetentionCleaner = new LogRetentionCleaner();
        }
        catch (KafkaException e) {
            LOG.error("Unable to start logmanager.", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private static int intRange(int i, int min, int max) {
        return Integer.min(Integer.max(min, i), max);
    }

    private Timer registerHdrTimer(MetricRegistry metricRegistry, String metricName) {
        Timer timer;
        try {
            timer = (Timer)metricRegistry.register(metricName, (Metric)new HdrTimer(1L, TimeUnit.MINUTES, 1));
        }
        catch (IllegalArgumentException e) {
            SortedMap timers = metricRegistry.getTimers((name, metric) -> metricName.equals(name));
            timer = (Timer)Iterables.getOnlyElement(timers.values());
        }
        return timer;
    }

    private void registerUncommittedGauge(MetricRegistry metricRegistry, String name) {
        try {
            metricRegistry.register(name, (Metric)((Gauge)() -> Math.max(0L, this.getLogEndOffset() - 1L - this.committedOffset.get())));
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    public int getPurgedSegmentsInLastRetention() {
        return this.purgedSegmentsInLastRetention.get();
    }

    private void teardownLogMetrics() {
        this.metricRegistry.remove(MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{METER_WRITTEN_MESSAGES}));
        this.metricRegistry.remove(MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{METER_READ_MESSAGES}));
        this.metricRegistry.remove(MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{METER_WRITE_DISCARDED_MESSAGES}));
        this.metricRegistry.remove(MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{GAUGE_UNCOMMITTED_MESSAGES}));
        this.metricRegistry.remove(MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{TIMER_WRITE_TIME}));
        this.metricRegistry.remove(MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{TIMER_READ_TIME}));
        this.metricRegistry.remove(MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{METRIC_NAME_SIZE}));
        this.metricRegistry.remove(MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{METRIC_NAME_LOG_END_OFFSET}));
        this.metricRegistry.remove(MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{METRIC_NAME_NUMBER_OF_SEGMENTS}));
        this.metricRegistry.remove(MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{METRIC_NAME_UNFLUSHED_MESSAGES}));
        this.metricRegistry.remove(MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{METRIC_NAME_RECOVERY_POINT}));
        this.metricRegistry.remove(MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{METRIC_NAME_LAST_FLUSH_TIME}));
        this.metricRegistry.remove(this.getOldestSegmentMetricName());
    }

    private String getOldestSegmentMetricName() {
        String journalOldestSegmentMetricName = GlobalMetricNames.JOURNAL_OLDEST_SEGMENT;
        if (!KafkaJournal.class.getName().equals(this.metricPrefix)) {
            journalOldestSegmentMetricName = MetricRegistry.name((String)this.metricPrefix, (String[])new String[]{"oldest-segment"});
        }
        return journalOldestSegmentMetricName;
    }

    @Override
    public Journal.Entry createEntry(byte[] idBytes, byte[] messageBytes) {
        return new Journal.Entry(idBytes, messageBytes);
    }

    @Override
    public long write(List<Journal.Entry> entries) {
        try (Timer.Context ignored = this.writeTime.time();){
            long payloadSize = 0L;
            long messageSetSize = 0L;
            long lastWriteOffset = 0L;
            ArrayList<Message> messages = new ArrayList<Message>(entries.size());
            for (Journal.Entry entry : entries) {
                byte[] messageBytes = entry.getMessageBytes();
                byte[] idBytes = entry.getIdBytes();
                payloadSize += (long)messageBytes.length;
                Message newMessage = new Message(messageBytes, idBytes);
                int newMessageSize = MessageSet.entrySize((Message)newMessage);
                if (newMessageSize > this.maxMessageSize) {
                    this.writeDiscardedMessages.mark();
                    LOG.warn("Message with ID <{}> is too large to store in journal, skipping! (size: {} bytes / max: {} bytes)", new Object[]{new String(idBytes, StandardCharsets.UTF_8), newMessageSize, this.maxMessageSize});
                    payloadSize = 0L;
                    continue;
                }
                if (messageSetSize + (long)newMessageSize > this.maxSegmentSize) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Flushing {} bytes message set with {} messages to avoid overflowing segment with max size of {} bytes", new Object[]{messageSetSize, messages.size(), this.maxSegmentSize});
                    }
                    lastWriteOffset = this.flushMessages(messages, payloadSize);
                    messages.clear();
                    messageSetSize = 0L;
                    payloadSize = 0L;
                }
                messages.add(newMessage);
                messageSetSize += (long)newMessageSize;
                if (!LOG.isTraceEnabled()) continue;
                LOG.trace("Message {} contains bytes {}", (Object)Tools.bytesToHex(idBytes), (Object)Tools.bytesToHex(messageBytes));
            }
            if (messages.size() > 0) {
                lastWriteOffset = this.flushMessages(messages, payloadSize);
            }
            long l = lastWriteOffset;
            return l;
        }
    }

    private long flushMessages(List<Message> messages, long payloadSize) {
        if (messages.isEmpty()) {
            LOG.debug("No messages to flush, not trying to write an empty message set.");
            return -1L;
        }
        ByteBufferMessageSet messageSet = new ByteBufferMessageSet(JavaConversions.asScalaBuffer(messages).toSeq());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Trying to write ByteBufferMessageSet with size of {} bytes to journal", (Object)messageSet.sizeInBytes());
        }
        LogAppendInfo appendInfo = this.kafkaLog.append(messageSet, true);
        long lastWriteOffset = appendInfo.lastOffset();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Wrote {} messages to journal: {} bytes (payload {} bytes), log position {} to {}", new Object[]{messages.size(), messageSet.sizeInBytes(), payloadSize, appendInfo.firstOffset(), lastWriteOffset});
        }
        this.writtenMessages.mark((long)messages.size());
        return lastWriteOffset;
    }

    @Override
    public long write(byte[] idBytes, byte[] messageBytes) {
        Journal.Entry journalEntry = this.createEntry(idBytes, messageBytes);
        return this.write(Collections.singletonList(journalEntry));
    }

    @Override
    public List<Journal.JournalReadEntry> read(long requestedMaximumCount) {
        return this.read(this.nextReadOffset, requestedMaximumCount);
    }

    public List<Journal.JournalReadEntry> read(long readOffset, long requestedMaximumCount) {
        long maximumCount = Math.max(1L, requestedMaximumCount);
        long maxOffset = readOffset + maximumCount;
        if (this.shuttingDown) {
            return Collections.emptyList();
        }
        ArrayList<Journal.JournalReadEntry> messages = new ArrayList<Journal.JournalReadEntry>(Ints.saturatedCast((long)maximumCount));
        try (Timer.Context ignored = this.readTime.time();){
            long logStartOffset = this.getLogStartOffset();
            if (readOffset < logStartOffset) {
                LOG.info("Read offset {} before start of log at {}, starting to read from the beginning of the journal.", (Object)readOffset, (Object)logStartOffset);
                readOffset = logStartOffset;
                maxOffset = readOffset + maximumCount;
            }
            LOG.debug("Requesting to read a maximum of {} messages (or 5MB) from the journal, offset interval [{}, {})", new Object[]{maximumCount, readOffset, maxOffset});
            MessageSet messageSet = this.kafkaLog.read(readOffset, 0x500000, Option.apply((Object)maxOffset)).messageSet();
            Iterator iterator = messageSet.iterator();
            long firstOffset = Long.MIN_VALUE;
            long lastOffset = Long.MIN_VALUE;
            long totalBytes = 0L;
            while (iterator.hasNext()) {
                MessageAndOffset messageAndOffset = (MessageAndOffset)iterator.next();
                if (firstOffset == Long.MIN_VALUE) {
                    firstOffset = messageAndOffset.offset();
                }
                lastOffset = messageAndOffset.offset();
                byte[] payloadBytes = ByteBufferUtils.readBytes(messageAndOffset.message().payload());
                if (LOG.isTraceEnabled()) {
                    byte[] keyBytes = ByteBufferUtils.readBytes(messageAndOffset.message().key());
                    LOG.trace("Read message {} contains {}", (Object)Tools.bytesToHex(keyBytes), (Object)Tools.bytesToHex(payloadBytes));
                }
                totalBytes += (long)payloadBytes.length;
                messages.add(new Journal.JournalReadEntry(payloadBytes, messageAndOffset.offset()));
                this.nextReadOffset = messageAndOffset.nextOffset();
            }
            if (messages.isEmpty()) {
                LOG.debug("No messages available to read for offset interval [{}, {}).", (Object)readOffset, (Object)maxOffset);
            } else {
                LOG.debug("Read {} messages, total payload size {}, from journal, offset interval [{}, {}], requested read at {}", new Object[]{messages.size(), totalBytes, firstOffset, lastOffset, readOffset});
            }
        }
        catch (OffsetOutOfRangeException e) {
            LOG.debug("Offset out of range, no messages available starting at offset {}", (Object)readOffset);
        }
        catch (Exception e) {
            if (this.shuttingDown) {
                LOG.debug("Caught exception during shutdown, ignoring it because we might have been blocked on a read.");
                return Collections.emptyList();
            }
            if (e instanceof ClosedByInterruptException) {
                LOG.debug("Interrupted while reading from journal, during shutdown this is harmless and ignored.", (Throwable)e);
            }
            throw e;
        }
        this.readMessages.mark((long)messages.size());
        return messages;
    }

    @Override
    public void markJournalOffsetCommitted(long offset) {
        long prev;
        int i = 0;
        do {
            prev = this.committedOffset.get();
            if (++i % 10 != 0) continue;
            LOG.warn("Committing journal offset spins {} times now, this might be a bug. Continuing to try update.", (Object)i);
        } while (!this.committedOffset.compareAndSet(prev, Math.max(offset, prev)));
    }

    protected void flushDirtyLogs() {
        LOG.debug("Checking for dirty logs to flush...");
        Set entries = JavaConversions.mapAsJavaMap((Map)this.logManager.logsByTopicPartition()).entrySet();
        for (Map.Entry topicAndPartitionLogEntry : entries) {
            TopicAndPartition topicAndPartition = (TopicAndPartition)topicAndPartitionLogEntry.getKey();
            Log kafkaLog = (Log)topicAndPartitionLogEntry.getValue();
            long timeSinceLastFlush = JODA_TIME.milliseconds() - kafkaLog.lastFlushTime();
            try {
                LOG.debug("Checking if flush is needed on {} flush interval {} last flushed {} time since last flush: {}", new Object[]{topicAndPartition.topic(), kafkaLog.config().flushInterval(), kafkaLog.lastFlushTime(), timeSinceLastFlush});
                if (timeSinceLastFlush < kafkaLog.config().flushMs()) continue;
                kafkaLog.flush();
            }
            catch (Exception e) {
                LOG.error("Error flushing topic " + topicAndPartition.topic(), (Throwable)e);
            }
        }
    }

    public long getCommittedOffset() {
        return this.committedOffset.get();
    }

    public long getNextReadOffset() {
        return this.nextReadOffset;
    }

    protected void startUp() throws Exception {
        this.dirtyLogFlushFuture = this.scheduler.scheduleAtFixedRate(this.dirtyLogFlusher, TimeUnit.SECONDS.toMillis(30L), this.logManager.flushCheckMs(), TimeUnit.MILLISECONDS);
        this.checkpointFlusherFuture = this.scheduler.scheduleAtFixedRate(this.recoveryCheckpointFlusher, TimeUnit.SECONDS.toMillis(30L), this.logManager.flushCheckpointMs(), TimeUnit.MILLISECONDS);
        this.logRetentionFuture = this.scheduler.scheduleAtFixedRate(this.logRetentionCleaner, TimeUnit.SECONDS.toMillis(30L), this.logManager.retentionCheckMs(), TimeUnit.MILLISECONDS);
        this.offsetFlusherFuture = this.scheduler.scheduleAtFixedRate(this.offsetFlusher, 1L, 1L, TimeUnit.SECONDS);
    }

    protected void shutDown() throws Exception {
        LOG.debug("Shutting down journal!");
        this.shuttingDown = true;
        this.offsetFlusherFuture.cancel(false);
        this.logRetentionFuture.cancel(false);
        this.checkpointFlusherFuture.cancel(false);
        this.dirtyLogFlushFuture.cancel(false);
        this.kafkaScheduler.shutdown();
        this.logManager.shutdown();
        this.offsetFlusher.run();
        this.teardownLogMetrics();
    }

    public int cleanupLogs() {
        try {
            return this.logRetentionCleaner.call();
        }
        catch (Exception e) {
            LOG.error("Unable to delete expired segments.", (Throwable)e);
            return 0;
        }
    }

    public java.lang.Iterable<LogSegment> getSegments() {
        return JavaConversions.asJavaIterable((Iterable)this.kafkaLog.logSegments());
    }

    public long size() {
        return this.kafkaLog.size();
    }

    public int numberOfSegments() {
        return this.kafkaLog.numberOfSegments();
    }

    public long getCommittedReadOffset() {
        return this.committedOffset.get();
    }

    public void truncateTo(long offset) {
        this.kafkaLog.truncateTo(offset);
    }

    public long getLogStartOffset() {
        java.lang.Iterable logSegments = JavaConversions.asJavaIterable((Iterable)this.kafkaLog.logSegments());
        LogSegment segment = (LogSegment)Iterables.getFirst((java.lang.Iterable)logSegments, null);
        if (segment == null) {
            return 0L;
        }
        return segment.baseOffset();
    }

    public long getLogEndOffset() {
        return this.kafkaLog.logEndOffset();
    }

    public ThrottleState getThrottleState() {
        return this.throttleState.get();
    }

    public void setThrottleState(ThrottleState state) {
        this.throttleState.set(state);
    }

    public class DirtyLogFlusher
    implements Runnable {
        @Override
        public void run() {
            try {
                KafkaJournal.this.flushDirtyLogs();
            }
            catch (Exception e) {
                LOG.error("Unable to flush dirty logs. Will try again.", (Throwable)e);
            }
        }
    }

    public class RecoveryCheckpointFlusher
    implements Runnable {
        @Override
        public void run() {
            try {
                KafkaJournal.this.logManager.checkpointRecoveryPointOffsets();
            }
            catch (Exception e) {
                LOG.error("Unable to flush checkpoint recovery point offsets. Will try again.", (Throwable)e);
            }
        }
    }

    public class LogRetentionCleaner
    implements Runnable,
    Callable<Integer> {
        private final Logger loggerForCleaner = LoggerFactory.getLogger(LogRetentionCleaner.class);

        @Override
        public void run() {
            try {
                this.call();
            }
            catch (Exception e) {
                this.loggerForCleaner.error("Unable to delete expired segments. Will try again.", (Throwable)e);
            }
        }

        @Override
        public Integer call() throws Exception {
            this.loggerForCleaner.debug("Beginning log cleanup");
            int total = 0;
            Timer.Context ctx = new Timer().time();
            for (Log kafkaLog : JavaConversions.asJavaIterable((Iterable)KafkaJournal.this.logManager.allLogs())) {
                if (kafkaLog.config().compact()) continue;
                this.loggerForCleaner.debug("Garbage collecting {}", (Object)kafkaLog.name());
                total += this.cleanupExpiredSegments(kafkaLog) + this.cleanupSegmentsToMaintainSize(kafkaLog) + this.cleanupSegmentsToRemoveCommitted(kafkaLog);
            }
            this.loggerForCleaner.debug("Log cleanup completed. {} files deleted in {} seconds", (Object)total, (Object)TimeUnit.NANOSECONDS.toSeconds(ctx.stop()));
            return total;
        }

        private int cleanupExpiredSegments(final Log kafkaLog) {
            if (kafkaLog.size() == 0L && kafkaLog.numberOfSegments() < 1) {
                KafkaJournal.this.purgedSegmentsInLastRetention.set(0);
                return 0;
            }
            int deletedSegments = kafkaLog.deleteOldSegments((Function1)new AbstractFunction1<LogSegment, Object>(){

                public Object apply(LogSegment segment) {
                    boolean shouldDelete;
                    long segmentAge = JODA_TIME.milliseconds() - segment.lastModified();
                    boolean bl = shouldDelete = segmentAge > kafkaLog.config().retentionMs();
                    if (shouldDelete) {
                        LogRetentionCleaner.this.loggerForCleaner.debug("[cleanup-time] Removing segment with age {}s, older than then maximum retention age {}s", (Object)TimeUnit.MILLISECONDS.toSeconds(segmentAge), (Object)TimeUnit.MILLISECONDS.toSeconds(kafkaLog.config().retentionMs()));
                    }
                    return shouldDelete;
                }
            });
            KafkaJournal.this.purgedSegmentsInLastRetention.set(deletedSegments);
            return deletedSegments;
        }

        private void updateLoadBalancerStatus(double utilizationPercentage) {
            LoadBalancerStatus currentStatus = KafkaJournal.this.serverStatus.getLifecycle().getLoadbalancerStatus();
            if (currentStatus == LoadBalancerStatus.THROTTLED && utilizationPercentage < (double)KafkaJournal.this.throttleThresholdPercentage) {
                KafkaJournal.this.serverStatus.running();
                LOG.info(String.format(Locale.ENGLISH, "Journal usage is %.2f%% (threshold %d%%), changing load balancer status from THROTTLED to ALIVE", utilizationPercentage, KafkaJournal.this.throttleThresholdPercentage));
            } else if (currentStatus == LoadBalancerStatus.ALIVE && utilizationPercentage >= (double)KafkaJournal.this.throttleThresholdPercentage) {
                KafkaJournal.this.serverStatus.throttle();
                LOG.info(String.format(Locale.ENGLISH, "Journal usage is %.2f%% (threshold %d%%), changing load balancer status from ALIVE to THROTTLED", utilizationPercentage, KafkaJournal.this.throttleThresholdPercentage));
            }
        }

        private int cleanupSegmentsToMaintainSize(Log kafkaLog) {
            double utilizationPercentage;
            final long retentionSize = kafkaLog.config().retentionSize();
            long currentSize = kafkaLog.size();
            double d = utilizationPercentage = retentionSize > 0L ? (double)(currentSize * 100L / retentionSize) : 0.0;
            if (utilizationPercentage > 95.0) {
                LOG.warn("Journal utilization ({}%) has gone over {}%.", (Object)utilizationPercentage, (Object)95);
            }
            if (KafkaJournal.this.throttleThresholdPercentage != -1) {
                this.updateLoadBalancerStatus(utilizationPercentage);
            }
            if (retentionSize < 0L || currentSize < retentionSize) {
                KafkaJournal.this.purgedSegmentsInLastRetention.set(0);
                return 0;
            }
            final long[] diff = new long[]{currentSize - retentionSize};
            int deletedSegments = kafkaLog.deleteOldSegments((Function1)new AbstractFunction1<LogSegment, Object>(){

                public Object apply(LogSegment segment) {
                    if (diff[0] - segment.size() >= 0L) {
                        diff[0] = diff[0] - segment.size();
                        LogRetentionCleaner.this.loggerForCleaner.debug("[cleanup-size] Removing segment starting at offset {}, size {} bytes, to shrink log to new size {}, target size {}", new Object[]{segment.baseOffset(), segment.size(), diff[0], retentionSize});
                        return true;
                    }
                    return false;
                }
            });
            KafkaJournal.this.purgedSegmentsInLastRetention.set(deletedSegments);
            return deletedSegments;
        }

        private int cleanupSegmentsToRemoveCommitted(Log kafkaLog) {
            if (kafkaLog.numberOfSegments() <= 1) {
                this.loggerForCleaner.debug("[cleanup-committed] The journal is already minimal at {} segment(s), not trying to remove more segments.", (Object)kafkaLog.numberOfSegments());
                return 0;
            }
            final long committedOffset = KafkaJournal.this.committedOffset.get();
            final HashSet logSegments = Sets.newHashSet((java.lang.Iterable)JavaConversions.asJavaIterable((Iterable)kafkaLog.logSegments(committedOffset, Long.MAX_VALUE)));
            this.loggerForCleaner.debug("[cleanup-committed] Keeping segments {}", (Object)logSegments);
            return kafkaLog.deleteOldSegments((Function1)new AbstractFunction1<LogSegment, Object>(){

                public Object apply(LogSegment segment) {
                    boolean shouldDelete;
                    boolean bl = shouldDelete = !logSegments.contains(segment);
                    if (shouldDelete) {
                        LogRetentionCleaner.this.loggerForCleaner.debug("[cleanup-committed] Should delete segment {} because it is prior to committed offset {}", (Object)segment, (Object)committedOffset);
                    }
                    return shouldDelete;
                }
            });
        }
    }

    public class OffsetFileFlusher
    implements Runnable {
        @Override
        public void run() {
            if (KafkaJournal.this.committedOffset.get() == Long.MIN_VALUE) {
                return;
            }
            try (FileOutputStream fos = new FileOutputStream(KafkaJournal.this.committedReadOffsetFile);){
                fos.write(String.valueOf(KafkaJournal.this.committedOffset.get()).getBytes(StandardCharsets.UTF_8));
                fos.flush();
                fos.getFD().sync();
            }
            catch (SyncFailedException e) {
                LOG.error("Cannot sync " + KafkaJournal.this.committedReadOffsetFile.getAbsolutePath() + " to disk. Continuing anyway, but there is no guarantee that the file has been written.", (Throwable)e);
            }
            catch (IOException e) {
                LOG.error("Cannot write " + KafkaJournal.this.committedReadOffsetFile.getAbsolutePath() + " to disk.", (Throwable)e);
            }
        }
    }
}

