package org.apache.iotdb.db.pipe.task.connection;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.class */
public class PipeEventCollector implements EventCollector, AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeEventCollector.class);
    private final BoundedBlockingPendingQueue<Event> pendingQueue;
    private final long creationTime;
    private final int regionId;
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
    private final EnrichedDeque<Event> bufferQueue = new EnrichedDeque<>(new LinkedList());

    public PipeEventCollector(BoundedBlockingPendingQueue<Event> boundedBlockingPendingQueue, long j, int i) {
        this.pendingQueue = boundedBlockingPendingQueue;
        this.creationTime = j;
        this.regionId = i;
    }

    public void collect(Event event) {
        try {
            if (event instanceof PipeInsertNodeTabletInsertionEvent) {
                parseAndCollectEvent((PipeInsertNodeTabletInsertionEvent) event);
            } else if (event instanceof PipeRawTabletInsertionEvent) {
                parseAndCollectEvent((PipeRawTabletInsertionEvent) event);
            } else if (event instanceof PipeTsFileInsertionEvent) {
                parseAndCollectEvent((PipeTsFileInsertionEvent) event);
            } else {
                collectEvent(event);
            }
        } catch (PipeException e) {
            throw e;
        } catch (Exception e2) {
            throw new PipeException("Error occurred when collecting events from processor.", e2);
        }
    }

    private void parseAndCollectEvent(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) {
        if (!pipeInsertNodeTabletInsertionEvent.shouldParseTimeOrPattern()) {
            collectEvent(pipeInsertNodeTabletInsertionEvent);
            return;
        }
        Iterator<PipeRawTabletInsertionEvent> it = pipeInsertNodeTabletInsertionEvent.toRawTabletInsertionEvents().iterator();
        while (it.hasNext()) {
            collectEvent(it.next());
        }
    }

    private void parseAndCollectEvent(PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) {
        if (!pipeRawTabletInsertionEvent.shouldParseTimeOrPattern()) {
            collectEvent(pipeRawTabletInsertionEvent);
            return;
        }
        PipeRawTabletInsertionEvent parseEventWithPatternOrTime = pipeRawTabletInsertionEvent.parseEventWithPatternOrTime();
        if (parseEventWithPatternOrTime.hasNoNeedParsingAndIsEmpty()) {
            return;
        }
        collectEvent(parseEventWithPatternOrTime);
    }

    private void parseAndCollectEvent(PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws Exception {
        if (!pipeTsFileInsertionEvent.waitForTsFileClose()) {
            LOGGER.warn("Pipe skipping temporary TsFile which shouldn't be transferred: {}", pipeTsFileInsertionEvent.getTsFile());
            return;
        }
        if (!pipeTsFileInsertionEvent.shouldParseTimeOrPattern()) {
            collectEvent(pipeTsFileInsertionEvent);
            return;
        }
        try {
            Iterator<TabletInsertionEvent> it = pipeTsFileInsertionEvent.toTabletInsertionEvents().iterator();
            while (it.hasNext()) {
                collectEvent(it.next());
            }
        } finally {
            pipeTsFileInsertionEvent.close();
        }
    }

    private synchronized void collectEvent(Event event) {
        this.collectInvocationCount.incrementAndGet();
        if (event instanceof EnrichedEvent) {
            ((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName());
            PipeEventCommitManager.getInstance().enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, this.creationTime, this.regionId);
        }
        if (event instanceof PipeHeartbeatEvent) {
            ((PipeHeartbeatEvent) event).recordBufferQueueSize(this.bufferQueue);
            ((PipeHeartbeatEvent) event).recordConnectorQueueSize(this.pendingQueue);
        }
        while (!this.isClosed.get() && !this.bufferQueue.isEmpty()) {
            if (!this.pendingQueue.waitedOffer(this.bufferQueue.peek())) {
                if ((event instanceof PipeHeartbeatEvent) && (this.bufferQueue.peekLast() instanceof PipeHeartbeatEvent)) {
                    ((EnrichedEvent) event).decreaseReferenceCount(PipeEventCollector.class.getName(), false);
                    return;
                } else {
                    this.bufferQueue.offer(event);
                    return;
                }
            }
            this.bufferQueue.poll();
        }
        if (this.pendingQueue.waitedOffer(event)) {
            return;
        }
        this.bufferQueue.offer(event);
    }

    public void resetCollectInvocationCount() {
        this.collectInvocationCount.set(0);
    }

    public boolean hasNoCollectInvocationAfterReset() {
        return this.collectInvocationCount.get() == 0;
    }

    public boolean isBufferQueueEmpty() {
        return this.bufferQueue.isEmpty();
    }

    public synchronized boolean tryCollectBufferedEvents() {
        while (!this.isClosed.get() && !this.bufferQueue.isEmpty()) {
            if (!this.pendingQueue.waitedOffer(this.bufferQueue.peek())) {
                return true;
            }
            this.bufferQueue.poll();
        }
        return false;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.isClosed.set(true);
        doClose();
    }

    private synchronized void doClose() {
        this.bufferQueue.forEach(event -> {
            if (event instanceof EnrichedEvent) {
                ((EnrichedEvent) event).clearReferenceCount(PipeEventCollector.class.getName());
            }
        });
        this.bufferQueue.clear();
    }

    public int getTabletInsertionEventCount() {
        return this.bufferQueue.getTabletInsertionEventCount();
    }

    public int getTsFileInsertionEventCount() {
        return this.bufferQueue.getTsFileInsertionEventCount();
    }

    public int getPipeHeartbeatEventCount() {
        return this.bufferQueue.getPipeHeartbeatEventCount();
    }
}
