package org.apache.iotdb.db.queryengine.plan.scheduler.load;

import io.airlift.units.Duration;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.StorageExecutor;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.LoadReadOnlyException;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
import org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo;
import org.apache.iotdb.db.queryengine.execution.load.ChunkData;
import org.apache.iotdb.db.queryengine.execution.load.TsFileData;
import org.apache.iotdb.db.queryengine.execution.load.TsFileSplitter;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.class */
public class LoadTsFileScheduler implements IScheduler {
    public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 900;
    private static final long MAX_MEMORY_SIZE;
    private final MPPQueryContext queryContext;
    private final QueryStateMachine stateMachine;
    private final LoadTsFileDispatcherImpl dispatcher;
    private final DataPartitionBatchFetcher partitionFetcher;
    private final PlanFragmentId fragmentId;
    private final boolean isGeneratedByPipe;
    private static final Logger logger = LoggerFactory.getLogger(LoadTsFileScheduler.class);
    private static final int TRANSMIT_LIMIT = CommonDescriptor.getInstance().getConfig().getTTimePartitionSlotTransmitLimit();
    private final List<LoadSingleTsFileNode> tsFileNodeList = new ArrayList();
    private final Set<TRegionReplicaSet> allReplicaSets = new HashSet();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler$DataPartitionBatchFetcher.class */
    public static class DataPartitionBatchFetcher {
        private final IPartitionFetcher fetcher;

        public DataPartitionBatchFetcher(IPartitionFetcher iPartitionFetcher) {
            this.fetcher = iPartitionFetcher;
        }

        public List<TRegionReplicaSet> queryDataPartition(List<Pair<String, TTimePartitionSlot>> list, String str) {
            ArrayList arrayList = new ArrayList();
            int size = list.size();
            int i = 0;
            while (true) {
                int i2 = i;
                if (i2 >= size) {
                    return arrayList;
                }
                List<Pair<String, TTimePartitionSlot>> subList = list.subList(i2, Math.min(size, i2 + LoadTsFileScheduler.TRANSMIT_LIMIT));
                DataPartition orCreateDataPartition = this.fetcher.getOrCreateDataPartition(toQueryParam(subList), str);
                arrayList.addAll((Collection) subList.stream().map(pair -> {
                    return orCreateDataPartition.getDataRegionReplicaSetForWriting((String) pair.left, (TTimePartitionSlot) pair.right);
                }).collect(Collectors.toList()));
                i = i2 + LoadTsFileScheduler.TRANSMIT_LIMIT;
            }
        }

        private List<DataPartitionQueryParam> toQueryParam(List<Pair<String, TTimePartitionSlot>> list) {
            return (List) ((Map) list.stream().collect(Collectors.groupingBy((v0) -> {
                return v0.getLeft();
            }, Collectors.mapping((v0) -> {
                return v0.getRight();
            }, Collectors.toSet())))).entrySet().stream().map(entry -> {
                return new DataPartitionQueryParam((String) entry.getKey(), new ArrayList((Collection) entry.getValue()));
            }).collect(Collectors.toList());
        }
    }

    /* loaded from: input_file:org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler$LoadCommand.class */
    public enum LoadCommand {
        EXECUTE,
        ROLLBACK
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler$TsFileDataManager.class */
    public static class TsFileDataManager {
        private final LoadTsFileScheduler scheduler;
        private final LoadSingleTsFileNode singleTsFileNode;
        private long dataSize = 0;
        private final Map<TRegionReplicaSet, LoadTsFilePieceNode> replicaSet2Piece = new HashMap();
        private final List<ChunkData> nonDirectionalChunkData = new ArrayList();

        public TsFileDataManager(LoadTsFileScheduler loadTsFileScheduler, LoadSingleTsFileNode loadSingleTsFileNode) {
            this.scheduler = loadTsFileScheduler;
            this.singleTsFileNode = loadSingleTsFileNode;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean addOrSendTsFileData(TsFileData tsFileData) {
            return tsFileData.isModification() ? addOrSendDeletionData(tsFileData) : addOrSendChunkData((ChunkData) tsFileData);
        }

        private boolean addOrSendChunkData(ChunkData chunkData) {
            this.nonDirectionalChunkData.add(chunkData);
            this.dataSize += chunkData.getDataSize();
            if (this.dataSize <= LoadTsFileScheduler.MAX_MEMORY_SIZE) {
                return true;
            }
            routeChunkData();
            for (TRegionReplicaSet tRegionReplicaSet : (List) this.replicaSet2Piece.keySet().stream().sorted(Comparator.comparingLong(obj -> {
                return this.replicaSet2Piece.get(obj).getDataSize();
            }).reversed()).collect(Collectors.toList())) {
                LoadTsFilePieceNode loadTsFilePieceNode = this.replicaSet2Piece.get(tRegionReplicaSet);
                if (loadTsFilePieceNode.getDataSize() == 0) {
                    return true;
                }
                if (!this.scheduler.dispatchOnePieceNode(loadTsFilePieceNode, tRegionReplicaSet)) {
                    return false;
                }
                this.dataSize -= loadTsFilePieceNode.getDataSize();
                this.replicaSet2Piece.put(tRegionReplicaSet, new LoadTsFilePieceNode(this.singleTsFileNode.getPlanNodeId(), this.singleTsFileNode.getTsFileResource().getTsFile()));
                if (this.dataSize <= LoadTsFileScheduler.MAX_MEMORY_SIZE) {
                    return true;
                }
            }
            return true;
        }

        private void routeChunkData() {
            if (this.nonDirectionalChunkData.isEmpty()) {
                return;
            }
            List<TRegionReplicaSet> queryDataPartition = this.scheduler.partitionFetcher.queryDataPartition((List) this.nonDirectionalChunkData.stream().map(chunkData -> {
                return new Pair(chunkData.getDevice(), chunkData.getTimePartitionSlot());
            }).collect(Collectors.toList()), this.scheduler.queryContext.getSession().getUserName());
            IntStream.range(0, this.nonDirectionalChunkData.size()).forEach(i -> {
                this.replicaSet2Piece.computeIfAbsent((TRegionReplicaSet) queryDataPartition.get(i), tRegionReplicaSet -> {
                    return new LoadTsFilePieceNode(this.singleTsFileNode.getPlanNodeId(), this.singleTsFileNode.getTsFileResource().getTsFile());
                }).addTsFileData(this.nonDirectionalChunkData.get(i));
            });
            this.nonDirectionalChunkData.clear();
        }

        private boolean addOrSendDeletionData(TsFileData tsFileData) {
            routeChunkData();
            for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : this.replicaSet2Piece.entrySet()) {
                this.dataSize += tsFileData.getDataSize();
                entry.getValue().addTsFileData(tsFileData);
            }
            return true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean sendAllTsFileData() {
            routeChunkData();
            for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : this.replicaSet2Piece.entrySet()) {
                if (!this.scheduler.dispatchOnePieceNode(entry.getValue(), entry.getKey())) {
                    LoadTsFileScheduler.logger.warn("Dispatch piece node {} of TsFile {} error.", entry.getValue(), this.singleTsFileNode.getTsFileResource().getTsFile());
                    return false;
                }
            }
            return true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void clear() {
            this.replicaSet2Piece.clear();
        }
    }

    public LoadTsFileScheduler(DistributedQueryPlan distributedQueryPlan, MPPQueryContext mPPQueryContext, QueryStateMachine queryStateMachine, IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> iClientManager, IPartitionFetcher iPartitionFetcher, boolean z) {
        this.queryContext = mPPQueryContext;
        this.stateMachine = queryStateMachine;
        this.fragmentId = distributedQueryPlan.getRootSubPlan().getPlanFragment().getId();
        this.dispatcher = new LoadTsFileDispatcherImpl(iClientManager, z);
        this.partitionFetcher = new DataPartitionBatchFetcher(iPartitionFetcher);
        this.isGeneratedByPipe = z;
        Iterator<FragmentInstance> it = distributedQueryPlan.getInstances().iterator();
        while (it.hasNext()) {
            this.tsFileNodeList.add((LoadSingleTsFileNode) it.next().getFragment().getPlanNodeTree());
        }
    }

    @Override // org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler
    public void start() {
        this.stateMachine.transitionToRunning();
        int size = this.tsFileNodeList.size();
        boolean z = true;
        for (int i = 0; i < size; i++) {
            LoadSingleTsFileNode loadSingleTsFileNode = this.tsFileNodeList.get(i);
            boolean z2 = true;
            try {
                if (loadSingleTsFileNode.isTsFileEmpty()) {
                    logger.info("Load skip TsFile {}, because it has no data.", loadSingleTsFileNode.getTsFileResource().getTsFilePath());
                } else if (loadSingleTsFileNode.needDecodeTsFile(list -> {
                    return this.partitionFetcher.queryDataPartition(list, this.queryContext.getSession().getUserName());
                })) {
                    String uuid = UUID.randomUUID().toString();
                    this.dispatcher.setUuid(uuid);
                    this.allReplicaSets.clear();
                    boolean firstPhase = firstPhase(loadSingleTsFileNode);
                    boolean secondPhase = secondPhase(firstPhase, uuid, loadSingleTsFileNode.getTsFileResource().getTsFile());
                    loadSingleTsFileNode.clean();
                    if (!firstPhase || !secondPhase) {
                        z2 = false;
                    }
                } else {
                    z2 = loadLocally(loadSingleTsFileNode);
                    loadSingleTsFileNode.clean();
                }
                if (z2) {
                    logger.info("Load TsFile {} Successfully, load process [{}/{}]", new Object[]{loadSingleTsFileNode.getTsFileResource().getTsFilePath(), Integer.valueOf(i + 1), Integer.valueOf(size)});
                } else {
                    z = false;
                    logger.warn("Can not Load TsFile {}, load process [{}/{}]", new Object[]{loadSingleTsFileNode.getTsFileResource().getTsFilePath(), Integer.valueOf(i + 1), Integer.valueOf(size)});
                }
            } catch (Exception e) {
                z = false;
                this.stateMachine.transitionToFailed(e);
                logger.warn(String.format("LoadTsFileScheduler loads TsFile %s error", loadSingleTsFileNode.getTsFileResource().getTsFilePath()), e);
            }
        }
        if (z) {
            this.stateMachine.transitionToFinished();
        }
    }

    private boolean firstPhase(LoadSingleTsFileNode loadSingleTsFileNode) {
        TsFileDataManager tsFileDataManager = new TsFileDataManager(this, loadSingleTsFileNode);
        try {
            try {
                File tsFile = loadSingleTsFileNode.getTsFileResource().getTsFile();
                Objects.requireNonNull(tsFileDataManager);
                new TsFileSplitter(tsFile, tsFileData -> {
                    return Boolean.valueOf(tsFileDataManager.addOrSendTsFileData(tsFileData));
                }).splitTsFileByDataPartition();
                if (tsFileDataManager.sendAllTsFileData()) {
                    tsFileDataManager.clear();
                    return true;
                }
                this.stateMachine.transitionToFailed(new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()));
                tsFileDataManager.clear();
                return false;
            } catch (IllegalStateException e) {
                this.stateMachine.transitionToFailed(e);
                logger.warn(String.format("Dispatch TsFileData error when parsing TsFile %s.", loadSingleTsFileNode.getTsFileResource().getTsFile()), e);
                tsFileDataManager.clear();
                return false;
            } catch (Exception e2) {
                this.stateMachine.transitionToFailed(e2);
                logger.warn(String.format("Parse or send TsFile %s error.", loadSingleTsFileNode.getTsFileResource().getTsFile()), e2);
                tsFileDataManager.clear();
                return false;
            }
        } catch (Throwable th) {
            tsFileDataManager.clear();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean dispatchOnePieceNode(LoadTsFilePieceNode loadTsFilePieceNode, TRegionReplicaSet tRegionReplicaSet) {
        this.allReplicaSets.add(tRegionReplicaSet);
        FragmentInstance fragmentInstance = new FragmentInstance(new PlanFragment(this.fragmentId, loadTsFilePieceNode), this.fragmentId.genFragmentInstanceId(), null, this.queryContext.getQueryType(), this.queryContext.getTimeOut(), this.queryContext.getSession());
        fragmentInstance.setExecutorAndHost(new StorageExecutor(tRegionReplicaSet));
        Future<FragInstanceDispatchResult> dispatch = this.dispatcher.dispatch(Collections.singletonList(fragmentInstance));
        try {
            FragInstanceDispatchResult fragInstanceDispatchResult = dispatch.get(900L, TimeUnit.SECONDS);
            if (fragInstanceDispatchResult.isSuccessful()) {
                return true;
            }
            logger.warn("Dispatch one piece to ReplicaSet {} error. Result status code {}. Result status message {}. Dispatch piece node error:%n{}", new Object[]{tRegionReplicaSet, TSStatusCode.representOf(fragInstanceDispatchResult.getFailureStatus().getCode()).name(), fragInstanceDispatchResult.getFailureStatus().getMessage(), loadTsFilePieceNode});
            if (fragInstanceDispatchResult.getFailureStatus().getSubStatus() != null) {
                for (TSStatus tSStatus : fragInstanceDispatchResult.getFailureStatus().getSubStatus()) {
                    logger.warn("Sub status code {}. Sub status message {}.", TSStatusCode.representOf(tSStatus.getCode()).name(), tSStatus.getMessage());
                }
            }
            TSStatus failureStatus = fragInstanceDispatchResult.getFailureStatus();
            failureStatus.setMessage(String.format("Load %s piece error in 1st phase. Because ", loadTsFilePieceNode.getTsFile()) + failureStatus.getMessage());
            this.stateMachine.transitionToFailed(failureStatus);
            return false;
        } catch (InterruptedException | CancellationException | ExecutionException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            logger.warn("Interrupt or Execution error.", e);
            this.stateMachine.transitionToFailed(e);
            return false;
        } catch (TimeoutException e2) {
            dispatch.cancel(true);
            logger.warn(String.format("Wait for loading %s time out.", LoadTsFilePieceNode.class.getName()), e2);
            this.stateMachine.transitionToFailed(e2);
            return false;
        }
    }

    private boolean secondPhase(boolean z, String str, File file) {
        logger.info("Start dispatching Load command for uuid {}", str);
        TLoadCommandReq tLoadCommandReq = new TLoadCommandReq((z ? LoadCommand.EXECUTE : LoadCommand.ROLLBACK).ordinal(), str);
        tLoadCommandReq.setIsGeneratedByPipe(this.isGeneratedByPipe);
        try {
            FragInstanceDispatchResult fragInstanceDispatchResult = this.dispatcher.dispatchCommand(tLoadCommandReq, this.allReplicaSets).get();
            if (fragInstanceDispatchResult.isSuccessful()) {
                return true;
            }
            logger.warn("Dispatch load command {} of TsFile {} error to replicaSets {} error. Result status code {}. Result status message {}.", new Object[]{tLoadCommandReq, file, this.allReplicaSets, TSStatusCode.representOf(fragInstanceDispatchResult.getFailureStatus().getCode()).name(), fragInstanceDispatchResult.getFailureStatus().getMessage()});
            TSStatus failureStatus = fragInstanceDispatchResult.getFailureStatus();
            failureStatus.setMessage(String.format("Load %s error in 2nd phase. Because ", file) + failureStatus.getMessage());
            this.stateMachine.transitionToFailed(failureStatus);
            return false;
        } catch (InterruptedException | ExecutionException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            logger.warn("Interrupt or Execution error.", e);
            this.stateMachine.transitionToFailed(e);
            return false;
        }
    }

    private boolean loadLocally(LoadSingleTsFileNode loadSingleTsFileNode) throws IoTDBException {
        logger.info("Start load TsFile {} locally.", loadSingleTsFileNode.getTsFileResource().getTsFile().getPath());
        if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
            throw new LoadReadOnlyException();
        }
        try {
            FragmentInstance fragmentInstance = new FragmentInstance(new PlanFragment(this.fragmentId, loadSingleTsFileNode), this.fragmentId.genFragmentInstanceId(), null, this.queryContext.getQueryType(), this.queryContext.getTimeOut(), this.queryContext.getSession());
            fragmentInstance.setExecutorAndHost(new StorageExecutor(loadSingleTsFileNode.getLocalRegionReplicaSet()));
            this.dispatcher.dispatchLocally(fragmentInstance);
            DataRegion dataRegion = StorageEngine.getInstance().getDataRegion((DataRegionId) ConsensusGroupId.Factory.createFromTConsensusGroupId(loadSingleTsFileNode.getLocalRegionReplicaSet().getRegionId()));
            MetricService.getInstance().count(loadSingleTsFileNode.getWritePointCount(), Metric.QUANTITY.toString(), MetricLevel.CORE, new String[]{Tag.NAME.toString(), Metric.POINTS_IN.toString(), Tag.DATABASE.toString(), dataRegion.getDatabaseName(), Tag.REGION.toString(), dataRegion.getDataRegionId()});
            return true;
        } catch (FragmentInstanceDispatchException e) {
            logger.warn(String.format("Dispatch tsFile %s error to local error. Result status code %s. Result status message %s.", loadSingleTsFileNode.getTsFileResource().getTsFile(), TSStatusCode.representOf(e.getFailureStatus().getCode()).name(), e.getFailureStatus().getMessage()));
            this.stateMachine.transitionToFailed(e.getFailureStatus());
            return false;
        }
    }

    @Override // org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler
    public void stop(Throwable th) {
    }

    @Override // org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler
    public Duration getTotalCpuTime() {
        return null;
    }

    @Override // org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler
    public FragmentInfo getFragmentInfo() {
        return null;
    }

    static {
        MAX_MEMORY_SIZE = Math.min(r0.getThriftMaxFrameSize() >> 2, (long) (r0.getAllocateMemoryForStorageEngine() * IoTDBDescriptor.getInstance().getConfig().getLoadTsFileProportion()));
    }
}
