package org.apache.iotdb.db.engine.cq;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.exception.ContinuousQueryException;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
import org.apache.iotdb.db.qp.utils.DateTimeUtils;
import org.apache.iotdb.db.query.dataset.ShowContinuousQueriesResult;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/engine/cq/ContinuousQueryService.class */
public class ContinuousQueryService implements IService {
    private ScheduledExecutorService continuousQueryTaskSubmitThread;
    private CQLogWriter logWriter;
    private static final Logger LOGGER = LoggerFactory.getLogger(ContinuousQueryService.class);
    private static final long SYSTEM_STARTUP_TIME = DateTimeUtils.currentTime();
    private static final ContinuousQueryTaskPoolManager TASK_POOL_MANAGER = ContinuousQueryTaskPoolManager.getInstance();
    private static final long TASK_SUBMIT_CHECK_INTERVAL = IoTDBDescriptor.getInstance().getConfig().getContinuousQueryMinimumEveryInterval() / 2;
    private static final String LOG_FILE_DIR = IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + "cq" + File.separator;
    private static final String LOG_FILE_NAME = LOG_FILE_DIR + "cqlog.bin";
    private static final ContinuousQueryService INSTANCE = new ContinuousQueryService();
    private final ConcurrentHashMap<String, CreateContinuousQueryPlan> continuousQueryPlans = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Long> nextExecutionTimestamps = new ConcurrentHashMap<>();
    private final ReentrantLock registrationLock = new ReentrantLock();

    public void doRecovery() throws StartupException {
        try {
            File file = SystemFileFactory.INSTANCE.getFile(LOG_FILE_DIR);
            if (!file.exists()) {
                file.mkdir();
                return;
            }
            File file2 = SystemFileFactory.INSTANCE.getFile(LOG_FILE_NAME);
            if (file2.exists()) {
                CQLogReader cQLogReader = new CQLogReader(file2);
                while (cQLogReader.hasNext()) {
                    try {
                        PhysicalPlan next = cQLogReader.next();
                        switch (next.getOperatorType()) {
                            case CREATE_CONTINUOUS_QUERY:
                                register((CreateContinuousQueryPlan) next, false);
                                break;
                            case DROP_CONTINUOUS_QUERY:
                                deregister((DropContinuousQueryPlan) next, false);
                                break;
                            default:
                                LOGGER.error("Unrecognizable command {}", next.getOperatorType());
                                break;
                        }
                    } catch (Throwable th) {
                        try {
                            cQLogReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                }
                cQLogReader.close();
            }
        } catch (IOException | ContinuousQueryException e) {
            LOGGER.error("Error occurred during restart CQService");
            throw new StartupException(e);
        }
    }

    @Override // org.apache.iotdb.db.service.IService
    public ServiceType getID() {
        return ServiceType.CONTINUOUS_QUERY_SERVICE;
    }

    @Override // org.apache.iotdb.db.service.IService
    public void start() throws StartupException {
        try {
            doRecovery();
            this.logWriter = new CQLogWriter(LOG_FILE_NAME);
            for (CreateContinuousQueryPlan createContinuousQueryPlan : this.continuousQueryPlans.values()) {
                this.nextExecutionTimestamps.put(createContinuousQueryPlan.getContinuousQueryName(), Long.valueOf(calculateNextExecutionTimestamp(createContinuousQueryPlan, SYSTEM_STARTUP_TIME)));
            }
            this.continuousQueryTaskSubmitThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("CQ-Task-Submit-Thread");
            this.continuousQueryTaskSubmitThread.scheduleAtFixedRate(this::checkAndSubmitTasks, 0L, TASK_SUBMIT_CHECK_INTERVAL, DateTimeUtils.timestampPrecisionStringToTimeUnit(IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision()));
            LOGGER.info("Continuous query service started.");
        } catch (IOException e) {
            throw new StartupException(e);
        }
    }

    private long calculateNextExecutionTimestamp(CreateContinuousQueryPlan createContinuousQueryPlan, long j) {
        long firstExecutionTimeBoundary = createContinuousQueryPlan.getFirstExecutionTimeBoundary() + createContinuousQueryPlan.getForInterval();
        if (j <= firstExecutionTimeBoundary) {
            return firstExecutionTimeBoundary;
        }
        long j2 = j - firstExecutionTimeBoundary;
        long everyInterval = createContinuousQueryPlan.getEveryInterval();
        return firstExecutionTimeBoundary + (everyInterval * ((j2 / everyInterval) + (j2 % everyInterval == 0 ? 0 : 1)));
    }

    private void checkAndSubmitTasks() {
        long j;
        long currentTime = DateTimeUtils.currentTime();
        for (CreateContinuousQueryPlan createContinuousQueryPlan : this.continuousQueryPlans.values()) {
            long longValue = this.nextExecutionTimestamps.get(createContinuousQueryPlan.getContinuousQueryName()).longValue();
            while (true) {
                j = longValue;
                if (currentTime >= j) {
                    TASK_POOL_MANAGER.submit(new ContinuousQueryTask(createContinuousQueryPlan, j));
                    longValue = j + createContinuousQueryPlan.getEveryInterval();
                }
            }
            this.nextExecutionTimestamps.replace(createContinuousQueryPlan.getContinuousQueryName(), Long.valueOf(j));
        }
    }

    @Override // org.apache.iotdb.db.service.IService
    public void stop() {
        try {
            if (this.continuousQueryTaskSubmitThread != null) {
                this.continuousQueryTaskSubmitThread.shutdown();
                try {
                    this.continuousQueryTaskSubmitThread.awaitTermination(600L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    LOGGER.warn("Check thread still doesn't exit after 60s");
                    this.continuousQueryTaskSubmitThread.shutdownNow();
                    Thread.currentThread().interrupt();
                }
            }
            this.continuousQueryPlans.clear();
            if (this.logWriter != null) {
                this.logWriter.close();
                this.logWriter = null;
            }
        } catch (IOException e2) {
            LOGGER.warn("Something wrong occurred While stopping CQService: {}", e2.getMessage());
        }
    }

    public void acquireRegistrationLock() {
        this.registrationLock.lock();
    }

    public void releaseRegistrationLock() {
        this.registrationLock.unlock();
    }

    public boolean register(CreateContinuousQueryPlan createContinuousQueryPlan, boolean z) throws ContinuousQueryException {
        if (this.continuousQueryPlans.containsKey(createContinuousQueryPlan.getContinuousQueryName())) {
            throw new ContinuousQueryException(String.format("Continuous Query [%s] already exists", createContinuousQueryPlan.getContinuousQueryName()));
        }
        if (z) {
            checkSchemaBeforeRegistration(createContinuousQueryPlan);
        }
        acquireRegistrationLock();
        if (z) {
            try {
                try {
                    this.logWriter.createContinuousQuery(createContinuousQueryPlan);
                } catch (Exception e) {
                    throw new ContinuousQueryException(e.getMessage());
                }
            } catch (Throwable th) {
                releaseRegistrationLock();
                throw th;
            }
        }
        doRegister(createContinuousQueryPlan);
        releaseRegistrationLock();
        return true;
    }

    private void checkSchemaBeforeRegistration(CreateContinuousQueryPlan createContinuousQueryPlan) throws ContinuousQueryException {
        try {
            new ContinuousQuerySchemaCheckTask(createContinuousQueryPlan, createContinuousQueryPlan.getFirstExecutionTimeBoundary()).run();
        } catch (Exception e) {
            throw new ContinuousQueryException("Failed to create continuous query task.", e);
        }
    }

    private void doRegister(CreateContinuousQueryPlan createContinuousQueryPlan) {
        this.continuousQueryPlans.put(createContinuousQueryPlan.getContinuousQueryName(), createContinuousQueryPlan);
        this.nextExecutionTimestamps.put(createContinuousQueryPlan.getContinuousQueryName(), Long.valueOf(calculateNextExecutionTimestamp(createContinuousQueryPlan, DateTimeUtils.currentTime())));
    }

    public void deregisterAll() throws ContinuousQueryException {
        Iterator it = this.continuousQueryPlans.keySet().iterator();
        while (it.hasNext()) {
            deregister(new DropContinuousQueryPlan((String) it.next()), false);
        }
    }

    public boolean deregister(DropContinuousQueryPlan dropContinuousQueryPlan, boolean z) throws ContinuousQueryException {
        if (!this.continuousQueryPlans.containsKey(dropContinuousQueryPlan.getContinuousQueryName())) {
            throw new ContinuousQueryException(String.format("Continuous Query [%s] does not exist", dropContinuousQueryPlan.getContinuousQueryName()));
        }
        acquireRegistrationLock();
        try {
            if (z) {
                try {
                    this.logWriter.dropContinuousQuery(dropContinuousQueryPlan);
                } catch (Exception e) {
                    throw new ContinuousQueryException(e.getMessage());
                }
            }
            doDeregister(dropContinuousQueryPlan);
            releaseRegistrationLock();
            return true;
        } catch (Throwable th) {
            releaseRegistrationLock();
            throw th;
        }
    }

    private void doDeregister(DropContinuousQueryPlan dropContinuousQueryPlan) {
        this.continuousQueryPlans.remove(dropContinuousQueryPlan.getContinuousQueryName());
        this.nextExecutionTimestamps.remove(dropContinuousQueryPlan.getContinuousQueryName());
    }

    public List<ShowContinuousQueriesResult> getShowContinuousQueriesResultList() {
        ArrayList arrayList = new ArrayList(this.continuousQueryPlans.size());
        for (CreateContinuousQueryPlan createContinuousQueryPlan : this.continuousQueryPlans.values()) {
            arrayList.add(new ShowContinuousQueriesResult(createContinuousQueryPlan.getQuerySql(), createContinuousQueryPlan.getContinuousQueryName(), createContinuousQueryPlan.getTargetPath(), createContinuousQueryPlan.getEveryInterval(), createContinuousQueryPlan.getForInterval(), createContinuousQueryPlan.getFirstExecutionTimeBoundary()));
        }
        return arrayList;
    }

    private ContinuousQueryService() {
    }

    public static ContinuousQueryService getInstance() {
        return INSTANCE;
    }
}
