package org.apache.iotdb.db.pipe.extractor.realtime;

import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.class */
public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionExtractor {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeDataRegionLogExtractor.class);

    @Override // org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor
    protected void doExtract(PipeRealtimeEvent pipeRealtimeEvent) {
        EnrichedEvent event = pipeRealtimeEvent.getEvent();
        if (event instanceof TabletInsertionEvent) {
            extractTabletInsertion(pipeRealtimeEvent);
        } else if (event instanceof TsFileInsertionEvent) {
            extractTsFileInsertion(pipeRealtimeEvent);
        } else {
            if (!(event instanceof PipeHeartbeatEvent)) {
                throw new UnsupportedOperationException(String.format("Unsupported event type %s for log realtime extractor %s", event.getClass(), this));
            }
            extractHeartbeat(pipeRealtimeEvent);
        }
    }

    private void extractTabletInsertion(PipeRealtimeEvent pipeRealtimeEvent) {
        pipeRealtimeEvent.getTsFileEpoch().migrateState(this, state -> {
            return TsFileEpoch.State.USING_TABLET;
        });
        if (this.pendingQueue.waitedOffer(pipeRealtimeEvent)) {
            return;
        }
        String format = String.format("extract: pending queue of PipeRealtimeDataRegionLogExtractor %s has reached capacity, discard tablet event %s, current state %s", this, pipeRealtimeEvent, pipeRealtimeEvent.getTsFileEpoch().getState(this));
        LOGGER.error(format);
        PipeAgent.runtime().report(this.pipeTaskMeta, new PipeRuntimeNonCriticalException(format));
        pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false);
    }

    private void extractTsFileInsertion(PipeRealtimeEvent pipeRealtimeEvent) {
        if (!((PipeTsFileInsertionEvent) pipeRealtimeEvent.getEvent()).getIsLoaded()) {
            pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false);
            return;
        }
        pipeRealtimeEvent.getTsFileEpoch().migrateState(this, state -> {
            return TsFileEpoch.State.USING_TSFILE;
        });
        if (this.pendingQueue.waitedOffer(pipeRealtimeEvent)) {
            return;
        }
        String format = String.format("extract: pending queue of PipeRealtimeDataRegionLogExtractor %s has reached capacity, discard loaded tsFile event %s, current state %s", this, pipeRealtimeEvent, pipeRealtimeEvent.getTsFileEpoch().getState(this));
        LOGGER.error(format);
        PipeAgent.runtime().report(this.pipeTaskMeta, new PipeRuntimeNonCriticalException(format));
        pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false);
    }

    private void extractHeartbeat(PipeRealtimeEvent pipeRealtimeEvent) {
        ((PipeHeartbeatEvent) pipeRealtimeEvent.getEvent()).recordExtractorQueueSize(this.pendingQueue);
        Event peekLast = this.pendingQueue.peekLast();
        if ((peekLast instanceof PipeRealtimeEvent) && (((PipeRealtimeEvent) peekLast).getEvent() instanceof PipeHeartbeatEvent) && (((PipeHeartbeatEvent) ((PipeRealtimeEvent) peekLast).getEvent()).isShouldPrintMessage() || !((PipeHeartbeatEvent) pipeRealtimeEvent.getEvent()).isShouldPrintMessage())) {
            pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false);
        } else {
            if (this.pendingQueue.waitedOffer(pipeRealtimeEvent)) {
                return;
            }
            LOGGER.error("extract: pending queue of PipeRealtimeDataRegionLogExtractor {} has reached capacity, discard heartbeat event {}", this, pipeRealtimeEvent);
            pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false);
        }
    }

    @Override // org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor
    public boolean isNeedListenToTsFile() {
        return true;
    }

    @Override // org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor
    public boolean isNeedListenToInsertNode() {
        return true;
    }

    public Event supply() {
        Event directPoll = this.pendingQueue.directPoll();
        while (true) {
            PipeRealtimeEvent pipeRealtimeEvent = (PipeRealtimeEvent) directPoll;
            if (pipeRealtimeEvent == null) {
                return null;
            }
            EnrichedEvent enrichedEvent = null;
            if (pipeRealtimeEvent.increaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName())) {
                enrichedEvent = pipeRealtimeEvent.getEvent();
            } else {
                String format = String.format("Tablet Event %s can not be supplied because the reference count can not be increased, the data represented by this event is lost", pipeRealtimeEvent.getEvent());
                LOGGER.error(format);
                PipeAgent.runtime().report(this.pipeTaskMeta, new PipeRuntimeNonCriticalException(format));
            }
            pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName(), false);
            if (enrichedEvent != null) {
                return enrichedEvent;
            }
            directPoll = this.pendingQueue.directPoll();
        }
    }
}
