package org.apache.iotdb.db.subscription.broker;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.subscription.timer.SubscriptionPollTimer;
import org.apache.iotdb.pipe.api.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/subscription/broker/SubscriptionBroker.class */
public class SubscriptionBroker {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionBroker.class);
    private final String brokerId;
    private final Map<String, SubscriptionPrefetchingQueue> topicNameToPrefetchingQueue = new ConcurrentHashMap();

    public SubscriptionBroker(String str) {
        this.brokerId = str;
    }

    public boolean isEmpty() {
        return this.topicNameToPrefetchingQueue.isEmpty();
    }

    public List<SerializedEnrichedEvent> poll(Set<String> set, SubscriptionPollTimer subscriptionPollTimer) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, SubscriptionPrefetchingQueue> entry : this.topicNameToPrefetchingQueue.entrySet()) {
            String key = entry.getKey();
            SubscriptionPrefetchingQueue value = entry.getValue();
            if (set.contains(key)) {
                SerializedEnrichedEvent poll = value.poll(subscriptionPollTimer);
                if (Objects.nonNull(poll)) {
                    arrayList.add(poll);
                }
                subscriptionPollTimer.update();
                if (subscriptionPollTimer.isExpired()) {
                    break;
                }
            }
        }
        return arrayList;
    }

    public void commit(Map<String, List<String>> map) {
        for (Map.Entry<String, List<String>> entry : map.entrySet()) {
            String key = entry.getKey();
            SubscriptionPrefetchingQueue subscriptionPrefetchingQueue = this.topicNameToPrefetchingQueue.get(key);
            if (Objects.isNull(subscriptionPrefetchingQueue)) {
                LOGGER.warn("Subscription: prefetching queue bound to topic [{}] does not exist", key);
            } else {
                subscriptionPrefetchingQueue.commit(entry.getValue());
            }
        }
    }

    public void bindPrefetchingQueue(String str, BoundedBlockingPendingQueue<Event> boundedBlockingPendingQueue) {
        if (Objects.nonNull(this.topicNameToPrefetchingQueue.get(str))) {
            LOGGER.warn("Subscription: prefetching queue bound to topic [{}] has already existed", str);
        } else {
            this.topicNameToPrefetchingQueue.put(str, new SubscriptionPrefetchingQueue(this.brokerId, str, boundedBlockingPendingQueue));
        }
    }

    public void unbindPrefetchingQueue(String str) {
        if (Objects.isNull(this.topicNameToPrefetchingQueue.get(str))) {
            LOGGER.warn("Subscription: prefetching queue bound to topic [{}] does not exist", str);
        } else {
            this.topicNameToPrefetchingQueue.remove(str);
        }
    }

    public void executePrefetch(String str) {
        SubscriptionPrefetchingQueue subscriptionPrefetchingQueue = this.topicNameToPrefetchingQueue.get(str);
        if (Objects.isNull(subscriptionPrefetchingQueue)) {
            LOGGER.warn("Subscription: prefetching queue bound to topic [{}] does not exist", str);
        } else {
            subscriptionPrefetchingQueue.executePrefetch();
        }
    }
}
