/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationManager;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.service.BacklogQuotaManager;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.DistributedIdGenerator;
import org.apache.pulsar.broker.service.PulsarChannelInitializer;
import org.apache.pulsar.broker.service.PulsarStats;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.util.FutureUtil;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.naming.DestinationDomain;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokerService
implements Closeable,
ZooKeeperCacheListener<Policies> {
    private static final Logger log = LoggerFactory.getLogger(BrokerService.class);
    private final PulsarService pulsar;
    private final ManagedLedgerFactory managedLedgerFactory;
    private final int port;
    private final int tlsPort;
    private final ConcurrentOpenHashMap<String, CompletableFuture<Topic>> topics;
    private final ConcurrentOpenHashMap<String, PulsarClient> replicationClients;
    private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> multiLayerTopicsMap;
    private int numberOfNamespaceBundles = 0;
    private final EventLoopGroup acceptorGroup;
    private final EventLoopGroup workerGroup;
    private final OrderedSafeExecutor topicOrderedExecutor;
    private final ConcurrentOpenHashMap<DestinationName, PersistentOfflineTopicStats> offlineTopicStatCache;
    private static final ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap = BrokerService.prepareDynamicConfigurationMap();
    private final ConcurrentOpenHashMap<String, Consumer> configRegisteredListeners;
    private final ConcurrentLinkedQueue<Pair<String, CompletableFuture<Topic>>> pendingTopicLoadingQueue;
    private AuthorizationManager authorizationManager = null;
    private final ScheduledExecutorService statsUpdater;
    private final ScheduledExecutorService backlogQuotaChecker;
    protected final AtomicReference<Semaphore> lookupRequestSemaphore;
    protected final AtomicReference<Semaphore> topicLoadRequestSemaphore;
    private final ScheduledExecutorService inactivityMonitor;
    private final ScheduledExecutorService messageExpiryMonitor;
    private DistributedIdGenerator producerNameGenerator;
    private static final String producerNameGeneratorPath = "/counters/producer-name";
    private final BacklogQuotaManager backlogQuotaManager;
    private final int keepAliveIntervalSeconds;
    private final PulsarStats pulsarStats;
    private final ClientCnxnAspect.EventListner zkStatsListener;
    private final AuthenticationService authenticationService;
    public static final String BROKER_SERVICE_CONFIGURATION_PATH = "/admin/configuration";
    private final ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache;
    private static final LongAdder totalUnackedMessages = new LongAdder();
    private final int maxUnackedMessages;
    public final int maxUnackedMsgsPerDispatcher;
    private static final AtomicBoolean blockedDispatcherOnHighUnackedMsgs = new AtomicBoolean(false);
    private final ConcurrentOpenHashSet<PersistentDispatcherMultipleConsumers> blockedDispatchers;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    public BrokerService(PulsarService pulsar) throws Exception {
        this.pulsar = pulsar;
        this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
        this.port = new URI(pulsar.getBrokerServiceUrl()).getPort();
        this.tlsPort = new URI(pulsar.getBrokerServiceUrlTls()).getPort();
        this.topics = new ConcurrentOpenHashMap();
        this.replicationClients = new ConcurrentOpenHashMap();
        this.keepAliveIntervalSeconds = pulsar.getConfiguration().getKeepAliveIntervalSeconds();
        this.configRegisteredListeners = new ConcurrentOpenHashMap();
        this.pendingTopicLoadingQueue = Queues.newConcurrentLinkedQueue();
        this.multiLayerTopicsMap = new ConcurrentOpenHashMap();
        this.pulsarStats = new PulsarStats(pulsar);
        this.offlineTopicStatCache = new ConcurrentOpenHashMap();
        this.topicOrderedExecutor = new OrderedSafeExecutor(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic(), "broker-np-topic-workers");
        DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-acceptor");
        DefaultThreadFactory workersThreadFactory = new DefaultThreadFactory("pulsar-io");
        int numThreads = Runtime.getRuntime().availableProcessors() * 2;
        log.info("Using {} threads for broker service IO", (Object)numThreads);
        this.acceptorGroup = EventLoopUtil.newEventLoopGroup((int)1, (ThreadFactory)acceptorThreadFactory);
        this.workerGroup = EventLoopUtil.newEventLoopGroup((int)numThreads, (ThreadFactory)workersThreadFactory);
        this.statsUpdater = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DefaultThreadFactory("pulsar-stats-updater"));
        if (pulsar.getConfiguration().isAuthorizationEnabled()) {
            this.authorizationManager = new AuthorizationManager(pulsar.getConfiguration(), pulsar.getConfigurationCache());
        }
        if (pulsar.getConfigurationCache() != null) {
            pulsar.getConfigurationCache().policiesCache().registerListener((ZooKeeperCacheListener)this);
        }
        this.inactivityMonitor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DefaultThreadFactory("pulsar-inactivity-monitor"));
        this.messageExpiryMonitor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DefaultThreadFactory("pulsar-msg-expiry-monitor"));
        this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
        this.backlogQuotaChecker = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DefaultThreadFactory("pulsar-backlog-quota-checker"));
        this.authenticationService = new AuthenticationService(pulsar.getConfiguration());
        this.dynamicConfigurationCache = new ZooKeeperDataCache<Map<String, String>>(this.pulsar().getLocalZkCache()){

            public Map<String, String> deserialize(String key, byte[] content) throws Exception {
                return (Map)ObjectMapperFactory.getThreadLocal().readValue(content, HashMap.class);
            }
        };
        this.blockedDispatchers = new ConcurrentOpenHashSet();
        this.updateConfigurationAndRegisterListeners();
        this.lookupRequestSemaphore = new AtomicReference<Semaphore>(new Semaphore(pulsar.getConfiguration().getMaxConcurrentLookupRequest(), false));
        this.topicLoadRequestSemaphore = new AtomicReference<Semaphore>(new Semaphore(pulsar.getConfiguration().getMaxConcurrentTopicLoadRequest(), false));
        if (pulsar.getConfiguration().getMaxUnackedMessagesPerBroker() > 0 && pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked() > 0.0) {
            this.maxUnackedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerBroker();
            this.maxUnackedMsgsPerDispatcher = (int)((double)this.maxUnackedMessages * pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked() / 100.0);
            log.info("Enabling per-broker unack-message limit {} and dispatcher-limit {} on blocked-broker", (Object)this.maxUnackedMessages, (Object)this.maxUnackedMsgsPerDispatcher);
            pulsar.getExecutor().scheduleAtFixedRate(() -> this.checkUnAckMessageDispatching(), 600L, 30L, TimeUnit.SECONDS);
        } else {
            this.maxUnackedMessages = 0;
            this.maxUnackedMsgsPerDispatcher = 0;
            log.info("Disabling per broker unack-msg blocking due invalid unAckMsgSubscriptionPercentageLimitOnBrokerBlocked {} ", (Object)pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked());
        }
        this.zkStatsListener = new ClientCnxnAspect.EventListner(){

            @Override
            public void recordLatency(ClientCnxnAspect.EventType eventType, long latencyMs) {
                BrokerService.this.pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs);
            }
        };
    }

    public void start() throws Exception {
        this.producerNameGenerator = new DistributedIdGenerator(this.pulsar.getZkClient(), producerNameGeneratorPath, this.pulsar.getConfiguration().getClusterName());
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.childOption(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT);
        bootstrap.group(this.acceptorGroup, this.workerGroup);
        bootstrap.childOption(ChannelOption.TCP_NODELAY, (Object)true);
        bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, (Object)new AdaptiveRecvByteBufAllocator(1024, 16384, 0x100000));
        bootstrap.channel(EventLoopUtil.getServerSocketChannelClass((EventLoopGroup)this.workerGroup));
        EventLoopUtil.enableTriggeredMode((ServerBootstrap)bootstrap);
        ServiceConfiguration serviceConfig = this.pulsar.getConfiguration();
        bootstrap.childHandler((ChannelHandler)new PulsarChannelInitializer(this, serviceConfig, false));
        bootstrap.bind((SocketAddress)new InetSocketAddress(this.pulsar.getBindAddress(), this.port)).sync();
        log.info("Started Pulsar Broker service on port {}", (Object)this.port);
        if (serviceConfig.isTlsEnabled()) {
            ServerBootstrap tlsBootstrap = bootstrap.clone();
            tlsBootstrap.childHandler((ChannelHandler)new PulsarChannelInitializer(this, serviceConfig, true));
            tlsBootstrap.bind((SocketAddress)new InetSocketAddress(this.pulsar.getBindAddress(), this.tlsPort)).sync();
            log.info("Started Pulsar Broker TLS service on port {}", (Object)this.tlsPort);
        }
        this.startStatsUpdater();
        this.startInactivityMonitor();
        this.startMessageExpiryMonitor();
        this.startBacklogQuotaChecker();
        ClientCnxnAspect.addListener(this.zkStatsListener);
        ClientCnxnAspect.registerExecutor(this.pulsar.getExecutor());
    }

    void startStatsUpdater() {
        this.statsUpdater.scheduleAtFixedRate((Runnable)SafeRun.safeRun(this::updateRates), 60L, 60L, TimeUnit.SECONDS);
        this.updateRates();
    }

    void startInactivityMonitor() {
        if (this.pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled()) {
            int interval = this.pulsar().getConfiguration().getBrokerServicePurgeInactiveFrequencyInSeconds();
            this.inactivityMonitor.scheduleAtFixedRate((Runnable)SafeRun.safeRun(() -> this.checkGC(interval)), interval, interval, TimeUnit.SECONDS);
        }
        long intervalInSeconds = TimeUnit.MINUTES.toSeconds(this.pulsar().getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes()) / 3L;
        this.inactivityMonitor.scheduleAtFixedRate((Runnable)SafeRun.safeRun(this::checkMessageDeduplicationInfo), intervalInSeconds, intervalInSeconds, TimeUnit.SECONDS);
    }

    void startMessageExpiryMonitor() {
        int interval = this.pulsar().getConfiguration().getMessageExpiryCheckIntervalInMinutes();
        this.messageExpiryMonitor.scheduleAtFixedRate((Runnable)SafeRun.safeRun(this::checkMessageExpiry), interval, interval, TimeUnit.MINUTES);
    }

    void startBacklogQuotaChecker() {
        if (this.pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
            int interval = this.pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
            log.info("Scheduling a thread to check backlog quota after [{}] seconds in background", (Object)interval);
            this.backlogQuotaChecker.scheduleAtFixedRate((Runnable)SafeRun.safeRun(this::monitorBacklogQuota), interval, interval, TimeUnit.SECONDS);
        } else {
            log.info("Backlog quota check monitoring is disabled");
        }
    }

    @Override
    public void close() throws IOException {
        log.info("Shutting down Pulsar Broker service");
        if (this.pulsar.getConfigurationCache() != null) {
            this.pulsar.getConfigurationCache().policiesCache().unregisterListener((ZooKeeperCacheListener)this);
        }
        this.unloadNamespaceBundlesGracefully();
        this.replicationClients.forEach((cluster, client) -> {
            try {
                client.shutdown();
            }
            catch (PulsarClientException e) {
                log.warn("Error shutting down repl client for cluster {}", cluster, (Object)e);
            }
        });
        this.acceptorGroup.shutdownGracefully();
        this.workerGroup.shutdownGracefully();
        this.statsUpdater.shutdown();
        this.inactivityMonitor.shutdown();
        this.messageExpiryMonitor.shutdown();
        this.backlogQuotaChecker.shutdown();
        this.authenticationService.close();
        this.pulsarStats.close();
        ClientCnxnAspect.removeListener(this.zkStatsListener);
        ClientCnxnAspect.registerExecutor(null);
        this.topicOrderedExecutor.shutdown();
        log.info("Broker service completely shut down");
    }

    public void unloadNamespaceBundlesGracefully() {
        try {
            if (this.pulsar.getLoadManager() != null) {
                this.pulsar.getLoadManager().get().disableBroker();
            }
            long closeTopicsStartTime = System.nanoTime();
            Set<NamespaceBundle> serviceUnits = this.pulsar.getNamespaceService().getOwnedServiceUnits();
            serviceUnits.forEach(su -> {
                if (su instanceof NamespaceBundle) {
                    try {
                        this.pulsar.getNamespaceService().unloadNamespaceBundle((NamespaceBundle)su);
                    }
                    catch (Exception e) {
                        log.warn("Failed to unload namespace bundle {}", su, (Object)e);
                    }
                }
            });
            double closeTopicsTimeSeconds = (double)TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - closeTopicsStartTime) / 1000.0;
            log.info("Unloading {} namespace-bundles completed in {} seconds", (Object)serviceUnits.size(), (Object)closeTopicsTimeSeconds);
        }
        catch (Exception e) {
            log.error("Failed to disable broker from loadbalancer list {}", (Object)e.getMessage(), (Object)e);
        }
    }

    public CompletableFuture<Topic> getTopic(String topic) {
        try {
            CompletableFuture topicFuture = (CompletableFuture)this.topics.get((Object)topic);
            if (topicFuture != null) {
                if (topicFuture.isCompletedExceptionally()) {
                    this.topics.remove((Object)topic, (Object)topicFuture);
                } else {
                    return topicFuture;
                }
            }
            boolean isPersistentTopic = DestinationName.get((String)topic).getDomain().equals((Object)DestinationDomain.persistent);
            return (CompletableFuture)this.topics.computeIfAbsent((Object)topic, topicName -> isPersistentTopic ? this.createPersistentTopic((String)topicName) : this.createNonPersistentTopic((String)topicName));
        }
        catch (IllegalArgumentException e) {
            log.warn("[{}] Illegalargument exception when loading topic", (Object)topic, (Object)e);
            return BrokerService.failedFuture(e);
        }
        catch (RuntimeException e) {
            Throwable cause = e.getCause();
            if (cause instanceof BrokerServiceException.ServiceUnitNotReadyException) {
                log.warn("[{}] Service unit is not ready when loading the topic", (Object)topic);
            } else {
                log.warn("[{}] Unexpected exception when loading topic: {}", (Object)topic, (Object)cause);
            }
            return BrokerService.failedFuture(cause);
        }
    }

    private CompletableFuture<Topic> createNonPersistentTopic(String topic) {
        CompletableFuture<Topic> topicFuture = new CompletableFuture<Topic>();
        if (!this.pulsar.getConfiguration().isEnableNonPersistentTopics()) {
            if (log.isDebugEnabled()) {
                log.debug("Broker is unable to load non-persistent topic {}", (Object)topic);
            }
            topicFuture.completeExceptionally(new BrokerServiceException.NotAllowedException("Broker is not unable to load non-persistent topic"));
            return topicFuture;
        }
        long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
        NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
        CompletableFuture<Void> replicationFuture = nonPersistentTopic.checkReplication();
        replicationFuture.thenRun(() -> {
            log.info("Created topic {}", (Object)nonPersistentTopic);
            long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
            this.pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
            this.addTopicToStatsMaps(DestinationName.get((String)topic), nonPersistentTopic);
            topicFuture.complete(nonPersistentTopic);
        });
        replicationFuture.exceptionally(ex -> {
            log.warn("Replication check failed. Removing topic from topics list {}, {}", (Object)topic, ex);
            nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
                this.pulsar.getExecutor().submit(() -> this.topics.remove((Object)topic, (Object)topicFuture));
                topicFuture.completeExceptionally((Throwable)ex);
            });
            return null;
        });
        return topicFuture;
    }

    private static <T> CompletableFuture<T> failedFuture(Throwable t) {
        CompletableFuture future = new CompletableFuture();
        future.completeExceptionally(t);
        return future;
    }

    public PulsarClient getReplicationClient(String cluster) {
        PulsarClient client = (PulsarClient)this.replicationClients.get((Object)cluster);
        if (client != null) {
            return client;
        }
        return (PulsarClient)this.replicationClients.computeIfAbsent((Object)cluster, key -> {
            try {
                String path = PulsarWebResource.path("clusters", cluster);
                ClusterData data = (ClusterData)this.pulsar.getConfigurationCache().clustersCache().get(path).orElseThrow(() -> new KeeperException.NoNodeException(path));
                ClientConfiguration configuration = new ClientConfiguration();
                configuration.setUseTcpNoDelay(false);
                configuration.setConnectionsPerBroker(this.pulsar.getConfiguration().getReplicationConnectionsPerBroker());
                configuration.setStatsInterval(0L, TimeUnit.SECONDS);
                if (this.pulsar.getConfiguration().isAuthenticationEnabled()) {
                    configuration.setAuthentication(this.pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(), this.pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
                }
                String clusterUrl = null;
                if (this.pulsar.getConfiguration().isReplicationTlsEnabled()) {
                    clusterUrl = StringUtils.isNotBlank((CharSequence)data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls() : data.getServiceUrlTls();
                    configuration.setUseTls(true);
                    configuration.setTlsTrustCertsFilePath(this.pulsar.getConfiguration().getTlsTrustCertsFilePath());
                    configuration.setTlsAllowInsecureConnection(this.pulsar.getConfiguration().isTlsAllowInsecureConnection());
                } else {
                    clusterUrl = StringUtils.isNotBlank((CharSequence)data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl();
                }
                return new PulsarClientImpl(clusterUrl, configuration, this.workerGroup);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    protected CompletableFuture<Topic> createPersistentTopic(String topic) throws RuntimeException {
        this.checkTopicNsOwnership(topic);
        CompletableFuture<Topic> topicFuture = new CompletableFuture<Topic>();
        if (!this.pulsar.getConfiguration().isEnablePersistentTopics()) {
            if (log.isDebugEnabled()) {
                log.debug("Broker is unable to load persistent topic {}", (Object)topic);
            }
            topicFuture.completeExceptionally(new BrokerServiceException.NotAllowedException("Broker is not unable to load persistent topic"));
            return topicFuture;
        }
        Semaphore topicLoadSemaphore = this.topicLoadRequestSemaphore.get();
        if (topicLoadSemaphore.tryAcquire()) {
            this.createPersistentTopic(topic, topicFuture);
            topicFuture.handle((persistentTopic, ex) -> {
                topicLoadSemaphore.release();
                this.createPendingLoadTopic();
                return null;
            });
        } else {
            this.pendingTopicLoadingQueue.add((Pair<String, CompletableFuture<Topic>>)new ImmutablePair((Object)topic, topicFuture));
            if (log.isDebugEnabled()) {
                log.debug("topic-loading for {} added into pending queue", (Object)topic);
            }
        }
        return topicFuture;
    }

    private void createPersistentTopic(final String topic, final CompletableFuture<Topic> topicFuture) {
        final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
        final DestinationName destinationName = DestinationName.get((String)topic);
        if (!this.pulsar.getNamespaceService().isServiceUnitActive(destinationName)) {
            String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic);
            log.warn(msg);
            this.pulsar.getExecutor().submit(() -> this.topics.remove((Object)topic, (Object)topicFuture));
            topicFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException(msg));
            return;
        }
        ((CompletableFuture)this.getManagedLedgerConfig(destinationName).thenAccept(managedLedgerConfig -> this.managedLedgerFactory.asyncOpen(destinationName.getPersistenceNamingEncoding(), managedLedgerConfig, new AsyncCallbacks.OpenLedgerCallback(){

            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
                PersistentTopic persistentTopic = new PersistentTopic(topic, ledger, BrokerService.this);
                CompletableFuture<Void> replicationFuture = persistentTopic.checkReplication();
                ((CompletableFuture)((CompletableFuture)replicationFuture.thenCompose(v -> persistentTopic.checkDeduplicationStatus())).thenRun(() -> {
                    log.info("Created topic {} - dedup is {}", (Object)topic, (Object)(persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled"));
                    long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
                    BrokerService.this.pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
                    BrokerService.this.addTopicToStatsMaps(destinationName, persistentTopic);
                    topicFuture.complete(persistentTopic);
                })).exceptionally(ex -> {
                    log.warn("Replication or dedup check failed. Removing topic from topics list {}, {}", (Object)topic, ex);
                    persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
                        BrokerService.this.topics.remove((Object)topic, (Object)topicFuture);
                        topicFuture.completeExceptionally((Throwable)ex);
                    });
                    return null;
                });
            }

            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
                log.warn("Failed to create topic {}", (Object)topic, (Object)exception);
                BrokerService.this.pulsar.getExecutor().submit(() -> BrokerService.this.topics.remove((Object)topic, (Object)topicFuture));
                topicFuture.completeExceptionally(new BrokerServiceException.PersistenceException((Throwable)exception));
            }
        }, null))).exceptionally(exception -> {
            log.warn("[{}] Failed to get topic configuration: {}", new Object[]{topic, exception.getMessage(), exception});
            this.pulsar.getExecutor().submit(() -> this.topics.remove((Object)topic, (Object)topicFuture));
            topicFuture.completeExceptionally((Throwable)exception);
            return null;
        });
    }

    public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(DestinationName topicName) {
        CompletableFuture<ManagedLedgerConfig> future = new CompletableFuture<ManagedLedgerConfig>();
        this.pulsar.getOrderedExecutor().submitOrdered((Object)topicName, SafeRun.safeRun(() -> {
            NamespaceName namespace = topicName.getNamespaceObject();
            ServiceConfiguration serviceConfig = this.pulsar.getConfiguration();
            Optional policies = Optional.empty();
            try {
                policies = this.pulsar.getConfigurationCache().policiesCache().get(AdminResource.path("policies", namespace.getProperty(), namespace.getCluster(), namespace.getLocalName()));
            }
            catch (Throwable t) {
                log.warn("Got exception when reading persistence policy for {}: {}", new Object[]{topicName, t.getMessage(), t});
                future.completeExceptionally(t);
                return;
            }
            PersistencePolicies persistencePolicies = policies.map(p -> p.persistence).orElseGet(() -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(), serviceConfig.getManagedLedgerDefaultWriteQuorum(), serviceConfig.getManagedLedgerDefaultAckQuorum(), serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
            RetentionPolicies retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), serviceConfig.getDefaultRetentionSizeInMB()));
            ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
            managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
            managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
            managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
            managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
            managedLedgerConfig.setDigestType(BookKeeper.DigestType.CRC32);
            managedLedgerConfig.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
            managedLedgerConfig.setMaxUnackedRangesToPersistInZk(serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
            managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
            managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(), TimeUnit.MINUTES);
            managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(), TimeUnit.MINUTES);
            managedLedgerConfig.setMaxSizePerLedgerMb(2048);
            managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
            managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
            managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
            managedLedgerConfig.setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
            managedLedgerConfig.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
            managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
            managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
            future.complete(managedLedgerConfig);
        }, exception -> {
            boolean bl = future.completeExceptionally((Throwable)exception);
        }));
        return future;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addTopicToStatsMaps(DestinationName topicName, Topic topic) {
        try {
            NamespaceBundle namespaceBundle = this.pulsar.getNamespaceService().getBundle(topicName);
            if (namespaceBundle != null) {
                ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> concurrentOpenHashMap = this.multiLayerTopicsMap;
                synchronized (concurrentOpenHashMap) {
                    String serviceUnit = namespaceBundle.toString();
                    ((ConcurrentOpenHashMap)((ConcurrentOpenHashMap)this.multiLayerTopicsMap.computeIfAbsent((Object)topicName.getNamespace(), k -> new ConcurrentOpenHashMap())).computeIfAbsent((Object)serviceUnit, k -> new ConcurrentOpenHashMap())).put((Object)topicName.toString(), (Object)topic);
                }
            }
            this.invalidateOfflineTopicStatCache(topicName);
        }
        catch (Exception e) {
            log.warn("Got exception when retrieving bundle name during create persistent topic", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void refreshTopicToStatsMaps(NamespaceBundle oldBundle) {
        block5: {
            Preconditions.checkNotNull((Object)oldBundle);
            try {
                List<Topic> topics = this.getAllTopicsFromNamespaceBundle(oldBundle.getNamespaceObject().toString(), oldBundle.toString());
                if (CollectionUtils.isEmpty(topics)) break block5;
                topics.stream().forEach(t -> this.addTopicToStatsMaps(DestinationName.get((String)t.getName()), (Topic)t));
                ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> concurrentOpenHashMap = this.multiLayerTopicsMap;
                synchronized (concurrentOpenHashMap) {
                    ((ConcurrentOpenHashMap)this.multiLayerTopicsMap.get((Object)oldBundle.getNamespaceObject().toString())).remove((Object)oldBundle.toString());
                    this.pulsarStats.invalidBundleStats(oldBundle.toString());
                }
            }
            catch (Exception e) {
                log.warn("Got exception while refreshing topicStats map", (Throwable)e);
            }
        }
    }

    public PersistentOfflineTopicStats getOfflineTopicStat(DestinationName topicName) {
        return (PersistentOfflineTopicStats)this.offlineTopicStatCache.get((Object)topicName);
    }

    public void cacheOfflineTopicStats(DestinationName topicName, PersistentOfflineTopicStats offlineTopicStats) {
        this.offlineTopicStatCache.put((Object)topicName, (Object)offlineTopicStats);
    }

    public void invalidateOfflineTopicStatCache(DestinationName topicName) {
        PersistentOfflineTopicStats removed = (PersistentOfflineTopicStats)this.offlineTopicStatCache.remove((Object)topicName);
        if (removed != null) {
            log.info("Removed cached offline topic stat for {} ", (Object)topicName.getPersistenceNamingEncoding());
        }
    }

    public Topic getTopicReference(String topic) throws Exception {
        CompletableFuture future = (CompletableFuture)this.topics.get((Object)topic);
        if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
            return (Topic)future.get();
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateRates() {
        PulsarStats pulsarStats = this.pulsarStats;
        synchronized (pulsarStats) {
            this.pulsarStats.updateStats(this.multiLayerTopicsMap);
        }
    }

    public void getDimensionMetrics(Consumer<ByteBuf> consumer) {
        this.pulsarStats.getDimensionMetrics(consumer);
    }

    public List<Metrics> getDestinationMetrics() {
        return this.pulsarStats.getDestinationMetrics();
    }

    public Map<String, NamespaceBundleStats> getBundleStats() {
        return this.pulsarStats.getBundleStats();
    }

    public Semaphore getLookupRequestSemaphore() {
        return this.lookupRequestSemaphore.get();
    }

    public void checkGC(int gcIntervalInSeconds) {
        this.topics.forEach((n2, t) -> {
            Topic topic;
            Topic topic2 = topic = t.isCompletedExceptionally() ? null : (Topic)t.getNow(null);
            if (topic != null) {
                topic.checkGC(gcIntervalInSeconds);
            }
        });
    }

    public void checkMessageExpiry() {
        this.topics.forEach((n, t) -> {
            Topic topic = t.getNow(null);
            if (topic != null) {
                topic.checkMessageExpiry();
            }
        });
    }

    public void checkMessageDeduplicationInfo() {
        this.topics.forEach((n, t) -> {
            Topic topic = t.getNow(null);
            if (topic != null) {
                topic.checkMessageDeduplicationInfo();
            }
        });
    }

    public BacklogQuotaManager getBacklogQuotaManager() {
        return this.backlogQuotaManager;
    }

    public boolean isBacklogExceeded(PersistentTopic topic) {
        DestinationName destination = DestinationName.get((String)topic.getName());
        long backlogQuotaLimitInBytes = this.getBacklogQuotaManager().getBacklogQuotaLimit(destination.getNamespace());
        if (log.isDebugEnabled()) {
            log.debug("[{}] - backlog quota limit = [{}]", (Object)topic.getName(), (Object)backlogQuotaLimitInBytes);
        }
        long storageSize = topic.getBacklogSize();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Storage size = [{}], limit [{}]", new Object[]{topic.getName(), storageSize, backlogQuotaLimitInBytes});
        }
        return storageSize >= backlogQuotaLimitInBytes;
    }

    public void monitorBacklogQuota() {
        this.topics.forEach((n, t) -> {
            try {
                if (t.getNow(null) != null && t.getNow(null) instanceof PersistentTopic) {
                    PersistentTopic topic = t.getNow(null);
                    if (this.isBacklogExceeded(topic)) {
                        this.getBacklogQuotaManager().handleExceededBacklogQuota(topic);
                    } else if (topic == null) {
                        if (log.isDebugEnabled()) {
                            log.debug("topic is null ");
                        }
                    } else if (log.isDebugEnabled()) {
                        log.debug("quota not exceeded for [{}]", (Object)topic.getName());
                    }
                }
            }
            catch (Exception xle) {
                log.warn("Backlog quota monitoring encountered :" + xle.getLocalizedMessage());
            }
        });
    }

    void checkTopicNsOwnership(String topic) throws RuntimeException {
        boolean ownedByThisInstance;
        DestinationName destination = DestinationName.get((String)topic);
        try {
            ownedByThisInstance = this.pulsar.getNamespaceService().isServiceUnitOwned((ServiceUnitId)destination);
        }
        catch (Exception e) {
            log.debug(String.format("Failed to check the ownership of the destination: %s", destination), (Throwable)e);
            throw new RuntimeException(new BrokerServiceException.ServerMetadataException(e));
        }
        if (!ownedByThisInstance) {
            String msg = String.format("Namespace not served by this instance. Please redo the lookup. Request is denied: namespace=%s", destination.getNamespace());
            log.warn(msg);
            throw new RuntimeException(new BrokerServiceException.ServiceUnitNotReadyException(msg));
        }
    }

    public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit) {
        CompletableFuture<Integer> result = new CompletableFuture<Integer>();
        ArrayList closeFutures = Lists.newArrayList();
        this.topics.forEach((name, topicFuture) -> {
            DestinationName topicName = DestinationName.get((String)name);
            if (serviceUnit.includes(topicName)) {
                log.info("[{}] Unloading topic", (Object)topicName);
                closeFutures.add(topicFuture.thenCompose(Topic::close));
            }
        });
        CompletableFuture aggregator = FutureUtil.waitForAll((List)closeFutures);
        ((CompletableFuture)aggregator.thenAccept(res -> {
            boolean bl = result.complete(closeFutures.size());
        })).exceptionally(ex -> {
            result.completeExceptionally((Throwable)ex);
            return null;
        });
        return result;
    }

    public AuthorizationManager getAuthorizationManager() {
        return this.authorizationManager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeTopicFromCache(String topic) {
        try {
            DestinationName destination = DestinationName.get((String)topic);
            NamespaceBundle namespaceBundle = this.pulsar.getNamespaceService().getBundle(destination);
            Preconditions.checkArgument((boolean)(namespaceBundle instanceof NamespaceBundle));
            String bundleName = namespaceBundle.toString();
            String namespaceName = destination.getNamespaceObject().toString();
            ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> concurrentOpenHashMap = this.multiLayerTopicsMap;
            synchronized (concurrentOpenHashMap) {
                ConcurrentOpenHashMap namespaceMap = (ConcurrentOpenHashMap)this.multiLayerTopicsMap.get((Object)namespaceName);
                ConcurrentOpenHashMap bundleMap = (ConcurrentOpenHashMap)namespaceMap.get((Object)bundleName);
                bundleMap.remove((Object)topic);
                if (bundleMap.isEmpty()) {
                    namespaceMap.remove((Object)bundleName);
                }
                if (namespaceMap.isEmpty()) {
                    this.multiLayerTopicsMap.remove((Object)namespaceName);
                    ClusterReplicationMetrics clusterReplicationMetrics = this.pulsarStats.getClusterReplicationMetrics();
                    this.replicationClients.forEach((cluster, client) -> clusterReplicationMetrics.remove(clusterReplicationMetrics.getKeyName(namespaceName, (String)cluster)));
                }
            }
        }
        catch (Exception e) {
            log.warn("Got exception when retrieving bundle name during removeTopicFromCache", (Throwable)e);
        }
        this.topics.remove((Object)topic);
    }

    public int getNumberOfNamespaceBundles() {
        this.numberOfNamespaceBundles = 0;
        this.multiLayerTopicsMap.forEach((namespaceName, bundles) -> {
            this.numberOfNamespaceBundles = (int)((long)this.numberOfNamespaceBundles + bundles.size());
        });
        return this.numberOfNamespaceBundles;
    }

    public ConcurrentOpenHashMap<String, CompletableFuture<Topic>> getTopics() {
        return this.topics;
    }

    public void onUpdate(String path, Policies data, Stat stat) {
        NamespaceName namespace = new NamespaceName(NamespaceBundleFactory.getNamespaceFromPoliciesPath(path));
        log.info("{} updating with {}", (Object)path, (Object)data);
        this.topics.forEach((name, topicFuture) -> {
            if (namespace.includes(DestinationName.get((String)name))) {
                topicFuture.thenAccept(topic -> {
                    if (log.isDebugEnabled()) {
                        log.debug("Notifying topic that policies have changed: {}", name);
                    }
                    topic.onPoliciesUpdate(data);
                });
            }
        });
    }

    public PulsarService pulsar() {
        return this.pulsar;
    }

    public ScheduledExecutorService executor() {
        return this.workerGroup;
    }

    public ConcurrentOpenHashMap<String, PulsarClient> getReplicationClients() {
        return this.replicationClients;
    }

    public boolean isAuthenticationEnabled() {
        return this.pulsar.getConfiguration().isAuthenticationEnabled();
    }

    public boolean isAuthorizationEnabled() {
        return this.authorizationManager != null;
    }

    public int getKeepAliveIntervalSeconds() {
        return this.keepAliveIntervalSeconds;
    }

    public String generateUniqueProducerName() {
        return this.producerNameGenerator.getNextId();
    }

    public Map<String, PersistentTopicStats> getTopicStats() {
        HashMap<String, PersistentTopicStats> stats = new HashMap<String, PersistentTopicStats>();
        this.topics.forEach((name, topicFuture) -> {
            Topic currentTopic = topicFuture.getNow(null);
            if (currentTopic != null) {
                stats.put((String)name, currentTopic.getStats());
            }
        });
        return stats;
    }

    public AuthenticationService getAuthenticationService() {
        return this.authenticationService;
    }

    public List<Topic> getAllTopicsFromNamespaceBundle(String namespace, String bundle) {
        return ((ConcurrentOpenHashMap)((ConcurrentOpenHashMap)this.multiLayerTopicsMap.get((Object)namespace)).get((Object)bundle)).values();
    }

    public ZooKeeperDataCache<Map<String, String>> getDynamicConfigurationCache() {
        return this.dynamicConfigurationCache;
    }

    private void updateConfigurationAndRegisterListeners() {
        this.addDynamicConfigValidator("loadManagerClassName", className -> {
            try {
                Class.forName(className);
            }
            catch (ClassNotFoundException e) {
                log.warn("Configured load-manager class {} not found {}", className, (Object)e.getMessage());
                return false;
            }
            return true;
        });
        this.updateDynamicServiceConfiguration();
        this.registerConfigurationListener("maxConcurrentLookupRequest", maxConcurrentLookupRequest -> this.lookupRequestSemaphore.set(new Semaphore((Integer)maxConcurrentLookupRequest, false)));
        this.registerConfigurationListener("maxConcurrentTopicLoadRequest", maxConcurrentTopicLoadRequest -> this.topicLoadRequestSemaphore.set(new Semaphore((Integer)maxConcurrentTopicLoadRequest, false)));
        this.registerConfigurationListener("loadManagerClassName", className -> {
            try {
                LoadManager newLoadManager = LoadManager.create(this.pulsar);
                log.info("Created load manager: {}", className);
                this.pulsar.getLoadManager().get().stop();
                newLoadManager.start();
                this.pulsar.getLoadManager().set(newLoadManager);
            }
            catch (Exception ex) {
                log.warn("Failed to change load manager due to {}", (Throwable)ex);
            }
        });
        this.registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", dispatchRatePerTopicInMsg -> this.updateTopicMessageDispatchRate());
        this.registerConfigurationListener("dispatchThrottlingRatePerTopicInByte", dispatchRatePerTopicInByte -> this.updateTopicMessageDispatchRate());
    }

    private void updateTopicMessageDispatchRate() {
        this.pulsar().getExecutor().submit(() -> this.topics.forEach((name, topicFuture) -> {
            if (topicFuture.isDone()) {
                String topicName = null;
                try {
                    if (topicFuture.get() instanceof PersistentTopic) {
                        PersistentTopic topic = (PersistentTopic)topicFuture.get();
                        topicName = ((Topic)topicFuture.get()).getName();
                        topic.getDispatchRateLimiter().updateDispatchRate();
                    }
                }
                catch (Exception e) {
                    log.warn("[{}] failed to update message-dispatch rate {}", topicName, (Object)e.getMessage());
                }
            }
        }));
    }

    public <T> void registerConfigurationListener(String configKey, Consumer<T> listener) {
        this.validateConfigKey(configKey);
        this.configRegisteredListeners.put((Object)configKey, listener);
    }

    private void addDynamicConfigValidator(String key, Predicate<String> validator) {
        this.validateConfigKey(key);
        if (dynamicConfigurationMap.containsKey((Object)key)) {
            ((ConfigField)BrokerService.dynamicConfigurationMap.get((Object)key)).validator = validator;
        }
    }

    private void validateConfigKey(String key) {
        try {
            ServiceConfiguration.class.getDeclaredField(key);
        }
        catch (Exception e) {
            log.error("ServiceConfiguration key {} not found {}", (Object)key, (Object)e.getMessage());
            throw new IllegalArgumentException("Invalid service config " + key, e);
        }
    }

    private void updateDynamicServiceConfiguration() {
        Optional configCache = Optional.empty();
        try {
            if (this.pulsar.getZkClient().exists(BROKER_SERVICE_CONFIGURATION_PATH, false) == null) {
                try {
                    byte[] data = ObjectMapperFactory.getThreadLocal().writeValueAsBytes((Object)Maps.newHashMap());
                    ZkUtils.createFullPathOptimistic((ZooKeeper)this.pulsar.getZkClient(), (String)BROKER_SERVICE_CONFIGURATION_PATH, (byte[])data, ZooDefs.Ids.OPEN_ACL_UNSAFE, (CreateMode)CreateMode.PERSISTENT);
                }
                catch (KeeperException.NodeExistsException nodeExistsException) {}
            }
            configCache = this.dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH);
        }
        catch (Exception e) {
            log.warn("Failed to read zookeeper path [{}]:", (Object)BROKER_SERVICE_CONFIGURATION_PATH, (Object)e);
        }
        if (configCache.isPresent()) {
            ((Map)configCache.get()).forEach((key, value) -> {
                if (dynamicConfigurationMap.containsKey(key) && ((ConfigField)BrokerService.dynamicConfigurationMap.get((Object)key)).validator != null && !((ConfigField)BrokerService.dynamicConfigurationMap.get((Object)key)).validator.test((String)value)) {
                    log.error("Failed to validate dynamic config {} with value {}", key, value);
                    throw new IllegalArgumentException(String.format("Failed to validate dynamic-config %s/%s", key, value));
                }
                try {
                    Field field = ServiceConfiguration.class.getDeclaredField((String)key);
                    if (field != null && field.isAnnotationPresent(FieldContext.class)) {
                        field.setAccessible(true);
                        field.set(this.pulsar().getConfiguration(), FieldParser.value((String)value, (Field)field));
                        log.info("Successfully updated {}/{}", key, value);
                    }
                }
                catch (Exception e) {
                    log.warn("Failed to update service configuration {}/{}, {}", new Object[]{key, value, e.getMessage()});
                }
            });
        }
        this.dynamicConfigurationCache.registerListener((ZooKeeperCacheListener)new ZooKeeperCacheListener<Map<String, String>>(){

            public void onUpdate(String path, Map<String, String> data, Stat stat) {
                if (BrokerService.BROKER_SERVICE_CONFIGURATION_PATH.equalsIgnoreCase(path) && data != null) {
                    data.forEach((configKey, value) -> {
                        Field configField = ((ConfigField)dynamicConfigurationMap.get((Object)configKey)).field;
                        Object newValue = FieldParser.value((String)((String)data.get(configKey)), (Field)configField);
                        if (configField != null) {
                            Consumer listener = (Consumer)BrokerService.this.configRegisteredListeners.get(configKey);
                            try {
                                Object existingValue = configField.get(BrokerService.this.pulsar.getConfiguration());
                                configField.set(BrokerService.this.pulsar.getConfiguration(), newValue);
                                log.info("Successfully updated configuration {}/{}", configKey, data.get(configKey));
                                if (listener != null && !existingValue.equals(newValue)) {
                                    listener.accept(newValue);
                                }
                            }
                            catch (Exception exception) {
                                log.error("Failed to update config {}/{}", configKey, newValue);
                            }
                        } else {
                            log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue);
                        }
                    });
                }
            }
        });
    }

    public static List<String> getDynamicConfiguration() {
        return dynamicConfigurationMap.keys();
    }

    public static boolean isDynamicConfiguration(String key) {
        return dynamicConfigurationMap.containsKey((Object)key);
    }

    public static boolean validateDynamicConfiguration(String key, String value) {
        if (dynamicConfigurationMap.containsKey((Object)key) && ((ConfigField)BrokerService.dynamicConfigurationMap.get((Object)key)).validator != null) {
            return ((ConfigField)BrokerService.dynamicConfigurationMap.get((Object)key)).validator.test(value);
        }
        return true;
    }

    private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() {
        ConcurrentOpenHashMap dynamicConfigurationMap = new ConcurrentOpenHashMap();
        Field[] fieldArray = ServiceConfiguration.class.getDeclaredFields();
        int n = fieldArray.length;
        int n2 = 0;
        while (n2 < n) {
            Field field = fieldArray[n2];
            if (field != null && field.isAnnotationPresent(FieldContext.class)) {
                field.setAccessible(true);
                if (field.getAnnotation(FieldContext.class).dynamic()) {
                    dynamicConfigurationMap.put((Object)field.getName(), (Object)new ConfigField(field));
                }
            }
            ++n2;
        }
        return dynamicConfigurationMap;
    }

    private void createPendingLoadTopic() {
        Pair<String, CompletableFuture<Topic>> pendingTopic = this.pendingTopicLoadingQueue.poll();
        if (pendingTopic == null) {
            return;
        }
        String topic = (String)pendingTopic.getLeft();
        try {
            this.checkTopicNsOwnership(topic);
            CompletableFuture pendingFuture = (CompletableFuture)pendingTopic.getRight();
            Semaphore topicLoadSemaphore = this.topicLoadRequestSemaphore.get();
            boolean acquiredPermit = topicLoadSemaphore.tryAcquire();
            this.createPersistentTopic(topic, pendingFuture);
            pendingFuture.handle((persistentTopic, ex) -> {
                if (acquiredPermit) {
                    topicLoadSemaphore.release();
                }
                this.createPendingLoadTopic();
                return null;
            });
        }
        catch (RuntimeException re) {
            log.error("Failed to create pending topic {} {}", (Object)topic, (Object)re);
            ((CompletableFuture)pendingTopic.getRight()).completeExceptionally(re.getCause());
            this.inactivityMonitor.schedule(() -> this.createPendingLoadTopic(), 100L, TimeUnit.MILLISECONDS);
        }
    }

    public OrderedSafeExecutor getTopicOrderedExecutor() {
        return this.topicOrderedExecutor;
    }

    public ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> getMultiLayerTopicMap() {
        return this.multiLayerTopicsMap;
    }

    public void addUnAckedMessages(PersistentDispatcherMultipleConsumers dispatcher, int numberOfMessages) {
        if (this.maxUnackedMessages > 0) {
            totalUnackedMessages.add(numberOfMessages);
            if (blockedDispatcherOnHighUnackedMsgs.get() && !dispatcher.isBlockedDispatcherOnUnackedMsgs() && dispatcher.getTotalUnackedMessages() > this.maxUnackedMsgsPerDispatcher) {
                this.lock.readLock().lock();
                try {
                    log.info("[{}] dispatcher reached to max unack msg limit on blocked-broker {}", (Object)dispatcher.getName(), (Object)dispatcher.getTotalUnackedMessages());
                    dispatcher.blockDispatcherOnUnackedMsgs();
                    this.blockedDispatchers.add((Object)dispatcher);
                }
                finally {
                    this.lock.readLock().unlock();
                }
            }
        }
    }

    public void checkUnAckMessageDispatching() {
        if (this.maxUnackedMessages <= 0) {
            return;
        }
        long unAckedMessages = totalUnackedMessages.sum();
        if (unAckedMessages >= (long)this.maxUnackedMessages && blockedDispatcherOnHighUnackedMsgs.compareAndSet(false, true)) {
            log.info("[{}] Starting blocking dispatchers with unacked msgs {} due to reached max broker limit {}", (Object)this.maxUnackedMessages, (Object)this.maxUnackedMsgsPerDispatcher);
            this.executor().submit(() -> this.blockDispatchersWithLargeUnAckMessages());
        } else if (blockedDispatcherOnHighUnackedMsgs.get() && unAckedMessages < (long)(this.maxUnackedMessages / 2) && blockedDispatcherOnHighUnackedMsgs.compareAndSet(true, false)) {
            this.unblockDispatchersOnUnAckMessages(this.blockedDispatchers.values());
        }
    }

    public boolean isBrokerDispatchingBlocked() {
        return blockedDispatcherOnHighUnackedMsgs.get();
    }

    private void blockDispatchersWithLargeUnAckMessages() {
        this.lock.readLock().lock();
        try {
            this.topics.forEach((name, topicFuture) -> {
                if (topicFuture.isDone()) {
                    try {
                        ((Topic)topicFuture.get()).getSubscriptions().forEach((subName, persistentSubscription) -> {
                            PersistentDispatcherMultipleConsumers dispatcher;
                            int dispatcherUnAckMsgs;
                            if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers && (dispatcherUnAckMsgs = (dispatcher = (PersistentDispatcherMultipleConsumers)persistentSubscription.getDispatcher()).getTotalUnackedMessages()) > this.maxUnackedMsgsPerDispatcher) {
                                log.info("[{}] Blocking dispatcher due to reached max broker limit {}", (Object)dispatcher.getName(), (Object)dispatcher.getTotalUnackedMessages());
                                dispatcher.blockDispatcherOnUnackedMsgs();
                                this.blockedDispatchers.add((Object)dispatcher);
                            }
                        });
                    }
                    catch (Exception e) {
                        log.warn("Failed to get topic from future ", (Throwable)e);
                    }
                }
            });
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleConsumers> dispatcherList) {
        this.lock.writeLock().lock();
        try {
            dispatcherList.forEach(dispatcher -> {
                dispatcher.unBlockDispatcherOnUnackedMsgs();
                this.executor().submit(() -> dispatcher.readMoreEntries());
                log.info("[{}] Dispatcher is unblocked", (Object)dispatcher.getName());
                this.blockedDispatchers.remove(dispatcher);
            });
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    private static class ConfigField {
        final Field field;
        Predicate<String> validator;

        public ConfigField(Field field) {
            this.field = field;
        }
    }
}

