package org.apache.iotdb.db.subscription.broker;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.subscription.timer.SubscriptionPollTimer;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.rpc.subscription.payload.EnrichedTablets;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.class */
public class SubscriptionPrefetchingQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionBroker.class);
    private final String brokerId;
    private final String topicName;
    private final BoundedBlockingPendingQueue<Event> inputPendingQueue;
    private final AtomicLong subscriptionCommitIdGenerator = new AtomicLong(0);
    private final Map<String, SerializedEnrichedEvent> uncommittedEvents = new ConcurrentHashMap();
    private final LinkedBlockingQueue<SerializedEnrichedEvent> prefetchingQueue = new LinkedBlockingQueue<>();

    public SubscriptionPrefetchingQueue(String str, String str2, BoundedBlockingPendingQueue<Event> boundedBlockingPendingQueue) {
        this.brokerId = str;
        this.topicName = str2;
        this.inputPendingQueue = boundedBlockingPendingQueue;
    }

    public SerializedEnrichedEvent poll(SubscriptionPollTimer subscriptionPollTimer) {
        if (this.prefetchingQueue.isEmpty()) {
            prefetchOnce(SubscriptionConfig.getInstance().getSubscriptionMaxTabletsPerPrefetching());
        }
        while (true) {
            try {
                SerializedEnrichedEvent poll = this.prefetchingQueue.poll(SubscriptionConfig.getInstance().getSubscriptionPollMaxBlockingTimeMs(), TimeUnit.MILLISECONDS);
                if (!Objects.nonNull(poll)) {
                    return null;
                }
                if (!poll.isCommitted()) {
                    this.prefetchingQueue.add(poll);
                    subscriptionPollTimer.update();
                    if (subscriptionPollTimer.isExpired()) {
                        return null;
                    }
                    if (poll.pollable()) {
                        poll.recordLastPolledTimestamp();
                        return poll;
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOGGER.warn("Subscription: Interrupted while polling events.", e);
                return null;
            }
        }
    }

    public void commit(List<String> list) {
        for (String str : list) {
            SerializedEnrichedEvent serializedEnrichedEvent = this.uncommittedEvents.get(str);
            if (Objects.isNull(serializedEnrichedEvent)) {
                LOGGER.warn("Subscription: subscription commit id [{}] does not exist, it may have been committed or something unexpected happened", str);
            } else {
                serializedEnrichedEvent.decreaseReferenceCount();
                serializedEnrichedEvent.recordCommittedTimestamp();
                this.uncommittedEvents.remove(str);
            }
        }
    }

    public void executePrefetch() {
        prefetchOnce(SubscriptionConfig.getInstance().getSubscriptionMaxTabletsPerPrefetching());
        serializeOnce();
    }

    private void prefetchOnce(long j) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        while (true) {
            PipeTsFileInsertionEvent maybeOf = UserDefinedEnrichedEvent.maybeOf(this.inputPendingQueue.waitedPoll());
            if (!Objects.nonNull(maybeOf)) {
                break;
            }
            if (!(maybeOf instanceof EnrichedEvent)) {
                LOGGER.warn("Subscription: Only support prefetch EnrichedEvent. Ignore {}.", maybeOf);
            } else if (maybeOf instanceof TabletInsertionEvent) {
                List<Tablet> convertToTablets = convertToTablets((TabletInsertionEvent) maybeOf);
                if (!Objects.isNull(convertToTablets) && !convertToTablets.isEmpty()) {
                    arrayList.addAll(convertToTablets);
                    arrayList2.add(maybeOf);
                    if (arrayList.size() >= j) {
                        break;
                    }
                }
            } else if (maybeOf instanceof PipeTsFileInsertionEvent) {
                Iterator<TabletInsertionEvent> it = maybeOf.toTabletInsertionEvents().iterator();
                while (it.hasNext()) {
                    List<Tablet> convertToTablets2 = convertToTablets(it.next());
                    if (!Objects.isNull(convertToTablets2) && !convertToTablets2.isEmpty()) {
                        arrayList.addAll(convertToTablets2);
                    }
                }
                arrayList2.add(maybeOf);
                if (arrayList.size() >= j) {
                    break;
                }
            } else {
                LOGGER.warn("Subscription: Ignore EnrichedEvent {} when prefetching.", maybeOf);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        String generateSubscriptionCommitId = generateSubscriptionCommitId();
        SerializedEnrichedEvent serializedEnrichedEvent = new SerializedEnrichedEvent(new EnrichedTablets(this.topicName, arrayList, generateSubscriptionCommitId), arrayList2);
        this.uncommittedEvents.put(generateSubscriptionCommitId, serializedEnrichedEvent);
        this.prefetchingQueue.add(serializedEnrichedEvent);
    }

    private void serializeOnce() {
        long size = this.prefetchingQueue.size();
        long j = 0;
        while (true) {
            try {
                SerializedEnrichedEvent poll = this.prefetchingQueue.poll(SubscriptionConfig.getInstance().getSubscriptionSerializeMaxBlockingTimeMs(), TimeUnit.MILLISECONDS);
                if (!Objects.nonNull(poll)) {
                    break;
                }
                if (!poll.isCommitted()) {
                    this.prefetchingQueue.add(poll);
                    if (j >= size) {
                        break;
                    }
                    j++;
                    if (poll.pollable()) {
                        poll.serialize();
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOGGER.warn("Subscription: Interrupted while serializing events.", e);
                return;
            }
        }
    }

    private List<Tablet> convertToTablets(TabletInsertionEvent tabletInsertionEvent) {
        if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
            return ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).convertToTablets();
        }
        if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
            return Collections.singletonList(((PipeRawTabletInsertionEvent) tabletInsertionEvent).convertToTablet());
        }
        LOGGER.warn("Subscription: Only support convert PipeInsertNodeTabletInsertionEvent or PipeRawTabletInsertionEvent to tablet. Ignore {}.", tabletInsertionEvent);
        return Collections.emptyList();
    }

    private String generateSubscriptionCommitId() {
        return IoTDBDescriptor.getInstance().getConfig().getDataNodeId() + "#" + PipeAgent.runtime().getRebootTimes() + "#" + this.topicName + "_" + this.brokerId + "#" + this.subscriptionCommitIdGenerator.getAndIncrement();
    }
}
