/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.BrokerHeartbeatManager;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.ReplicaPlacer;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;

public class ClusterControlManager {
    private final LogContext logContext;
    private final Logger log;
    private final Time time;
    private final long sessionTimeoutNs;
    private final ReplicaPlacer replicaPlacer;
    private final TimelineHashMap<Integer, BrokerRegistration> brokerRegistrations;
    private BrokerHeartbeatManager heartbeatManager;
    private Optional<ReadyBrokersFuture> readyBrokersFuture;

    ClusterControlManager(LogContext logContext, Time time, SnapshotRegistry snapshotRegistry, long sessionTimeoutNs, ReplicaPlacer replicaPlacer) {
        this.logContext = logContext;
        this.log = logContext.logger(ClusterControlManager.class);
        this.time = time;
        this.sessionTimeoutNs = sessionTimeoutNs;
        this.replicaPlacer = replicaPlacer;
        this.brokerRegistrations = new TimelineHashMap(snapshotRegistry, 0);
        this.heartbeatManager = null;
        this.readyBrokersFuture = Optional.empty();
    }

    public void activate() {
        this.heartbeatManager = new BrokerHeartbeatManager(this.logContext, this.time, this.sessionTimeoutNs);
        for (BrokerRegistration registration : this.brokerRegistrations.values()) {
            this.heartbeatManager.touch(registration.id(), registration.fenced(), -1L);
        }
    }

    public void deactivate() {
        this.heartbeatManager = null;
    }

    Map<Integer, BrokerRegistration> brokerRegistrations() {
        return this.brokerRegistrations;
    }

    Set<Integer> fencedBrokerIds() {
        return this.brokerRegistrations.values().stream().filter(BrokerRegistration::fenced).map(BrokerRegistration::id).collect(Collectors.toSet());
    }

    public ControllerResult<BrokerRegistrationReply> registerBroker(BrokerRegistrationRequestData request, long brokerEpoch, FeatureMapAndEpoch finalizedFeatures) {
        if (this.heartbeatManager == null) {
            throw new RuntimeException("ClusterControlManager is not active.");
        }
        int brokerId = request.brokerId();
        BrokerRegistration existing = this.brokerRegistrations.get(brokerId);
        if (existing != null) {
            if (this.heartbeatManager.hasValidSession(brokerId)) {
                if (!existing.incarnationId().equals((Object)request.incarnationId())) {
                    throw new DuplicateBrokerRegistrationException("Another broker is registered with that broker id.");
                }
            } else if (!existing.incarnationId().equals((Object)request.incarnationId())) {
                this.heartbeatManager.remove(brokerId);
                existing = null;
            }
        }
        RegisterBrokerRecord record = new RegisterBrokerRecord().setBrokerId(brokerId).setIncarnationId(request.incarnationId()).setBrokerEpoch(brokerEpoch).setRack(request.rack());
        for (BrokerRegistrationRequestData.Listener listener : request.listeners()) {
            record.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().setHost(listener.host()).setName(listener.name()).setPort(listener.port()).setSecurityProtocol(listener.securityProtocol()));
        }
        for (BrokerRegistrationRequestData.Feature feature : request.features()) {
            Optional<VersionRange> finalized = finalizedFeatures.map().get(feature.name());
            if (finalized.isPresent() && !finalized.get().contains(new VersionRange(feature.minSupportedVersion(), feature.maxSupportedVersion()))) {
                throw new UnsupportedVersionException("Unable to register because the broker has an unsupported version of " + feature.name());
            }
            record.features().add(new RegisterBrokerRecord.BrokerFeature().setName(feature.name()).setMinSupportedVersion(feature.minSupportedVersion()).setMaxSupportedVersion(feature.maxSupportedVersion()));
        }
        if (existing == null) {
            this.heartbeatManager.touch(brokerId, true, -1L);
        } else {
            this.heartbeatManager.touch(brokerId, existing.fenced(), -1L);
        }
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        records.add(new ApiMessageAndVersion((ApiMessage)record, MetadataRecordType.REGISTER_BROKER_RECORD.highestSupportedVersion()));
        return ControllerResult.of(records, new BrokerRegistrationReply(brokerEpoch));
    }

    public void replay(RegisterBrokerRecord record) {
        Object endpoint;
        int brokerId = record.brokerId();
        ArrayList<Endpoint> listeners = new ArrayList<Endpoint>();
        Iterator iterator = record.endPoints().iterator();
        while (iterator.hasNext()) {
            endpoint = (RegisterBrokerRecord.BrokerEndpoint)iterator.next();
            listeners.add(new Endpoint(((RegisterBrokerRecord.BrokerEndpoint)endpoint).name(), SecurityProtocol.forId((short)((RegisterBrokerRecord.BrokerEndpoint)endpoint).securityProtocol()), ((RegisterBrokerRecord.BrokerEndpoint)endpoint).host(), ((RegisterBrokerRecord.BrokerEndpoint)endpoint).port()));
        }
        HashMap<String, VersionRange> features = new HashMap<String, VersionRange>();
        endpoint = record.features().iterator();
        while (endpoint.hasNext()) {
            RegisterBrokerRecord.BrokerFeature feature = (RegisterBrokerRecord.BrokerFeature)endpoint.next();
            features.put(feature.name(), new VersionRange(feature.minSupportedVersion(), feature.maxSupportedVersion()));
        }
        this.brokerRegistrations.put(brokerId, new BrokerRegistration(brokerId, record.brokerEpoch(), record.incarnationId(), listeners, features, Optional.ofNullable(record.rack()), record.fenced()));
        BrokerRegistration prevRegistration = this.brokerRegistrations.get(brokerId);
        if (prevRegistration == null) {
            this.log.info("Registered new broker: {}", (Object)record);
        } else if (prevRegistration.incarnationId().equals((Object)record.incarnationId())) {
            this.log.info("Re-registered broker incarnation: {}", (Object)record);
        } else {
            this.log.info("Re-registered broker id {}: {}", (Object)brokerId, (Object)record);
        }
    }

    public void replay(UnregisterBrokerRecord record) {
        int brokerId = record.brokerId();
        BrokerRegistration registration = this.brokerRegistrations.get(brokerId);
        if (registration == null) {
            throw new RuntimeException(String.format("Unable to replay %s: no broker registration found for that id", record.toString()));
        }
        if (registration.epoch() != record.brokerEpoch()) {
            throw new RuntimeException(String.format("Unable to replay %s: no broker registration with that epoch found", record.toString()));
        }
        this.brokerRegistrations.remove(brokerId);
        this.log.info("Unregistered broker: {}", (Object)record);
    }

    public void replay(FenceBrokerRecord record) {
        int brokerId = record.id();
        BrokerRegistration registration = this.brokerRegistrations.get(brokerId);
        if (registration == null) {
            throw new RuntimeException(String.format("Unable to replay %s: no broker registration found for that id", record.toString()));
        }
        if (registration.epoch() != record.epoch()) {
            throw new RuntimeException(String.format("Unable to replay %s: no broker registration with that epoch found", record.toString()));
        }
        this.brokerRegistrations.put(brokerId, registration.cloneWithFencing(true));
        this.log.info("Fenced broker: {}", (Object)record);
    }

    public void replay(UnfenceBrokerRecord record) {
        int brokerId = record.id();
        BrokerRegistration registration = this.brokerRegistrations.get(brokerId);
        if (registration == null) {
            throw new RuntimeException(String.format("Unable to replay %s: no broker registration found for that id", record.toString()));
        }
        if (registration.epoch() != record.epoch()) {
            throw new RuntimeException(String.format("Unable to replay %s: no broker registration with that epoch found", record.toString()));
        }
        this.brokerRegistrations.put(brokerId, registration.cloneWithFencing(false));
        this.log.info("Unfenced broker: {}", (Object)record);
        if (this.readyBrokersFuture.isPresent() && this.readyBrokersFuture.get().check()) {
            this.readyBrokersFuture.get().future.complete(null);
            this.readyBrokersFuture = Optional.empty();
        }
    }

    public List<List<Integer>> placeReplicas(int startPartition, int numPartitions, short numReplicas) {
        if (this.heartbeatManager == null) {
            throw new RuntimeException("ClusterControlManager is not active.");
        }
        return this.heartbeatManager.placeReplicas(startPartition, numPartitions, numReplicas, id -> this.brokerRegistrations.get(id).rack(), this.replicaPlacer);
    }

    public boolean unfenced(int brokerId) {
        BrokerRegistration registration = this.brokerRegistrations.get(brokerId);
        if (registration == null) {
            return false;
        }
        return !registration.fenced();
    }

    BrokerHeartbeatManager heartbeatManager() {
        if (this.heartbeatManager == null) {
            throw new RuntimeException("ClusterControlManager is not active.");
        }
        return this.heartbeatManager;
    }

    public void checkBrokerEpoch(int brokerId, long brokerEpoch) {
        BrokerRegistration registration = this.brokerRegistrations.get(brokerId);
        if (registration == null) {
            throw new StaleBrokerEpochException("No broker registration found for broker id " + brokerId);
        }
        if (registration.epoch() != brokerEpoch) {
            throw new StaleBrokerEpochException("Expected broker epoch " + registration.epoch() + ", but got broker epoch " + brokerEpoch);
        }
    }

    public void addReadyBrokersFuture(CompletableFuture<Void> future, int minBrokers) {
        this.readyBrokersFuture = Optional.of(new ReadyBrokersFuture(future, minBrokers));
        if (this.readyBrokersFuture.get().check()) {
            this.readyBrokersFuture.get().future.complete(null);
            this.readyBrokersFuture = Optional.empty();
        }
    }

    ClusterControlIterator iterator(long epoch) {
        return new ClusterControlIterator(epoch);
    }

    class ClusterControlIterator
    implements Iterator<List<ApiMessageAndVersion>> {
        private final Iterator<Map.Entry<Integer, BrokerRegistration>> iterator;

        ClusterControlIterator(long epoch) {
            this.iterator = ClusterControlManager.this.brokerRegistrations.entrySet(epoch).iterator();
        }

        @Override
        public boolean hasNext() {
            return this.iterator.hasNext();
        }

        @Override
        public List<ApiMessageAndVersion> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            Map.Entry<Integer, BrokerRegistration> entry = this.iterator.next();
            int brokerId = entry.getKey();
            BrokerRegistration registration = entry.getValue();
            RegisterBrokerRecord.BrokerEndpointCollection endpoints = new RegisterBrokerRecord.BrokerEndpointCollection();
            for (Map.Entry<String, Endpoint> entry2 : registration.listeners().entrySet()) {
                endpoints.add(new RegisterBrokerRecord.BrokerEndpoint().setName(entry2.getKey()).setHost(entry2.getValue().host()).setPort(entry2.getValue().port()).setSecurityProtocol(entry2.getValue().securityProtocol().id));
            }
            RegisterBrokerRecord.BrokerFeatureCollection features = new RegisterBrokerRecord.BrokerFeatureCollection();
            for (Map.Entry<String, VersionRange> entry3 : registration.supportedFeatures().entrySet()) {
                features.add(new RegisterBrokerRecord.BrokerFeature().setName(entry3.getKey()).setMaxSupportedVersion(entry3.getValue().max()).setMinSupportedVersion(entry3.getValue().min()));
            }
            ArrayList<ApiMessageAndVersion> arrayList = new ArrayList<ApiMessageAndVersion>();
            arrayList.add(new ApiMessageAndVersion((ApiMessage)new RegisterBrokerRecord().setBrokerId(brokerId).setIncarnationId(registration.incarnationId()).setBrokerEpoch(registration.epoch()).setEndPoints(endpoints).setFeatures(features).setRack(registration.rack().orElse(null)).setFenced(registration.fenced()), MetadataRecordType.REGISTER_BROKER_RECORD.highestSupportedVersion()));
            return arrayList;
        }
    }

    class ReadyBrokersFuture {
        private final CompletableFuture<Void> future;
        private final int minBrokers;

        ReadyBrokersFuture(CompletableFuture<Void> future, int minBrokers) {
            this.future = future;
            this.minBrokers = minBrokers;
        }

        boolean check() {
            int numUnfenced = 0;
            for (BrokerRegistration registration : ClusterControlManager.this.brokerRegistrations.values()) {
                if (!registration.fenced()) {
                    ++numUnfenced;
                }
                if (numUnfenced < this.minBrokers) continue;
                return true;
            }
            return false;
        }
    }
}

