/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.consensus.statemachine;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.db.consensus.statemachine.BaseStateMachine;
import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.snapshot.SnapshotLoader;
import org.apache.iotdb.db.engine.snapshot.SnapshotTaker;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataRegionStateMachine
extends BaseStateMachine {
    private static final Logger logger = LoggerFactory.getLogger(DataRegionStateMachine.class);
    private static final FragmentInstanceManager QUERY_INSTANCE_MANAGER = FragmentInstanceManager.getInstance();
    private DataRegion region;
    private static final int MAX_REQUEST_CACHE_SIZE = 5;
    private static final long CACHE_WINDOW_TIME_IN_MS = 10000L;
    private final Lock queueLock = new ReentrantLock();
    private final Condition queueSortCondition = this.queueLock.newCondition();
    private final PriorityQueue<InsertNodeWrapper> requestCache;
    private long nextSyncIndex = -1L;

    public DataRegionStateMachine(DataRegion region) {
        this.region = region;
        this.requestCache = new PriorityQueue();
    }

    public void start() {
    }

    public void stop() {
    }

    public boolean isReadOnly() {
        return CommonDescriptor.getInstance().getConfig().isReadOnly();
    }

    public boolean takeSnapshot(File snapshotDir) {
        try {
            return new SnapshotTaker(this.region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true);
        }
        catch (Exception e) {
            logger.error("Exception occurs when taking snapshot for {}-{} in {}", new Object[]{this.region.getStorageGroupName(), this.region.getDataRegionId(), snapshotDir, e});
            return false;
        }
    }

    public void loadSnapshot(File latestSnapshotRootDir) {
        DataRegion newRegion = new SnapshotLoader(latestSnapshotRootDir.getAbsolutePath(), this.region.getStorageGroupName(), this.region.getDataRegionId()).loadSnapshotForStateMachine();
        if (newRegion == null) {
            logger.error("Fail to load snapshot from {}", (Object)latestSnapshotRootDir);
            return;
        }
        this.region = newRegion;
        try {
            StorageEngineV2.getInstance().setDataRegion(new DataRegionId(Integer.parseInt(this.region.getDataRegionId())), this.region);
        }
        catch (Exception e) {
            logger.error("Exception occurs when replacing data region in storage engine.", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper insertNodeWrapper) {
        this.queueLock.lock();
        try {
            this.requestCache.add(insertNodeWrapper);
            if (this.requestCache.size() == 5 && this.requestCache.peek().getStartSyncIndex() != insertNodeWrapper.getStartSyncIndex()) {
                this.queueSortCondition.signalAll();
            }
            while (true) {
                if (insertNodeWrapper.getStartSyncIndex() == this.nextSyncIndex) {
                    this.requestCache.remove(insertNodeWrapper);
                    this.nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1L;
                    break;
                }
                if (this.requestCache.size() == 5 && this.requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
                    this.requestCache.remove();
                    this.nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1L;
                    break;
                }
                try {
                    boolean timeout = !this.queueSortCondition.await(10000L, TimeUnit.MILLISECONDS);
                    if (!timeout || this.requestCache.peek().getStartSyncIndex() != insertNodeWrapper.getStartSyncIndex()) continue;
                    logger.info("waiting target request timeout. current index: {}, target index: {}", (Object)insertNodeWrapper.getStartSyncIndex(), (Object)this.nextSyncIndex);
                    this.requestCache.remove(insertNodeWrapper);
                }
                catch (InterruptedException e) {
                    logger.warn("current waiting is interrupted. SyncIndex: {}. Exception: {}", (Object)insertNodeWrapper.getStartSyncIndex(), (Object)e);
                    Thread.currentThread().interrupt();
                    continue;
                }
                break;
            }
            logger.debug("region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}", new Object[]{this.region.getDataRegionId(), this.requestCache.size(), insertNodeWrapper.getStartSyncIndex(), insertNodeWrapper.getEndSyncIndex()});
            LinkedList<TSStatus> subStatus = new LinkedList<TSStatus>();
            for (PlanNode planNode : insertNodeWrapper.getInsertNodes()) {
                subStatus.add(this.write(planNode));
            }
            this.queueSortCondition.signalAll();
            TSStatus tSStatus = new TSStatus().setSubStatus(subStatus);
            return tSStatus;
        }
        finally {
            this.queueLock.unlock();
        }
    }

    private InsertNodeWrapper deserializeAndWrap(BatchIndexedConsensusRequest batchRequest) {
        InsertNodeWrapper insertNodeWrapper = new InsertNodeWrapper(batchRequest.getStartSyncIndex(), batchRequest.getEndSyncIndex());
        for (IndexedConsensusRequest indexedRequest : batchRequest.getRequests()) {
            insertNodeWrapper.add(this.grabInsertNode(indexedRequest));
        }
        return insertNodeWrapper;
    }

    private PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
        ArrayList<InsertNode> insertNodes = new ArrayList<InsertNode>(indexedRequest.getRequests().size());
        for (IConsensusRequest req : indexedRequest.getRequests()) {
            PlanNode planNode = this.getPlanNode(req);
            if (planNode instanceof InsertNode) {
                InsertNode innerNode = (InsertNode)planNode;
                innerNode.setSearchIndex(indexedRequest.getSearchIndex());
                insertNodes.add(innerNode);
                continue;
            }
            if (indexedRequest.getRequests().size() == 1) {
                return planNode;
            }
            throw new IllegalArgumentException("PlanNodes in IndexedConsensusRequest are not InsertNode and the size of requests are larger than 1");
        }
        return this.mergeInsertNodes(insertNodes);
    }

    public List<Path> getSnapshotFiles(File latestSnapshotRootDir) {
        try {
            return new SnapshotLoader(latestSnapshotRootDir.getAbsolutePath(), this.region.getStorageGroupName(), this.region.getDataRegionId()).getSnapshotFileInfo().stream().map(File::toPath).collect(Collectors.toList());
        }
        catch (IOException e) {
            logger.error("Meets error when getting snapshot files for {}-{}", new Object[]{this.region.getStorageGroupName(), this.region.getDataRegionId(), e});
            return null;
        }
    }

    public TSStatus write(IConsensusRequest request) {
        try {
            PlanNode planNode;
            if (request instanceof IndexedConsensusRequest) {
                IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest)request;
                planNode = this.grabInsertNode(indexedRequest);
            } else {
                if (request instanceof BatchIndexedConsensusRequest) {
                    InsertNodeWrapper insertNodeWrapper = this.deserializeAndWrap((BatchIndexedConsensusRequest)request);
                    return this.cacheAndInsertLatestNode(insertNodeWrapper);
                }
                planNode = this.getPlanNode(request);
            }
            return this.write(planNode);
        }
        catch (IllegalArgumentException e) {
            logger.error(e.getMessage(), (Throwable)e);
            return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
        }
    }

    private InsertNode mergeInsertNodes(List<InsertNode> insertNodes) {
        InsertNode result;
        int size = insertNodes.size();
        if (size == 0) {
            throw new RuntimeException();
        }
        if (size == 1) {
            return insertNodes.get(0);
        }
        if (insertNodes.get(0) instanceof InsertTabletNode) {
            ArrayList<Integer> index = new ArrayList<Integer>(size);
            ArrayList<InsertTabletNode> insertTabletNodes = new ArrayList<InsertTabletNode>(size);
            int i = 0;
            for (InsertNode insertNode : insertNodes) {
                insertTabletNodes.add((InsertTabletNode)insertNode);
                index.add(i);
                ++i;
            }
            result = new InsertMultiTabletsNode(insertNodes.get(0).getPlanNodeId(), index, insertTabletNodes);
        } else {
            boolean sameDevice = true;
            PartialPath device = insertNodes.get(0).getDevicePath();
            ArrayList<Integer> index = new ArrayList<Integer>(size);
            ArrayList<InsertRowNode> insertRowNodes = new ArrayList<InsertRowNode>(size);
            int i = 0;
            for (InsertNode insertNode : insertNodes) {
                if (sameDevice && !insertNode.getDevicePath().equals((Object)device)) {
                    sameDevice = false;
                }
                insertRowNodes.add((InsertRowNode)insertNode);
                index.add(i);
                ++i;
            }
            result = sameDevice ? new InsertRowsOfOneDeviceNode(insertNodes.get(0).getPlanNodeId(), index, insertRowNodes) : new InsertRowsNode(insertNodes.get(0).getPlanNodeId(), index, insertRowNodes);
        }
        ((InsertNode)result).setSearchIndex(insertNodes.get(0).getSearchIndex());
        result.setDevicePath(insertNodes.get(0).getDevicePath());
        return result;
    }

    protected TSStatus write(PlanNode planNode) {
        return planNode.accept(new DataExecutionVisitor(), this.region);
    }

    public DataSet read(IConsensusRequest request) {
        FragmentInstance fragmentInstance;
        if (request instanceof GetConsensusReqReaderPlan) {
            return this.region.getWALNode();
        }
        try {
            fragmentInstance = this.getFragmentInstance(request);
        }
        catch (IllegalArgumentException e) {
            logger.error(e.getMessage());
            return null;
        }
        return QUERY_INSTANCE_MANAGER.execDataQueryFragmentInstance(fragmentInstance, this.region);
    }

    private static class InsertNodeWrapper
    implements Comparable<InsertNodeWrapper> {
        private final long startSyncIndex;
        private final long endSyncIndex;
        private final List<PlanNode> insertNodes;

        public InsertNodeWrapper(long startSyncIndex, long endSyncIndex) {
            this.startSyncIndex = startSyncIndex;
            this.endSyncIndex = endSyncIndex;
            this.insertNodes = new LinkedList<PlanNode>();
        }

        @Override
        public int compareTo(InsertNodeWrapper o) {
            return Long.compare(this.startSyncIndex, o.startSyncIndex);
        }

        public void add(PlanNode insertNode) {
            this.insertNodes.add(insertNode);
        }

        public long getStartSyncIndex() {
            return this.startSyncIndex;
        }

        public long getEndSyncIndex() {
            return this.endSyncIndex;
        }

        public List<PlanNode> getInsertNodes() {
            return this.insertNodes;
        }
    }
}

