package org.apache.pulsar.broker.service;

import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
import java.util.regex.Pattern;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.NotificationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/TopicListService.class */
public class TopicListService {
    private static final Logger log = LoggerFactory.getLogger(TopicListService.class);
    private final NamespaceService namespaceService;
    private final TopicResources topicResources;
    private final ServerCnx connection;
    private final boolean enableSubscriptionPatternEvaluation;
    private final int maxSubscriptionPatternLength;
    private final ConcurrentLongHashMap<CompletableFuture<TopicListWatcher>> watchers = ConcurrentLongHashMap.newBuilder().expectedItems(8).concurrencyLevel(1).build();

    /* loaded from: input_file:org/apache/pulsar/broker/service/TopicListService$TopicListWatcher.class */
    public static class TopicListWatcher implements BiConsumer<String, NotificationType> {
        private final List<String> matchingTopics;
        private final TopicListService topicListService;
        private final long id;
        private final Pattern topicsPattern;

        public TopicListWatcher(TopicListService topicListService, long j, Pattern pattern, List<String> list) {
            this.topicListService = topicListService;
            this.id = j;
            this.topicsPattern = pattern;
            this.matchingTopics = TopicList.filterTopics(list, pattern);
        }

        public List<String> getMatchingTopics() {
            return this.matchingTopics;
        }

        @Override // java.util.function.BiConsumer
        public void accept(String str, NotificationType notificationType) {
            List<String> emptyList;
            List<String> singletonList;
            if (this.topicsPattern.matcher(str).matches()) {
                if (notificationType == NotificationType.Deleted) {
                    singletonList = Collections.emptyList();
                    emptyList = Collections.singletonList(str);
                    this.matchingTopics.remove(str);
                } else {
                    emptyList = Collections.emptyList();
                    singletonList = Collections.singletonList(str);
                    this.matchingTopics.add(str);
                }
                this.topicListService.sendTopicListUpdate(this.id, TopicList.calculateHash(this.matchingTopics), emptyList, singletonList);
            }
        }
    }

    public TopicListService(PulsarService pulsarService, ServerCnx serverCnx, boolean z, int i) {
        this.namespaceService = pulsarService.getNamespaceService();
        this.connection = serverCnx;
        this.enableSubscriptionPatternEvaluation = z;
        this.maxSubscriptionPatternLength = i;
        this.topicResources = pulsarService.getPulsarResources().getTopicResources();
    }

    public void inactivate() {
        Iterator it = new HashSet(this.watchers.keys()).iterator();
        while (it.hasNext()) {
            deleteTopicListWatcher((Long) it.next());
        }
    }

    public void handleWatchTopicList(NamespaceName namespaceName, long j, long j2, Pattern pattern, String str, Semaphore semaphore) {
        ServerError serverError;
        if (!this.enableSubscriptionPatternEvaluation || pattern.pattern().length() > this.maxSubscriptionPatternLength) {
            String str2 = !this.enableSubscriptionPatternEvaluation ? "Unable to create topic list watcher: " + "Evaluating subscription patterns is disabled." : "Unable to create topic list watcher: " + "Pattern longer than maximum: " + this.maxSubscriptionPatternLength;
            log.warn("[{}] {} on namespace {}", new Object[]{this.connection.getRemoteAddress(), str2, namespaceName});
            this.connection.getCommandSender().sendErrorResponse(j2, ServerError.NotAllowedError, str2);
            semaphore.release();
            return;
        }
        CompletableFuture<TopicListWatcher> completableFuture = new CompletableFuture<>();
        CompletableFuture<TopicListWatcher> completableFuture2 = (CompletableFuture) this.watchers.putIfAbsent(j, completableFuture);
        if (completableFuture2 == null) {
            initializeTopicsListWatcher(completableFuture, namespaceName, j, pattern);
        } else {
            if (!completableFuture2.isDone() || completableFuture2.isCompletedExceptionally()) {
                log.warn("[{}] Watcher with id is already present on the connection, consumerId={}", this.connection.getRemoteAddress(), Long.valueOf(j));
                if (completableFuture2.isDone()) {
                    serverError = ServerError.UnknownError;
                    this.watchers.remove(j, completableFuture2);
                } else {
                    serverError = ServerError.ServiceNotReady;
                }
                this.connection.getCommandSender().sendErrorResponse(j2, serverError, "Topic list watcher is already present on the connection");
                semaphore.release();
                return;
            }
            log.info("[{}] Watcher with the same id is already created: watcherId={}, watcher={}", new Object[]{this.connection.getRemoteAddress(), Long.valueOf(j), completableFuture2.getNow(null)});
            completableFuture = completableFuture2;
        }
        CompletableFuture<TopicListWatcher> completableFuture3 = completableFuture;
        completableFuture3.thenAccept(topicListWatcher -> {
            List<String> matchingTopics = topicListWatcher.getMatchingTopics();
            String calculateHash = TopicList.calculateHash(matchingTopics);
            if (calculateHash.equals(str)) {
                matchingTopics = Collections.emptyList();
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received WatchTopicList for namespace [//{}] by {}", new Object[]{this.connection.getRemoteAddress(), namespaceName, Long.valueOf(j2)});
            }
            this.connection.getCommandSender().sendWatchTopicListSuccess(j2, j, calculateHash, matchingTopics);
            semaphore.release();
        }).exceptionally(th -> {
            log.warn("[{}] Error WatchTopicList for namespace [//{}] by {}", new Object[]{this.connection.getRemoteAddress(), namespaceName, Long.valueOf(j2)});
            this.connection.getCommandSender().sendErrorResponse(j2, BrokerServiceException.getClientErrorCode(new BrokerServiceException.ServerMetadataException(th)), th.getMessage());
            this.watchers.remove(j, completableFuture3);
            semaphore.release();
            return null;
        });
    }

    public void initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> completableFuture, NamespaceName namespaceName, long j, Pattern pattern) {
        this.namespaceService.getListOfPersistentTopics(namespaceName).thenApply(list -> {
            TopicListWatcher topicListWatcher = new TopicListWatcher(this, j, pattern, list);
            this.topicResources.registerPersistentTopicListener(namespaceName, topicListWatcher);
            return topicListWatcher;
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (topicListWatcher, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
            } else {
                completableFuture.complete(topicListWatcher);
            }
        });
    }

    public void handleWatchTopicListClose(CommandWatchTopicListClose commandWatchTopicListClose) {
        long requestId = commandWatchTopicListClose.getRequestId();
        deleteTopicListWatcher(Long.valueOf(commandWatchTopicListClose.getWatcherId()));
        this.connection.getCommandSender().sendSuccess(requestId);
    }

    public void deleteTopicListWatcher(Long l) {
        CompletableFuture completableFuture = (CompletableFuture) this.watchers.get(l.longValue());
        if (completableFuture == null) {
            log.info("[{}] TopicListWatcher was not registered on the connection: {}", l, this.connection.getRemoteAddress());
            return;
        }
        if (!completableFuture.isDone() && completableFuture.completeExceptionally(new IllegalStateException("Closed watcher before creation was complete"))) {
            log.info("[{}] Closed watcher before its creation was completed. watcherId={}", this.connection.getRemoteAddress(), l);
            this.watchers.remove(l.longValue());
        } else if (completableFuture.isCompletedExceptionally()) {
            log.info("[{}] Closed watcher that already failed to be created. watcherId={}", this.connection.getRemoteAddress(), l);
            this.watchers.remove(l.longValue());
        } else {
            this.topicResources.deregisterPersistentTopicListener((BiConsumer) completableFuture.getNow(null));
            this.watchers.remove(l.longValue());
            log.info("[{}] Closed watcher, watcherId={}", this.connection.getRemoteAddress(), l);
        }
    }

    public void sendTopicListUpdate(long j, String str, List<String> list, List<String> list2) {
        this.connection.getCommandSender().sendWatchTopicListUpdate(j, list2, list, str);
    }
}
