/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.subscription.broker;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.subscription.broker.SerializedEnrichedEvent;
import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingQueue;
import org.apache.iotdb.db.subscription.timer.SubscriptionPollTimer;
import org.apache.iotdb.pipe.api.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriptionBroker {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionBroker.class);
    private final String brokerId;
    private final Map<String, SubscriptionPrefetchingQueue> topicNameToPrefetchingQueue;

    public SubscriptionBroker(String brokerId) {
        this.brokerId = brokerId;
        this.topicNameToPrefetchingQueue = new ConcurrentHashMap<String, SubscriptionPrefetchingQueue>();
    }

    public boolean isEmpty() {
        return this.topicNameToPrefetchingQueue.isEmpty();
    }

    public List<SerializedEnrichedEvent> poll(Set<String> topicNames, SubscriptionPollTimer timer) {
        ArrayList<SerializedEnrichedEvent> events = new ArrayList<SerializedEnrichedEvent>();
        for (Map.Entry<String, SubscriptionPrefetchingQueue> entry : this.topicNameToPrefetchingQueue.entrySet()) {
            String topicName = entry.getKey();
            SubscriptionPrefetchingQueue prefetchingQueue = entry.getValue();
            if (!topicNames.contains(topicName)) continue;
            SerializedEnrichedEvent event = prefetchingQueue.poll(timer);
            if (Objects.nonNull(event)) {
                events.add(event);
            }
            timer.update();
            if (!timer.isExpired()) continue;
            break;
        }
        return events;
    }

    public void commit(Map<String, List<String>> topicNameToSubscriptionCommitIds) {
        for (Map.Entry<String, List<String>> entry : topicNameToSubscriptionCommitIds.entrySet()) {
            String topicName = entry.getKey();
            SubscriptionPrefetchingQueue prefetchingQueue = this.topicNameToPrefetchingQueue.get(topicName);
            if (Objects.isNull(prefetchingQueue)) {
                LOGGER.warn("Subscription: prefetching queue bound to topic [{}] does not exist", (Object)topicName);
                continue;
            }
            prefetchingQueue.commit(entry.getValue());
        }
    }

    public void bindPrefetchingQueue(String topicName, BoundedBlockingPendingQueue<Event> inputPendingQueue) {
        SubscriptionPrefetchingQueue prefetchingQueue = this.topicNameToPrefetchingQueue.get(topicName);
        if (Objects.nonNull(prefetchingQueue)) {
            LOGGER.warn("Subscription: prefetching queue bound to topic [{}] has already existed", (Object)topicName);
            return;
        }
        this.topicNameToPrefetchingQueue.put(topicName, new SubscriptionPrefetchingQueue(this.brokerId, topicName, inputPendingQueue));
    }

    public void unbindPrefetchingQueue(String topicName) {
        SubscriptionPrefetchingQueue prefetchingQueue = this.topicNameToPrefetchingQueue.get(topicName);
        if (Objects.isNull(prefetchingQueue)) {
            LOGGER.warn("Subscription: prefetching queue bound to topic [{}] does not exist", (Object)topicName);
            return;
        }
        this.topicNameToPrefetchingQueue.remove(topicName);
    }

    public void executePrefetch(String topicName) {
        SubscriptionPrefetchingQueue prefetchingQueue = this.topicNameToPrefetchingQueue.get(topicName);
        if (Objects.isNull(prefetchingQueue)) {
            LOGGER.warn("Subscription: prefetching queue bound to topic [{}] does not exist", (Object)topicName);
            return;
        }
        prefetchingQueue.executePrefetch();
    }
}

