/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.mpp.execution.fragment;

import io.airlift.stats.CounterStat;
import io.airlift.units.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.IDataRegionForQuery;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.driver.DataDriver;
import org.apache.iotdb.db.mpp.execution.driver.SchemaDriver;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceExecution;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.schedule.DriverScheduler;
import org.apache.iotdb.db.mpp.execution.schedule.IDriverScheduler;
import org.apache.iotdb.db.mpp.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.utils.SetThreadName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FragmentInstanceManager {
    private static final Logger logger = LoggerFactory.getLogger(FragmentInstanceManager.class);
    private final Map<FragmentInstanceId, FragmentInstanceContext> instanceContext;
    private final Map<FragmentInstanceId, FragmentInstanceExecution> instanceExecution;
    private final LocalExecutionPlanner planner = LocalExecutionPlanner.getInstance();
    private final IDriverScheduler scheduler = DriverScheduler.getInstance();
    private final ScheduledExecutorService instanceManagementExecutor;
    private final ExecutorService instanceNotificationExecutor;
    private final Duration infoCacheTime;
    private final CounterStat failedInstances = new CounterStat();
    private static final long QUERY_TIMEOUT_MS = IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
    private final ExecutorService intoOperationExecutor;

    public static FragmentInstanceManager getInstance() {
        return InstanceHolder.INSTANCE;
    }

    private FragmentInstanceManager() {
        this.instanceContext = new ConcurrentHashMap<FragmentInstanceId, FragmentInstanceContext>();
        this.instanceExecution = new ConcurrentHashMap<FragmentInstanceId, FragmentInstanceExecution>();
        this.instanceManagementExecutor = IoTDBThreadPoolFactory.newScheduledThreadPool((int)1, (String)"instance-management");
        this.instanceNotificationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool((int)4, (String)"instance-notification");
        this.infoCacheTime = new Duration(5.0, TimeUnit.MINUTES);
        ScheduledExecutorUtil.safelyScheduleWithFixedDelay((ScheduledExecutorService)this.instanceManagementExecutor, this::removeOldInstances, (long)200L, (long)200L, (TimeUnit)TimeUnit.MILLISECONDS);
        ScheduledExecutorUtil.safelyScheduleWithFixedDelay((ScheduledExecutorService)this.instanceManagementExecutor, this::cancelTimeoutFlushingInstances, (long)200L, (long)200L, (TimeUnit)TimeUnit.MILLISECONDS);
        this.intoOperationExecutor = IoTDBThreadPoolFactory.newFixedThreadPool((int)IoTDBDescriptor.getInstance().getConfig().getIntoOperationExecutionThreadCount(), (String)"into-operation-executor");
    }

    public FragmentInstanceInfo execDataQueryFragmentInstance(FragmentInstance instance, IDataRegionForQuery dataRegion) {
        FragmentInstanceId instanceId = instance.getId();
        try (SetThreadName fragmentInstanceName = new SetThreadName(instanceId.getFullId());){
            FragmentInstanceExecution execution = this.instanceExecution.computeIfAbsent(instanceId, id -> {
                FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, this.instanceNotificationExecutor);
                FragmentInstanceContext context = this.instanceContext.computeIfAbsent(instanceId, fragmentInstanceId -> FragmentInstanceContext.createFragmentInstanceContext(fragmentInstanceId, stateMachine, instance.getSessionInfo(), this.intoOperationExecutor));
                try {
                    DataDriver driver = this.planner.plan(instance.getFragment().getPlanNodeTree(), instance.getFragment().getTypeProvider(), context, instance.getTimeFilter(), dataRegion);
                    return FragmentInstanceExecution.createFragmentInstanceExecution(this.scheduler, instanceId, context, driver, stateMachine, this.failedInstances, instance.getTimeOut());
                }
                catch (Throwable t) {
                    logger.warn("error when create FragmentInstanceExecution.", t);
                    stateMachine.failed(t);
                    return null;
                }
            });
            if (execution != null) {
                execution.getStateMachine().addStateChangeListener(newState -> {
                    if (newState.isDone()) {
                        this.instanceExecution.remove(instanceId);
                    }
                });
                FragmentInstanceInfo fragmentInstanceInfo = execution.getInstanceInfo();
                return fragmentInstanceInfo;
            }
            FragmentInstanceInfo fragmentInstanceInfo = this.createFailedInstanceInfo(instanceId);
            return fragmentInstanceInfo;
        }
    }

    public FragmentInstanceInfo execSchemaQueryFragmentInstance(FragmentInstance instance, ISchemaRegion schemaRegion) {
        FragmentInstanceId instanceId = instance.getId();
        FragmentInstanceExecution execution = this.instanceExecution.computeIfAbsent(instanceId, id -> {
            FragmentInstanceStateMachine stateMachine = new FragmentInstanceStateMachine(instanceId, this.instanceNotificationExecutor);
            FragmentInstanceContext context = this.instanceContext.computeIfAbsent(instanceId, fragmentInstanceId -> FragmentInstanceContext.createFragmentInstanceContext(fragmentInstanceId, stateMachine, instance.getSessionInfo(), this.intoOperationExecutor));
            try {
                SchemaDriver driver = this.planner.plan(instance.getFragment().getPlanNodeTree(), context, schemaRegion);
                return FragmentInstanceExecution.createFragmentInstanceExecution(this.scheduler, instanceId, context, driver, stateMachine, this.failedInstances, instance.getTimeOut());
            }
            catch (Throwable t) {
                logger.warn("Execute error caused by ", t);
                stateMachine.failed(t);
                return null;
            }
        });
        if (execution != null) {
            execution.getStateMachine().addStateChangeListener(newState -> {
                if (newState.isDone()) {
                    this.instanceExecution.remove(instanceId);
                }
            });
            return execution.getInstanceInfo();
        }
        return this.createFailedInstanceInfo(instanceId);
    }

    public FragmentInstanceInfo abortFragmentInstance(FragmentInstanceId fragmentInstanceId) {
        this.instanceExecution.remove(fragmentInstanceId);
        FragmentInstanceContext context = this.instanceContext.get(fragmentInstanceId);
        if (context != null) {
            context.abort();
            return context.getInstanceInfo();
        }
        return null;
    }

    public FragmentInstanceInfo cancelTask(FragmentInstanceId instanceId) {
        logger.debug("[CancelFI]");
        Objects.requireNonNull(instanceId, "taskId is null");
        FragmentInstanceContext context = this.instanceContext.remove(instanceId);
        if (context != null) {
            this.instanceExecution.remove(instanceId);
            context.cancel();
            return context.getInstanceInfo();
        }
        return null;
    }

    public FragmentInstanceInfo getInstanceInfo(FragmentInstanceId instanceId) {
        Objects.requireNonNull(instanceId, "instanceId is null");
        FragmentInstanceContext context = this.instanceContext.get(instanceId);
        if (context == null) {
            return null;
        }
        return context.getInstanceInfo();
    }

    public CounterStat getFailedInstances() {
        return this.failedInstances;
    }

    private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId instanceId) {
        FragmentInstanceContext context = this.instanceContext.get(instanceId);
        return new FragmentInstanceInfo(FragmentInstanceState.FAILED, context.getEndTime(), context.getFailedCause(), context.getFailureInfoList());
    }

    private void removeOldInstances() {
        long oldestAllowedInstance = System.currentTimeMillis() - this.infoCacheTime.toMillis();
        this.instanceContext.entrySet().removeIf(entry -> {
            long endTime = ((FragmentInstanceContext)entry.getValue()).getEndTime();
            return endTime != -1L && endTime <= oldestAllowedInstance;
        });
    }

    private void cancelTimeoutFlushingInstances() {
        long now = System.currentTimeMillis();
        this.instanceContext.entrySet().stream().filter(entry -> {
            FragmentInstanceContext context = (FragmentInstanceContext)entry.getValue();
            return context.getStateMachine().getState() == FragmentInstanceState.FLUSHING && now - context.getStartTime() > QUERY_TIMEOUT_MS;
        }).forEach(entry -> ((FragmentInstanceContext)entry.getValue()).failed(new TimeoutException()));
    }

    private static class InstanceHolder {
        private static final FragmentInstanceManager INSTANCE = new FragmentInstanceManager();

        private InstanceHolder() {
        }
    }
}

