package org.apache.pulsar.broker.loadbalance.extensions.channel;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.PulsarClusterMetadataSetup;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.class */
public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
    private static final Logger log = LoggerFactory.getLogger(ServiceUnitStateChannelImpl.class);
    public static final String TOPIC = TopicName.get(TopicDomain.persistent.value(), NamespaceName.SYSTEM_NAMESPACE, "loadbalancer-service-unit-state").toString();
    public static final CompressionType MSG_COMPRESSION_TYPE = CompressionType.ZSTD;
    private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000;
    private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100;
    public static final long VERSION_ID_INIT = 1;
    public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 180;
    private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0;
    private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS = 10;
    private static final int MAX_OUTSTANDING_PUB_MESSAGES = 500;
    private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 600000;
    private final PulsarService pulsar;
    private final ServiceConfiguration config;
    private final String brokerId;
    private ExtensibleLoadManagerImpl loadManager;
    private BrokerRegistry brokerRegistry;
    private LeaderElectionService leaderElectionService;
    private TableView<ServiceUnitStateData> tableview;
    private Producer<ServiceUnitStateData> producer;
    private ScheduledFuture<?> monitorTask;
    private long inFlightStateWaitingTimeInMillis;
    private long ownershipMonitorDelayTimeInSecs;
    private long stateTombstoneDelayTimeInMillis;
    private long maxCleanupDelayTimeInSecs;
    private long minCleanupDelayTimeInSecs;
    private volatile ChannelState channelState;
    final Map<ServiceUnitState, Counters> ownerLookUpCounters;
    final Map<EventType, Counters> eventCounters;
    final Map<ServiceUnitState, Counters> handlerCounters;
    private SessionEvent lastMetadataSessionEvent = SessionEvent.SessionReestablished;
    private long lastMetadataSessionEventTimestamp = MIN_CLEAN_UP_DELAY_TIME_IN_SECS;
    private long totalInactiveBrokerCleanupCnt = MIN_CLEAN_UP_DELAY_TIME_IN_SECS;
    private long totalServiceUnitTombstoneCleanupCnt = MIN_CLEAN_UP_DELAY_TIME_IN_SECS;
    private long totalOrphanServiceUnitCleanupCnt = MIN_CLEAN_UP_DELAY_TIME_IN_SECS;
    private AtomicLong totalCleanupErrorCnt = new AtomicLong();
    private long totalInactiveBrokerCleanupScheduledCnt = MIN_CLEAN_UP_DELAY_TIME_IN_SECS;
    private long totalInactiveBrokerCleanupIgnoredCnt = MIN_CLEAN_UP_DELAY_TIME_IN_SECS;
    private long totalInactiveBrokerCleanupCancelledCnt = MIN_CLEAN_UP_DELAY_TIME_IN_SECS;
    private volatile long lastOwnEventHandledAt = MIN_CLEAN_UP_DELAY_TIME_IN_SECS;
    private long lastOwnedServiceUnitCountAt = MIN_CLEAN_UP_DELAY_TIME_IN_SECS;
    private int totalOwnedServiceUnitCnt = 0;
    private final Schema<ServiceUnitStateData> schema = Schema.JSON(ServiceUnitStateData.class);
    private final Map<String, CompletableFuture<String>> getOwnerRequests = new ConcurrentHashMap();
    private final Map<String, CompletableFuture<Void>> cleanupJobs = new ConcurrentHashMap();
    private final StateChangeListeners stateChangeListeners = new StateChangeListeners();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl$ChannelState.class */
    public enum ChannelState {
        Closed(0),
        Constructed(1),
        LeaderElectionServiceStarted(2),
        Started(3);

        int id;

        ChannelState(int i) {
            this.id = i;
        }
    }

    /* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl$Counters.class */
    public static class Counters {
        private final AtomicLong total;
        private final AtomicLong failure;

        public Counters() {
            this.total = new AtomicLong();
            this.failure = new AtomicLong();
        }

        public AtomicLong getTotal() {
            return this.total;
        }

        public AtomicLong getFailure() {
            return this.failure;
        }

        public Counters(AtomicLong atomicLong, AtomicLong atomicLong2) {
            this.total = atomicLong;
            this.failure = atomicLong2;
        }
    }

    /* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl$EventType.class */
    public enum EventType {
        Assign,
        Split,
        Unload,
        Override
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl$MetadataState.class */
    public enum MetadataState {
        Stable,
        Jittery,
        Unstable
    }

    @VisibleForTesting
    public ServiceUnitStateChannelImpl(PulsarService pulsarService) {
        this.pulsar = pulsarService;
        this.config = pulsarService.getConfig();
        this.brokerId = pulsarService.getBrokerId();
        this.stateTombstoneDelayTimeInMillis = this.config.getLoadBalancerServiceUnitStateTombstoneDelayTimeInSeconds() * 1000;
        this.inFlightStateWaitingTimeInMillis = this.config.getLoadBalancerInFlightServiceUnitStateWaitingTimeInMillis();
        this.ownershipMonitorDelayTimeInSecs = this.config.getLoadBalancerServiceUnitStateMonitorIntervalInSeconds();
        if (this.stateTombstoneDelayTimeInMillis < this.inFlightStateWaitingTimeInMillis) {
            long j = this.stateTombstoneDelayTimeInMillis / 1000;
            long j2 = this.inFlightStateWaitingTimeInMillis;
            IllegalArgumentException illegalArgumentException = new IllegalArgumentException("Invalid Config: loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds" + j + " secs< loadBalancerInFlightServiceUnitStateWaitingTimeInMillis" + illegalArgumentException + " millis");
            throw illegalArgumentException;
        }
        this.maxCleanupDelayTimeInSecs = 180L;
        this.minCleanupDelayTimeInSecs = MIN_CLEAN_UP_DELAY_TIME_IN_SECS;
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        for (ServiceUnitState serviceUnitState : ServiceUnitState.values()) {
            hashMap.put(serviceUnitState, new Counters());
            hashMap2.put(serviceUnitState, new Counters());
        }
        for (EventType eventType : EventType.values()) {
            hashMap3.put(eventType, new Counters());
        }
        this.ownerLookUpCounters = Map.copyOf(hashMap);
        this.handlerCounters = Map.copyOf(hashMap2);
        this.eventCounters = Map.copyOf(hashMap3);
        this.channelState = ChannelState.Constructed;
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public void scheduleOwnershipMonitor() {
        if (this.monitorTask == null) {
            this.monitorTask = this.pulsar.getLoadManagerExecutor().scheduleWithFixedDelay(() -> {
                try {
                    monitorOwnerships(this.brokerRegistry.getAvailableBrokersAsync().get(this.inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS));
                } catch (Exception e) {
                    log.info("Failed to monitor the ownerships. will retry..", e);
                }
            }, MIN_CLEAN_UP_DELAY_TIME_IN_SECS, this.ownershipMonitorDelayTimeInSecs, TimeUnit.SECONDS);
            log.info("This leader broker:{} started the ownership monitor.", this.brokerId);
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public void cancelOwnershipMonitor() {
        if (this.monitorTask != null) {
            this.monitorTask.cancel(false);
            this.monitorTask = null;
            log.info("This previous leader broker:{} stopped the ownership monitor.", this.brokerId);
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public void cleanOwnerships() {
        doCleanup(this.brokerId);
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public synchronized void start() throws PulsarServerException {
        if (!validateChannelState(ChannelState.LeaderElectionServiceStarted, false)) {
            throw new IllegalStateException("Invalid channel state:" + this.channelState.name());
        }
        boolean debug = debug();
        try {
            this.brokerRegistry = getBrokerRegistry();
            this.brokerRegistry.addListener(this::handleBrokerRegistrationEvent);
            this.leaderElectionService = getLeaderElectionService();
            Optional<LeaderBroker> optional = this.leaderElectionService.readCurrentLeader().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS);
            if (optional.isPresent()) {
                log.info("Successfully found the channel leader:{}.", optional.get());
            } else {
                log.warn("Failed to find the channel leader.");
            }
            this.channelState = ChannelState.LeaderElectionServiceStarted;
            this.loadManager = getLoadManager();
            if (this.producer != null) {
                this.producer.close();
                if (debug) {
                    log.info("Closed the channel producer.");
                }
            }
            PulsarClusterMetadataSetup.createTenantIfAbsent(this.pulsar.getPulsarResources(), NamespaceName.SYSTEM_NAMESPACE.getTenant(), this.config.getClusterName());
            PulsarClusterMetadataSetup.createNamespaceIfAbsent(this.pulsar.getPulsarResources(), NamespaceName.SYSTEM_NAMESPACE, this.config.getClusterName(), this.config.getDefaultNumberOfNamespaceBundles());
            ExtensibleLoadManagerImpl.createSystemTopic(this.pulsar, TOPIC);
            this.producer = this.pulsar.getClient().newProducer(this.schema).enableBatching(true).compressionType(MSG_COMPRESSION_TYPE).maxPendingMessages(MAX_OUTSTANDING_PUB_MESSAGES).blockIfQueueFull(true).topic(TOPIC).create();
            if (debug) {
                log.info("Successfully started the channel producer.");
            }
            if (this.tableview != null) {
                this.tableview.close();
                if (debug) {
                    log.info("Closed the channel tableview.");
                }
            }
            this.tableview = this.pulsar.getClient().newTableViewBuilder(this.schema).topic(TOPIC).loadConf(Map.of("topicCompactionStrategyClassName", ServiceUnitStateCompactionStrategy.class.getName())).create();
            this.tableview.listen((str, serviceUnitStateData) -> {
                handle(str, serviceUnitStateData);
            });
            ServiceUnitStateCompactionStrategy serviceUnitStateCompactionStrategy = (ServiceUnitStateCompactionStrategy) TopicCompactionStrategy.getInstance("table-view");
            if (serviceUnitStateCompactionStrategy == null) {
                log.error("table-viewtag TopicCompactionStrategy is null.");
                throw new IllegalStateException("table-viewtag TopicCompactionStrategy is null.");
            }
            serviceUnitStateCompactionStrategy.setSkippedMsgHandler((str2, serviceUnitStateData2) -> {
                handleSkippedEvent(str2);
            });
            if (debug) {
                log.info("Successfully started the channel tableview.");
            }
            this.pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
            if (debug) {
                log.info("Successfully registered the handleMetadataSessionEvent");
            }
            this.channelState = ChannelState.Started;
            log.info("Successfully started the channel.");
        } catch (Exception e) {
            log.error("Failed to start the channel.", e);
            throw new PulsarServerException("Failed to start the channel.", e);
        }
    }

    @VisibleForTesting
    protected BrokerRegistry getBrokerRegistry() {
        return ((ExtensibleLoadManagerWrapper) this.pulsar.getLoadManager().get()).get().getBrokerRegistry();
    }

    @VisibleForTesting
    protected LoadManagerContext getContext() {
        return ((ExtensibleLoadManagerWrapper) this.pulsar.getLoadManager().get()).get().getContext();
    }

    @VisibleForTesting
    protected ExtensibleLoadManagerImpl getLoadManager() {
        return ExtensibleLoadManagerImpl.get(this.pulsar.getLoadManager().get());
    }

    @VisibleForTesting
    protected LeaderElectionService getLeaderElectionService() {
        return ((ExtensibleLoadManagerWrapper) this.pulsar.getLoadManager().get()).get().getLeaderElectionService();
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel, java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws PulsarServerException {
        this.channelState = ChannelState.Closed;
        boolean debug = debug();
        try {
            this.leaderElectionService = null;
            if (this.tableview != null) {
                this.tableview.close();
                this.tableview = null;
                if (debug) {
                    log.info("Successfully closed the channel tableview.");
                }
            }
            if (this.producer != null) {
                this.producer.close();
                this.producer = null;
                log.info("Successfully closed the channel producer.");
            }
            if (this.brokerRegistry != null) {
                this.brokerRegistry = null;
            }
            if (this.monitorTask != null) {
                this.monitorTask.cancel(true);
                this.monitorTask = null;
                log.info("Successfully cancelled the cleanup tasks");
            }
            if (this.stateChangeListeners != null) {
                this.stateChangeListeners.close();
            }
            log.info("Successfully closed the channel.");
        } catch (Exception e) {
            log.error("Failed to close the channel.", e);
            throw new PulsarServerException("Failed to close the channel.", e);
        }
    }

    private boolean validateChannelState(ChannelState channelState, boolean z) {
        return Integer.compare(this.channelState.id, channelState.id) * (z ? -1 : 1) <= 0;
    }

    private boolean debug() {
        return ExtensibleLoadManagerImpl.debug(this.config, log);
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public CompletableFuture<Optional<String>> getChannelOwnerAsync() {
        return !validateChannelState(ChannelState.LeaderElectionServiceStarted, true) ? CompletableFuture.failedFuture(new IllegalStateException("Invalid channel state:" + this.channelState.name())) : this.leaderElectionService.readCurrentLeader().thenApply(optional -> {
            return optional.map((v0) -> {
                return v0.getBrokerId();
            });
        });
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public CompletableFuture<Boolean> isChannelOwnerAsync() {
        return getChannelOwnerAsync().thenApply(optional -> {
            if (optional.isPresent()) {
                return Boolean.valueOf(isTargetBroker((String) optional.get()));
            }
            log.error("There is no channel owner now.");
            throw new IllegalStateException("There is no channel owner now.");
        });
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public boolean isChannelOwner() {
        try {
            return isChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS).booleanValue();
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Failed to get the channel owner.", e);
            throw new RuntimeException("Failed to get the channel owner.", e);
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public boolean isOwner(String str, String str2) {
        if (!validateChannelState(ChannelState.Started, true)) {
            throw new IllegalStateException("Invalid channel state:" + this.channelState.name());
        }
        CompletableFuture<Optional<String>> ownerAsync = getOwnerAsync(str);
        if (!ownerAsync.isDone() || ownerAsync.isCompletedExceptionally() || ownerAsync.isCancelled()) {
            return false;
        }
        Optional<String> join = ownerAsync.join();
        return join.isPresent() && StringUtils.equals(str2, join.get());
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public boolean isOwner(String str) {
        return isOwner(str, this.brokerId);
    }

    private CompletableFuture<Optional<String>> getActiveOwnerAsync(String str, ServiceUnitState serviceUnitState, Optional<String> optional) {
        return dedupeGetOwnerRequest(str).thenCompose(str2 -> {
            return str2 == null ? CompletableFuture.completedFuture(null) : this.brokerRegistry.lookupAsync(str2).thenApply(optional2 -> {
                if (optional2.isPresent()) {
                    return str2;
                }
                throw new IllegalStateException("The new owner " + str2 + " is inactive.");
            });
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (str3, th) -> {
            if (th != null) {
                log.error("{} failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}", new Object[]{this.brokerId, str, serviceUnitState, optional, th});
                this.ownerLookUpCounters.get(serviceUnitState).getFailure().incrementAndGet();
            }
        }).thenApply((v0) -> {
            return Optional.ofNullable(v0);
        });
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public CompletableFuture<Optional<String>> getOwnerAsync(String str) {
        if (!validateChannelState(ChannelState.Started, true)) {
            return CompletableFuture.failedFuture(new IllegalStateException("Invalid channel state:" + this.channelState.name()));
        }
        ServiceUnitStateData serviceUnitStateData = (ServiceUnitStateData) this.tableview.get(str);
        ServiceUnitState state = ServiceUnitStateData.state(serviceUnitStateData);
        this.ownerLookUpCounters.get(state).getTotal().incrementAndGet();
        switch (state) {
            case Owned:
                return getActiveOwnerAsync(str, state, Optional.of(serviceUnitStateData.dstBroker()));
            case Splitting:
                return getActiveOwnerAsync(str, state, Optional.of(serviceUnitStateData.sourceBroker()));
            case Assigning:
            case Releasing:
                return isTargetBroker(serviceUnitStateData.dstBroker()) ? getActiveOwnerAsync(str, state, Optional.of(serviceUnitStateData.dstBroker())) : CompletableFuture.completedFuture(Optional.ofNullable(serviceUnitStateData.dstBroker()));
            case Init:
            case Free:
                return CompletableFuture.completedFuture(Optional.empty());
            case Deleted:
                this.ownerLookUpCounters.get(state).getFailure().incrementAndGet();
                return CompletableFuture.failedFuture(new IllegalArgumentException(str + " is deleted."));
            default:
                this.ownerLookUpCounters.get(state).getFailure().incrementAndGet();
                String format = String.format("Failed to process service unit state data: %s when get owner.", serviceUnitStateData);
                log.error(format);
                return CompletableFuture.failedFuture(new IllegalStateException(format));
        }
    }

    private Optional<String> getOwner(String str) {
        ServiceUnitStateData serviceUnitStateData = (ServiceUnitStateData) this.tableview.get(str);
        switch (ServiceUnitStateData.state(serviceUnitStateData)) {
            case Owned:
                return Optional.of(serviceUnitStateData.dstBroker());
            case Splitting:
                return Optional.of(serviceUnitStateData.sourceBroker());
            case Assigning:
            case Releasing:
            default:
                return null;
            case Init:
            case Free:
                return Optional.empty();
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public Optional<String> getAssigned(String str) {
        ServiceUnitStateData serviceUnitStateData;
        if (validateChannelState(ChannelState.Started, true) && (serviceUnitStateData = (ServiceUnitStateData) this.tableview.get(str)) != null) {
            ServiceUnitState state = ServiceUnitStateData.state(serviceUnitStateData);
            switch (state) {
                case Owned:
                case Assigning:
                    return Optional.of(serviceUnitStateData.dstBroker());
                case Splitting:
                    return Optional.of(serviceUnitStateData.sourceBroker());
                case Releasing:
                    return Optional.ofNullable(serviceUnitStateData.dstBroker());
                case Init:
                case Free:
                    return Optional.empty();
                case Deleted:
                    log.warn("Trying to get the assigned broker from the deleted serviceUnit:{}", str);
                    return Optional.empty();
                default:
                    log.warn("Trying to get the assigned broker from unknown state:{} serviceUnit:{}", state, str);
                    return Optional.empty();
            }
        }
        return Optional.empty();
    }

    private long getNextVersionId(String str) {
        return getNextVersionId((ServiceUnitStateData) this.tableview.get(str));
    }

    private long getNextVersionId(ServiceUnitStateData serviceUnitStateData) {
        if (serviceUnitStateData == null) {
            return 1L;
        }
        return serviceUnitStateData.versionId() + 1;
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public CompletableFuture<String> publishAssignEventAsync(String str, String str2) {
        if (!validateChannelState(ChannelState.Started, true)) {
            return CompletableFuture.failedFuture(new IllegalStateException("Invalid channel state:" + this.channelState.name()));
        }
        EventType eventType = EventType.Assign;
        this.eventCounters.get(eventType).getTotal().incrementAndGet();
        CompletableFuture<String> dedupeGetOwnerRequest = dedupeGetOwnerRequest(str);
        pubAsync(str, new ServiceUnitStateData(ServiceUnitState.Assigning, str2, getNextVersionId(str))).whenComplete((messageId, th) -> {
            if (th != null) {
                this.getOwnerRequests.remove(str, dedupeGetOwnerRequest);
                if (!dedupeGetOwnerRequest.isCompletedExceptionally()) {
                    dedupeGetOwnerRequest.completeExceptionally(th);
                }
                this.eventCounters.get(eventType).getFailure().incrementAndGet();
            }
        });
        return dedupeGetOwnerRequest;
    }

    private CompletableFuture<Void> publishOverrideEventAsync(String str, ServiceUnitStateData serviceUnitStateData) {
        if (!validateChannelState(ChannelState.Started, true)) {
            throw new IllegalStateException("Invalid channel state:" + this.channelState.name());
        }
        this.eventCounters.get(EventType.Override).getTotal().incrementAndGet();
        return pubAsync(str, serviceUnitStateData).thenApply(messageId -> {
            return null;
        });
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
        if (!validateChannelState(ChannelState.Started, true)) {
            return CompletableFuture.failedFuture(new IllegalStateException("Invalid channel state:" + this.channelState.name()));
        }
        EventType eventType = EventType.Unload;
        this.eventCounters.get(eventType).getTotal().incrementAndGet();
        String serviceUnit = unload.serviceUnit();
        return pubAsync(serviceUnit, isTransferCommand(unload) ? new ServiceUnitStateData(ServiceUnitState.Releasing, unload.destBroker().get(), unload.sourceBroker(), unload.force(), getNextVersionId(serviceUnit)) : new ServiceUnitStateData(ServiceUnitState.Releasing, (String) null, unload.sourceBroker(), unload.force(), getNextVersionId(serviceUnit))).whenComplete((messageId, th) -> {
            if (th != null) {
                this.eventCounters.get(eventType).getFailure().incrementAndGet();
            }
        }).thenApply(messageId2 -> {
            return null;
        });
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public CompletableFuture<Void> publishSplitEventAsync(Split split) {
        if (!validateChannelState(ChannelState.Started, true)) {
            return CompletableFuture.failedFuture(new IllegalStateException("Invalid channel state:" + this.channelState.name()));
        }
        EventType eventType = EventType.Split;
        this.eventCounters.get(eventType).getTotal().incrementAndGet();
        String serviceUnit = split.serviceUnit();
        return pubAsync(serviceUnit, new ServiceUnitStateData(ServiceUnitState.Splitting, (String) null, split.sourceBroker(), split.splitServiceUnitToDestBroker(), getNextVersionId(serviceUnit))).whenComplete((messageId, th) -> {
            if (th != null) {
                this.eventCounters.get(eventType).getFailure().incrementAndGet();
            }
        }).thenApply(messageId2 -> {
            return null;
        });
    }

    private void handle(String str, ServiceUnitStateData serviceUnitStateData) {
        long incrementAndGet = getHandlerTotalCounter(serviceUnitStateData).incrementAndGet();
        if (debug()) {
            log.info("{} received a handle request for serviceUnit:{}, data:{}. totalHandledRequests:{}", new Object[]{this.brokerId, str, serviceUnitStateData, Long.valueOf(incrementAndGet)});
        }
        try {
            switch (ServiceUnitStateData.state(serviceUnitStateData)) {
                case Owned:
                    handleOwnEvent(str, serviceUnitStateData);
                    break;
                case Splitting:
                    handleSplitEvent(str, serviceUnitStateData);
                    break;
                case Assigning:
                    handleAssignEvent(str, serviceUnitStateData);
                    break;
                case Releasing:
                    handleReleaseEvent(str, serviceUnitStateData);
                    break;
                case Init:
                    handleInitEvent(str);
                    break;
                case Free:
                    handleFreeEvent(str, serviceUnitStateData);
                    break;
                case Deleted:
                    handleDeleteEvent(str, serviceUnitStateData);
                    break;
                default:
                    throw new IllegalStateException("Failed to handle channel data:" + serviceUnitStateData);
            }
        } catch (Throwable th) {
            log.error("Failed to handle the event. serviceUnit:{}, data:{}, handlerFailureCount:{}", new Object[]{str, serviceUnitStateData, Long.valueOf(getHandlerFailureCounter(serviceUnitStateData).incrementAndGet()), th});
            throw th;
        }
    }

    private static boolean isTransferCommand(ServiceUnitStateData serviceUnitStateData) {
        return serviceUnitStateData != null && StringUtils.isNotEmpty(serviceUnitStateData.dstBroker()) && StringUtils.isNotEmpty(serviceUnitStateData.sourceBroker());
    }

    private static boolean isTransferCommand(Unload unload) {
        return unload.destBroker().isPresent();
    }

    private static String getLogEventTag(ServiceUnitStateData serviceUnitStateData) {
        return serviceUnitStateData == null ? ServiceUnitState.Init.toString() : isTransferCommand(serviceUnitStateData) ? "Transfer:" + serviceUnitStateData.state() : serviceUnitStateData.state().toString();
    }

    private AtomicLong getHandlerTotalCounter(ServiceUnitStateData serviceUnitStateData) {
        return getHandlerCounter(serviceUnitStateData, true);
    }

    private AtomicLong getHandlerFailureCounter(ServiceUnitStateData serviceUnitStateData) {
        return getHandlerCounter(serviceUnitStateData, false);
    }

    private AtomicLong getHandlerCounter(ServiceUnitStateData serviceUnitStateData, boolean z) {
        ServiceUnitState state = ServiceUnitStateData.state(serviceUnitStateData);
        AtomicLong total = z ? this.handlerCounters.get(state).getTotal() : this.handlerCounters.get(state).getFailure();
        if (total == null) {
            throw new IllegalStateException("Unknown state:" + state);
        }
        return total;
    }

    private void log(Throwable th, String str, ServiceUnitStateData serviceUnitStateData, ServiceUnitStateData serviceUnitStateData2) {
        if (th != null) {
            long j = getHandlerTotalCounter(serviceUnitStateData).get();
            long incrementAndGet = getHandlerFailureCounter(serviceUnitStateData).incrementAndGet();
            Logger logger = log;
            Object[] objArr = new Object[8];
            objArr[0] = this.brokerId;
            objArr[1] = getLogEventTag(serviceUnitStateData);
            objArr[2] = str;
            objArr[3] = serviceUnitStateData == null ? "" : serviceUnitStateData;
            objArr[4] = serviceUnitStateData2 == null ? "" : serviceUnitStateData2;
            objArr[5] = Long.valueOf(j);
            objArr[6] = Long.valueOf(incrementAndGet);
            objArr[7] = th;
            logger.error("{} failed to handle {} event for serviceUnit:{}, cur:{}, next:{}, totalHandledRequests:{}, totalFailedRequests:{}", objArr);
            return;
        }
        if (debug() || isTransferCommand(serviceUnitStateData)) {
            long j2 = getHandlerTotalCounter(serviceUnitStateData).get();
            long j3 = getHandlerFailureCounter(serviceUnitStateData).get();
            Logger logger2 = log;
            Object[] objArr2 = new Object[7];
            objArr2[0] = this.brokerId;
            objArr2[1] = getLogEventTag(serviceUnitStateData);
            objArr2[2] = str;
            objArr2[3] = serviceUnitStateData == null ? "" : serviceUnitStateData;
            objArr2[4] = serviceUnitStateData2 == null ? "" : serviceUnitStateData2;
            objArr2[5] = Long.valueOf(j2);
            objArr2[6] = Long.valueOf(j3);
            logger2.info("{} handled {} event for serviceUnit:{}, cur:{}, next:{}, totalHandledRequests:{}, totalFailedRequests:{}", objArr2);
        }
    }

    private void handleSkippedEvent(String str) {
        ServiceUnitStateData serviceUnitStateData;
        CompletableFuture<String> completableFuture = this.getOwnerRequests.get(str);
        if (completableFuture == null || (serviceUnitStateData = (ServiceUnitStateData) this.tableview.get(str)) == null || serviceUnitStateData.state() != ServiceUnitState.Owned) {
            return;
        }
        completableFuture.complete(serviceUnitStateData.dstBroker());
        this.getOwnerRequests.remove(str);
        this.stateChangeListeners.notify(str, serviceUnitStateData, null);
    }

    private void handleOwnEvent(String str, ServiceUnitStateData serviceUnitStateData) {
        CompletableFuture<String> remove = this.getOwnerRequests.remove(str);
        if (remove != null) {
            if (debug()) {
                log.info("Returned owner request for serviceUnit:{}", str);
            }
            remove.complete(serviceUnitStateData.dstBroker());
        }
        if (isTargetBroker(serviceUnitStateData.dstBroker())) {
            this.pulsar.getNamespaceService().onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(this.pulsar, str));
            this.lastOwnEventHandledAt = System.currentTimeMillis();
            this.stateChangeListeners.notify(str, serviceUnitStateData, null);
            log(null, str, serviceUnitStateData, null);
            return;
        }
        if ((serviceUnitStateData.force() || isTransferCommand(serviceUnitStateData)) && isTargetBroker(serviceUnitStateData.sourceBroker())) {
            this.stateChangeListeners.notifyOnCompletion(closeServiceUnit(str, true), str, serviceUnitStateData).whenComplete((num, th) -> {
                log(th, str, serviceUnitStateData, null);
            });
        } else {
            this.stateChangeListeners.notify(str, serviceUnitStateData, null);
        }
    }

    private void handleAssignEvent(String str, ServiceUnitStateData serviceUnitStateData) {
        if (isTargetBroker(serviceUnitStateData.dstBroker())) {
            ServiceUnitStateData serviceUnitStateData2 = new ServiceUnitStateData(ServiceUnitState.Owned, serviceUnitStateData.dstBroker(), serviceUnitStateData.sourceBroker(), getNextVersionId(serviceUnitStateData));
            this.stateChangeListeners.notifyOnCompletion(pubAsync(str, serviceUnitStateData2), str, serviceUnitStateData).whenComplete((messageId, th) -> {
                log(th, str, serviceUnitStateData, serviceUnitStateData2);
            });
        }
    }

    private void handleReleaseEvent(String str, ServiceUnitStateData serviceUnitStateData) {
        ServiceUnitStateData serviceUnitStateData2;
        CompletableFuture<Integer> closeServiceUnit;
        if (isTargetBroker(serviceUnitStateData.sourceBroker())) {
            if (isTransferCommand(serviceUnitStateData)) {
                serviceUnitStateData2 = new ServiceUnitStateData(ServiceUnitState.Assigning, serviceUnitStateData.dstBroker(), serviceUnitStateData.sourceBroker(), getNextVersionId(serviceUnitStateData));
                closeServiceUnit = closeServiceUnit(str, false);
            } else {
                serviceUnitStateData2 = new ServiceUnitStateData(ServiceUnitState.Free, (String) null, serviceUnitStateData.sourceBroker(), getNextVersionId(serviceUnitStateData));
                closeServiceUnit = closeServiceUnit(str, true);
            }
            ServiceUnitStateData serviceUnitStateData3 = serviceUnitStateData2;
            CompletableFuture notifyOnCompletion = this.stateChangeListeners.notifyOnCompletion(closeServiceUnit.thenCompose(num -> {
                return pubAsync(str, serviceUnitStateData3);
            }), str, serviceUnitStateData);
            ServiceUnitStateData serviceUnitStateData4 = serviceUnitStateData2;
            notifyOnCompletion.whenComplete((messageId, th) -> {
                log(th, str, serviceUnitStateData, serviceUnitStateData4);
            });
        }
    }

    private void handleSplitEvent(String str, ServiceUnitStateData serviceUnitStateData) {
        if (isTargetBroker(serviceUnitStateData.sourceBroker())) {
            this.stateChangeListeners.notifyOnCompletion(splitServiceUnit(str, serviceUnitStateData), str, serviceUnitStateData).whenComplete((r9, th) -> {
                log(th, str, serviceUnitStateData, null);
            });
        }
    }

    private void handleFreeEvent(String str, ServiceUnitStateData serviceUnitStateData) {
        CompletableFuture<String> remove = this.getOwnerRequests.remove(str);
        if (remove != null) {
            remove.complete(null);
        }
        if (isTargetBroker(serviceUnitStateData.sourceBroker())) {
            this.stateChangeListeners.notifyOnCompletion((serviceUnitStateData.force() ? closeServiceUnit(str, true).thenCompose(num -> {
                return tombstoneAsync(str);
            }) : CompletableFuture.completedFuture(0)).thenApply(serializable -> {
                return null;
            }), str, serviceUnitStateData).whenComplete((r9, th) -> {
                log(th, str, serviceUnitStateData, null);
            });
        } else {
            this.stateChangeListeners.notify(str, serviceUnitStateData, null);
        }
    }

    private void handleDeleteEvent(String str, ServiceUnitStateData serviceUnitStateData) {
        CompletableFuture<String> remove = this.getOwnerRequests.remove(str);
        if (remove != null) {
            remove.completeExceptionally(new IllegalStateException(str + "has been deleted."));
        }
        if (isTargetBroker(serviceUnitStateData.sourceBroker())) {
            this.stateChangeListeners.notifyOnCompletion(tombstoneAsync(str), str, serviceUnitStateData).whenComplete((messageId, th) -> {
                log(th, str, serviceUnitStateData, null);
            });
        } else {
            this.stateChangeListeners.notify(str, serviceUnitStateData, null);
        }
    }

    private void handleInitEvent(String str) {
        CompletableFuture<String> remove = this.getOwnerRequests.remove(str);
        if (remove != null) {
            remove.complete(null);
        }
        this.stateChangeListeners.notify(str, null, null);
        log(null, str, null, null);
    }

    private CompletableFuture<MessageId> pubAsync(String str, ServiceUnitStateData serviceUnitStateData) {
        CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
        this.producer.newMessage().key(str).value(serviceUnitStateData).sendAsync().whenComplete((messageId, th) -> {
            if (th == null) {
                completableFuture.complete(messageId);
            } else {
                log.error("Failed to publish the message: serviceUnit:{}, data:{}", new Object[]{str, serviceUnitStateData, th});
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    private CompletableFuture<MessageId> tombstoneAsync(String str) {
        return pubAsync(str, null);
    }

    private boolean isTargetBroker(String str) {
        if (str == null) {
            return false;
        }
        return str.equals(this.brokerId);
    }

    private CompletableFuture<String> deferGetOwner(String str) {
        CompletableFuture<String> exceptionally = new CompletableFuture().orTimeout(this.inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS).exceptionally(th -> {
            Optional<String> owner = getOwner(str);
            log.warn("{} failed to wait for owner for serviceUnit:{}; Trying to return the current owner:{}", new Object[]{this.brokerId, str, owner, th});
            if (owner == null) {
                throw new IllegalStateException(th);
            }
            return owner.orElse(null);
        });
        if (debug()) {
            log.info("{} is waiting for owner for serviceUnit:{}", this.brokerId, str);
        }
        return exceptionally;
    }

    private CompletableFuture<String> dedupeGetOwnerRequest(String str) {
        MutableObject mutableObject = new MutableObject();
        try {
            CompletableFuture<String> computeIfAbsent = this.getOwnerRequests.computeIfAbsent(str, str2 -> {
                Optional<String> owner = getOwner(str);
                if (owner == null || !owner.isPresent()) {
                    mutableObject.setValue(deferGetOwner(str));
                } else {
                    mutableObject.setValue(this.brokerRegistry.lookupAsync(owner.get()).thenCompose(optional -> {
                        return optional.isPresent() ? CompletableFuture.completedFuture((String) owner.get()) : deferGetOwner(str);
                    }));
                }
                return (CompletableFuture) mutableObject.getValue();
            });
            CompletableFuture completableFuture = (CompletableFuture) mutableObject.getValue();
            if (completableFuture != null) {
                completableFuture.whenComplete((str3, th) -> {
                    this.getOwnerRequests.remove(str);
                    if (th != null) {
                        log.warn("{} failed to getOwner for serviceUnit:{}", new Object[]{this.brokerId, str, th});
                    }
                });
            }
            return computeIfAbsent;
        } catch (Throwable th2) {
            CompletableFuture completableFuture2 = (CompletableFuture) mutableObject.getValue();
            if (completableFuture2 != null) {
                completableFuture2.whenComplete((str32, th3) -> {
                    this.getOwnerRequests.remove(str);
                    if (th3 != null) {
                        log.warn("{} failed to getOwner for serviceUnit:{}", new Object[]{this.brokerId, str, th3});
                    }
                });
            }
            throw th2;
        }
    }

    private CompletableFuture<Integer> closeServiceUnit(String str, boolean z) {
        long nanoTime = System.nanoTime();
        MutableInt mutableInt = new MutableInt();
        NamespaceBundle namespaceBundle = LoadManagerShared.getNamespaceBundle(this.pulsar, str);
        return this.pulsar.getBrokerService().unloadServiceUnit(namespaceBundle, z, true, this.pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS).thenApply(num -> {
            mutableInt.setValue(num);
            return num;
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (num2, th) -> {
            if (z) {
                this.pulsar.getBrokerService().cleanUnloadedTopicFromCache(namespaceBundle);
            }
            this.pulsar.getNamespaceService().onNamespaceBundleUnload(namespaceBundle);
            double millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
            if (th == null) {
                log.info("Unloading bundle:{} with {} topics completed in {} ms", new Object[]{namespaceBundle, mutableInt, Double.valueOf(millis)});
                return;
            }
            log.error("Failed to close topics under bundle:{} in {} ms", new Object[]{namespaceBundle.toString(), Double.valueOf(millis), th});
            if (z) {
                return;
            }
            this.pulsar.getBrokerService().cleanUnloadedTopicFromCache(namespaceBundle);
        });
    }

    private CompletableFuture<Void> splitServiceUnit(String str, ServiceUnitStateData serviceUnitStateData) {
        long nanoTime = System.nanoTime();
        NamespaceService namespaceService = this.pulsar.getNamespaceService();
        NamespaceBundleFactory namespaceBundleFactory = namespaceService.getNamespaceBundleFactory();
        NamespaceBundle namespaceBundle = LoadManagerShared.getNamespaceBundle(this.pulsar, str);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        Map<String, Optional<String>> splitServiceUnitToDestBroker = serviceUnitStateData.splitServiceUnitToDestBroker();
        ArrayList arrayList = null;
        NamespaceBundleSplitAlgorithm namespaceBundleSplitAlgorithmByName = namespaceService.getNamespaceBundleSplitAlgorithmByName(this.config.getDefaultNamespaceBundleSplitAlgorithm());
        if (splitServiceUnitToDestBroker != null && splitServiceUnitToDestBroker.size() == 2) {
            HashSet hashSet = new HashSet();
            String namespaceName = namespaceBundle.getNamespaceObject().toString();
            splitServiceUnitToDestBroker.forEach((str2, optional) -> {
                NamespaceBundle bundle = namespaceBundleFactory.getBundle(namespaceName, str2);
                hashSet.add((Long) bundle.getKeyRange().lowerEndpoint());
                hashSet.add((Long) bundle.getKeyRange().upperEndpoint());
            });
            arrayList = new ArrayList(hashSet);
            namespaceBundleSplitAlgorithmByName = NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE_FORCE_ALGO;
        }
        splitServiceUnitOnceAndRetry(namespaceService, namespaceBundleFactory, namespaceBundleSplitAlgorithmByName, namespaceBundle, (List) serviceUnitStateData.splitServiceUnitToDestBroker().keySet().stream().map(str3 -> {
            return namespaceBundleFactory.getBundle(namespaceBundle.getNamespaceObject().toString(), str3);
        }).collect(Collectors.toList()), arrayList, serviceUnitStateData, new AtomicInteger(0), nanoTime, completableFuture);
        return completableFuture;
    }

    @VisibleForTesting
    protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, NamespaceBundleFactory namespaceBundleFactory, NamespaceBundleSplitAlgorithm namespaceBundleSplitAlgorithm, NamespaceBundle namespaceBundle, List<NamespaceBundle> list, List<Long> list2, ServiceUnitStateData serviceUnitStateData, AtomicInteger atomicInteger, long j, CompletableFuture<Void> completableFuture) {
        ownChildBundles(list, serviceUnitStateData).thenCompose(r15 -> {
            return getSplitNamespaceBundles(namespaceService, namespaceBundleFactory, namespaceBundleSplitAlgorithm, namespaceBundle, list, list2);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) namespaceBundles -> {
            return updateSplitNamespaceBundlesAsync(namespaceService, namespaceBundleFactory, namespaceBundle, namespaceBundles);
        }).thenAccept(r5 -> {
            this.pulsar.getBrokerService().refreshTopicToStatsMaps(namespaceBundle);
        }).thenAccept(r13 -> {
            pubAsync(namespaceBundle.toString(), new ServiceUnitStateData(ServiceUnitState.Deleted, (String) null, serviceUnitStateData.sourceBroker(), getNextVersionId(serviceUnitStateData)));
        }).thenAccept(r14 -> {
            log.info("Successfully split {} parent namespace-bundle to {} in {} ms", new Object[]{namespaceBundle, list, Double.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - j))});
            namespaceService.onNamespaceBundleSplit(namespaceBundle);
            completableFuture.complete(null);
        }).exceptionally(th -> {
            Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
            if ((unwrapCompletionException instanceof MetadataStoreException.BadVersionException) && atomicInteger.incrementAndGet() < 7) {
                log.warn("Failed to update bundle range in metadata store. Retrying {} th / {} limit", new Object[]{Integer.valueOf(atomicInteger.get()), 7, th});
                this.pulsar.getExecutor().schedule(() -> {
                    splitServiceUnitOnceAndRetry(namespaceService, namespaceBundleFactory, namespaceBundleSplitAlgorithm, namespaceBundle, list, list2, serviceUnitStateData, atomicInteger, j, completableFuture);
                }, 100L, TimeUnit.MILLISECONDS);
                return null;
            }
            String format = String.format("Failed to split bundle %s, Retried %d th / %d limit, reason %s", namespaceBundle.toString(), Integer.valueOf(atomicInteger.get()), 7, unwrapCompletionException.getMessage());
            log.warn(format, unwrapCompletionException);
            completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(format));
            return null;
        });
    }

    private CompletableFuture<Void> ownChildBundles(List<NamespaceBundle> list, ServiceUnitStateData serviceUnitStateData) {
        ArrayList arrayList = new ArrayList(list.size());
        boolean debug = debug();
        Iterator<NamespaceBundle> it = list.iterator();
        while (it.hasNext()) {
            String namespaceBundle = it.next().toString();
            if (((ServiceUnitStateData) this.tableview.get(namespaceBundle)) == null) {
                arrayList.add(pubAsync(namespaceBundle, new ServiceUnitStateData(ServiceUnitState.Owned, serviceUnitStateData.sourceBroker(), 1L)).thenApply(messageId -> {
                    return null;
                }));
            } else if (debug) {
                log.info("Already owned child bundle:{}", namespaceBundle);
            }
        }
        return !arrayList.isEmpty() ? FutureUtil.waitForAll(arrayList) : CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<NamespaceBundles> getSplitNamespaceBundles(NamespaceService namespaceService, NamespaceBundleFactory namespaceBundleFactory, NamespaceBundleSplitAlgorithm namespaceBundleSplitAlgorithm, NamespaceBundle namespaceBundle, List<NamespaceBundle> list, List<Long> list2) {
        CompletableFuture<NamespaceBundles> completableFuture = new CompletableFuture<>();
        boolean debug = debug();
        NamespaceBundles bundles = namespaceBundleFactory.getBundles(namespaceBundle.getNamespaceObject());
        boolean z = false;
        try {
            bundles.validateBundle(namespaceBundle);
        } catch (IllegalArgumentException e) {
            if (debug) {
                log.info("Namespace bundles do not contain the parent bundle:{}", namespaceBundle);
            }
            for (NamespaceBundle namespaceBundle2 : list) {
                try {
                    bundles.validateBundle(namespaceBundle2);
                    if (debug) {
                        log.info("Namespace bundles contain the child bundle:{}", namespaceBundle2);
                    }
                } catch (Exception e2) {
                    completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException("Namespace bundles do not contain the child bundle:" + namespaceBundle2, e));
                    return completableFuture;
                }
            }
            z = true;
        } catch (Exception e3) {
            completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException("Failed to validate the parent bundle in the namespace bundles.", e3));
            return completableFuture;
        }
        if (!z) {
            return namespaceService.getSplitBoundary(namespaceBundle, namespaceBundleSplitAlgorithm, list2).thenApply(pair -> {
                return (NamespaceBundles) pair.getLeft();
            });
        }
        completableFuture.complete(bundles);
        return completableFuture;
    }

    private CompletableFuture<Void> updateSplitNamespaceBundlesAsync(NamespaceService namespaceService, NamespaceBundleFactory namespaceBundleFactory, NamespaceBundle namespaceBundle, NamespaceBundles namespaceBundles) {
        NamespaceName namespaceObject = namespaceBundle.getNamespaceObject();
        return namespaceService.updateNamespaceBundles(namespaceObject, namespaceBundles).thenCompose(r7 -> {
            return namespaceService.updateNamespaceBundlesForPolicies(namespaceObject, namespaceBundles);
        }).thenAccept((Consumer<? super U>) r6 -> {
            namespaceBundleFactory.invalidateBundleCache(namespaceBundle.getNamespaceObject());
            if (debug()) {
                log.info("Successfully updated split namespace bundles and namespace bundle cache.");
            }
        });
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public void handleMetadataSessionEvent(SessionEvent sessionEvent) {
        if (sessionEvent == SessionEvent.SessionReestablished || sessionEvent == SessionEvent.SessionLost) {
            this.lastMetadataSessionEvent = sessionEvent;
            this.lastMetadataSessionEventTimestamp = System.currentTimeMillis();
            log.info("Received metadata session event:{} at timestamp:{}", this.lastMetadataSessionEvent, Long.valueOf(this.lastMetadataSessionEventTimestamp));
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public void handleBrokerRegistrationEvent(String str, NotificationType notificationType) {
        if (notificationType == NotificationType.Created) {
            log.info("BrokerRegistry detected the broker:{} registry has been created.", str);
            handleBrokerCreationEvent(str);
        } else if (notificationType == NotificationType.Deleted) {
            log.info("BrokerRegistry detected the broker:{} registry has been deleted.", str);
            handleBrokerDeletionEvent(str);
        }
    }

    private MetadataState getMetadataState() {
        return this.lastMetadataSessionEvent == SessionEvent.SessionReestablished ? System.currentTimeMillis() - this.lastMetadataSessionEventTimestamp > 1000 * this.maxCleanupDelayTimeInSecs ? MetadataState.Stable : MetadataState.Jittery : MetadataState.Unstable;
    }

    private void handleBrokerCreationEvent(String str) {
        CompletableFuture<Void> remove = this.cleanupJobs.remove(str);
        if (remove != null) {
            remove.cancel(false);
            this.totalInactiveBrokerCleanupCancelledCnt++;
            log.info("Successfully cancelled the ownership cleanup for broker:{}. Active cleanup job count:{}", str, Integer.valueOf(this.cleanupJobs.size()));
        } else if (debug()) {
            log.info("No needs to cancel the ownership cleanup for broker:{}. There was no scheduled cleanup job. Active cleanup job count:{}", str, Integer.valueOf(this.cleanupJobs.size()));
        }
    }

    private void handleBrokerDeletionEvent(String str) {
        if (!isChannelOwner()) {
            log.warn("This broker is not the leader now. Ignoring BrokerDeletionEvent for broker {}.", str);
            return;
        }
        MetadataState metadataState = getMetadataState();
        log.info("Handling broker:{} ownership cleanup based on metadata connection state:{}, event:{}, event_ts:{}:", new Object[]{str, metadataState, this.lastMetadataSessionEvent, Long.valueOf(this.lastMetadataSessionEventTimestamp)});
        switch (metadataState) {
            case Stable:
                scheduleCleanup(str, this.minCleanupDelayTimeInSecs);
                return;
            case Jittery:
                scheduleCleanup(str, this.maxCleanupDelayTimeInSecs);
                return;
            case Unstable:
                this.totalInactiveBrokerCleanupIgnoredCnt++;
                log.error("MetadataState state is unstable. Ignoring the ownership cleanup request for the reported broker :{}", str);
                return;
            default:
                return;
        }
    }

    private void scheduleCleanup(String str, long j) {
        MutableObject mutableObject = new MutableObject();
        try {
            this.cleanupJobs.computeIfAbsent(str, str2 -> {
                Executor delayedExecutor = CompletableFuture.delayedExecutor(j, TimeUnit.SECONDS, this.pulsar.getLoadManagerExecutor());
                this.totalInactiveBrokerCleanupScheduledCnt++;
                CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
                    try {
                        doCleanup(str);
                    } catch (Throwable th) {
                        log.error("Failed to run the cleanup job for the broker {}, totalCleanupErrorCnt:{}.", new Object[]{str, Long.valueOf(this.totalCleanupErrorCnt.incrementAndGet()), th});
                    }
                }, delayedExecutor);
                mutableObject.setValue(runAsync);
                return runAsync;
            });
            CompletableFuture completableFuture = (CompletableFuture) mutableObject.getValue();
            if (completableFuture != null) {
                completableFuture.whenComplete((r5, th) -> {
                    this.cleanupJobs.remove(str);
                });
            }
            log.info("Scheduled ownership cleanup for broker:{} with delay:{} secs. Pending clean jobs:{}.", new Object[]{str, Long.valueOf(j), Integer.valueOf(this.cleanupJobs.size())});
        } catch (Throwable th2) {
            CompletableFuture completableFuture2 = (CompletableFuture) mutableObject.getValue();
            if (completableFuture2 != null) {
                completableFuture2.whenComplete((r52, th3) -> {
                    this.cleanupJobs.remove(str);
                });
            }
            throw th2;
        }
    }

    private void overrideOwnership(String str, ServiceUnitStateData serviceUnitStateData, String str2) {
        long nextVersionId = getNextVersionId(serviceUnitStateData);
        try {
            selectBroker(str, str2).thenApply(optional -> {
                return (ServiceUnitStateData) optional.map(str3 -> {
                    if (serviceUnitStateData.state() == ServiceUnitState.Splitting) {
                        return new ServiceUnitStateData(ServiceUnitState.Splitting, serviceUnitStateData.dstBroker(), str3, Map.copyOf(serviceUnitStateData.splitServiceUnitToDestBroker()), true, nextVersionId);
                    }
                    if (serviceUnitStateData.state() == ServiceUnitState.Owned) {
                        return new ServiceUnitStateData(ServiceUnitState.Owned, str3, str3.equals(serviceUnitStateData.dstBroker()) ? null : serviceUnitStateData.dstBroker(), true, nextVersionId);
                    }
                    return new ServiceUnitStateData(ServiceUnitState.Owned, str3, str3.equals(serviceUnitStateData.sourceBroker()) ? null : serviceUnitStateData.sourceBroker(), true, nextVersionId);
                }).orElseGet(() -> {
                    return new ServiceUnitStateData(ServiceUnitState.Free, (String) null, serviceUnitStateData.state() == ServiceUnitState.Owned ? serviceUnitStateData.dstBroker() : serviceUnitStateData.sourceBroker(), true, nextVersionId);
                });
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) serviceUnitStateData2 -> {
                log.info("Overriding inactiveBroker:{}, ownership serviceUnit:{} from orphanData:{} to overrideData:{}", new Object[]{str2, str, serviceUnitStateData, serviceUnitStateData2});
                return publishOverrideEventAsync(str, serviceUnitStateData2);
            }).get(this.config.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
        } catch (Throwable th) {
            log.error("Failed to override inactiveBroker:{} ownership serviceUnit:{} orphanData:{}. totalCleanupErrorCnt:{}", new Object[]{str2, str, serviceUnitStateData, Long.valueOf(this.totalCleanupErrorCnt.incrementAndGet()), th});
        }
    }

    private void waitForCleanups(String str, boolean z, int i) {
        long currentTimeMillis = System.currentTimeMillis();
        while (System.currentTimeMillis() - currentTimeMillis < i) {
            boolean z2 = true;
            Iterator it = this.tableview.entrySet().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Map.Entry entry = (Map.Entry) it.next();
                String str2 = (String) entry.getKey();
                ServiceUnitStateData serviceUnitStateData = (ServiceUnitStateData) entry.getValue();
                if (!z || !str2.startsWith(NamespaceName.SYSTEM_NAMESPACE.toString())) {
                    if (serviceUnitStateData.state() == ServiceUnitState.Owned && str.equals(serviceUnitStateData.dstBroker())) {
                        z2 = false;
                        break;
                    }
                }
            }
            if (z2) {
                break;
            }
            try {
                TimeUnit.MILLISECONDS.sleep(100L);
            } catch (InterruptedException e) {
                log.warn("Interrupted while delaying the next service unit clean-up. Cleaning broker:{}", this.brokerId);
            }
        }
        log.info("Finished cleanup waiting for orphan broker:{}. Elapsed {} ms", this.brokerId, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    private synchronized void doCleanup(String str) {
        try {
            if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS).isEmpty()) {
                log.error("Found the channel owner is empty. Skip the inactive broker:{}'s orphan bundle cleanup", str);
                return;
            }
            long nanoTime = System.nanoTime();
            log.info("Started ownership cleanup for the inactive broker:{}", str);
            int i = 0;
            long j = this.totalCleanupErrorCnt.get();
            HashMap hashMap = new HashMap();
            for (Map.Entry entry : this.tableview.entrySet()) {
                ServiceUnitStateData serviceUnitStateData = (ServiceUnitStateData) entry.getValue();
                String str2 = (String) entry.getKey();
                ServiceUnitState state = ServiceUnitStateData.state(serviceUnitStateData);
                if ((StringUtils.equals(str, serviceUnitStateData.dstBroker()) && ServiceUnitState.isActiveState(state)) || (StringUtils.equals(str, serviceUnitStateData.sourceBroker()) && ServiceUnitState.isInFlightState(state))) {
                    if (str2.startsWith(NamespaceName.SYSTEM_NAMESPACE.toString())) {
                        hashMap.put(str2, serviceUnitStateData);
                    } else {
                        overrideOwnership(str2, serviceUnitStateData, str);
                    }
                    i++;
                }
            }
            try {
                this.producer.flushAsync().get(5000L, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                log.error("Failed to flush the in-flight non-system bundle override messages.", e);
            }
            if (i > 0) {
                waitForCleanups(str, true, OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS);
                this.totalOrphanServiceUnitCleanupCnt += i;
                this.totalInactiveBrokerCleanupCnt++;
            }
            for (Map.Entry entry2 : hashMap.entrySet()) {
                log.info("Overriding orphan system service unit:{}", entry2.getKey());
                overrideOwnership((String) entry2.getKey(), (ServiceUnitStateData) entry2.getValue(), str);
            }
            try {
                this.producer.flushAsync().get(5000L, TimeUnit.MILLISECONDS);
            } catch (Exception e2) {
                log.error("Failed to flush the in-flight system bundle override messages.", e2);
            }
            double millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
            getContext().topBundleLoadDataStore().removeAsync(str);
            getContext().brokerLoadDataStore().removeAsync(str);
            log.info("Completed a cleanup for the inactive broker:{} in {} ms. Cleaned up orphan service units: orphanServiceUnitCleanupCnt:{}, approximate cleanupErrorCnt:{}, metrics:{} ", new Object[]{str, Double.valueOf(millis), Integer.valueOf(i), Long.valueOf(this.totalCleanupErrorCnt.get() - j), printCleanupMetrics()});
        } catch (Exception e3) {
            log.error("Failed to find the channel owner. Skip the inactive broker:{}'s orphan bundle cleanup", str);
        }
    }

    private CompletableFuture<Optional<String>> selectBroker(String str, String str2) {
        return getLoadManager().selectAsync(LoadManagerShared.getNamespaceBundle(this.pulsar, str), str2 == null ? Set.of() : Set.of(str2), LookupOptions.builder().build());
    }

    @VisibleForTesting
    protected void monitorOwnerships(List<String> list) {
        if (!isChannelOwner()) {
            log.warn("This broker is not the leader now. Skipping ownership monitor.");
            return;
        }
        if (list == null || list.size() == 0) {
            log.error("no active brokers found. Skipping ownership monitor.");
            return;
        }
        MetadataState metadataState = getMetadataState();
        if (metadataState != MetadataState.Stable) {
            log.warn("metadata state:{} is not Stable. Skipping ownership monitor.", metadataState);
            return;
        }
        boolean debug = debug();
        if (debug) {
            log.info("Started the ownership monitor run for activeBrokerCount:{}", Integer.valueOf(list.size()));
        }
        long nanoTime = System.nanoTime();
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet(list);
        HashMap hashMap = new HashMap();
        int i = 0;
        int i2 = 0;
        long j = this.totalCleanupErrorCnt.get();
        long currentTimeMillis = System.currentTimeMillis();
        for (Map.Entry entry : this.tableview.entrySet()) {
            String str = (String) entry.getKey();
            ServiceUnitStateData serviceUnitStateData = (ServiceUnitStateData) entry.getValue();
            String dstBroker = serviceUnitStateData.dstBroker();
            String sourceBroker = serviceUnitStateData.sourceBroker();
            ServiceUnitState state = serviceUnitStateData.state();
            if (state == ServiceUnitState.Owned && (StringUtils.isBlank(dstBroker) || !hashSet2.contains(dstBroker))) {
                hashSet.add(dstBroker);
            } else if (ServiceUnitState.isInFlightState(state) && StringUtils.isNotBlank(sourceBroker) && !hashSet2.contains(sourceBroker)) {
                hashSet.add(sourceBroker);
            } else if (ServiceUnitState.isInFlightState(state) && StringUtils.isNotBlank(dstBroker) && !hashSet2.contains(dstBroker)) {
                hashSet.add(dstBroker);
            } else if (ServiceUnitState.isInFlightState(state) && currentTimeMillis - serviceUnitStateData.timestamp() > this.inFlightStateWaitingTimeInMillis) {
                hashMap.put(str, serviceUnitStateData);
            } else if (!ServiceUnitState.isActiveState(state) && currentTimeMillis - serviceUnitStateData.timestamp() > this.stateTombstoneDelayTimeInMillis) {
                log.info("Found semi-terminal states to tombstone serviceUnit:{}, stateData:{}", str, serviceUnitStateData);
                tombstoneAsync(str).whenComplete((messageId, th) -> {
                    if (th != null) {
                        log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}, cleanupErrorCnt:{}.", new Object[]{str, serviceUnitStateData, Long.valueOf(this.totalCleanupErrorCnt.incrementAndGet() - j), th});
                    }
                });
                i++;
            }
        }
        if (!hashSet.isEmpty()) {
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                handleBrokerDeletionEvent((String) it.next());
            }
        }
        if (!hashMap.isEmpty()) {
            for (Map.Entry entry2 : hashMap.entrySet()) {
                overrideOwnership((String) entry2.getKey(), (ServiceUnitStateData) entry2.getValue(), null);
                i2++;
            }
        }
        try {
            this.producer.flushAsync().get(5000L, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            log.error("Failed to flush the in-flight messages.", e);
        }
        boolean z = false;
        if (i > 0) {
            this.totalServiceUnitTombstoneCleanupCnt += i;
            z = true;
        }
        if (i2 > 0) {
            this.totalOrphanServiceUnitCleanupCnt += i2;
            z = true;
        }
        if (debug || z) {
            log.info("Completed the ownership monitor run in {} ms. Scheduled cleanups for inactive brokers:{}. inactiveBrokerCount:{}. Published cleanups for orphan service units, orphanServiceUnitCleanupCnt:{}. Tombstoned semi-terminal state service units, serviceUnitTombstoneCleanupCnt:{}. Approximate cleanupErrorCnt:{}, metrics:{}. ", new Object[]{Double.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime)), hashSet, Integer.valueOf(hashSet.size()), Integer.valueOf(i2), Integer.valueOf(i), Long.valueOf(this.totalCleanupErrorCnt.get() - j), printCleanupMetrics()});
        }
    }

    private String printCleanupMetrics() {
        return String.format("{totalInactiveBrokerCleanupCnt:%d, totalServiceUnitTombstoneCleanupCnt:%d, totalOrphanServiceUnitCleanupCnt:%d, totalCleanupErrorCnt:%d, totalInactiveBrokerCleanupScheduledCnt%d, totalInactiveBrokerCleanupIgnoredCnt:%d, totalInactiveBrokerCleanupCancelledCnt:%d,   activeCleanupJobs:%d}", Long.valueOf(this.totalInactiveBrokerCleanupCnt), Long.valueOf(this.totalServiceUnitTombstoneCleanupCnt), Long.valueOf(this.totalOrphanServiceUnitCleanupCnt), Long.valueOf(this.totalCleanupErrorCnt.get()), Long.valueOf(this.totalInactiveBrokerCleanupScheduledCnt), Long.valueOf(this.totalInactiveBrokerCleanupIgnoredCnt), Long.valueOf(this.totalInactiveBrokerCleanupCancelledCnt), Integer.valueOf(this.cleanupJobs.size()));
    }

    private int getTotalOwnedServiceUnitCnt() {
        if (this.tableview == null) {
            return 0;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (this.lastOwnEventHandledAt > this.lastOwnedServiceUnitCountAt || currentTimeMillis - this.lastOwnedServiceUnitCountAt > MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS) {
            int i = 0;
            for (ServiceUnitStateData serviceUnitStateData : this.tableview.values()) {
                if (serviceUnitStateData.state() == ServiceUnitState.Owned && isTargetBroker(serviceUnitStateData.dstBroker())) {
                    i++;
                }
            }
            this.lastOwnedServiceUnitCountAt = currentTimeMillis;
            this.totalOwnedServiceUnitCnt = i;
        }
        return this.totalOwnedServiceUnitCnt;
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public List<Metrics> getMetrics() {
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        hashMap.put("metric", "sunitStateChn");
        hashMap.put("broker", this.pulsar.getAdvertisedAddress());
        for (Map.Entry<ServiceUnitState, Counters> entry : this.ownerLookUpCounters.entrySet()) {
            HashMap hashMap2 = new HashMap(hashMap);
            hashMap2.put("state", entry.getKey().toString());
            hashMap2.put("result", "Total");
            Metrics create = Metrics.create(hashMap2);
            create.put("brk_sunit_state_chn_owner_lookup_total", Long.valueOf(entry.getValue().getTotal().get()));
            arrayList.add(create);
            HashMap hashMap3 = new HashMap(hashMap);
            hashMap3.put("state", entry.getKey().toString());
            hashMap3.put("result", "Failure");
            Metrics create2 = Metrics.create(hashMap3);
            create2.put("brk_sunit_state_chn_owner_lookup_total", Long.valueOf(entry.getValue().getFailure().get()));
            arrayList.add(create2);
        }
        for (Map.Entry<EventType, Counters> entry2 : this.eventCounters.entrySet()) {
            HashMap hashMap4 = new HashMap(hashMap);
            hashMap4.put("event", entry2.getKey().toString());
            hashMap4.put("result", "Total");
            Metrics create3 = Metrics.create(hashMap4);
            create3.put("brk_sunit_state_chn_event_publish_ops_total", Long.valueOf(entry2.getValue().getTotal().get()));
            arrayList.add(create3);
            HashMap hashMap5 = new HashMap(hashMap);
            hashMap5.put("event", entry2.getKey().toString());
            hashMap5.put("result", "Failure");
            Metrics create4 = Metrics.create(hashMap5);
            create4.put("brk_sunit_state_chn_event_publish_ops_total", Long.valueOf(entry2.getValue().getFailure().get()));
            arrayList.add(create4);
        }
        for (Map.Entry<ServiceUnitState, Counters> entry3 : this.handlerCounters.entrySet()) {
            HashMap hashMap6 = new HashMap(hashMap);
            hashMap6.put("event", entry3.getKey().toString());
            hashMap6.put("result", "Total");
            Metrics create5 = Metrics.create(hashMap6);
            create5.put("brk_sunit_state_chn_subscribe_ops_total", Long.valueOf(entry3.getValue().getTotal().get()));
            arrayList.add(create5);
            HashMap hashMap7 = new HashMap(hashMap);
            hashMap7.put("event", entry3.getKey().toString());
            hashMap7.put("result", "Failure");
            Metrics create6 = Metrics.create(hashMap7);
            create6.put("brk_sunit_state_chn_subscribe_ops_total", Long.valueOf(entry3.getValue().getFailure().get()));
            arrayList.add(create6);
        }
        HashMap hashMap8 = new HashMap(hashMap);
        hashMap8.put("result", "Failure");
        Metrics create7 = Metrics.create(hashMap8);
        create7.put("brk_sunit_state_chn_cleanup_ops_total", Long.valueOf(this.totalCleanupErrorCnt.get()));
        arrayList.add(create7);
        HashMap hashMap9 = new HashMap(hashMap);
        hashMap9.put("result", "Skip");
        Metrics create8 = Metrics.create(hashMap9);
        create8.put("brk_sunit_state_chn_inactive_broker_cleanup_ops_total", Long.valueOf(this.totalInactiveBrokerCleanupIgnoredCnt));
        arrayList.add(create8);
        HashMap hashMap10 = new HashMap(hashMap);
        hashMap10.put("result", "Cancel");
        Metrics create9 = Metrics.create(hashMap10);
        create9.put("brk_sunit_state_chn_inactive_broker_cleanup_ops_total", Long.valueOf(this.totalInactiveBrokerCleanupCancelledCnt));
        arrayList.add(create9);
        HashMap hashMap11 = new HashMap(hashMap);
        hashMap11.put("result", "Schedule");
        Metrics create10 = Metrics.create(hashMap11);
        create10.put("brk_sunit_state_chn_inactive_broker_cleanup_ops_total", Long.valueOf(this.totalInactiveBrokerCleanupScheduledCnt));
        arrayList.add(create10);
        HashMap hashMap12 = new HashMap(hashMap);
        hashMap12.put("result", "Success");
        Metrics create11 = Metrics.create(hashMap12);
        create11.put("brk_sunit_state_chn_inactive_broker_cleanup_ops_total", Long.valueOf(this.totalInactiveBrokerCleanupCnt));
        arrayList.add(create11);
        Metrics create12 = Metrics.create(hashMap);
        create12.put("brk_sunit_state_chn_orphan_su_cleanup_ops_total", Long.valueOf(this.totalOrphanServiceUnitCleanupCnt));
        create12.put("brk_sunit_state_chn_su_tombstone_cleanup_ops_total", Long.valueOf(this.totalServiceUnitTombstoneCleanupCnt));
        create12.put("brk_sunit_state_chn_owned_su_total", Integer.valueOf(getTotalOwnedServiceUnitCnt()));
        arrayList.add(create12);
        return arrayList;
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public void listen(StateChangeListener stateChangeListener) {
        this.stateChangeListeners.addListener(stateChangeListener);
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel
    public Set<Map.Entry<String, ServiceUnitStateData>> getOwnershipEntrySet() {
        return this.tableview.entrySet();
    }

    public static ServiceUnitStateChannel get(PulsarService pulsarService) {
        return ExtensibleLoadManagerImpl.get(pulsarService.getLoadManager().get()).getServiceUnitStateChannel();
    }
}
