package org.apache.iotdb.db.mpp.plan.scheduler;

import io.airlift.units.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.class */
public class StandaloneScheduler implements IScheduler {
    private static final StorageEngineV2 STORAGE_ENGINE = StorageEngineV2.getInstance();
    private static final SchemaEngine SCHEMA_ENGINE = SchemaEngine.getInstance();
    private static final Logger LOGGER = LoggerFactory.getLogger(StandaloneScheduler.class);
    private MPPQueryContext queryContext;
    private QueryStateMachine stateMachine;
    private QueryType queryType;
    private List<FragmentInstance> instances;
    private ExecutorService executor;
    private ScheduledExecutorService scheduledExecutor;
    private IFragInstanceDispatcher dispatcher;
    private IFragInstanceStateTracker stateTracker;
    private IQueryTerminator queryTerminator;

    public StandaloneScheduler(MPPQueryContext mPPQueryContext, QueryStateMachine queryStateMachine, List<FragmentInstance> list, QueryType queryType, ExecutorService executorService, ScheduledExecutorService scheduledExecutorService, IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> iClientManager) {
        this.queryContext = mPPQueryContext;
        this.instances = list;
        this.queryType = queryType;
        this.executor = executorService;
        this.scheduledExecutor = scheduledExecutorService;
        this.stateMachine = queryStateMachine;
        this.stateTracker = new FixedRateFragInsStateTracker(queryStateMachine, executorService, scheduledExecutorService, list, iClientManager);
        this.queryTerminator = new SimpleQueryTerminator(executorService, scheduledExecutorService, mPPQueryContext.getQueryId(), list, iClientManager);
    }

    @Override // org.apache.iotdb.db.mpp.plan.scheduler.IScheduler
    public void start() {
        this.stateMachine.transitionToDispatching();
        switch (this.queryType) {
            case READ:
                try {
                    for (FragmentInstance fragmentInstance : this.instances) {
                        DataRegionId createFromTConsensusGroupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(fragmentInstance.getRegionReplicaSet().getRegionId());
                        if (createFromTConsensusGroupId instanceof DataRegionId) {
                            FragmentInstanceManager.getInstance().execDataQueryFragmentInstance(fragmentInstance, StorageEngineV2.getInstance().getDataRegion(createFromTConsensusGroupId));
                        } else {
                            FragmentInstanceManager.getInstance().execSchemaQueryFragmentInstance(fragmentInstance, SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId) createFromTConsensusGroupId));
                        }
                    }
                } catch (Exception e) {
                    this.stateMachine.transitionToFailed(e);
                }
                this.stateMachine.transitionToRunning();
                LOGGER.info("{} transit to RUNNING", getLogHeader());
                this.instances.forEach(fragmentInstance2 -> {
                    this.stateMachine.initialFragInstanceState(fragmentInstance2.getId(), FragmentInstanceState.RUNNING);
                });
                this.stateTracker.start();
                LOGGER.info("{} state tracker starts", getLogHeader());
                return;
            case WRITE:
                try {
                    for (FragmentInstance fragmentInstance3 : this.instances) {
                        PlanNode root = fragmentInstance3.getFragment().getRoot();
                        DataRegionId createFromTConsensusGroupId2 = ConsensusGroupId.Factory.createFromTConsensusGroupId(fragmentInstance3.getRegionReplicaSet().getRegionId());
                        boolean z = false;
                        if (root instanceof InsertNode) {
                            InsertNode insertNode = (InsertNode) root;
                            SchemaValidator.validate(insertNode);
                            z = insertNode.hasFailedMeasurements();
                            if (z) {
                                LOGGER.warn("Fail to insert measurements {} caused by {}", insertNode.getFailedMeasurements(), insertNode.getFailedMessages());
                            }
                        }
                        if (createFromTConsensusGroupId2 instanceof DataRegionId) {
                            STORAGE_ENGINE.write(createFromTConsensusGroupId2, root);
                        } else {
                            SCHEMA_ENGINE.write((SchemaRegionId) createFromTConsensusGroupId2, root);
                        }
                        if (z) {
                            InsertNode insertNode2 = (InsertNode) root;
                            List<Exception> failedExceptions = insertNode2.getFailedExceptions();
                            throw new WriteProcessException("failed to insert measurements " + insertNode2.getFailedMeasurements() + (!failedExceptions.isEmpty() ? " caused by " + failedExceptions.get(0).getMessage() : AlignedPath.VECTOR_PLACEHOLDER));
                        }
                    }
                    this.stateMachine.transitionToFinished();
                    return;
                } catch (Exception e2) {
                    LOGGER.error("Execute write operation error ", e2);
                    this.stateMachine.transitionToFailed(e2);
                    return;
                }
            default:
                return;
        }
    }

    @Override // org.apache.iotdb.db.mpp.plan.scheduler.IScheduler
    public void stop() {
        this.stateTracker.abort();
        Iterator<FragmentInstance> it = this.instances.iterator();
        while (it.hasNext()) {
            FragmentInstanceManager.getInstance().cancelTask(it.next().getId());
        }
    }

    @Override // org.apache.iotdb.db.mpp.plan.scheduler.IScheduler
    public Duration getTotalCpuTime() {
        return null;
    }

    @Override // org.apache.iotdb.db.mpp.plan.scheduler.IScheduler
    public FragmentInfo getFragmentInfo() {
        return null;
    }

    @Override // org.apache.iotdb.db.mpp.plan.scheduler.IScheduler
    public void abortFragmentInstance(FragmentInstanceId fragmentInstanceId, Throwable th) {
    }

    @Override // org.apache.iotdb.db.mpp.plan.scheduler.IScheduler
    public void cancelFragment(PlanFragmentId planFragmentId) {
    }

    private String getLogHeader() {
        return String.format("Query[%s]", this.queryContext.getQueryId());
    }
}
