package org.apache.iotdb.db.pipe.extractor.realtime;

import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant;
import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.pipe.api.PipeExtractor;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeExtractorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;

/* loaded from: input_file:org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.class */
public abstract class PipeRealtimeDataRegionExtractor implements PipeExtractor {
    protected String pattern;
    protected String pipeName;
    protected String dataRegionId;
    protected PipeTaskMeta pipeTaskMeta;
    protected boolean isForwardingPipeRequests = true;
    protected final UnboundedBlockingPendingQueue<Event> pendingQueue = new UnboundedBlockingPendingQueue<>();
    protected final AtomicBoolean isClosed = new AtomicBoolean(false);

    public void validate(PipeParameterValidator pipeParameterValidator) throws Exception {
    }

    public void customize(PipeParameters pipeParameters, PipeExtractorRuntimeConfiguration pipeExtractorRuntimeConfiguration) throws Exception {
        this.pattern = pipeParameters.getStringOrDefault(PipeExtractorConstant.EXTRACTOR_PATTERN_KEY, "root");
        PipeTaskExtractorRuntimeEnvironment pipeTaskExtractorRuntimeEnvironment = (PipeTaskExtractorRuntimeEnvironment) pipeExtractorRuntimeConfiguration.getRuntimeEnvironment();
        this.pipeName = pipeTaskExtractorRuntimeEnvironment.getPipeName();
        this.dataRegionId = String.valueOf(pipeTaskExtractorRuntimeEnvironment.getRegionId());
        this.pipeTaskMeta = pipeTaskExtractorRuntimeEnvironment.getPipeTaskMeta();
    }

    public void start() throws Exception {
        PipeInsertionDataNodeListener.getInstance().startListenAndAssign(this.dataRegionId, this);
    }

    public void close() throws Exception {
        PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(this.dataRegionId, this);
        synchronized (this.isClosed) {
            clearPendingQueue();
            this.isClosed.set(true);
        }
    }

    private void clearPendingQueue() {
        ArrayList arrayList = new ArrayList(this.pendingQueue.size());
        UnboundedBlockingPendingQueue<Event> unboundedBlockingPendingQueue = this.pendingQueue;
        Objects.requireNonNull(arrayList);
        unboundedBlockingPendingQueue.forEach((v1) -> {
            r1.add(v1);
        });
        this.pendingQueue.clear();
        arrayList.forEach(event -> {
            if (event instanceof EnrichedEvent) {
                ((EnrichedEvent) event).clearReferenceCount(PipeRealtimeDataRegionExtractor.class.getName());
            }
        });
    }

    public final void extract(PipeRealtimeEvent pipeRealtimeEvent) {
        doExtract(pipeRealtimeEvent);
        synchronized (this.isClosed) {
            if (this.isClosed.get()) {
                clearPendingQueue();
            }
        }
    }

    protected abstract void doExtract(PipeRealtimeEvent pipeRealtimeEvent);

    public abstract boolean isNeedListenToTsFile();

    public abstract boolean isNeedListenToInsertNode();

    public final String getPattern() {
        return this.pattern;
    }

    public final boolean isForwardingPipeRequests() {
        return this.isForwardingPipeRequests;
    }

    public final String getPipeName() {
        return this.pipeName;
    }

    public final PipeTaskMeta getPipeTaskMeta() {
        return this.pipeTaskMeta;
    }

    public String toString() {
        return "PipeRealtimeDataRegionExtractor{pattern='" + this.pattern + "', dataRegionId='" + this.dataRegionId + "'}";
    }
}
