package org.apache.iotdb.db.pipe.task.stage;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.plugin.builtin.processor.DoNothingProcessor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.config.constant.PipeProcessorConstant;
import org.apache.iotdb.db.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
import org.apache.iotdb.db.pipe.execution.executor.PipeProcessorSubtaskExecutor;
import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutorManager;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.connection.EventSupplier;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;

/* loaded from: input_file:org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.class */
public class PipeTaskProcessorStage extends PipeTaskStage {
    private final PipeProcessorSubtaskExecutor executor = PipeSubtaskExecutorManager.getInstance().getProcessorSubtaskExecutor();
    private final PipeProcessorSubtask pipeProcessorSubtask;

    public PipeTaskProcessorStage(String str, long j, PipeParameters pipeParameters, TConsensusGroupId tConsensusGroupId, EventSupplier eventSupplier, BoundedBlockingPendingQueue<Event> boundedBlockingPendingQueue) {
        DoNothingProcessor doNothingProcessor = pipeParameters.getStringOrDefault(PipeProcessorConstant.PROCESSOR_KEY, BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName()).equals(BuiltinPipePlugin.DO_NOTHING_PROCESSOR.getPipePluginName()) ? new DoNothingProcessor() : PipeAgent.plugin().reflectProcessor(pipeParameters);
        try {
            doNothingProcessor.validate(new PipeParameterValidator(pipeParameters));
            doNothingProcessor.customize(pipeParameters, new PipeTaskRuntimeConfiguration(new PipeTaskRuntimeEnvironment(str, j)));
            this.pipeProcessorSubtask = new PipeProcessorSubtask(str + "_" + tConsensusGroupId + "_" + j, eventSupplier, doNothingProcessor, new PipeEventCollector(boundedBlockingPendingQueue));
        } catch (Exception e) {
            throw new PipeException(e.getMessage(), e);
        }
    }

    @Override // org.apache.iotdb.db.pipe.task.stage.PipeTaskStage
    public void createSubtask() throws PipeException {
        this.executor.register(this.pipeProcessorSubtask);
    }

    @Override // org.apache.iotdb.db.pipe.task.stage.PipeTaskStage
    public void startSubtask() throws PipeException {
        this.executor.start(this.pipeProcessorSubtask.getTaskID());
    }

    @Override // org.apache.iotdb.db.pipe.task.stage.PipeTaskStage
    public void stopSubtask() throws PipeException {
        this.executor.stop(this.pipeProcessorSubtask.getTaskID());
    }

    @Override // org.apache.iotdb.db.pipe.task.stage.PipeTaskStage
    public void dropSubtask() throws PipeException {
        this.executor.deregister(this.pipeProcessorSubtask.getTaskID());
    }
}
