/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.mpp.plan.scheduler;

import io.airlift.units.Duration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.analyze.SchemaValidator;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.mpp.plan.scheduler.FixedRateFragInsStateTracker;
import org.apache.iotdb.db.mpp.plan.scheduler.IFragInstanceDispatcher;
import org.apache.iotdb.db.mpp.plan.scheduler.IFragInstanceStateTracker;
import org.apache.iotdb.db.mpp.plan.scheduler.IQueryTerminator;
import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
import org.apache.iotdb.db.mpp.plan.scheduler.SimpleQueryTerminator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandaloneScheduler
implements IScheduler {
    private static final StorageEngineV2 STORAGE_ENGINE = StorageEngineV2.getInstance();
    private static final SchemaEngine SCHEMA_ENGINE = SchemaEngine.getInstance();
    private static final Logger LOGGER = LoggerFactory.getLogger(StandaloneScheduler.class);
    private MPPQueryContext queryContext;
    private QueryStateMachine stateMachine;
    private QueryType queryType;
    private List<FragmentInstance> instances;
    private ExecutorService executor;
    private ScheduledExecutorService scheduledExecutor;
    private IFragInstanceDispatcher dispatcher;
    private IFragInstanceStateTracker stateTracker;
    private IQueryTerminator queryTerminator;

    public StandaloneScheduler(MPPQueryContext queryContext, QueryStateMachine stateMachine, List<FragmentInstance> instances, QueryType queryType, ExecutorService executor, ScheduledExecutorService scheduledExecutor, IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
        this.queryContext = queryContext;
        this.instances = instances;
        this.queryType = queryType;
        this.executor = executor;
        this.scheduledExecutor = scheduledExecutor;
        this.stateMachine = stateMachine;
        this.stateTracker = new FixedRateFragInsStateTracker(stateMachine, executor, scheduledExecutor, instances, internalServiceClientManager);
        this.queryTerminator = new SimpleQueryTerminator(executor, scheduledExecutor, queryContext.getQueryId(), instances, internalServiceClientManager);
    }

    @Override
    public void start() {
        this.stateMachine.transitionToDispatching();
        switch (this.queryType) {
            case READ: {
                try {
                    for (FragmentInstance fragmentInstance : this.instances) {
                        Object region;
                        ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)fragmentInstance.getRegionReplicaSet().getRegionId());
                        if (groupId instanceof DataRegionId) {
                            region = StorageEngineV2.getInstance().getDataRegion((DataRegionId)groupId);
                            FragmentInstanceManager.getInstance().execDataQueryFragmentInstance(fragmentInstance, (DataRegion)region);
                            continue;
                        }
                        region = SchemaEngine.getInstance().getSchemaRegion((SchemaRegionId)groupId);
                        FragmentInstanceManager.getInstance().execSchemaQueryFragmentInstance(fragmentInstance, (ISchemaRegion)region);
                    }
                }
                catch (Exception e) {
                    this.stateMachine.transitionToFailed(e);
                }
                this.stateMachine.transitionToRunning();
                LOGGER.info("{} transit to RUNNING", (Object)this.getLogHeader());
                this.instances.forEach(instance -> this.stateMachine.initialFragInstanceState(instance.getId(), FragmentInstanceState.RUNNING));
                this.stateTracker.start();
                LOGGER.info("{} state tracker starts", (Object)this.getLogHeader());
                break;
            }
            case WRITE: {
                try {
                    for (FragmentInstance fragmentInstance : this.instances) {
                        PlanNode planNode = fragmentInstance.getFragment().getRoot();
                        ConsensusGroupId groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId((TConsensusGroupId)fragmentInstance.getRegionReplicaSet().getRegionId());
                        boolean hasFailedMeasurement = false;
                        if (planNode instanceof InsertNode) {
                            InsertNode insertNode = (InsertNode)planNode;
                            SchemaValidator.validate(insertNode);
                            hasFailedMeasurement = insertNode.hasFailedMeasurements();
                            if (hasFailedMeasurement) {
                                LOGGER.warn("Fail to insert measurements {} caused by {}", insertNode.getFailedMeasurements(), insertNode.getFailedMessages());
                            }
                        }
                        if (groupId instanceof DataRegionId) {
                            STORAGE_ENGINE.write((DataRegionId)groupId, planNode);
                        } else {
                            SCHEMA_ENGINE.write((SchemaRegionId)groupId, planNode);
                        }
                        if (!hasFailedMeasurement) continue;
                        InsertNode node = (InsertNode)planNode;
                        List<Exception> exceptions = node.getFailedExceptions();
                        throw new WriteProcessException("failed to insert measurements " + node.getFailedMeasurements() + (!exceptions.isEmpty() ? " caused by " + exceptions.get(0).getMessage() : ""));
                    }
                    this.stateMachine.transitionToFinished();
                    break;
                }
                catch (Exception e) {
                    LOGGER.error("Execute write operation error ", (Throwable)e);
                    this.stateMachine.transitionToFailed(e);
                }
            }
        }
    }

    @Override
    public void stop() {
        this.stateTracker.abort();
        for (FragmentInstance fragmentInstance : this.instances) {
            FragmentInstanceManager.getInstance().cancelTask(fragmentInstance.getId());
        }
    }

    @Override
    public Duration getTotalCpuTime() {
        return null;
    }

    @Override
    public FragmentInfo getFragmentInfo() {
        return null;
    }

    @Override
    public void abortFragmentInstance(FragmentInstanceId instanceId, Throwable failureCause) {
    }

    @Override
    public void cancelFragment(PlanFragmentId planFragmentId) {
    }

    private String getLogHeader() {
        return String.format("Query[%s]", this.queryContext.getQueryId());
    }
}

