package org.apache.iotdb.db.mpp.plan.execution;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.SetThreadName;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.QueryState;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.execution.exchange.ISourceHandle;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.constant.DataNodeEndPoints;
import org.apache.iotdb.db.mpp.plan.execution.memory.MemorySourceHandle;
import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySource;
import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySourceContext;
import org.apache.iotdb.db.mpp.plan.execution.memory.StatementMemorySourceVisitor;
import org.apache.iotdb.db.mpp.plan.optimization.PlanOptimizer;
import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner;
import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
import org.apache.iotdb.db.mpp.plan.scheduler.ClusterScheduler;
import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
import org.apache.iotdb.db.mpp.plan.scheduler.StandaloneScheduler;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/mpp/plan/execution/QueryExecution.class */
public class QueryExecution implements IQueryExecution {
    private static final Logger logger = LoggerFactory.getLogger(QueryExecution.class);
    private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
    private final MPPQueryContext context;
    private IScheduler scheduler;
    private final QueryStateMachine stateMachine;
    private final List<PlanOptimizer> planOptimizers = new ArrayList();
    private final Analysis analysis;
    private LogicalQueryPlan logicalPlan;
    private DistributedQueryPlan distributedPlan;
    private final ExecutorService executor;
    private final ExecutorService writeOperationExecutor;
    private final ScheduledExecutorService scheduledExecutor;
    private final IPartitionFetcher partitionFetcher;
    private final ISchemaFetcher schemaFetcher;
    private ISourceHandle resultHandle;
    private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager;

    public QueryExecution(Statement statement, MPPQueryContext mPPQueryContext, ExecutorService executorService, ExecutorService executorService2, ScheduledExecutorService scheduledExecutorService, IPartitionFetcher iPartitionFetcher, ISchemaFetcher iSchemaFetcher, IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> iClientManager) {
        this.executor = executorService;
        this.writeOperationExecutor = executorService2;
        this.scheduledExecutor = scheduledExecutorService;
        this.context = mPPQueryContext;
        this.analysis = analyze(statement, mPPQueryContext, iPartitionFetcher, iSchemaFetcher);
        this.stateMachine = new QueryStateMachine(mPPQueryContext.getQueryId(), executorService);
        this.partitionFetcher = iPartitionFetcher;
        this.schemaFetcher = iSchemaFetcher;
        this.internalServiceClientManager = iClientManager;
        this.stateMachine.addStateChangeListener(queryState -> {
            SetThreadName setThreadName = new SetThreadName(mPPQueryContext.getQueryId().getId(), new Object[0]);
            Throwable th = null;
            try {
                if (!queryState.isDone()) {
                    if (setThreadName != null) {
                        if (0 == 0) {
                            setThreadName.close();
                            return;
                        }
                        try {
                            setThreadName.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
                stop();
                if (queryState == QueryState.FAILED || queryState == QueryState.ABORTED || queryState == QueryState.CANCELED) {
                    logger.info("release resource because Query State is: {}", queryState);
                    releaseResource();
                }
                if (setThreadName != null) {
                    if (0 == 0) {
                        setThreadName.close();
                        return;
                    }
                    try {
                        setThreadName.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                if (setThreadName != null) {
                    if (0 != 0) {
                        try {
                            setThreadName.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        setThreadName.close();
                    }
                }
                throw th4;
            }
        });
    }

    @Override // org.apache.iotdb.db.mpp.plan.execution.IQueryExecution
    public void start() {
        if (skipExecute()) {
            logger.info("execution of query will be skipped. Transit to RUNNING immediately.");
            constructResultForMemorySource();
            this.stateMachine.transitionToRunning();
        } else {
            doLogicalPlan();
            doDistributedPlan();
            if (this.context.getQueryType() == QueryType.READ) {
                initResultHandle();
            }
            schedule();
        }
    }

    private boolean skipExecute() {
        return this.analysis.isFinishQueryAfterAnalyze() || (this.context.getQueryType() == QueryType.READ && !this.analysis.hasDataSource());
    }

    private void constructResultForMemorySource() {
        StatementMemorySource process = new StatementMemorySourceVisitor().process(this.analysis.getStatement(), new StatementMemorySourceContext(this.context, this.analysis));
        this.resultHandle = new MemorySourceHandle(process.getTsBlock());
        this.analysis.setRespDatasetHeader(process.getDatasetHeader());
    }

    private Analysis analyze(Statement statement, MPPQueryContext mPPQueryContext, IPartitionFetcher iPartitionFetcher, ISchemaFetcher iSchemaFetcher) {
        return new Analyzer(mPPQueryContext, iPartitionFetcher, iSchemaFetcher).analyze(statement);
    }

    private void schedule() {
        this.scheduler = config.isClusterMode() ? new ClusterScheduler(this.context, this.stateMachine, this.distributedPlan.getInstances(), this.context.getQueryType(), this.executor, this.writeOperationExecutor, this.scheduledExecutor, this.internalServiceClientManager) : new StandaloneScheduler(this.context, this.stateMachine, this.distributedPlan.getInstances(), this.context.getQueryType(), this.executor, this.scheduledExecutor, this.internalServiceClientManager);
        this.scheduler.start();
    }

    public void doLogicalPlan() {
        this.logicalPlan = new LogicalPlanner(this.context, this.planOptimizers).plan(this.analysis);
        if (isQuery()) {
            logger.info("logical plan is: \n {}", PlanNodeUtil.nodeToString(this.logicalPlan.getRootNode()));
        }
    }

    public void doDistributedPlan() {
        this.distributedPlan = new DistributionPlanner(this.analysis, this.logicalPlan).planFragments();
        if (isQuery()) {
            logger.info("distribution plan done. Fragment instance count is {}, details is: \n {}", Integer.valueOf(this.distributedPlan.getInstances().size()), printFragmentInstances(this.distributedPlan.getInstances()));
        }
    }

    private String printFragmentInstances(List<FragmentInstance> list) {
        StringBuilder sb = new StringBuilder();
        Iterator<FragmentInstance> it = list.iterator();
        while (it.hasNext()) {
            sb.append(System.lineSeparator()).append(it.next());
        }
        return sb.toString();
    }

    @Override // org.apache.iotdb.db.mpp.plan.execution.IQueryExecution
    public void stop() {
        if (this.scheduler != null) {
            this.scheduler.stop();
        }
    }

    @Override // org.apache.iotdb.db.mpp.plan.execution.IQueryExecution
    public void stopAndCleanup() {
        stop();
        releaseResource();
    }

    private void releaseResource() {
        if (this.resultHandle != null) {
            this.resultHandle.abort();
        }
    }

    @Override // org.apache.iotdb.db.mpp.plan.execution.IQueryExecution
    public Optional<TsBlock> getBatchResult() {
        while (this.resultHandle != null && !this.resultHandle.isAborted() && !this.resultHandle.isFinished()) {
            try {
                try {
                    this.resultHandle.isBlocked().get();
                    if (this.resultHandle.isFinished()) {
                        return Optional.empty();
                    }
                    TsBlock receive = this.resultHandle.receive();
                    if (receive != null) {
                        return Optional.of(receive);
                    }
                } catch (InterruptedException e) {
                    this.stateMachine.transitionToFailed(e);
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(new SQLException("ResultSet thread was interrupted", e));
                }
            } catch (CancellationException | ExecutionException e2) {
                this.stateMachine.transitionToFailed(e2);
                Throwable cause = e2.getCause() == null ? e2 : e2.getCause();
                Throwables.throwIfUnchecked(cause);
                throw new RuntimeException(cause);
            }
        }
        logger.info("resultHandle for client is finished");
        this.stateMachine.transitionToFinished();
        return Optional.empty();
    }

    @Override // org.apache.iotdb.db.mpp.plan.execution.IQueryExecution
    public boolean hasNextResult() {
        return (this.resultHandle == null || this.resultHandle.isFinished()) ? false : true;
    }

    @Override // org.apache.iotdb.db.mpp.plan.execution.IQueryExecution
    public int getOutputValueColumnCount() {
        return this.analysis.getRespDatasetHeader().getOutputValueColumnCount();
    }

    @Override // org.apache.iotdb.db.mpp.plan.execution.IQueryExecution
    public DatasetHeader getDatasetHeader() {
        return this.analysis.getRespDatasetHeader();
    }

    @Override // org.apache.iotdb.db.mpp.plan.execution.IQueryExecution
    public ExecutionResult getStatus() {
        try {
            if (this.stateMachine.getState() == QueryState.FINISHED) {
                return getExecutionResult(QueryState.FINISHED);
            }
            SettableFuture create = SettableFuture.create();
            this.stateMachine.addStateChangeListener(queryState -> {
                if (queryState == QueryState.RUNNING || queryState.isDone()) {
                    create.set(queryState);
                }
            });
            return getExecutionResult((QueryState) create.get());
        } catch (InterruptedException | ExecutionException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            return new ExecutionResult(this.context.getQueryId(), RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, this.stateMachine.getFailureMessage()));
        }
    }

    private void initResultHandle() {
        ISourceHandle createSourceHandle;
        if (this.resultHandle == null) {
            TEndPoint upStreamEndpoint = this.context.getResultNodeContext().getUpStreamEndpoint();
            if (DataNodeEndPoints.isSameNode(upStreamEndpoint)) {
                MPPDataExchangeManager mPPDataExchangeManager = MPPDataExchangeService.getInstance().getMPPDataExchangeManager();
                TFragmentInstanceId thrift = this.context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift();
                String id = this.context.getResultNodeContext().getVirtualResultNodeId().getId();
                TFragmentInstanceId thrift2 = this.context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift();
                QueryStateMachine queryStateMachine = this.stateMachine;
                queryStateMachine.getClass();
                createSourceHandle = mPPDataExchangeManager.createLocalSourceHandle(thrift, id, thrift2, queryStateMachine::transitionToFailed);
            } else {
                MPPDataExchangeManager mPPDataExchangeManager2 = MPPDataExchangeService.getInstance().getMPPDataExchangeManager();
                TFragmentInstanceId thrift3 = this.context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift();
                String id2 = this.context.getResultNodeContext().getVirtualResultNodeId().getId();
                TFragmentInstanceId thrift4 = this.context.getResultNodeContext().getUpStreamFragmentInstanceId().toThrift();
                QueryStateMachine queryStateMachine2 = this.stateMachine;
                queryStateMachine2.getClass();
                createSourceHandle = mPPDataExchangeManager2.createSourceHandle(thrift3, id2, upStreamEndpoint, thrift4, queryStateMachine2::transitionToFailed);
            }
            this.resultHandle = createSourceHandle;
        }
    }

    private ExecutionResult getExecutionResult(QueryState queryState) {
        TSStatusCode tSStatusCode = (queryState == QueryState.FINISHED || queryState == QueryState.RUNNING) ? TSStatusCode.SUCCESS_STATUS : TSStatusCode.QUERY_PROCESS_ERROR;
        TSStatus status = RpcUtils.getStatus(tSStatusCode, this.stateMachine.getFailureMessage());
        if (this.analysis.getStatement() instanceof InsertBaseStatement) {
            InsertBaseStatement insertBaseStatement = (InsertBaseStatement) this.analysis.getStatement();
            List<TEndPoint> collectRedirectInfo = config.isClusterMode() ? insertBaseStatement.collectRedirectInfo(this.analysis.getDataPartitionInfo()) : Collections.emptyList();
            if ((insertBaseStatement instanceof InsertRowsStatement) || (insertBaseStatement instanceof InsertMultiTabletsStatement)) {
                if (tSStatusCode == TSStatusCode.SUCCESS_STATUS) {
                    ArrayList arrayList = new ArrayList();
                    status.setCode(TSStatusCode.NEED_REDIRECTION.getStatusCode());
                    Iterator<TEndPoint> it = collectRedirectInfo.iterator();
                    while (it.hasNext()) {
                        arrayList.add(StatusUtils.getStatus(TSStatusCode.NEED_REDIRECTION).setRedirectNode(it.next()));
                    }
                    status.setSubStatus(arrayList);
                }
            } else if (config.isClusterMode()) {
                status.setRedirectNode(collectRedirectInfo.get(0));
            }
        }
        return new ExecutionResult(this.context.getQueryId(), status);
    }

    public DistributedQueryPlan getDistributedPlan() {
        return this.distributedPlan;
    }

    public LogicalQueryPlan getLogicalPlan() {
        return this.logicalPlan;
    }

    @Override // org.apache.iotdb.db.mpp.plan.execution.IQueryExecution
    public boolean isQuery() {
        return this.context.getQueryType() == QueryType.READ;
    }

    @Override // org.apache.iotdb.db.mpp.plan.execution.IQueryExecution
    public String getQueryId() {
        return this.context.getQueryId().getId();
    }

    public String toString() {
        return String.format("QueryExecution[%s]", this.context.getQueryId());
    }
}
