package org.apache.iotdb.db.pipe.extractor.realtime;

import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.class */
public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegionExtractor {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
    private final UnboundedBlockingPendingQueue<Event> pendingQueue = new UnboundedBlockingPendingQueue<>();

    @Override // org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor
    public void extract(PipeRealtimeEvent pipeRealtimeEvent) {
        Event event = pipeRealtimeEvent.getEvent();
        if (event instanceof TabletInsertionEvent) {
            extractTabletInsertion(pipeRealtimeEvent);
        } else {
            if (!(event instanceof TsFileInsertionEvent)) {
                throw new UnsupportedOperationException(String.format("Unsupported event type %s for hybrid realtime extractor %s", event.getClass(), this));
            }
            extractTsFileInsertion(pipeRealtimeEvent);
        }
    }

    @Override // org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor
    public boolean isNeedListenToTsFile() {
        return true;
    }

    @Override // org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor
    public boolean isNeedListenToInsertNode() {
        return true;
    }

    private void extractTabletInsertion(PipeRealtimeEvent pipeRealtimeEvent) {
        if (isApproachingCapacity()) {
            pipeRealtimeEvent.getTsFileEpoch().migrateState(this, state -> {
                return TsFileEpoch.State.USING_TSFILE;
            });
        }
        TsFileEpoch.State state2 = pipeRealtimeEvent.getTsFileEpoch().getState(this);
        switch (state2) {
            case USING_TSFILE:
                pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
                return;
            case EMPTY:
            case USING_TABLET:
                if (this.pendingQueue.waitedOffer(pipeRealtimeEvent)) {
                    return;
                }
                String format = String.format("extractTabletInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s has reached capacity, discard tablet event %s, current state %s", this, pipeRealtimeEvent, pipeRealtimeEvent.getTsFileEpoch().getState(this));
                LOGGER.error(format);
                PipeAgent.runtime().report(this.pipeTaskMeta, new PipeRuntimeNonCriticalException(format));
                pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
                return;
            default:
                throw new UnsupportedOperationException(String.format("Unsupported state %s for hybrid realtime extractor %s", state2, PipeRealtimeDataRegionHybridExtractor.class.getName()));
        }
    }

    private void extractTsFileInsertion(PipeRealtimeEvent pipeRealtimeEvent) {
        pipeRealtimeEvent.getTsFileEpoch().migrateState(this, state -> {
            return state.equals(TsFileEpoch.State.EMPTY) ? TsFileEpoch.State.USING_TSFILE : state;
        });
        TsFileEpoch.State state2 = pipeRealtimeEvent.getTsFileEpoch().getState(this);
        switch (state2) {
            case USING_TSFILE:
            case EMPTY:
                if (this.pendingQueue.waitedOffer(pipeRealtimeEvent)) {
                    return;
                }
                String format = String.format("extractTsFileInsertion: pending queue of PipeRealtimeDataRegionHybridExtractor %s has reached capacity, discard TsFile event %s, current state %s", this, pipeRealtimeEvent, pipeRealtimeEvent.getTsFileEpoch().getState(this));
                LOGGER.error(format);
                PipeAgent.runtime().report(this.pipeTaskMeta, new PipeRuntimeNonCriticalException(format));
                pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
                return;
            case USING_TABLET:
                pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
                return;
            default:
                throw new UnsupportedOperationException(String.format("Unsupported state %s for hybrid realtime extractor %s", state2, PipeRealtimeDataRegionHybridExtractor.class.getName()));
        }
    }

    private boolean isApproachingCapacity() {
        return this.pendingQueue.size() >= PipeConfig.getInstance().getPipeExtractorPendingQueueTabletLimit();
    }

    public Event supply() {
        Event supplyTsFileInsertion;
        Event directPoll = this.pendingQueue.directPoll();
        while (true) {
            PipeRealtimeEvent pipeRealtimeEvent = (PipeRealtimeEvent) directPoll;
            if (pipeRealtimeEvent == null) {
                return null;
            }
            Event event = pipeRealtimeEvent.getEvent();
            if (event instanceof TabletInsertionEvent) {
                supplyTsFileInsertion = supplyTabletInsertion(pipeRealtimeEvent);
            } else {
                if (!(event instanceof TsFileInsertionEvent)) {
                    throw new UnsupportedOperationException(String.format("Unsupported event type %s for hybrid realtime extractor %s to supply.", event.getClass(), this));
                }
                supplyTsFileInsertion = supplyTsFileInsertion(pipeRealtimeEvent);
            }
            pipeRealtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
            if (supplyTsFileInsertion != null) {
                return supplyTsFileInsertion;
            }
            directPoll = this.pendingQueue.directPoll();
        }
    }

    private Event supplyTabletInsertion(PipeRealtimeEvent pipeRealtimeEvent) {
        pipeRealtimeEvent.getTsFileEpoch().migrateState(this, state -> {
            return state.equals(TsFileEpoch.State.EMPTY) ? TsFileEpoch.State.USING_TABLET : state;
        });
        if (!pipeRealtimeEvent.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TABLET)) {
            return null;
        }
        if (pipeRealtimeEvent.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) {
            return pipeRealtimeEvent.getEvent();
        }
        pipeRealtimeEvent.getTsFileEpoch().migrateState(this, state2 -> {
            return TsFileEpoch.State.USING_TSFILE;
        });
        LOGGER.warn("Discard tablet event {} because it is not reliable anymore. Change the state of TsFileEpoch to USING_TSFILE.", pipeRealtimeEvent);
        return null;
    }

    private Event supplyTsFileInsertion(PipeRealtimeEvent pipeRealtimeEvent) {
        pipeRealtimeEvent.getTsFileEpoch().migrateState(this, state -> {
            if (!state.equals(TsFileEpoch.State.EMPTY)) {
                return state;
            }
            LOGGER.error(String.format("EMPTY TsFileEpoch when supplying TsFile Event %s", pipeRealtimeEvent));
            return TsFileEpoch.State.USING_TSFILE;
        });
        if (!pipeRealtimeEvent.getTsFileEpoch().getState(this).equals(TsFileEpoch.State.USING_TSFILE)) {
            return null;
        }
        if (pipeRealtimeEvent.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName())) {
            return pipeRealtimeEvent.getEvent();
        }
        String format = String.format("TsFile Event %s can not be supplied because the reference count can not be increased, the data represented by this event is lost", pipeRealtimeEvent.getEvent());
        LOGGER.error(format);
        PipeAgent.runtime().report(this.pipeTaskMeta, new PipeRuntimeNonCriticalException(format));
        return null;
    }

    @Override // org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor
    public void close() throws Exception {
        super.close();
        this.pendingQueue.clear();
    }
}
