package org.apache.iotdb.db.queryengine.execution.fragment;

import io.airlift.stats.CounterStat;
import io.airlift.units.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.execution.driver.IDriver;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeService;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
import org.apache.iotdb.db.queryengine.execution.schedule.DriverScheduler;
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.PipelineDriverFactory;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
import org.apache.iotdb.db.utils.SetThreadName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.class */
public class FragmentInstanceManager {
    private final Map<FragmentInstanceId, FragmentInstanceContext> instanceContext;
    private final Map<FragmentInstanceId, FragmentInstanceExecution> instanceExecution;
    private final Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap;
    private final LocalExecutionPlanner planner;
    private final IDriverScheduler scheduler;
    private final ScheduledExecutorService instanceManagementExecutor;
    public final ExecutorService instanceNotificationExecutor;
    private final Duration infoCacheTime;
    private final CounterStat failedInstances;
    private final ExecutorService intoOperationExecutor;
    private final MPPDataExchangeManager exchangeManager;
    private static final Logger logger = LoggerFactory.getLogger(FragmentInstanceManager.class);
    private static final QueryExecutionMetricSet QUERY_EXECUTION_METRIC_SET = QueryExecutionMetricSet.getInstance();

    /* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager$InstanceHolder.class */
    private static class InstanceHolder {
        private static final FragmentInstanceManager INSTANCE = new FragmentInstanceManager();

        private InstanceHolder() {
        }
    }

    public static FragmentInstanceManager getInstance() {
        return InstanceHolder.INSTANCE;
    }

    private FragmentInstanceManager() {
        this.planner = LocalExecutionPlanner.getInstance();
        this.scheduler = DriverScheduler.getInstance();
        this.failedInstances = new CounterStat();
        this.exchangeManager = MPPDataExchangeService.getInstance().getMPPDataExchangeManager();
        this.instanceContext = new ConcurrentHashMap();
        this.instanceExecution = new ConcurrentHashMap();
        this.dataNodeQueryContextMap = new ConcurrentHashMap();
        this.instanceManagementExecutor = IoTDBThreadPoolFactory.newScheduledThreadPool(1, ThreadName.FRAGMENT_INSTANCE_MANAGEMENT.getName());
        this.instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(4, ThreadName.FRAGMENT_INSTANCE_NOTIFICATION.getName());
        this.infoCacheTime = new Duration(5.0d, TimeUnit.MINUTES);
        ScheduledExecutorUtil.safelyScheduleWithFixedDelay(this.instanceManagementExecutor, this::removeOldInstances, 2000L, 2000L, TimeUnit.MILLISECONDS);
        ScheduledExecutorUtil.safelyScheduleWithFixedDelay(this.instanceManagementExecutor, this::cancelTimeoutFlushingInstances, 2000L, 2000L, TimeUnit.MILLISECONDS);
        this.intoOperationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool(IoTDBDescriptor.getInstance().getConfig().getIntoOperationExecutionThreadCount(), "into-operation-executor");
    }

    public FragmentInstanceInfo execDataQueryFragmentInstance(FragmentInstance fragmentInstance, IDataRegionForQuery iDataRegionForQuery) {
        long nanoTime = System.nanoTime();
        FragmentInstanceId id = fragmentInstance.getId();
        AtomicLong atomicLong = new AtomicLong();
        try {
            SetThreadName setThreadName = new SetThreadName(id.getFullId());
            try {
                FragmentInstanceExecution computeIfAbsent = this.instanceExecution.computeIfAbsent(id, fragmentInstanceId -> {
                    FragmentInstanceStateMachine fragmentInstanceStateMachine = new FragmentInstanceStateMachine(id, this.instanceNotificationExecutor);
                    DataNodeQueryContext orCreateDataNodeQueryContext = getOrCreateDataNodeQueryContext(id.getQueryId(), fragmentInstance.getDataNodeFINum());
                    FragmentInstanceContext computeIfAbsent2 = this.instanceContext.computeIfAbsent(id, fragmentInstanceId -> {
                        return FragmentInstanceContext.createFragmentInstanceContext(fragmentInstanceId, fragmentInstanceStateMachine, fragmentInstance.getSessionInfo(), iDataRegionForQuery, fragmentInstance.getGlobalTimePredicate(), this.dataNodeQueryContextMap);
                    });
                    try {
                        List<PipelineDriverFactory> plan = this.planner.plan(fragmentInstance.getFragment().getPlanNodeTree(), fragmentInstance.getFragment().getTypeProvider(), computeIfAbsent2, orCreateDataNodeQueryContext);
                        ArrayList arrayList = new ArrayList();
                        plan.forEach(pipelineDriverFactory -> {
                            arrayList.add(pipelineDriverFactory.createDriver());
                        });
                        if (fragmentInstance.isHighestPriority()) {
                            arrayList.forEach(iDriver -> {
                                iDriver.setHighestPriority(true);
                            });
                        }
                        computeIfAbsent2.initializeNumOfDrivers(arrayList.size());
                        ISink sink = ((IDriver) arrayList.get(arrayList.size() - 1)).getSink();
                        atomicLong.addAndGet(arrayList.size());
                        return FragmentInstanceExecution.createFragmentInstanceExecution(this.scheduler, id, computeIfAbsent2, arrayList, sink, fragmentInstanceStateMachine, this.failedInstances, fragmentInstance.getTimeOut(), this.exchangeManager);
                    } catch (Throwable th) {
                        logger.warn("error when create FragmentInstanceExecution.", th);
                        fragmentInstanceStateMachine.failed(th);
                        return null;
                    }
                });
                if (computeIfAbsent == null) {
                    FragmentInstanceInfo createFailedInstanceInfo = createFailedInstanceInfo(id);
                    setThreadName.close();
                    QueryRelatedResourceMetricSet.getInstance().updateFragmentInstanceCount(this.instanceContext.size(), this.instanceExecution.size(), atomicLong.get());
                    QUERY_EXECUTION_METRIC_SET.recordExecutionCost(QueryExecutionMetricSet.LOCAL_EXECUTION_PLANNER, System.nanoTime() - nanoTime);
                    return createFailedInstanceInfo;
                }
                computeIfAbsent.getStateMachine().addStateChangeListener(fragmentInstanceState -> {
                    if (fragmentInstanceState.isDone()) {
                        this.instanceExecution.remove(id);
                    }
                });
                FragmentInstanceInfo instanceInfo = computeIfAbsent.getInstanceInfo();
                setThreadName.close();
                QueryRelatedResourceMetricSet.getInstance().updateFragmentInstanceCount(this.instanceContext.size(), this.instanceExecution.size(), atomicLong.get());
                QUERY_EXECUTION_METRIC_SET.recordExecutionCost(QueryExecutionMetricSet.LOCAL_EXECUTION_PLANNER, System.nanoTime() - nanoTime);
                return instanceInfo;
            } finally {
            }
        } catch (Throwable th) {
            QueryRelatedResourceMetricSet.getInstance().updateFragmentInstanceCount(this.instanceContext.size(), this.instanceExecution.size(), atomicLong.get());
            QUERY_EXECUTION_METRIC_SET.recordExecutionCost(QueryExecutionMetricSet.LOCAL_EXECUTION_PLANNER, System.nanoTime() - nanoTime);
            throw th;
        }
    }

    private DataNodeQueryContext getOrCreateDataNodeQueryContext(QueryId queryId, int i) {
        return this.dataNodeQueryContextMap.computeIfAbsent(queryId, queryId2 -> {
            return new DataNodeQueryContext(i);
        });
    }

    public FragmentInstanceInfo execSchemaQueryFragmentInstance(FragmentInstance fragmentInstance, ISchemaRegion iSchemaRegion) {
        FragmentInstanceId id = fragmentInstance.getId();
        FragmentInstanceExecution computeIfAbsent = this.instanceExecution.computeIfAbsent(id, fragmentInstanceId -> {
            FragmentInstanceStateMachine fragmentInstanceStateMachine = new FragmentInstanceStateMachine(id, this.instanceNotificationExecutor);
            FragmentInstanceContext computeIfAbsent2 = this.instanceContext.computeIfAbsent(id, fragmentInstanceId -> {
                return FragmentInstanceContext.createFragmentInstanceContext(fragmentInstanceId, fragmentInstanceStateMachine, fragmentInstance.getSessionInfo());
            });
            try {
                List<PipelineDriverFactory> plan = this.planner.plan(fragmentInstance.getFragment().getPlanNodeTree(), computeIfAbsent2, iSchemaRegion);
                ArrayList arrayList = new ArrayList();
                plan.forEach(pipelineDriverFactory -> {
                    arrayList.add(pipelineDriverFactory.createDriver());
                });
                computeIfAbsent2.initializeNumOfDrivers(arrayList.size());
                return FragmentInstanceExecution.createFragmentInstanceExecution(this.scheduler, id, computeIfAbsent2, arrayList, ((IDriver) arrayList.get(arrayList.size() - 1)).getSink(), fragmentInstanceStateMachine, this.failedInstances, fragmentInstance.getTimeOut(), this.exchangeManager);
            } catch (Throwable th) {
                logger.warn("Execute error caused by ", th);
                fragmentInstanceStateMachine.failed(th);
                return null;
            }
        });
        if (computeIfAbsent == null) {
            return createFailedInstanceInfo(id);
        }
        computeIfAbsent.getStateMachine().addStateChangeListener(fragmentInstanceState -> {
            if (fragmentInstanceState.isDone()) {
                this.instanceExecution.remove(id);
            }
        });
        return computeIfAbsent.getInstanceInfo();
    }

    public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentInstanceId) {
        this.instanceExecution.remove(fragmentInstanceId);
        FragmentInstanceContext fragmentInstanceContext = this.instanceContext.get(fragmentInstanceId);
        if (fragmentInstanceContext == null) {
            return null;
        }
        fragmentInstanceContext.abort();
        return fragmentInstanceContext.getInstanceInfo();
    }

    public FragmentInstanceInfo cancelTask(FragmentInstanceId fragmentInstanceId, boolean z) {
        logger.debug("[CancelFI]");
        Objects.requireNonNull(fragmentInstanceId, "taskId is null");
        FragmentInstanceContext remove = this.instanceContext.remove(fragmentInstanceId);
        if (remove == null) {
            return null;
        }
        this.instanceExecution.remove(fragmentInstanceId);
        if (z) {
            remove.cancel();
        } else {
            remove.finished();
        }
        return remove.getInstanceInfo();
    }

    public FragmentInstanceInfo getInstanceInfo(FragmentInstanceId fragmentInstanceId) {
        Objects.requireNonNull(fragmentInstanceId, "instanceId is null");
        FragmentInstanceContext fragmentInstanceContext = this.instanceContext.get(fragmentInstanceId);
        if (fragmentInstanceContext == null) {
            return null;
        }
        return fragmentInstanceContext.getInstanceInfo();
    }

    public CounterStat getFailedInstances() {
        return this.failedInstances;
    }

    private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId fragmentInstanceId) {
        FragmentInstanceContext fragmentInstanceContext = this.instanceContext.get(fragmentInstanceId);
        return new FragmentInstanceInfo(FragmentInstanceState.FAILED, fragmentInstanceContext.getEndTime(), fragmentInstanceContext.getFailedCause(), fragmentInstanceContext.getFailureInfoList());
    }

    private void removeOldInstances() {
        long currentTimeMillis = System.currentTimeMillis() - this.infoCacheTime.toMillis();
        this.instanceContext.entrySet().removeIf(entry -> {
            long endTime = ((FragmentInstanceContext) entry.getValue()).getEndTime();
            return endTime != -1 && endTime <= currentTimeMillis;
        });
    }

    private void cancelTimeoutFlushingInstances() {
        long currentTimeMillis = System.currentTimeMillis();
        this.instanceExecution.forEach((fragmentInstanceId, fragmentInstanceExecution) -> {
            if (fragmentInstanceExecution.getStateMachine().getState() != FragmentInstanceState.FLUSHING || currentTimeMillis - fragmentInstanceExecution.getStartTime() <= fragmentInstanceExecution.getTimeoutInMs()) {
                return;
            }
            fragmentInstanceExecution.getStateMachine().failed(new TimeoutException("Query has executed more than " + fragmentInstanceExecution.getTimeoutInMs() + "ms"));
        });
    }

    public ExecutorService getIntoOperationExecutor() {
        return this.intoOperationExecutor;
    }
}
