/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.raft.metadata;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import org.apache.kafka.metalog.MetaLogLeader;
import org.apache.kafka.metalog.MetaLogListener;
import org.apache.kafka.metalog.MetaLogManager;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;

public class MetaLogRaftShim
implements MetaLogManager {
    private final RaftClient<ApiMessageAndVersion> client;
    private final int nodeId;

    public MetaLogRaftShim(RaftClient<ApiMessageAndVersion> client, int nodeId) {
        this.client = client;
        this.nodeId = nodeId;
    }

    public void initialize() {
    }

    public void register(MetaLogListener listener) {
        this.client.register(new ListenerShim(listener));
    }

    public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
        return this.write(epoch, batch, true);
    }

    public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
        return this.write(epoch, batch, false);
    }

    private long write(long epoch, List<ApiMessageAndVersion> batch, boolean isAtomic) {
        Long result = isAtomic ? this.client.scheduleAtomicAppend((int)epoch, batch) : this.client.scheduleAppend((int)epoch, batch);
        if (result == null) {
            throw new IllegalArgumentException(String.format("Unable to alloate a buffer for the schedule write operation: epoch %s, batch %s)", epoch, batch));
        }
        return result;
    }

    public void renounce(long epoch) {
        throw new UnsupportedOperationException();
    }

    public MetaLogLeader leader() {
        LeaderAndEpoch leaderAndEpoch = this.client.leaderAndEpoch();
        return new MetaLogLeader(leaderAndEpoch.leaderId.orElse(-1), (long)leaderAndEpoch.epoch);
    }

    public int nodeId() {
        return this.nodeId;
    }

    private class ListenerShim
    implements RaftClient.Listener<ApiMessageAndVersion> {
        private final MetaLogListener listener;

        private ListenerShim(MetaLogListener listener) {
            this.listener = listener;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
            try {
                while (reader.hasNext()) {
                    BatchReader.Batch batch = (BatchReader.Batch)reader.next();
                    List records = batch.records().stream().map(ApiMessageAndVersion::message).collect(Collectors.toList());
                    this.listener.handleCommits(batch.lastOffset(), records);
                }
            }
            finally {
                reader.close();
            }
        }

        @Override
        public void handleClaim(int epoch) {
            this.listener.handleNewLeader(new MetaLogLeader(MetaLogRaftShim.this.nodeId, (long)epoch));
        }

        @Override
        public void handleResign(int epoch) {
            this.listener.handleRenounce((long)epoch);
        }

        public String toString() {
            return "ListenerShim(listener=" + this.listener + ')';
        }
    }
}

