/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.metalog;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.metalog.MetaLogLeader;
import org.apache.kafka.metalog.MetaLogListener;
import org.apache.kafka.metalog.MetaLogManager;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class LocalLogManager
implements MetaLogManager,
AutoCloseable {
    private final Logger log;
    private final int nodeId;
    private final SharedLogData shared;
    private final EventQueue eventQueue;
    private boolean initialized = false;
    private boolean shutdown = false;
    private long maxReadOffset = Long.MAX_VALUE;
    private final List<MetaLogListenerData> listeners = new ArrayList<MetaLogListenerData>();
    private volatile MetaLogLeader leader = new MetaLogLeader(-1, -1L);

    public LocalLogManager(LogContext logContext, int nodeId, SharedLogData shared, String threadNamePrefix) {
        this.log = logContext.logger(LocalLogManager.class);
        this.nodeId = nodeId;
        this.shared = shared;
        this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix);
        shared.registerLogManager(this);
    }

    private void scheduleLogCheck() {
        this.eventQueue.append(() -> {
            try {
                this.log.debug("Node {}: running log check.", (Object)this.nodeId);
                int numEntriesFound = 0;
                block2: for (MetaLogListenerData listenerData : this.listeners) {
                    while (true) {
                        LocalBatch batch;
                        Map.Entry<Long, LocalBatch> entry;
                        if ((entry = this.shared.nextBatch(listenerData.offset)) == null) {
                            this.log.trace("Node {}: reached the end of the log after finding {} entries.", (Object)this.nodeId, (Object)numEntriesFound);
                            continue block2;
                        }
                        long entryOffset = entry.getKey();
                        if (entryOffset > this.maxReadOffset) {
                            this.log.trace("Node {}: after {} entries, not reading the next entry because its offset is {}, and maxReadOffset is {}.", new Object[]{this.nodeId, numEntriesFound, entryOffset, this.maxReadOffset});
                            continue block2;
                        }
                        if (entry.getValue() instanceof LeaderChangeBatch) {
                            batch = (LeaderChangeBatch)entry.getValue();
                            this.log.trace("Node {}: handling LeaderChange to {}.", (Object)this.nodeId, (Object)((LeaderChangeBatch)batch).newLeader);
                            listenerData.listener.handleNewLeader(((LeaderChangeBatch)batch).newLeader);
                            if (((LeaderChangeBatch)batch).newLeader.epoch() > this.leader.epoch()) {
                                this.leader = ((LeaderChangeBatch)batch).newLeader;
                            }
                        } else if (entry.getValue() instanceof LocalRecordBatch) {
                            batch = (LocalRecordBatch)entry.getValue();
                            this.log.trace("Node {}: handling LocalRecordBatch with offset {}.", (Object)this.nodeId, (Object)entryOffset);
                            listenerData.listener.handleCommits(entryOffset, ((LocalRecordBatch)batch).records);
                        }
                        ++numEntriesFound;
                        listenerData.offset = entryOffset;
                    }
                }
                this.log.trace("Completed log check for node " + this.nodeId);
            }
            catch (Exception e) {
                this.log.error("Exception while handling log check", (Throwable)e);
            }
        });
    }

    public void beginShutdown() {
        this.eventQueue.beginShutdown("beginShutdown", () -> {
            try {
                if (this.initialized && !this.shutdown) {
                    this.log.debug("Node {}: beginning shutdown.", (Object)this.nodeId);
                    this.renounce(this.leader.epoch());
                    for (MetaLogListenerData listenerData : this.listeners) {
                        listenerData.listener.beginShutdown();
                    }
                    this.shared.unregisterLogManager(this);
                }
            }
            catch (Exception e) {
                this.log.error("Unexpected exception while sending beginShutdown callbacks", (Throwable)e);
            }
            this.shutdown = true;
        });
    }

    @Override
    public void close() throws InterruptedException {
        this.log.debug("Node {}: closing.", (Object)this.nodeId);
        this.beginShutdown();
        this.eventQueue.close();
    }

    @Override
    public void initialize() throws Exception {
        this.eventQueue.append(() -> {
            this.log.debug("initialized local log manager for node " + this.nodeId);
            this.initialized = true;
        });
    }

    @Override
    public void register(MetaLogListener listener) throws Exception {
        CompletableFuture future = new CompletableFuture();
        this.eventQueue.append(() -> {
            if (this.shutdown) {
                this.log.info("Node {}: can't register because local log manager has already been shut down.", (Object)this.nodeId);
                future.complete(null);
            } else if (this.initialized) {
                this.log.info("Node {}: registered MetaLogListener.", (Object)this.nodeId);
                this.listeners.add(new MetaLogListenerData(listener));
                this.shared.electLeaderIfNeeded();
                this.scheduleLogCheck();
                future.complete(null);
            } else {
                this.log.info("Node {}: can't register because local log manager has not been initialized.", (Object)this.nodeId);
                future.completeExceptionally(new RuntimeException("LocalLogManager was not initialized."));
            }
        });
        future.get();
    }

    @Override
    public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
        return this.scheduleAtomicWrite(epoch, batch);
    }

    @Override
    public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
        return this.shared.tryAppend(this.nodeId, this.leader.epoch(), new LocalRecordBatch(batch.stream().map(ApiMessageAndVersion::message).collect(Collectors.toList())));
    }

    @Override
    public void renounce(long epoch) {
        MetaLogLeader curLeader = this.leader;
        MetaLogLeader nextLeader = new MetaLogLeader(-1, curLeader.epoch() + 1L);
        this.shared.tryAppend(this.nodeId, curLeader.epoch(), new LeaderChangeBatch(nextLeader));
    }

    @Override
    public MetaLogLeader leader() {
        return this.leader;
    }

    @Override
    public int nodeId() {
        return this.nodeId;
    }

    public List<MetaLogListener> listeners() {
        CompletableFuture future = new CompletableFuture();
        this.eventQueue.append(() -> future.complete(this.listeners.stream().map(l -> ((MetaLogListenerData)l).listener).collect(Collectors.toList())));
        try {
            return (List)future.get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void setMaxReadOffset(long maxReadOffset) {
        CompletableFuture future = new CompletableFuture();
        this.eventQueue.append(() -> {
            this.log.trace("Node {}: set maxReadOffset to {}.", (Object)this.nodeId, (Object)maxReadOffset);
            this.maxReadOffset = maxReadOffset;
            this.scheduleLogCheck();
            future.complete(null);
        });
        try {
            future.get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private static class MetaLogListenerData {
        private long offset = -1L;
        private final MetaLogListener listener;

        MetaLogListenerData(MetaLogListener listener) {
            this.listener = listener;
        }
    }

    public static class SharedLogData {
        private final Logger log = LoggerFactory.getLogger(SharedLogData.class);
        private final HashMap<Integer, LocalLogManager> logManagers = new HashMap();
        private final TreeMap<Long, LocalBatch> batches = new TreeMap();
        private MetaLogLeader leader = new MetaLogLeader(-1, -1L);
        private long prevOffset = -1L;

        synchronized void registerLogManager(LocalLogManager logManager) {
            if (this.logManagers.put(logManager.nodeId(), logManager) != null) {
                throw new RuntimeException("Can't have multiple LocalLogManagers with id " + logManager.nodeId());
            }
            this.electLeaderIfNeeded();
        }

        synchronized void unregisterLogManager(LocalLogManager logManager) {
            if (!this.logManagers.remove(logManager.nodeId(), logManager)) {
                throw new RuntimeException("Log manager " + logManager.nodeId() + " was not found.");
            }
        }

        synchronized long tryAppend(int nodeId, long epoch, LocalBatch batch) {
            if (epoch != this.leader.epoch()) {
                this.log.trace("tryAppend(nodeId={}, epoch={}): the provided epoch does not match the current leader epoch of {}.", new Object[]{nodeId, epoch, this.leader.epoch()});
                return Long.MAX_VALUE;
            }
            if (nodeId != this.leader.nodeId()) {
                this.log.trace("tryAppend(nodeId={}, epoch={}): the given node id does not match the current leader id of {}.", new Object[]{nodeId, epoch, this.leader.nodeId()});
                return Long.MAX_VALUE;
            }
            this.log.trace("tryAppend(nodeId={}): appending {}.", (Object)nodeId, (Object)batch);
            long offset = this.append(batch);
            this.electLeaderIfNeeded();
            return offset;
        }

        synchronized long append(LocalBatch batch) {
            this.prevOffset += (long)batch.size();
            this.log.debug("append(batch={}, prevOffset={})", (Object)batch, (Object)this.prevOffset);
            this.batches.put(this.prevOffset, batch);
            if (batch instanceof LeaderChangeBatch) {
                LeaderChangeBatch leaderChangeBatch = (LeaderChangeBatch)batch;
                this.leader = leaderChangeBatch.newLeader;
            }
            for (LocalLogManager logManager : this.logManagers.values()) {
                logManager.scheduleLogCheck();
            }
            return this.prevOffset;
        }

        synchronized void electLeaderIfNeeded() {
            if (this.leader.nodeId() != -1 || this.logManagers.isEmpty()) {
                return;
            }
            int nextLeaderIndex = ThreadLocalRandom.current().nextInt(this.logManagers.size());
            Iterator<Integer> iter = this.logManagers.keySet().iterator();
            Integer nextLeaderNode = null;
            for (int i = 0; i <= nextLeaderIndex; ++i) {
                nextLeaderNode = iter.next();
            }
            MetaLogLeader newLeader = new MetaLogLeader(nextLeaderNode, this.leader.epoch() + 1L);
            this.log.info("Elected new leader: {}.", (Object)newLeader);
            this.append(new LeaderChangeBatch(newLeader));
        }

        synchronized Map.Entry<Long, LocalBatch> nextBatch(long offset) {
            Map.Entry<Long, LocalBatch> entry = this.batches.higherEntry(offset);
            if (entry == null) {
                return null;
            }
            return new AbstractMap.SimpleImmutableEntry<Long, LocalBatch>(entry.getKey(), entry.getValue());
        }
    }

    static class LocalRecordBatch
    implements LocalBatch {
        private final List<ApiMessage> records;

        LocalRecordBatch(List<ApiMessage> records) {
            this.records = records;
        }

        @Override
        public int size() {
            return this.records.size();
        }

        public boolean equals(Object o) {
            if (!(o instanceof LocalRecordBatch)) {
                return false;
            }
            LocalRecordBatch other = (LocalRecordBatch)o;
            return other.records.equals(this.records);
        }

        public int hashCode() {
            return Objects.hash(this.records);
        }

        public String toString() {
            return "LocalRecordBatch(records=" + this.records + ")";
        }
    }

    static class LeaderChangeBatch
    implements LocalBatch {
        private final MetaLogLeader newLeader;

        LeaderChangeBatch(MetaLogLeader newLeader) {
            this.newLeader = newLeader;
        }

        @Override
        public int size() {
            return 1;
        }

        public boolean equals(Object o) {
            if (!(o instanceof LeaderChangeBatch)) {
                return false;
            }
            LeaderChangeBatch other = (LeaderChangeBatch)o;
            return other.newLeader.equals(this.newLeader);
        }

        public int hashCode() {
            return Objects.hash(this.newLeader);
        }

        public String toString() {
            return "LeaderChangeBatch(newLeader=" + this.newLeader + ")";
        }
    }

    static interface LocalBatch {
        public int size();
    }
}

