package org.apache.iotdb.db.pipe.extractor.dataregion;

import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.pipe.extractor.IoTDBExtractor;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.historical.PipeHistoricalDataRegionTsFileExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionFakeExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionLogExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionTsFileExtractor;
import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.class */
public class IoTDBDataRegionExtractor extends IoTDBExtractor {
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionExtractor.class);
    private PipeHistoricalDataRegionExtractor historicalExtractor;
    private PipeRealtimeDataRegionExtractor realtimeExtractor;
    private DataRegionWatermarkInjector watermarkInjector;
    private boolean hasNoExtractionNeed = true;

    public void validate(PipeParameterValidator pipeParameterValidator) throws Exception {
        super.validate(pipeParameterValidator);
        Pair<Boolean, Boolean> parseInsertionDeletionListeningOptionPair = DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(pipeParameterValidator.getParameters());
        if (((Boolean) parseInsertionDeletionListeningOptionPair.getLeft()).equals(false) && ((Boolean) parseInsertionDeletionListeningOptionPair.getRight()).equals(false)) {
            return;
        }
        this.hasNoExtractionNeed = false;
        if (((Boolean) parseInsertionDeletionListeningOptionPair.getLeft()).equals(true) && IoTDBDescriptor.getInstance().getConfig().getDataRegionConsensusProtocolClass().equals("org.apache.iotdb.consensus.ratis.RatisConsensus")) {
            throw new PipeException("The pipe cannot transfer data when data region is using ratis consensus.");
        }
        pipeParameterValidator.validateAttributeValueRange("extractor.pattern.format", true, new String[]{"prefix", "iotdb"}).validateAttributeValueRange("source.pattern.format", true, new String[]{"prefix", "iotdb"});
        validatePattern(PipePattern.parsePipePatternFromSourceParameters(pipeParameterValidator.getParameters()));
        pipeParameterValidator.validateAttributeValueRange("extractor.history.enable", true, new String[]{Boolean.TRUE.toString(), Boolean.FALSE.toString()}).validateAttributeValueRange("extractor.realtime.enable", true, new String[]{Boolean.TRUE.toString(), Boolean.FALSE.toString()}).validateAttributeValueRange("source.history.enable", true, new String[]{Boolean.TRUE.toString(), Boolean.FALSE.toString()}).validateAttributeValueRange("source.realtime.enable", true, new String[]{Boolean.TRUE.toString(), Boolean.FALSE.toString()}).validate(objArr -> {
            return ((Boolean) objArr[0]).booleanValue() || ((Boolean) objArr[1]).booleanValue();
        }, "Should not set both history.enable and realtime.enable to false.", new Object[]{Boolean.valueOf(pipeParameterValidator.getParameters().getBooleanOrDefault(Arrays.asList("extractor.history.enable", "source.history.enable"), true)), Boolean.valueOf(pipeParameterValidator.getParameters().getBooleanOrDefault(Arrays.asList("extractor.realtime.enable", "source.realtime.enable"), true))});
        if (pipeParameterValidator.getParameters().getBooleanOrDefault(Arrays.asList("extractor.realtime.enable", "source.realtime.enable"), true) || pipeParameterValidator.getParameters().hasAnyAttributes(new String[]{"source.start-time", "source.end-time"})) {
            pipeParameterValidator.validateAttributeValueRange(pipeParameterValidator.getParameters().hasAttribute("extractor.realtime.mode") ? "extractor.realtime.mode" : "source.realtime.mode", true, new String[]{"file", "hybrid", "log", "forced-log", "stream", "batch"});
        }
        if (pipeParameterValidator.getParameters().hasAnyAttributes(new String[]{"source.start-time", "source.end-time"}) && pipeParameterValidator.getParameters().hasAnyAttributes(new String[]{"extractor.history.enable", "extractor.realtime.enable", "source.history.enable", "source.realtime.enable"})) {
            LOGGER.warn("When {}, {}, {} or {} is specified, specifying {}, {}, {} and {} is invalid.", new Object[]{"source.start-time", "extractor.start-time", "source.end-time", "extractor.end-time", "source.history.start-time", "extractor.history.start-time", "source.history.end-time", "extractor.history.end-time"});
        }
        constructHistoricalExtractor();
        constructRealtimeExtractor(pipeParameterValidator.getParameters());
        this.historicalExtractor.validate(pipeParameterValidator);
        this.realtimeExtractor.validate(pipeParameterValidator);
    }

    private void validatePattern(PipePattern pipePattern) {
        if (!pipePattern.isLegal()) {
            throw new IllegalArgumentException(String.format("Pattern \"%s\" is illegal.", pipePattern));
        }
    }

    private void constructHistoricalExtractor() {
        this.historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor();
    }

    private void constructRealtimeExtractor(PipeParameters pipeParameters) throws IllegalPathException {
        if (!pipeParameters.getBooleanOrDefault(Arrays.asList("extractor.realtime.enable", "source.realtime.enable"), true)) {
            this.realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor();
            LOGGER.info("Pipe: '{}' is set to false, use fake realtime extractor.", "extractor.realtime.enable");
            return;
        }
        if (!pipeParameters.hasAnyAttributes(new String[]{"extractor.realtime.mode", "source.realtime.mode"})) {
            checkWalEnable(pipeParameters);
            this.realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
            LOGGER.info("Pipe: '{}' is not set, use hybrid mode by default.", "extractor.realtime.mode");
            return;
        }
        String stringByKeys = pipeParameters.getStringByKeys(new String[]{"extractor.realtime.mode", "source.realtime.mode"});
        boolean z = -1;
        switch (stringByKeys.hashCode()) {
            case -1202757124:
                if (stringByKeys.equals("hybrid")) {
                    z = 2;
                    break;
                }
                break;
            case -891990144:
                if (stringByKeys.equals("stream")) {
                    z = 4;
                    break;
                }
                break;
            case 107332:
                if (stringByKeys.equals("log")) {
                    z = 3;
                    break;
                }
                break;
            case 3143036:
                if (stringByKeys.equals("file")) {
                    z = false;
                    break;
                }
                break;
            case 93509434:
                if (stringByKeys.equals("batch")) {
                    z = true;
                    break;
                }
                break;
            case 137287248:
                if (stringByKeys.equals("forced-log")) {
                    z = 5;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
            case true:
                this.realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
                return;
            case true:
            case true:
            case true:
                checkWalEnable(pipeParameters);
                this.realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
                return;
            case true:
                checkWalEnable(pipeParameters);
                this.realtimeExtractor = new PipeRealtimeDataRegionLogExtractor();
                return;
            default:
                checkWalEnable(pipeParameters);
                this.realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor();
                if (LOGGER.isWarnEnabled()) {
                    LOGGER.warn("Pipe: Unsupported extractor realtime mode: {}, create a hybrid extractor.", pipeParameters.getStringByKeys(new String[]{"extractor.realtime.mode", "source.realtime.mode"}));
                    return;
                }
                return;
        }
    }

    private void checkWalEnable(PipeParameters pipeParameters) throws IllegalPathException {
        if (Boolean.TRUE.equals(DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(pipeParameters).getLeft()) && IoTDBDescriptor.getInstance().getConfig().getWalMode().equals(WALMode.DISABLE)) {
            throw new PipeException("The pipe cannot transfer realtime insertion if data region disables wal. Please set 'realtime.mode'='batch' in source parameters when enabling realtime transmission.");
        }
    }

    public void customize(PipeParameters pipeParameters, PipeExtractorRuntimeConfiguration pipeExtractorRuntimeConfiguration) throws Exception {
        if (this.hasNoExtractionNeed) {
            return;
        }
        super.customize(pipeParameters, pipeExtractorRuntimeConfiguration);
        this.historicalExtractor.customize(pipeParameters, pipeExtractorRuntimeConfiguration);
        this.realtimeExtractor.customize(pipeParameters, pipeExtractorRuntimeConfiguration);
        if (pipeParameters.hasAnyAttributes(new String[]{"extractor.watermark-interval-ms", "source.watermark-interval-ms"})) {
            long longOrDefault = pipeParameters.getLongOrDefault(Arrays.asList("extractor.watermark-interval-ms", "source.watermark-interval-ms"), -1L);
            if (longOrDefault > 0) {
                this.watermarkInjector = new DataRegionWatermarkInjector(this.regionId, longOrDefault);
                LOGGER.info("Pipe {}@{}: Set watermark injector with interval {} ms.", new Object[]{this.pipeName, Integer.valueOf(this.regionId), Long.valueOf(this.watermarkInjector.getInjectionIntervalInMs())});
            }
        }
        PipeExtractorMetrics.getInstance().register(this);
    }

    public void start() throws Exception {
        if (this.hasNoExtractionNeed || this.hasBeenStarted.get()) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        LOGGER.info("Pipe {}@{}: Starting historical extractor {} and realtime extractor {}.", new Object[]{this.pipeName, Integer.valueOf(this.regionId), this.historicalExtractor.getClass().getSimpleName(), this.realtimeExtractor.getClass().getSimpleName()});
        super.start();
        AtomicReference<Exception> atomicReference = new AtomicReference<>(null);
        DataRegionId dataRegionId = new DataRegionId(this.regionId);
        while (!StorageEngine.getInstance().runIfPresent(dataRegionId, dataRegion -> {
            dataRegion.writeLock(String.format("Pipe: starting %s", IoTDBDataRegionExtractor.class.getName()));
            try {
                startHistoricalExtractorAndRealtimeExtractor(atomicReference);
            } finally {
                dataRegion.writeUnlock();
            }
        }) && !StorageEngine.getInstance().runIfAbsent(dataRegionId, () -> {
            startHistoricalExtractorAndRealtimeExtractor(atomicReference);
        })) {
            rethrowExceptionIfAny(atomicReference);
        }
        rethrowExceptionIfAny(atomicReference);
        LOGGER.info("Pipe {}@{}: Started historical extractor {} and realtime extractor {} successfully within {} ms.", new Object[]{this.pipeName, Integer.valueOf(this.regionId), this.historicalExtractor.getClass().getSimpleName(), this.realtimeExtractor.getClass().getSimpleName(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
    }

    private void startHistoricalExtractorAndRealtimeExtractor(AtomicReference<Exception> atomicReference) {
        try {
            this.realtimeExtractor.start();
            this.historicalExtractor.start();
        } catch (Exception e) {
            atomicReference.set(e);
            LOGGER.warn("Pipe {}@{}: Start historical extractor {} and realtime extractor {} error.", new Object[]{this.pipeName, Integer.valueOf(this.regionId), this.historicalExtractor.getClass().getSimpleName(), this.realtimeExtractor.getClass().getSimpleName(), e});
        }
    }

    private void rethrowExceptionIfAny(AtomicReference<Exception> atomicReference) {
        if (atomicReference.get() != null) {
            throw new PipeException("failed to start extractors.", atomicReference.get());
        }
    }

    public Event supply() throws Exception {
        if (this.hasNoExtractionNeed) {
            return null;
        }
        Event event = null;
        if (this.historicalExtractor.hasConsumedAll()) {
            if (Objects.nonNull(this.watermarkInjector)) {
                event = this.watermarkInjector.inject();
            }
            if (Objects.isNull(event)) {
                event = this.realtimeExtractor.supply();
            }
        } else {
            event = this.historicalExtractor.supply();
        }
        if (Objects.nonNull(event)) {
            if (event instanceof TabletInsertionEvent) {
                PipeExtractorMetrics.getInstance().markTabletEvent(this.taskID);
            } else if (event instanceof TsFileInsertionEvent) {
                PipeExtractorMetrics.getInstance().markTsFileEvent(this.taskID);
            } else if (event instanceof PipeHeartbeatEvent) {
                PipeExtractorMetrics.getInstance().markPipeHeartbeatEvent(this.taskID);
            }
        }
        return event;
    }

    public void close() throws Exception {
        if (this.hasNoExtractionNeed || !this.hasBeenStarted.get()) {
            return;
        }
        this.historicalExtractor.close();
        this.realtimeExtractor.close();
        if (Objects.nonNull(this.taskID)) {
            PipeExtractorMetrics.getInstance().deregister(this.taskID);
        }
    }

    public boolean isStreamMode() {
        return (this.realtimeExtractor instanceof PipeRealtimeDataRegionHybridExtractor) || (this.realtimeExtractor instanceof PipeRealtimeDataRegionLogExtractor);
    }

    public boolean hasConsumedAllHistoricalTsFiles() {
        return this.historicalExtractor.hasConsumedAll();
    }

    public String getTaskID() {
        return this.taskID;
    }

    public String getPipeName() {
        return this.pipeName;
    }

    public int getDataRegionId() {
        return this.regionId;
    }

    public long getCreationTime() {
        return this.creationTime;
    }

    public int getHistoricalTsFileInsertionEventCount() {
        if (this.hasBeenStarted.get()) {
            return this.historicalExtractor.getPendingQueueSize();
        }
        return 0;
    }

    public int getTabletInsertionEventCount() {
        if (this.hasBeenStarted.get()) {
            return this.realtimeExtractor.getTabletInsertionEventCount();
        }
        return 0;
    }

    public int getRealtimeTsFileInsertionEventCount() {
        if (this.hasBeenStarted.get()) {
            return this.realtimeExtractor.getTsFileInsertionEventCount();
        }
        return 0;
    }

    public int getPipeHeartbeatEventCount() {
        if (this.hasBeenStarted.get()) {
            return this.realtimeExtractor.getPipeHeartbeatEventCount();
        }
        return 0;
    }
}
