/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.extensions.channel;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.PulsarClusterMetadataSetup;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.channel.StateChangeListeners;
import org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServiceUnitStateChannelImpl
implements ServiceUnitStateChannel {
    private static final Logger log = LoggerFactory.getLogger(ServiceUnitStateChannelImpl.class);
    public static final String TOPIC = TopicName.get((String)TopicDomain.persistent.value(), (NamespaceName)NamespaceName.SYSTEM_NAMESPACE, (String)"loadbalancer-service-unit-state").toString();
    public static final CompressionType MSG_COMPRESSION_TYPE = CompressionType.ZSTD;
    private static final long MAX_IN_FLIGHT_STATE_WAITING_TIME_IN_MILLIS = 30000L;
    private static final int OWNERSHIP_CLEAN_UP_MAX_WAIT_TIME_IN_MILLIS = 5000;
    private static final int OWNERSHIP_CLEAN_UP_WAIT_RETRY_DELAY_IN_MILLIS = 100;
    private static final int OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS = 3000;
    public static final long VERSION_ID_INIT = 1L;
    private static final long OWNERSHIP_MONITOR_DELAY_TIME_IN_SECS = 60L;
    public static final long MAX_CLEAN_UP_DELAY_TIME_IN_SECS = 180L;
    private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0L;
    private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS = 10L;
    private static final int MAX_OUTSTANDING_PUB_MESSAGES = 500;
    private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 600000L;
    private final PulsarService pulsar;
    private final ServiceConfiguration config;
    private final Schema<ServiceUnitStateData> schema;
    private final ConcurrentOpenHashMap<String, CompletableFuture<String>> getOwnerRequests;
    private final String lookupServiceAddress;
    private final ConcurrentOpenHashMap<String, CompletableFuture<Void>> cleanupJobs;
    private final StateChangeListeners stateChangeListeners;
    private ExtensibleLoadManagerImpl loadManager;
    private BrokerRegistry brokerRegistry;
    private LeaderElectionService leaderElectionService;
    private TableView<ServiceUnitStateData> tableview;
    private Producer<ServiceUnitStateData> producer;
    private ScheduledFuture<?> monitorTask;
    private SessionEvent lastMetadataSessionEvent = SessionEvent.SessionReestablished;
    private long lastMetadataSessionEventTimestamp = 0L;
    private long inFlightStateWaitingTimeInMillis;
    private long ownershipMonitorDelayTimeInSecs;
    private long semiTerminalStateWaitingTimeInMillis;
    private long maxCleanupDelayTimeInSecs;
    private long minCleanupDelayTimeInSecs;
    private long totalInactiveBrokerCleanupCnt = 0L;
    private long totalServiceUnitTombstoneCleanupCnt = 0L;
    private long totalOrphanServiceUnitCleanupCnt = 0L;
    private AtomicLong totalCleanupErrorCnt = new AtomicLong();
    private long totalInactiveBrokerCleanupScheduledCnt = 0L;
    private long totalInactiveBrokerCleanupIgnoredCnt = 0L;
    private long totalInactiveBrokerCleanupCancelledCnt = 0L;
    private volatile ChannelState channelState;
    private volatile long lastOwnEventHandledAt = 0L;
    private long lastOwnedServiceUnitCountAt = 0L;
    private int totalOwnedServiceUnitCnt = 0;
    final Map<ServiceUnitState, Counters> ownerLookUpCounters;
    final Map<EventType, Counters> eventCounters;
    final Map<ServiceUnitState, Counters> handlerCounters;

    public ServiceUnitStateChannelImpl(PulsarService pulsar) {
        this.pulsar = pulsar;
        this.config = pulsar.getConfig();
        this.lookupServiceAddress = pulsar.getLookupServiceAddress();
        this.schema = Schema.JSON(ServiceUnitStateData.class);
        this.getOwnerRequests = ConcurrentOpenHashMap.newBuilder().build();
        this.cleanupJobs = ConcurrentOpenHashMap.newBuilder().build();
        this.stateChangeListeners = new StateChangeListeners();
        this.semiTerminalStateWaitingTimeInMillis = this.config.getLoadBalancerServiceUnitStateTombstoneDelayTimeInSeconds() * 1000L;
        this.inFlightStateWaitingTimeInMillis = 30000L;
        this.ownershipMonitorDelayTimeInSecs = 60L;
        if (this.semiTerminalStateWaitingTimeInMillis < this.inFlightStateWaitingTimeInMillis) {
            throw new IllegalArgumentException("Invalid Config: loadBalancerServiceUnitStateCleanUpDelayTimeInSeconds < 30 secs");
        }
        this.maxCleanupDelayTimeInSecs = 180L;
        this.minCleanupDelayTimeInSecs = 0L;
        HashMap<ServiceUnitState, Counters> tmpOwnerLookUpCounters = new HashMap<ServiceUnitState, Counters>();
        HashMap<ServiceUnitState, Counters> tmpHandlerCounters = new HashMap<ServiceUnitState, Counters>();
        HashMap<Enum, Counters> tmpEventCounters = new HashMap<Enum, Counters>();
        for (ServiceUnitState serviceUnitState : ServiceUnitState.values()) {
            tmpOwnerLookUpCounters.put(serviceUnitState, new Counters());
            tmpHandlerCounters.put(serviceUnitState, new Counters());
        }
        for (Enum enum_ : EventType.values()) {
            tmpEventCounters.put(enum_, new Counters());
        }
        this.ownerLookUpCounters = Map.copyOf(tmpOwnerLookUpCounters);
        this.handlerCounters = Map.copyOf(tmpHandlerCounters);
        this.eventCounters = Map.copyOf(tmpEventCounters);
        this.channelState = ChannelState.Constructed;
    }

    @Override
    public void scheduleOwnershipMonitor() {
        if (this.monitorTask == null) {
            this.monitorTask = this.pulsar.getLoadManagerExecutor().scheduleWithFixedDelay(() -> {
                try {
                    this.monitorOwnerships(this.brokerRegistry.getAvailableBrokersAsync().get(this.inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS));
                }
                catch (Exception e) {
                    log.info("Failed to monitor the ownerships. will retry..", (Throwable)e);
                }
            }, 0L, this.ownershipMonitorDelayTimeInSecs, TimeUnit.SECONDS);
            log.info("This leader broker:{} started the ownership monitor.", (Object)this.lookupServiceAddress);
        }
    }

    @Override
    public void cancelOwnershipMonitor() {
        if (this.monitorTask != null) {
            this.monitorTask.cancel(false);
            this.monitorTask = null;
            log.info("This previous leader broker:{} stopped the ownership monitor.", (Object)this.lookupServiceAddress);
        }
    }

    @Override
    public void cleanOwnerships() {
        this.doCleanup(this.lookupServiceAddress);
    }

    @Override
    public synchronized void start() throws PulsarServerException {
        if (!this.validateChannelState(ChannelState.LeaderElectionServiceStarted, false)) {
            throw new IllegalStateException("Invalid channel state:" + this.channelState.name());
        }
        boolean debug = this.debug();
        try {
            this.brokerRegistry = this.getBrokerRegistry();
            this.brokerRegistry.addListener(this::handleBrokerRegistrationEvent);
            this.leaderElectionService = this.getLeaderElectionService();
            Optional<LeaderBroker> leader = this.leaderElectionService.readCurrentLeader().get(10L, TimeUnit.SECONDS);
            if (leader.isPresent()) {
                log.info("Successfully found the channel leader:{}.", (Object)leader.get());
            } else {
                log.warn("Failed to find the channel leader.");
            }
            this.channelState = ChannelState.LeaderElectionServiceStarted;
            this.loadManager = this.getLoadManager();
            if (this.producer != null) {
                this.producer.close();
                if (debug) {
                    log.info("Closed the channel producer.");
                }
            }
            PulsarClusterMetadataSetup.createNamespaceIfAbsent(this.pulsar.getPulsarResources(), NamespaceName.SYSTEM_NAMESPACE, this.config.getClusterName());
            ExtensibleLoadManagerImpl.createSystemTopic(this.pulsar, TOPIC);
            this.producer = this.pulsar.getClient().newProducer(this.schema).enableBatching(true).compressionType(MSG_COMPRESSION_TYPE).maxPendingMessages(500).blockIfQueueFull(true).topic(TOPIC).create();
            if (debug) {
                log.info("Successfully started the channel producer.");
            }
            if (this.tableview != null) {
                this.tableview.close();
                if (debug) {
                    log.info("Closed the channel tableview.");
                }
            }
            this.tableview = this.pulsar.getClient().newTableViewBuilder(this.schema).topic(TOPIC).loadConf(Map.of("topicCompactionStrategyClassName", ServiceUnitStateCompactionStrategy.class.getName())).create();
            this.tableview.listen((key, value) -> this.handle((String)key, (ServiceUnitStateData)value));
            ServiceUnitStateCompactionStrategy strategy = (ServiceUnitStateCompactionStrategy)TopicCompactionStrategy.getInstance((String)"table-view");
            if (strategy == null) {
                String err = "table-viewtag TopicCompactionStrategy is null.";
                log.error(err);
                throw new IllegalStateException(err);
            }
            strategy.setSkippedMsgHandler((key, value) -> this.handleSkippedEvent((String)key));
            if (debug) {
                log.info("Successfully started the channel tableview.");
            }
            this.pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
            if (debug) {
                log.info("Successfully registered the handleMetadataSessionEvent");
            }
            this.channelState = ChannelState.Started;
            log.info("Successfully started the channel.");
        }
        catch (Exception e) {
            String msg = "Failed to start the channel.";
            log.error(msg, (Throwable)e);
            throw new PulsarServerException(msg, (Throwable)e);
        }
    }

    @VisibleForTesting
    protected BrokerRegistry getBrokerRegistry() {
        return ((ExtensibleLoadManagerWrapper)this.pulsar.getLoadManager().get()).get().getBrokerRegistry();
    }

    @VisibleForTesting
    protected LoadManagerContext getContext() {
        return ((ExtensibleLoadManagerWrapper)this.pulsar.getLoadManager().get()).get().getContext();
    }

    @VisibleForTesting
    protected ExtensibleLoadManagerImpl getLoadManager() {
        return ExtensibleLoadManagerImpl.get(this.pulsar.getLoadManager().get());
    }

    @VisibleForTesting
    protected LeaderElectionService getLeaderElectionService() {
        return ((ExtensibleLoadManagerWrapper)this.pulsar.getLoadManager().get()).get().getLeaderElectionService();
    }

    @Override
    public synchronized void close() throws PulsarServerException {
        this.channelState = ChannelState.Closed;
        boolean debug = this.debug();
        try {
            this.leaderElectionService = null;
            if (this.tableview != null) {
                this.tableview.close();
                this.tableview = null;
                if (debug) {
                    log.info("Successfully closed the channel tableview.");
                }
            }
            if (this.producer != null) {
                this.producer.close();
                this.producer = null;
                log.info("Successfully closed the channel producer.");
            }
            if (this.brokerRegistry != null) {
                this.brokerRegistry = null;
            }
            if (this.monitorTask != null) {
                this.monitorTask.cancel(true);
                this.monitorTask = null;
                log.info("Successfully cancelled the cleanup tasks");
            }
            if (this.stateChangeListeners != null) {
                this.stateChangeListeners.close();
            }
            log.info("Successfully closed the channel.");
        }
        catch (Exception e) {
            String msg = "Failed to close the channel.";
            log.error(msg, (Throwable)e);
            throw new PulsarServerException(msg, (Throwable)e);
        }
    }

    private boolean validateChannelState(ChannelState targetState, boolean checkLowerIds) {
        int order;
        int n = order = checkLowerIds ? -1 : 1;
        return Integer.compare(this.channelState.id, targetState.id) * order <= 0;
    }

    private boolean debug() {
        return ExtensibleLoadManagerImpl.debug(this.config, log);
    }

    @Override
    public CompletableFuture<Optional<String>> getChannelOwnerAsync() {
        if (!this.validateChannelState(ChannelState.LeaderElectionServiceStarted, true)) {
            return CompletableFuture.failedFuture(new IllegalStateException("Invalid channel state:" + this.channelState.name()));
        }
        return this.leaderElectionService.readCurrentLeader().thenApply(leader -> {
            if (leader.isPresent()) {
                String broker = ((LeaderBroker)leader.get()).getServiceUrl();
                broker = broker.substring(broker.lastIndexOf(47) + 1);
                return Optional.of(broker);
            }
            return Optional.empty();
        });
    }

    @Override
    public CompletableFuture<Boolean> isChannelOwnerAsync() {
        return this.getChannelOwnerAsync().thenApply(owner -> {
            if (owner.isPresent()) {
                return this.isTargetBroker((String)owner.get());
            }
            String msg = "There is no channel owner now.";
            log.error(msg);
            throw new IllegalStateException(msg);
        });
    }

    @Override
    public boolean isChannelOwner() {
        try {
            return this.isChannelOwnerAsync().get(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            String msg = "Failed to get the channel owner.";
            log.error(msg, (Throwable)e);
            throw new RuntimeException(msg, e);
        }
    }

    @Override
    public boolean isOwner(String serviceUnit, String targetBroker) {
        if (!this.validateChannelState(ChannelState.Started, true)) {
            throw new IllegalStateException("Invalid channel state:" + this.channelState.name());
        }
        CompletableFuture<Optional<String>> ownerFuture = this.getOwnerAsync(serviceUnit);
        if (!ownerFuture.isDone() || ownerFuture.isCompletedExceptionally() || ownerFuture.isCancelled()) {
            return false;
        }
        Optional<String> owner = ownerFuture.join();
        return owner.isPresent() && StringUtils.equals((CharSequence)targetBroker, (CharSequence)owner.get());
    }

    @Override
    public boolean isOwner(String serviceUnit) {
        return this.isOwner(serviceUnit, this.lookupServiceAddress);
    }

    @Override
    public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
        if (!this.validateChannelState(ChannelState.Started, true)) {
            return CompletableFuture.failedFuture(new IllegalStateException("Invalid channel state:" + this.channelState.name()));
        }
        ServiceUnitStateData data = (ServiceUnitStateData)this.tableview.get(serviceUnit);
        ServiceUnitState state = ServiceUnitStateData.state(data);
        this.ownerLookUpCounters.get((Object)state).getTotal().incrementAndGet();
        switch (state) {
            case Owned: {
                return CompletableFuture.completedFuture(Optional.of(data.dstBroker()));
            }
            case Splitting: {
                return CompletableFuture.completedFuture(Optional.of(data.sourceBroker()));
            }
            case Assigning: 
            case Releasing: {
                return ((CompletableFuture)this.deferGetOwnerRequest(serviceUnit).whenComplete((__, e) -> {
                    if (e != null) {
                        this.ownerLookUpCounters.get((Object)state).getFailure().incrementAndGet();
                    }
                })).thenApply(broker -> broker == null ? Optional.empty() : Optional.of(broker));
            }
            case Init: 
            case Free: {
                return CompletableFuture.completedFuture(Optional.empty());
            }
            case Deleted: {
                this.ownerLookUpCounters.get((Object)state).getFailure().incrementAndGet();
                return CompletableFuture.failedFuture(new IllegalArgumentException(serviceUnit + " is deleted."));
            }
        }
        this.ownerLookUpCounters.get((Object)state).getFailure().incrementAndGet();
        String errorMsg = String.format("Failed to process service unit state data: %s when get owner.", data);
        log.error(errorMsg);
        return CompletableFuture.failedFuture(new IllegalStateException(errorMsg));
    }

    private long getNextVersionId(String serviceUnit) {
        ServiceUnitStateData data = (ServiceUnitStateData)this.tableview.get(serviceUnit);
        return this.getNextVersionId(data);
    }

    private long getNextVersionId(ServiceUnitStateData data) {
        return data == null ? 1L : data.versionId() + 1L;
    }

    @Override
    public CompletableFuture<String> publishAssignEventAsync(String serviceUnit, String broker) {
        if (!this.validateChannelState(ChannelState.Started, true)) {
            return CompletableFuture.failedFuture(new IllegalStateException("Invalid channel state:" + this.channelState.name()));
        }
        EventType eventType = EventType.Assign;
        this.eventCounters.get((Object)eventType).getTotal().incrementAndGet();
        CompletableFuture<String> getOwnerRequest = this.deferGetOwnerRequest(serviceUnit);
        this.pubAsync(serviceUnit, new ServiceUnitStateData(ServiceUnitState.Assigning, broker, this.getNextVersionId(serviceUnit))).whenComplete((__, ex) -> {
            if (ex != null) {
                this.getOwnerRequests.remove((Object)serviceUnit, (Object)getOwnerRequest);
                if (!getOwnerRequest.isCompletedExceptionally()) {
                    getOwnerRequest.completeExceptionally((Throwable)ex);
                }
                this.eventCounters.get((Object)eventType).getFailure().incrementAndGet();
            }
        });
        return getOwnerRequest;
    }

    private CompletableFuture<Void> publishOverrideEventAsync(String serviceUnit, ServiceUnitStateData orphanData, ServiceUnitStateData override) {
        if (!this.validateChannelState(ChannelState.Started, true)) {
            throw new IllegalStateException("Invalid channel state:" + this.channelState.name());
        }
        EventType eventType = EventType.Override;
        this.eventCounters.get((Object)eventType).getTotal().incrementAndGet();
        return ((CompletableFuture)this.pubAsync(serviceUnit, override).whenComplete((__, e) -> {
            if (e != null) {
                this.eventCounters.get((Object)eventType).getFailure().incrementAndGet();
                log.error("Failed to override serviceUnit:{} from orphanData:{} to overrideData:{}", new Object[]{serviceUnit, orphanData, override, e});
            }
        })).thenApply(__ -> null);
    }

    @Override
    public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
        if (!this.validateChannelState(ChannelState.Started, true)) {
            return CompletableFuture.failedFuture(new IllegalStateException("Invalid channel state:" + this.channelState.name()));
        }
        EventType eventType = EventType.Unload;
        this.eventCounters.get((Object)eventType).getTotal().incrementAndGet();
        String serviceUnit = unload.serviceUnit();
        ServiceUnitStateData next = ServiceUnitStateChannelImpl.isTransferCommand(unload) ? new ServiceUnitStateData(ServiceUnitState.Releasing, unload.destBroker().get(), unload.sourceBroker(), unload.force(), this.getNextVersionId(serviceUnit)) : new ServiceUnitStateData(ServiceUnitState.Releasing, null, unload.sourceBroker(), unload.force(), this.getNextVersionId(serviceUnit));
        return ((CompletableFuture)this.pubAsync(serviceUnit, next).whenComplete((__, ex) -> {
            if (ex != null) {
                this.eventCounters.get((Object)eventType).getFailure().incrementAndGet();
            }
        })).thenApply(__ -> null);
    }

    @Override
    public CompletableFuture<Void> publishSplitEventAsync(Split split) {
        if (!this.validateChannelState(ChannelState.Started, true)) {
            return CompletableFuture.failedFuture(new IllegalStateException("Invalid channel state:" + this.channelState.name()));
        }
        EventType eventType = EventType.Split;
        this.eventCounters.get((Object)eventType).getTotal().incrementAndGet();
        String serviceUnit = split.serviceUnit();
        ServiceUnitStateData next = new ServiceUnitStateData(ServiceUnitState.Splitting, null, split.sourceBroker(), split.splitServiceUnitToDestBroker(), this.getNextVersionId(serviceUnit));
        return ((CompletableFuture)this.pubAsync(serviceUnit, next).whenComplete((__, ex) -> {
            if (ex != null) {
                this.eventCounters.get((Object)eventType).getFailure().incrementAndGet();
            }
        })).thenApply(__ -> null);
    }

    private void handle(String serviceUnit, ServiceUnitStateData data) {
        long totalHandledRequests = this.getHandlerTotalCounter(data).incrementAndGet();
        if (this.debug()) {
            log.info("{} received a handle request for serviceUnit:{}, data:{}. totalHandledRequests:{}", new Object[]{this.lookupServiceAddress, serviceUnit, data, totalHandledRequests});
        }
        ServiceUnitState state = ServiceUnitStateData.state(data);
        try {
            switch (state) {
                case Owned: {
                    this.handleOwnEvent(serviceUnit, data);
                    break;
                }
                case Assigning: {
                    this.handleAssignEvent(serviceUnit, data);
                    break;
                }
                case Releasing: {
                    this.handleReleaseEvent(serviceUnit, data);
                    break;
                }
                case Splitting: {
                    this.handleSplitEvent(serviceUnit, data);
                    break;
                }
                case Deleted: {
                    this.handleDeleteEvent(serviceUnit, data);
                    break;
                }
                case Free: {
                    this.handleFreeEvent(serviceUnit, data);
                    break;
                }
                case Init: {
                    this.handleInitEvent(serviceUnit);
                    break;
                }
                default: {
                    throw new IllegalStateException("Failed to handle channel data:" + data);
                }
            }
        }
        catch (Throwable e) {
            log.error("Failed to handle the event. serviceUnit:{}, data:{}, handlerFailureCount:{}", new Object[]{serviceUnit, data, this.getHandlerFailureCounter(data).incrementAndGet(), e});
            throw e;
        }
    }

    private static boolean isTransferCommand(ServiceUnitStateData data) {
        if (data == null) {
            return false;
        }
        return StringUtils.isNotEmpty((CharSequence)data.dstBroker()) && StringUtils.isNotEmpty((CharSequence)data.sourceBroker());
    }

    private static boolean isTransferCommand(Unload data) {
        return data.destBroker().isPresent();
    }

    private static String getLogEventTag(ServiceUnitStateData data) {
        return data == null ? ServiceUnitState.Init.toString() : (ServiceUnitStateChannelImpl.isTransferCommand(data) ? "Transfer:" + data.state() : data.state().toString());
    }

    private AtomicLong getHandlerTotalCounter(ServiceUnitStateData data) {
        return this.getHandlerCounter(data, true);
    }

    private AtomicLong getHandlerFailureCounter(ServiceUnitStateData data) {
        return this.getHandlerCounter(data, false);
    }

    private AtomicLong getHandlerCounter(ServiceUnitStateData data, boolean total) {
        AtomicLong counter;
        ServiceUnitState state = ServiceUnitStateData.state(data);
        AtomicLong atomicLong = counter = total ? this.handlerCounters.get((Object)state).getTotal() : this.handlerCounters.get((Object)state).getFailure();
        if (counter == null) {
            throw new IllegalStateException("Unknown state:" + state);
        }
        return counter;
    }

    private void log(Throwable e, String serviceUnit, ServiceUnitStateData data, ServiceUnitStateData next) {
        if (e == null) {
            if (log.isDebugEnabled() || ServiceUnitStateChannelImpl.isTransferCommand(data)) {
                long handlerTotalCount = this.getHandlerTotalCounter(data).get();
                long handlerFailureCount = this.getHandlerFailureCounter(data).get();
                log.info("{} handled {} event for serviceUnit:{}, cur:{}, next:{}, totalHandledRequests:{}, totalFailedRequests:{}", new Object[]{this.lookupServiceAddress, ServiceUnitStateChannelImpl.getLogEventTag(data), serviceUnit, data == null ? "" : data, next == null ? "" : next, handlerTotalCount, handlerFailureCount});
            }
        } else {
            long handlerTotalCount = this.getHandlerTotalCounter(data).get();
            long handlerFailureCount = this.getHandlerFailureCounter(data).incrementAndGet();
            log.error("{} failed to handle {} event for serviceUnit:{}, cur:{}, next:{}, totalHandledRequests:{}, totalFailedRequests:{}", new Object[]{this.lookupServiceAddress, ServiceUnitStateChannelImpl.getLogEventTag(data), serviceUnit, data == null ? "" : data, next == null ? "" : next, handlerTotalCount, handlerFailureCount, e});
        }
    }

    private void handleSkippedEvent(String serviceUnit) {
        ServiceUnitStateData data;
        CompletableFuture getOwnerRequest = (CompletableFuture)this.getOwnerRequests.get((Object)serviceUnit);
        if (getOwnerRequest != null && (data = (ServiceUnitStateData)this.tableview.get(serviceUnit)) != null && data.state() == ServiceUnitState.Owned) {
            getOwnerRequest.complete(data.dstBroker());
            this.getOwnerRequests.remove((Object)serviceUnit);
            this.stateChangeListeners.notify(serviceUnit, data, null);
        }
    }

    private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
        CompletableFuture getOwnerRequest = (CompletableFuture)this.getOwnerRequests.remove((Object)serviceUnit);
        if (getOwnerRequest != null) {
            getOwnerRequest.complete(data.dstBroker());
        }
        this.stateChangeListeners.notify(serviceUnit, data, null);
        if (this.isTargetBroker(data.dstBroker())) {
            this.log(null, serviceUnit, data, null);
            this.pulsar.getNamespaceService().onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(this.pulsar, serviceUnit));
            this.lastOwnEventHandledAt = System.currentTimeMillis();
        } else if (data.force() && this.isTargetBroker(data.sourceBroker())) {
            this.closeServiceUnit(serviceUnit);
        }
    }

    private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
        if (this.isTargetBroker(data.dstBroker())) {
            ServiceUnitStateData next = new ServiceUnitStateData(ServiceUnitState.Owned, data.dstBroker(), data.sourceBroker(), this.getNextVersionId(data));
            this.stateChangeListeners.notifyOnCompletion(this.pubAsync(serviceUnit, next), serviceUnit, data).whenComplete((__, e) -> this.log((Throwable)e, serviceUnit, data, next));
        }
    }

    private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
        if (this.isTargetBroker(data.sourceBroker())) {
            ServiceUnitStateData next = ServiceUnitStateChannelImpl.isTransferCommand(data) ? new ServiceUnitStateData(ServiceUnitState.Assigning, data.dstBroker(), data.sourceBroker(), this.getNextVersionId(data)) : new ServiceUnitStateData(ServiceUnitState.Free, null, data.sourceBroker(), this.getNextVersionId(data));
            this.stateChangeListeners.notifyOnCompletion(this.closeServiceUnit(serviceUnit).thenCompose(__ -> this.pubAsync(serviceUnit, next)), serviceUnit, data).whenComplete((__, e) -> this.log((Throwable)e, serviceUnit, data, next));
        }
    }

    private void handleSplitEvent(String serviceUnit, ServiceUnitStateData data) {
        if (this.isTargetBroker(data.sourceBroker())) {
            this.stateChangeListeners.notifyOnCompletion(this.splitServiceUnit(serviceUnit, data), serviceUnit, data).whenComplete((__, e) -> this.log((Throwable)e, serviceUnit, data, null));
        }
    }

    private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) {
        CompletableFuture getOwnerRequest = (CompletableFuture)this.getOwnerRequests.remove((Object)serviceUnit);
        if (getOwnerRequest != null) {
            getOwnerRequest.complete(null);
        }
        this.stateChangeListeners.notify(serviceUnit, data, null);
        if (this.isTargetBroker(data.sourceBroker())) {
            this.log(null, serviceUnit, data, null);
        }
    }

    private void handleDeleteEvent(String serviceUnit, ServiceUnitStateData data) {
        CompletableFuture getOwnerRequest = (CompletableFuture)this.getOwnerRequests.remove((Object)serviceUnit);
        if (getOwnerRequest != null) {
            getOwnerRequest.completeExceptionally(new IllegalStateException(serviceUnit + "has been deleted."));
        }
        this.stateChangeListeners.notify(serviceUnit, data, null);
        if (this.isTargetBroker(data.sourceBroker())) {
            this.log(null, serviceUnit, data, null);
        }
    }

    private void handleInitEvent(String serviceUnit) {
        CompletableFuture getOwnerRequest = (CompletableFuture)this.getOwnerRequests.remove((Object)serviceUnit);
        if (getOwnerRequest != null) {
            getOwnerRequest.complete(null);
        }
        this.stateChangeListeners.notify(serviceUnit, null, null);
        this.log(null, serviceUnit, null, null);
    }

    private CompletableFuture<MessageId> pubAsync(String serviceUnit, ServiceUnitStateData data) {
        CompletableFuture<MessageId> future = new CompletableFuture<MessageId>();
        this.producer.newMessage().key(serviceUnit).value((Object)data).sendAsync().whenComplete((messageId, e) -> {
            if (e != null) {
                log.error("Failed to publish the message: serviceUnit:{}, data:{}", new Object[]{serviceUnit, data, e});
                future.completeExceptionally((Throwable)e);
            } else {
                future.complete((MessageId)messageId);
            }
        });
        return future;
    }

    private CompletableFuture<MessageId> tombstoneAsync(String serviceUnit) {
        return this.pubAsync(serviceUnit, null);
    }

    private boolean isTargetBroker(String broker) {
        if (broker == null) {
            return false;
        }
        return broker.equals(this.lookupServiceAddress);
    }

    private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
        return (CompletableFuture)this.getOwnerRequests.computeIfAbsent((Object)serviceUnit, k -> {
            CompletableFuture future = new CompletableFuture();
            future.orTimeout(this.inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS).whenComplete((v, e) -> {
                if (e != null) {
                    this.getOwnerRequests.remove((Object)serviceUnit, (Object)future);
                    log.warn("Failed to getOwner for serviceUnit:{}", (Object)serviceUnit, e);
                }
            });
            return future;
        });
    }

    private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) {
        long startTime = System.nanoTime();
        MutableInt unloadedTopics = new MutableInt();
        NamespaceBundle bundle = LoadManagerShared.getNamespaceBundle(this.pulsar, serviceUnit);
        return ((CompletableFuture)this.pulsar.getBrokerService().unloadServiceUnit(bundle, true, this.pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS).thenApply(numUnloadedTopics -> {
            unloadedTopics.setValue((Number)numUnloadedTopics);
            return numUnloadedTopics;
        })).whenComplete((__, ex) -> {
            this.pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle);
            this.pulsar.getNamespaceService().onNamespaceBundleUnload(bundle);
            double unloadBundleTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
            if (ex != null) {
                log.error("Failed to close topics under bundle:{} in {} ms", new Object[]{bundle.toString(), unloadBundleTime, ex});
            } else {
                log.info("Unloading bundle:{} with {} topics completed in {} ms", new Object[]{bundle, unloadedTopics, unloadBundleTime});
            }
        });
    }

    private CompletableFuture<Void> splitServiceUnit(String serviceUnit, ServiceUnitStateData data) {
        long startTime = System.nanoTime();
        NamespaceService namespaceService = this.pulsar.getNamespaceService();
        NamespaceBundleFactory bundleFactory = namespaceService.getNamespaceBundleFactory();
        NamespaceBundle bundle = LoadManagerShared.getNamespaceBundle(this.pulsar, serviceUnit);
        CompletableFuture<Void> completionFuture = new CompletableFuture<Void>();
        Map<String, Optional<String>> bundleToDestBroker = data.splitServiceUnitToDestBroker();
        ArrayList<Long> boundaries = null;
        NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm = namespaceService.getNamespaceBundleSplitAlgorithmByName(this.config.getDefaultNamespaceBundleSplitAlgorithm());
        if (bundleToDestBroker != null && bundleToDestBroker.size() == 2) {
            HashSet boundariesSet = new HashSet();
            String namespace = bundle.getNamespaceObject().toString();
            bundleToDestBroker.forEach((bundleRange, destBroker) -> {
                NamespaceBundle subBundle = bundleFactory.getBundle(namespace, (String)bundleRange);
                boundariesSet.add((Long)subBundle.getKeyRange().lowerEndpoint());
                boundariesSet.add((Long)subBundle.getKeyRange().upperEndpoint());
            });
            boundaries = new ArrayList<Long>(boundariesSet);
            nsBundleSplitAlgorithm = NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE_FORCE_ALGO;
        }
        AtomicInteger counter = new AtomicInteger(0);
        List<NamespaceBundle> childBundles = data.splitServiceUnitToDestBroker().keySet().stream().map(child -> bundleFactory.getBundle(bundle.getNamespaceObject().toString(), (String)child)).collect(Collectors.toList());
        this.splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, nsBundleSplitAlgorithm, bundle, childBundles, boundaries, data, counter, startTime, completionFuture);
        return completionFuture;
    }

    @VisibleForTesting
    protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, NamespaceBundleFactory bundleFactory, NamespaceBundleSplitAlgorithm algorithm, NamespaceBundle parentBundle, List<NamespaceBundle> childBundles, List<Long> boundaries, ServiceUnitStateData parentData, AtomicInteger counter, long startTime, CompletableFuture<Void> completionFuture) {
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)((CompletableFuture)this.ownChildBundles(childBundles, parentData).thenCompose(__ -> this.getSplitNamespaceBundles(namespaceService, bundleFactory, algorithm, parentBundle, childBundles, boundaries))).thenCompose(namespaceBundles -> this.updateSplitNamespaceBundlesAsync(namespaceService, bundleFactory, parentBundle, (NamespaceBundles)namespaceBundles))).thenAccept(__ -> this.pulsar.getBrokerService().refreshTopicToStatsMaps(parentBundle))).thenAccept(__ -> this.pubAsync(parentBundle.toString(), new ServiceUnitStateData(ServiceUnitState.Deleted, null, parentData.sourceBroker(), this.getNextVersionId(parentData))))).thenAccept(__ -> {
            double splitBundleTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
            log.info("Successfully split {} parent namespace-bundle to {} in {} ms", new Object[]{parentBundle, childBundles, splitBundleTime});
            namespaceService.onNamespaceBundleSplit(parentBundle);
            completionFuture.complete(null);
        })).exceptionally(ex -> {
            Throwable throwable = FutureUtil.unwrapCompletionException((Throwable)ex);
            if (throwable instanceof MetadataStoreException.BadVersionException && counter.incrementAndGet() < 7) {
                log.warn("Failed to update bundle range in metadata store. Retrying {} th / {} limit", new Object[]{counter.get(), 7, ex});
                this.pulsar.getExecutor().schedule(() -> this.splitServiceUnitOnceAndRetry(namespaceService, bundleFactory, algorithm, parentBundle, childBundles, boundaries, parentData, counter, startTime, completionFuture), 100L, TimeUnit.MILLISECONDS);
            } else {
                String msg = String.format("Failed to split bundle %s, Retried %d th / %d limit, reason %s", parentBundle.toString(), counter.get(), 7, throwable.getMessage());
                log.warn(msg, throwable);
                completionFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg));
            }
            return null;
        });
    }

    private CompletableFuture<Void> ownChildBundles(List<NamespaceBundle> childBundles, ServiceUnitStateData parentData) {
        ArrayList<CompletionStage> futures = new ArrayList<CompletionStage>(childBundles.size());
        boolean debug = this.debug();
        for (NamespaceBundle childBundle : childBundles) {
            String childBundleStr = childBundle.toString();
            ServiceUnitStateData childData = (ServiceUnitStateData)this.tableview.get(childBundleStr);
            if (childData != null) {
                if (!debug) continue;
                log.info("Already owned child bundle:{}", (Object)childBundleStr);
                continue;
            }
            childData = new ServiceUnitStateData(ServiceUnitState.Owned, parentData.sourceBroker(), 1L);
            futures.add(this.pubAsync(childBundleStr, childData).thenApply(__ -> null));
        }
        if (!futures.isEmpty()) {
            return FutureUtil.waitForAll(futures);
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<NamespaceBundles> getSplitNamespaceBundles(NamespaceService namespaceService, NamespaceBundleFactory bundleFactory, NamespaceBundleSplitAlgorithm algorithm, NamespaceBundle parentBundle, List<NamespaceBundle> childBundles, List<Long> boundaries) {
        CompletableFuture<NamespaceBundles> future = new CompletableFuture<NamespaceBundles>();
        boolean debug = this.debug();
        NamespaceBundles targetNsBundle = bundleFactory.getBundles(parentBundle.getNamespaceObject());
        boolean found = false;
        try {
            targetNsBundle.validateBundle(parentBundle);
        }
        catch (IllegalArgumentException e) {
            if (debug) {
                log.info("Namespace bundles do not contain the parent bundle:{}", (Object)parentBundle);
            }
            for (NamespaceBundle childBundle : childBundles) {
                try {
                    targetNsBundle.validateBundle(childBundle);
                    if (!debug) continue;
                    log.info("Namespace bundles contain the child bundle:{}", (Object)childBundle);
                }
                catch (Exception ex) {
                    future.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException("Namespace bundles do not contain the child bundle:" + childBundle, e));
                    return future;
                }
            }
            found = true;
        }
        catch (Exception e) {
            future.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException("Failed to validate the parent bundle in the namespace bundles.", e));
            return future;
        }
        if (found) {
            future.complete(targetNsBundle);
            return future;
        }
        return namespaceService.getSplitBoundary(parentBundle, algorithm, boundaries).thenApply(splitBundlesPair -> (NamespaceBundles)splitBundlesPair.getLeft());
    }

    private CompletableFuture<Void> updateSplitNamespaceBundlesAsync(NamespaceService namespaceService, NamespaceBundleFactory bundleFactory, NamespaceBundle parentBundle, NamespaceBundles splitNamespaceBundles) {
        NamespaceName namespaceName = parentBundle.getNamespaceObject();
        return ((CompletableFuture)namespaceService.updateNamespaceBundles(namespaceName, splitNamespaceBundles).thenCompose(__ -> namespaceService.updateNamespaceBundlesForPolicies(namespaceName, splitNamespaceBundles))).thenAccept(__ -> {
            bundleFactory.invalidateBundleCache(parentBundle.getNamespaceObject());
            if (this.debug()) {
                log.info("Successfully updated split namespace bundles and namespace bundle cache.");
            }
        });
    }

    @Override
    public void handleMetadataSessionEvent(SessionEvent e) {
        if (e == SessionEvent.SessionReestablished || e == SessionEvent.SessionLost) {
            this.lastMetadataSessionEvent = e;
            this.lastMetadataSessionEventTimestamp = System.currentTimeMillis();
            log.info("Received metadata session event:{} at timestamp:{}", (Object)this.lastMetadataSessionEvent, (Object)this.lastMetadataSessionEventTimestamp);
        }
    }

    @Override
    public void handleBrokerRegistrationEvent(String broker, NotificationType type) {
        if (type == NotificationType.Created) {
            log.info("BrokerRegistry detected the broker:{} registry has been created.", (Object)broker);
            this.handleBrokerCreationEvent(broker);
        } else if (type == NotificationType.Deleted) {
            log.info("BrokerRegistry detected the broker:{} registry has been deleted.", (Object)broker);
            this.handleBrokerDeletionEvent(broker);
        }
    }

    private MetadataState getMetadataState() {
        long now = System.currentTimeMillis();
        if (this.lastMetadataSessionEvent == SessionEvent.SessionReestablished) {
            if (now - this.lastMetadataSessionEventTimestamp > 1000L * this.maxCleanupDelayTimeInSecs) {
                return MetadataState.Stable;
            }
            return MetadataState.Jittery;
        }
        return MetadataState.Unstable;
    }

    private void handleBrokerCreationEvent(String broker) {
        CompletableFuture future = (CompletableFuture)this.cleanupJobs.remove((Object)broker);
        if (future != null) {
            future.cancel(false);
            ++this.totalInactiveBrokerCleanupCancelledCnt;
            log.info("Successfully cancelled the ownership cleanup for broker:{}. Active cleanup job count:{}", (Object)broker, (Object)this.cleanupJobs.size());
        } else if (this.debug()) {
            log.info("No needs to cancel the ownership cleanup for broker:{}. There was no scheduled cleanup job. Active cleanup job count:{}", (Object)broker, (Object)this.cleanupJobs.size());
        }
    }

    private void handleBrokerDeletionEvent(String broker) {
        if (!this.isChannelOwner()) {
            log.warn("This broker is not the leader now. Ignoring BrokerDeletionEvent for broker {}.", (Object)broker);
            return;
        }
        MetadataState state = this.getMetadataState();
        log.info("Handling broker:{} ownership cleanup based on metadata connection state:{}, event:{}, event_ts:{}:", new Object[]{broker, state, this.lastMetadataSessionEvent, this.lastMetadataSessionEventTimestamp});
        switch (state) {
            case Stable: {
                this.scheduleCleanup(broker, this.minCleanupDelayTimeInSecs);
                break;
            }
            case Jittery: {
                this.scheduleCleanup(broker, this.maxCleanupDelayTimeInSecs);
                break;
            }
            case Unstable: {
                ++this.totalInactiveBrokerCleanupIgnoredCnt;
                log.error("MetadataState state is unstable. Ignoring the ownership cleanup request for the reported broker :{}", (Object)broker);
            }
        }
    }

    private void scheduleCleanup(String broker, long delayInSecs) {
        this.cleanupJobs.computeIfAbsent((Object)broker, k -> {
            Executor delayed = CompletableFuture.delayedExecutor(delayInSecs, TimeUnit.SECONDS, this.pulsar.getLoadManagerExecutor());
            ++this.totalInactiveBrokerCleanupScheduledCnt;
            return CompletableFuture.runAsync(() -> {
                try {
                    this.doCleanup(broker);
                }
                catch (Throwable e) {
                    log.error("Failed to run the cleanup job for the broker {}, totalCleanupErrorCnt:{}.", new Object[]{broker, this.totalCleanupErrorCnt.incrementAndGet(), e});
                }
                finally {
                    this.cleanupJobs.remove((Object)broker);
                }
            }, delayed);
        });
        log.info("Scheduled ownership cleanup for broker:{} with delay:{} secs. Pending clean jobs:{}.", new Object[]{broker, delayInSecs, this.cleanupJobs.size()});
    }

    private ServiceUnitStateData getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData, String selectedBroker, String inactiveBroker) {
        if (orphanData.state() == ServiceUnitState.Splitting) {
            return new ServiceUnitStateData(ServiceUnitState.Splitting, orphanData.dstBroker(), selectedBroker, Map.copyOf(orphanData.splitServiceUnitToDestBroker()), true, this.getNextVersionId(orphanData));
        }
        return new ServiceUnitStateData(ServiceUnitState.Owned, selectedBroker, inactiveBroker, true, this.getNextVersionId(orphanData));
    }

    private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, String inactiveBroker) {
        Optional<String> selectedBroker = this.selectBroker(serviceUnit, inactiveBroker);
        if (selectedBroker.isPresent()) {
            ServiceUnitStateData override = this.getOverrideInactiveBrokerStateData(orphanData, selectedBroker.get(), inactiveBroker);
            log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}", new Object[]{serviceUnit, orphanData, override});
            this.publishOverrideEventAsync(serviceUnit, orphanData, override).exceptionally(e -> {
                log.error("Failed to override the ownership serviceUnit:{} orphanData:{}. Failed to publish override event. totalCleanupErrorCnt:{}", new Object[]{serviceUnit, orphanData, this.totalCleanupErrorCnt.incrementAndGet()});
                return null;
            });
        } else {
            log.error("Failed to override the ownership serviceUnit:{} orphanData:{}. Empty selected broker. totalCleanupErrorCnt:{}", new Object[]{serviceUnit, orphanData, this.totalCleanupErrorCnt.incrementAndGet()});
        }
    }

    private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) {
        long started = System.currentTimeMillis();
        while (System.currentTimeMillis() - started < (long)maxWaitTimeInMillis) {
            boolean cleaned = true;
            for (Map.Entry etr : this.tableview.entrySet()) {
                String serviceUnit = (String)etr.getKey();
                ServiceUnitStateData data = (ServiceUnitStateData)etr.getValue();
                if (excludeSystemTopics && serviceUnit.startsWith(NamespaceName.SYSTEM_NAMESPACE.toString()) || data.state() != ServiceUnitState.Owned || !broker.equals(data.dstBroker())) continue;
                cleaned = false;
                break;
            }
            if (cleaned) {
                try {
                    TimeUnit.MILLISECONDS.sleep(3000L);
                }
                catch (InterruptedException e) {
                    log.warn("Interrupted while gracefully waiting for the cleanup convergence.");
                }
                break;
            }
            try {
                TimeUnit.MILLISECONDS.sleep(100L);
            }
            catch (InterruptedException e) {
                log.warn("Interrupted while delaying the next service unit clean-up. Cleaning broker:{}", (Object)this.lookupServiceAddress);
            }
        }
    }

    private synchronized void doCleanup(String broker) {
        long startTime = System.nanoTime();
        log.info("Started ownership cleanup for the inactive broker:{}", (Object)broker);
        int orphanServiceUnitCleanupCnt = 0;
        long totalCleanupErrorCntStart = this.totalCleanupErrorCnt.get();
        String heartbeatNamespace = NamespaceName.get((String)String.format("pulsar/%s/%s", this.config.getClusterName(), broker)).toString();
        String heartbeatNamespaceV2 = NamespaceName.get((String)String.format("pulsar/%s", broker)).toString();
        HashMap<String, ServiceUnitStateData> orphanSystemServiceUnits = new HashMap<String, ServiceUnitStateData>();
        for (Map.Entry etr : this.tableview.entrySet()) {
            ServiceUnitStateData stateData = (ServiceUnitStateData)etr.getValue();
            String serviceUnit = (String)etr.getKey();
            ServiceUnitState state = ServiceUnitStateData.state(stateData);
            if (StringUtils.equals((CharSequence)broker, (CharSequence)stateData.dstBroker())) {
                if (!ServiceUnitState.isActiveState(state)) continue;
                if (serviceUnit.startsWith(NamespaceName.SYSTEM_NAMESPACE.toString())) {
                    orphanSystemServiceUnits.put(serviceUnit, stateData);
                } else if (serviceUnit.startsWith(heartbeatNamespace) || serviceUnit.startsWith(heartbeatNamespaceV2)) {
                    log.info("Skip override heartbeat namespace bundle serviceUnit:{}, stateData:{}", (Object)serviceUnit, (Object)stateData);
                    this.tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
                        if (e != null) {
                            log.error("Failed cleaning the heartbeat namespace ownership serviceUnit:{}, stateData:{}, cleanupErrorCnt:{}.", new Object[]{serviceUnit, stateData, this.totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e});
                        }
                    });
                } else {
                    this.overrideOwnership(serviceUnit, stateData, broker);
                }
                ++orphanServiceUnitCleanupCnt;
                continue;
            }
            if (!StringUtils.equals((CharSequence)broker, (CharSequence)stateData.sourceBroker()) || !ServiceUnitState.isInFlightState(state)) continue;
            if (serviceUnit.startsWith(NamespaceName.SYSTEM_NAMESPACE.toString())) {
                orphanSystemServiceUnits.put(serviceUnit, stateData);
            } else {
                this.overrideOwnership(serviceUnit, stateData, broker);
            }
            ++orphanServiceUnitCleanupCnt;
        }
        try {
            this.producer.flush();
        }
        catch (PulsarClientException e2) {
            log.error("Failed to flush the in-flight non-system bundle override messages.", (Throwable)e2);
        }
        if (orphanServiceUnitCleanupCnt > 0) {
            this.waitForCleanups(broker, true, 5000);
            this.totalOrphanServiceUnitCleanupCnt += (long)orphanServiceUnitCleanupCnt;
            ++this.totalInactiveBrokerCleanupCnt;
        }
        for (Map.Entry orphanSystemServiceUnit : orphanSystemServiceUnits.entrySet()) {
            log.info("Overriding orphan system service unit:{}", orphanSystemServiceUnit.getKey());
            this.overrideOwnership((String)orphanSystemServiceUnit.getKey(), (ServiceUnitStateData)orphanSystemServiceUnit.getValue(), broker);
        }
        try {
            this.producer.flush();
        }
        catch (PulsarClientException e3) {
            log.error("Failed to flush the in-flight system bundle override messages.", (Throwable)e3);
        }
        double cleanupTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
        this.getContext().topBundleLoadDataStore().removeAsync(broker);
        this.getContext().brokerLoadDataStore().removeAsync(broker);
        log.info("Completed a cleanup for the inactive broker:{} in {} ms. Cleaned up orphan service units: orphanServiceUnitCleanupCnt:{}, approximate cleanupErrorCnt:{}, metrics:{} ", new Object[]{broker, cleanupTime, orphanServiceUnitCleanupCnt, totalCleanupErrorCntStart - this.totalCleanupErrorCnt.get(), this.printCleanupMetrics()});
    }

    private Optional<String> selectBroker(String serviceUnit, String inactiveBroker) {
        try {
            return this.loadManager.selectAsync(LoadManagerShared.getNamespaceBundle(this.pulsar, serviceUnit), Set.of(inactiveBroker)).get(this.inFlightStateWaitingTimeInMillis, TimeUnit.MILLISECONDS);
        }
        catch (Throwable e) {
            log.error("Failed to select a broker for serviceUnit:{}", (Object)serviceUnit);
            return Optional.empty();
        }
    }

    private Optional<ServiceUnitStateData> getRollForwardStateData(String serviceUnit, String inactiveBroker, long nextVersionId) {
        Optional<String> selectedBroker = this.selectBroker(serviceUnit, inactiveBroker);
        if (selectedBroker.isEmpty()) {
            return Optional.empty();
        }
        return Optional.of(new ServiceUnitStateData(ServiceUnitState.Owned, selectedBroker.get(), true, nextVersionId));
    }

    private Optional<ServiceUnitStateData> getOverrideInFlightStateData(String serviceUnit, ServiceUnitStateData orphanData, Set<String> availableBrokers) {
        long nextVersionId = this.getNextVersionId(orphanData);
        ServiceUnitState state = orphanData.state();
        switch (state) {
            case Assigning: {
                return this.getRollForwardStateData(serviceUnit, orphanData.dstBroker(), nextVersionId);
            }
            case Splitting: {
                return Optional.of(new ServiceUnitStateData(ServiceUnitState.Splitting, orphanData.dstBroker(), orphanData.sourceBroker(), Map.copyOf(orphanData.splitServiceUnitToDestBroker()), true, nextVersionId));
            }
            case Releasing: {
                if (availableBrokers.contains(orphanData.sourceBroker())) {
                    return Optional.of(new ServiceUnitStateData(ServiceUnitState.Owned, orphanData.sourceBroker(), true, nextVersionId));
                }
                return this.getRollForwardStateData(serviceUnit, orphanData.sourceBroker(), nextVersionId);
            }
        }
        String msg = String.format("Failed to get the overrideStateData from serviceUnit=%s, orphanData=%s", serviceUnit, orphanData);
        log.error(msg);
        throw new IllegalStateException(msg);
    }

    @VisibleForTesting
    protected void monitorOwnerships(List<String> brokers) {
        if (!this.isChannelOwner()) {
            log.warn("This broker is not the leader now. Skipping ownership monitor.");
            return;
        }
        if (brokers == null || brokers.size() == 0) {
            log.error("no active brokers found. Skipping ownership monitor.");
            return;
        }
        MetadataState metadataState = this.getMetadataState();
        if (metadataState != MetadataState.Stable) {
            log.warn("metadata state:{} is not Stable. Skipping ownership monitor.", (Object)metadataState);
            return;
        }
        boolean debug = this.debug();
        if (debug) {
            log.info("Started the ownership monitor run for activeBrokerCount:{}", (Object)brokers.size());
        }
        long startTime = System.nanoTime();
        HashSet<String> inactiveBrokers = new HashSet<String>();
        HashSet<String> activeBrokers = new HashSet<String>(brokers);
        HashMap<String, ServiceUnitStateData> orphanServiceUnits = new HashMap<String, ServiceUnitStateData>();
        int serviceUnitTombstoneCleanupCnt = 0;
        int orphanServiceUnitCleanupCnt = 0;
        long totalCleanupErrorCntStart = this.totalCleanupErrorCnt.get();
        long now = System.currentTimeMillis();
        for (Map.Entry entry : this.tableview.entrySet()) {
            String serviceUnit = (String)entry.getKey();
            ServiceUnitStateData stateData = (ServiceUnitStateData)entry.getValue();
            String dstBroker = stateData.dstBroker();
            String srcBroker = stateData.sourceBroker();
            ServiceUnitState state = stateData.state();
            if (ServiceUnitState.isActiveState(state)) {
                if (StringUtils.isNotBlank((CharSequence)srcBroker) && !activeBrokers.contains(srcBroker)) {
                    inactiveBrokers.add(srcBroker);
                    continue;
                }
                if (StringUtils.isNotBlank((CharSequence)dstBroker) && !activeBrokers.contains(dstBroker)) {
                    inactiveBrokers.add(dstBroker);
                    continue;
                }
                if (!ServiceUnitState.isInFlightState(state) || now - stateData.timestamp() <= this.inFlightStateWaitingTimeInMillis) continue;
                orphanServiceUnits.put(serviceUnit, stateData);
                continue;
            }
            if (now - stateData.timestamp() <= this.semiTerminalStateWaitingTimeInMillis) continue;
            log.info("Found semi-terminal states to tombstone serviceUnit:{}, stateData:{}", (Object)serviceUnit, (Object)stateData);
            this.tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
                if (e != null) {
                    log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}, cleanupErrorCnt:{}.", new Object[]{serviceUnit, stateData, this.totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e});
                }
            });
            ++serviceUnitTombstoneCleanupCnt;
        }
        if (!inactiveBrokers.isEmpty()) {
            for (String string : inactiveBrokers) {
                this.handleBrokerDeletionEvent(string);
            }
        } else if (!orphanServiceUnits.isEmpty()) {
            for (Map.Entry entry : orphanServiceUnits.entrySet()) {
                ServiceUnitStateData orphanData;
                String orphanServiceUnit = (String)entry.getKey();
                Optional<ServiceUnitStateData> overrideData = this.getOverrideInFlightStateData(orphanServiceUnit, orphanData = (ServiceUnitStateData)entry.getValue(), activeBrokers);
                if (overrideData.isPresent()) {
                    log.info("Overriding in-flight state ownership serviceUnit:{} from orphanData:{} to overrideData:{}", new Object[]{orphanServiceUnit, orphanData, overrideData});
                    this.publishOverrideEventAsync(orphanServiceUnit, orphanData, overrideData.get()).whenComplete((__, e) -> {
                        if (e != null) {
                            log.error("Failed cleaning the ownership orphanServiceUnit:{}, orphanData:{}, cleanupErrorCnt:{}.", new Object[]{orphanServiceUnit, orphanData, this.totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e});
                        }
                    });
                    ++orphanServiceUnitCleanupCnt;
                    continue;
                }
                log.warn("Failed get the overrideStateData from orphanServiceUnit:{}, orphanData:{}, cleanupErrorCnt:{}. will retry..", new Object[]{orphanServiceUnit, orphanData, this.totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart});
            }
        }
        try {
            this.producer.flush();
        }
        catch (PulsarClientException e2) {
            log.error("Failed to flush the in-flight messages.", (Throwable)e2);
        }
        boolean cleaned = false;
        if (serviceUnitTombstoneCleanupCnt > 0) {
            this.totalServiceUnitTombstoneCleanupCnt += (long)serviceUnitTombstoneCleanupCnt;
            cleaned = true;
        }
        if (orphanServiceUnitCleanupCnt > 0) {
            this.totalOrphanServiceUnitCleanupCnt += (long)orphanServiceUnitCleanupCnt;
            cleaned = true;
        }
        if (debug || cleaned) {
            double d = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
            log.info("Completed the ownership monitor run in {} ms. Scheduled cleanups for inactive brokers:{}. inactiveBrokerCount:{}. Published cleanups for orphan service units, orphanServiceUnitCleanupCnt:{}. Tombstoned semi-terminal state service units, serviceUnitTombstoneCleanupCnt:{}. Approximate cleanupErrorCnt:{}, metrics:{}. ", new Object[]{d, inactiveBrokers, inactiveBrokers.size(), orphanServiceUnitCleanupCnt, serviceUnitTombstoneCleanupCnt, totalCleanupErrorCntStart - this.totalCleanupErrorCnt.get(), this.printCleanupMetrics()});
        }
    }

    private String printCleanupMetrics() {
        return String.format("{totalInactiveBrokerCleanupCnt:%d, totalServiceUnitTombstoneCleanupCnt:%d, totalOrphanServiceUnitCleanupCnt:%d, totalCleanupErrorCnt:%d, totalInactiveBrokerCleanupScheduledCnt%d, totalInactiveBrokerCleanupIgnoredCnt:%d, totalInactiveBrokerCleanupCancelledCnt:%d,   activeCleanupJobs:%d}", this.totalInactiveBrokerCleanupCnt, this.totalServiceUnitTombstoneCleanupCnt, this.totalOrphanServiceUnitCleanupCnt, this.totalCleanupErrorCnt.get(), this.totalInactiveBrokerCleanupScheduledCnt, this.totalInactiveBrokerCleanupIgnoredCnt, this.totalInactiveBrokerCleanupCancelledCnt, this.cleanupJobs.size());
    }

    private int getTotalOwnedServiceUnitCnt() {
        if (this.tableview == null) {
            return 0;
        }
        long now = System.currentTimeMillis();
        if (this.lastOwnEventHandledAt > this.lastOwnedServiceUnitCountAt || now - this.lastOwnedServiceUnitCountAt > 600000L) {
            int cnt = 0;
            for (ServiceUnitStateData data : this.tableview.values()) {
                if (data.state() != ServiceUnitState.Owned || !this.isTargetBroker(data.dstBroker())) continue;
                ++cnt;
            }
            this.lastOwnedServiceUnitCountAt = now;
            this.totalOwnedServiceUnitCnt = cnt;
        }
        return this.totalOwnedServiceUnitCnt;
    }

    @Override
    public List<Metrics> getMetrics() {
        Metrics metric;
        HashMap<String, String> dim;
        ArrayList<Metrics> metrics = new ArrayList<Metrics>();
        HashMap<String, String> dimensions = new HashMap<String, String>();
        dimensions.put("metric", "sunitStateChn");
        dimensions.put("broker", this.pulsar.getAdvertisedAddress());
        for (Map.Entry<ServiceUnitState, Counters> entry : this.ownerLookUpCounters.entrySet()) {
            dim = new HashMap<String, String>(dimensions);
            dim.put("state", entry.getKey().toString());
            dim.put("result", "Total");
            metric = Metrics.create(dim);
            metric.put("brk_sunit_state_chn_owner_lookup_total", (Object)entry.getValue().getTotal().get());
            metrics.add(metric);
            dim = new HashMap(dimensions);
            dim.put("state", entry.getKey().toString());
            dim.put("result", "Failure");
            metric = Metrics.create(dim);
            metric.put("brk_sunit_state_chn_owner_lookup_total", (Object)entry.getValue().getFailure().get());
            metrics.add(metric);
        }
        for (Map.Entry<Enum, Counters> entry : this.eventCounters.entrySet()) {
            dim = new HashMap(dimensions);
            dim.put("event", ((EventType)entry.getKey()).toString());
            dim.put("result", "Total");
            metric = Metrics.create(dim);
            metric.put("brk_sunit_state_chn_event_publish_ops_total", (Object)entry.getValue().getTotal().get());
            metrics.add(metric);
            dim = new HashMap(dimensions);
            dim.put("event", ((EventType)entry.getKey()).toString());
            dim.put("result", "Failure");
            metric = Metrics.create(dim);
            metric.put("brk_sunit_state_chn_event_publish_ops_total", (Object)entry.getValue().getFailure().get());
            metrics.add(metric);
        }
        for (Map.Entry<Enum, Counters> entry : this.handlerCounters.entrySet()) {
            dim = new HashMap(dimensions);
            dim.put("event", ((ServiceUnitState)entry.getKey()).toString());
            dim.put("result", "Total");
            metric = Metrics.create(dim);
            metric.put("brk_sunit_state_chn_subscribe_ops_total", (Object)entry.getValue().getTotal().get());
            metrics.add(metric);
            dim = new HashMap(dimensions);
            dim.put("event", ((ServiceUnitState)entry.getKey()).toString());
            dim.put("result", "Failure");
            metric = Metrics.create(dim);
            metric.put("brk_sunit_state_chn_subscribe_ops_total", (Object)entry.getValue().getFailure().get());
            metrics.add(metric);
        }
        HashMap<String, String> dim2 = new HashMap<String, String>(dimensions);
        dim2.put("result", "Failure");
        Metrics metrics2 = Metrics.create(dim2);
        metrics2.put("brk_sunit_state_chn_cleanup_ops_total", (Object)this.totalCleanupErrorCnt.get());
        metrics.add(metrics2);
        HashMap<String, String> dim3 = new HashMap<String, String>(dimensions);
        dim3.put("result", "Skip");
        Metrics metrics3 = Metrics.create(dim3);
        metrics3.put("brk_sunit_state_chn_inactive_broker_cleanup_ops_total", (Object)this.totalInactiveBrokerCleanupIgnoredCnt);
        metrics.add(metrics3);
        dim3 = new HashMap(dimensions);
        dim3.put("result", "Cancel");
        Metrics metrics4 = Metrics.create(dim3);
        metrics4.put("brk_sunit_state_chn_inactive_broker_cleanup_ops_total", (Object)this.totalInactiveBrokerCleanupCancelledCnt);
        metrics.add(metrics4);
        dim3 = new HashMap(dimensions);
        dim3.put("result", "Schedule");
        Metrics metrics5 = Metrics.create(dim3);
        metrics5.put("brk_sunit_state_chn_inactive_broker_cleanup_ops_total", (Object)this.totalInactiveBrokerCleanupScheduledCnt);
        metrics.add(metrics5);
        dim3 = new HashMap(dimensions);
        dim3.put("result", "Success");
        Metrics metrics6 = Metrics.create(dim3);
        metrics6.put("brk_sunit_state_chn_inactive_broker_cleanup_ops_total", (Object)this.totalInactiveBrokerCleanupCnt);
        metrics.add(metrics6);
        Metrics metric3 = Metrics.create(dimensions);
        metric3.put("brk_sunit_state_chn_orphan_su_cleanup_ops_total", (Object)this.totalOrphanServiceUnitCleanupCnt);
        metric3.put("brk_sunit_state_chn_su_tombstone_cleanup_ops_total", (Object)this.totalServiceUnitTombstoneCleanupCnt);
        metric3.put("brk_sunit_state_chn_owned_su_total", (Object)this.getTotalOwnedServiceUnitCnt());
        metrics.add(metric3);
        return metrics;
    }

    @Override
    public void listen(StateChangeListener listener) {
        this.stateChangeListeners.addListener(listener);
    }

    @Override
    public Set<Map.Entry<String, ServiceUnitStateData>> getOwnershipEntrySet() {
        return this.tableview.entrySet();
    }

    public static ServiceUnitStateChannel get(PulsarService pulsar) {
        return ExtensibleLoadManagerImpl.get(pulsar.getLoadManager().get()).getServiceUnitStateChannel();
    }

    public static class Counters {
        private final AtomicLong total;
        private final AtomicLong failure;

        public Counters() {
            this.total = new AtomicLong();
            this.failure = new AtomicLong();
        }

        public AtomicLong getTotal() {
            return this.total;
        }

        public AtomicLong getFailure() {
            return this.failure;
        }

        public Counters(AtomicLong total, AtomicLong failure) {
            this.total = total;
            this.failure = failure;
        }
    }

    public static enum EventType {
        Assign,
        Split,
        Unload,
        Override;

    }

    static enum ChannelState {
        Closed(0),
        Constructed(1),
        LeaderElectionServiceStarted(2),
        Started(3);

        int id;

        private ChannelState(int id) {
            this.id = id;
        }
    }

    static enum MetadataState {
        Stable,
        Jittery,
        Unstable;

    }
}

