package org.apache.iotdb.db.pipe.extractor;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.historical.PipeHistoricalDataRegionTsFileExtractor;
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionFakeExtractor;
import org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionTsFileExtractor;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.pipe.api.PipeExtractor;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.class */
public class IoTDBDataRegionExtractor implements PipeExtractor {
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionExtractor.class);
    private final AtomicBoolean hasBeenStarted = new AtomicBoolean(false);
    private PipeHistoricalDataRegionExtractor historicalExtractor;
    private PipeRealtimeDataRegionExtractor realtimeExtractor;
    private int dataRegionId;

    public void validate(PipeParameterValidator pipeParameterValidator) throws Exception {
        pipeParameterValidator.validateAttributeValueRange(PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY, true, new String[]{Boolean.TRUE.toString(), Boolean.FALSE.toString()}).validateAttributeValueRange(PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE, true, new String[]{Boolean.TRUE.toString(), Boolean.FALSE.toString()}).validate(objArr -> {
            return ((Boolean) objArr[0]).booleanValue() || ((Boolean) objArr[1]).booleanValue();
        }, String.format("Should not set both %s and %s to false.", PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY, PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE), new Object[]{Boolean.valueOf(pipeParameterValidator.getParameters().getBooleanOrDefault(PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY, true)), Boolean.valueOf(pipeParameterValidator.getParameters().getBooleanOrDefault(PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE, true))});
        if (pipeParameterValidator.getParameters().getBooleanOrDefault(PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE, true)) {
            pipeParameterValidator.validateAttributeValueRange(PipeExtractorConstant.EXTRACTOR_REALTIME_MODE, true, new String[]{PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE});
        }
        constructHistoricalExtractor();
        constructRealtimeExtractor(pipeParameterValidator.getParameters());
        this.historicalExtractor.validate(pipeParameterValidator);
        this.realtimeExtractor.validate(pipeParameterValidator);
    }

    private void constructHistoricalExtractor() {
        this.historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor();
    }

    private void constructRealtimeExtractor(PipeParameters pipeParameters) {
        if (pipeParameters.getBooleanOrDefault(PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE, true)) {
            this.realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor();
        } else {
            this.realtimeExtractor = new PipeRealtimeDataRegionFakeExtractor();
            LOGGER.info("'{}' is set to false, use fake realtime extractor.", PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE);
        }
    }

    public void customize(PipeParameters pipeParameters, PipeExtractorRuntimeConfiguration pipeExtractorRuntimeConfiguration) throws Exception {
        this.dataRegionId = ((PipeTaskExtractorRuntimeEnvironment) pipeExtractorRuntimeConfiguration.getRuntimeEnvironment()).getRegionId();
        this.historicalExtractor.customize(pipeParameters, pipeExtractorRuntimeConfiguration);
        this.realtimeExtractor.customize(pipeParameters, pipeExtractorRuntimeConfiguration);
    }

    public void start() throws Exception {
        if (this.hasBeenStarted.get()) {
            return;
        }
        this.hasBeenStarted.set(true);
        AtomicReference<Exception> atomicReference = new AtomicReference<>(null);
        DataRegionId dataRegionId = new DataRegionId(this.dataRegionId);
        while (!StorageEngine.getInstance().runIfPresent(dataRegionId, dataRegion -> {
            dataRegion.writeLock(String.format("Pipe: starting %s", IoTDBDataRegionExtractor.class.getName()));
            try {
                startHistoricalExtractorAndRealtimeExtractor(atomicReference);
            } finally {
                dataRegion.writeUnlock();
            }
        }) && !StorageEngine.getInstance().runIfAbsent(dataRegionId, () -> {
            startHistoricalExtractorAndRealtimeExtractor(atomicReference);
        })) {
            rethrowExceptionIfAny(atomicReference);
        }
        rethrowExceptionIfAny(atomicReference);
    }

    private void startHistoricalExtractorAndRealtimeExtractor(AtomicReference<Exception> atomicReference) {
        try {
            this.historicalExtractor.start();
            this.realtimeExtractor.start();
        } catch (Exception e) {
            atomicReference.set(e);
            LOGGER.warn(String.format("Start historical extractor %s and realtime extractor %s error.", this.historicalExtractor, this.realtimeExtractor), e);
        }
    }

    private void rethrowExceptionIfAny(AtomicReference<Exception> atomicReference) {
        if (atomicReference.get() != null) {
            throw new PipeException("failed to start extractors.", atomicReference.get());
        }
    }

    public Event supply() throws Exception {
        return this.historicalExtractor.hasConsumedAll() ? this.realtimeExtractor.supply() : this.historicalExtractor.supply();
    }

    public void close() throws Exception {
        this.historicalExtractor.close();
        this.realtimeExtractor.close();
    }
}
