/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.pipe.receiver.protocol.thrift;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.connector.PipeReceiverStatusHandler;
import org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV1;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFileSealReqV2;
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferPlanNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaSnapshotSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionSnapshotEvent;
import org.apache.iotdb.db.pipe.receiver.visitor.PipePlanToStatementVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementExceptionVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor;
import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementToBatchVisitor;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.db.tools.schema.SRStatementGenerator;
import org.apache.iotdb.db.tools.schema.SchemaRegionSnapshotParser;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IoTDBDataNodeReceiver
extends IoTDBFileReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataNodeReceiver.class);
    private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
    private static final String[] RECEIVER_FILE_BASE_DIRS = IOTDB_CONFIG.getPipeReceiverFileDirs();
    private static FolderManager folderManager = null;
    public static final PipePlanToStatementVisitor PLAN_TO_STATEMENT_VISITOR = new PipePlanToStatementVisitor();
    private static final PipeStatementTSStatusVisitor STATEMENT_STATUS_VISITOR = new PipeStatementTSStatusVisitor();
    private static final PipeStatementExceptionVisitor STATEMENT_EXCEPTION_VISITOR = new PipeStatementExceptionVisitor();
    private final PipeStatementToBatchVisitor batchVisitor = new PipeStatementToBatchVisitor();
    private static final AtomicLong CONFIG_RECEIVER_ID_GENERATOR = new AtomicLong(0L);
    protected final AtomicReference<String> configReceiverId = new AtomicReference();

    public synchronized TPipeTransferResp receive(TPipeTransferReq req) {
        try {
            short rawRequestType = req.getType();
            if (PipeRequestType.isValidatedRequestType((short)rawRequestType)) {
                switch (PipeRequestType.valueOf((short)rawRequestType)) {
                    case HANDSHAKE_DATANODE_V1: {
                        return this.handleTransferHandshakeV1(PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req));
                    }
                    case HANDSHAKE_DATANODE_V2: {
                        return this.handleTransferHandshakeV2(PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req));
                    }
                    case TRANSFER_TABLET_INSERT_NODE: {
                        return this.handleTransferTabletInsertNode(PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req));
                    }
                    case TRANSFER_TABLET_RAW: {
                        return this.handleTransferTabletRaw(PipeTransferTabletRawReq.fromTPipeTransferReq(req));
                    }
                    case TRANSFER_TABLET_BINARY: {
                        return this.handleTransferTabletBinary(PipeTransferTabletBinaryReq.fromTPipeTransferReq(req));
                    }
                    case TRANSFER_TABLET_BATCH: {
                        return this.handleTransferTabletBatch(PipeTransferTabletBatchReq.fromTPipeTransferReq(req));
                    }
                    case TRANSFER_TS_FILE_PIECE: {
                        return this.handleTransferFilePiece(PipeTransferTsFilePieceReq.fromTPipeTransferReq(req), req instanceof AirGapPseudoTPipeTransferRequest, true);
                    }
                    case TRANSFER_TS_FILE_SEAL: {
                        return this.handleTransferFileSealV1(PipeTransferTsFileSealReq.fromTPipeTransferReq(req));
                    }
                    case TRANSFER_TS_FILE_PIECE_WITH_MOD: {
                        return this.handleTransferFilePiece(PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq(req), req instanceof AirGapPseudoTPipeTransferRequest, false);
                    }
                    case TRANSFER_TS_FILE_SEAL_WITH_MOD: {
                        return this.handleTransferFileSealV2(PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req));
                    }
                    case TRANSFER_SCHEMA_PLAN: {
                        return this.handleTransferSchemaPlan(PipeTransferPlanNodeReq.fromTPipeTransferReq(req));
                    }
                    case TRANSFER_SCHEMA_SNAPSHOT_PIECE: {
                        return this.handleTransferFilePiece(PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq(req), req instanceof AirGapPseudoTPipeTransferRequest, false);
                    }
                    case TRANSFER_SCHEMA_SNAPSHOT_SEAL: {
                        return this.handleTransferFileSealV2(PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req));
                    }
                    case HANDSHAKE_CONFIGNODE_V1: 
                    case HANDSHAKE_CONFIGNODE_V2: 
                    case TRANSFER_CONFIG_PLAN: 
                    case TRANSFER_CONFIG_SNAPSHOT_PIECE: 
                    case TRANSFER_CONFIG_SNAPSHOT_SEAL: {
                        return this.handleTransferConfigPlan(req);
                    }
                }
            }
            TSStatus status = RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_TYPE_ERROR, (String)String.format("Unknown PipeRequestType %s.", rawRequestType));
            LOGGER.warn("Receiver id = {}: Unknown PipeRequestType, response status = {}.", (Object)this.receiverId.get(), (Object)status);
            return new TPipeTransferResp(status);
        }
        catch (Exception e) {
            String error = String.format("Exception %s encountered while handling request %s.", e.getMessage(), req);
            LOGGER.warn("Receiver id = {}: {}", new Object[]{this.receiverId.get(), error, e});
            return new TPipeTransferResp(RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_ERROR, (String)error));
        }
    }

    private TPipeTransferResp handleTransferTabletInsertNode(PipeTransferTabletInsertNodeReq req) {
        InsertBaseStatement statement = req.constructStatement();
        return new TPipeTransferResp(statement.isEmpty() ? RpcUtils.SUCCESS_STATUS : this.executeStatementAndClassifyExceptions(statement));
    }

    private TPipeTransferResp handleTransferTabletBinary(PipeTransferTabletBinaryReq req) {
        InsertBaseStatement statement = req.constructStatement();
        return new TPipeTransferResp(statement.isEmpty() ? RpcUtils.SUCCESS_STATUS : this.executeStatementAndClassifyExceptions(statement));
    }

    private TPipeTransferResp handleTransferTabletRaw(PipeTransferTabletRawReq req) {
        InsertTabletStatement statement = req.constructStatement();
        return new TPipeTransferResp(statement.isEmpty() ? RpcUtils.SUCCESS_STATUS : this.executeStatementAndClassifyExceptions(statement));
    }

    private TPipeTransferResp handleTransferTabletBatch(PipeTransferTabletBatchReq req) {
        Pair<InsertRowsStatement, InsertMultiTabletsStatement> statementPair = req.constructStatements();
        return new TPipeTransferResp(PipeReceiverStatusHandler.getPriorStatus(Stream.of(((InsertRowsStatement)statementPair.getLeft()).isEmpty() ? RpcUtils.SUCCESS_STATUS : this.executeStatementAndClassifyExceptions((Statement)statementPair.getLeft()), ((InsertMultiTabletsStatement)statementPair.getRight()).isEmpty() ? RpcUtils.SUCCESS_STATUS : this.executeStatementAndClassifyExceptions((Statement)statementPair.getRight())).collect(Collectors.toList())));
    }

    protected String getClusterId() {
        return IoTDBDescriptor.getInstance().getConfig().getClusterId();
    }

    protected String getReceiverFileBaseDir() throws DiskSpaceInsufficientException {
        return Objects.isNull(folderManager) ? null : folderManager.getNextFolder();
    }

    protected TSStatus loadFileV1(PipeTransferFileSealReqV1 req, String fileAbsolutePath) throws FileNotFoundException {
        return this.loadTsFile(fileAbsolutePath);
    }

    protected TSStatus loadFileV2(PipeTransferFileSealReqV2 req, List<String> fileAbsolutePaths) throws IOException, IllegalPathException {
        return req instanceof PipeTransferTsFileSealWithModReq ? this.loadTsFile(fileAbsolutePaths.get(1)) : this.loadSchemaSnapShot(req.getParameters(), fileAbsolutePaths);
    }

    private TSStatus loadTsFile(String fileAbsolutePath) throws FileNotFoundException {
        LoadTsFileStatement statement = new LoadTsFileStatement(fileAbsolutePath);
        statement.setDeleteAfterLoad(true);
        statement.setVerifySchema(true);
        statement.setAutoCreateDatabase(false);
        return this.executeStatementAndClassifyExceptions(statement);
    }

    private TSStatus loadSchemaSnapShot(Map<String, String> parameters, List<String> fileAbsolutePaths) throws IllegalPathException, IOException {
        SRStatementGenerator generator = SchemaRegionSnapshotParser.translate2Statements(Paths.get(fileAbsolutePaths.get(0), new String[0]), fileAbsolutePaths.size() > 1 ? Paths.get(fileAbsolutePaths.get(1), new String[0]) : null, new PartialPath(parameters.get("Database")));
        Set<StatementType> executionTypes = PipeSchemaRegionSnapshotEvent.getStatementTypeSet(parameters.get("Type"));
        this.batchVisitor.clear();
        ArrayList results = new ArrayList();
        while (generator.hasNext()) {
            Statement originalStatement = generator.next();
            if (!executionTypes.contains((Object)originalStatement.getType())) continue;
            ((Optional)this.batchVisitor.process(originalStatement, null)).ifPresent(statement -> results.add(this.executeStatementAndClassifyExceptions((Statement)statement)));
        }
        this.batchVisitor.getRemainBatches().stream().filter(Optional::isPresent).forEach(statement -> results.add(this.executeStatementAndClassifyExceptions((Statement)statement.get())));
        return PipeReceiverStatusHandler.getPriorStatus(results);
    }

    private TPipeTransferResp handleTransferSchemaPlan(PipeTransferPlanNodeReq req) {
        return req.getPlanNode() instanceof AlterLogicalViewNode ? new TPipeTransferResp(ClusterConfigTaskExecutor.getInstance().alterLogicalViewByPipe((AlterLogicalViewNode)req.getPlanNode())) : new TPipeTransferResp(this.executeStatementAndClassifyExceptions((Statement)PLAN_TO_STATEMENT_VISITOR.process(req.getPlanNode(), null)));
    }

    private TPipeTransferResp handleTransferConfigPlan(TPipeTransferReq req) {
        return ClusterConfigTaskExecutor.getInstance().handleTransferConfigPlan(this.getConfigReceiverId(), req);
    }

    private String getConfigReceiverId() {
        if (Objects.isNull(this.configReceiverId.get())) {
            this.configReceiverId.set(IoTDBDescriptor.getInstance().getConfig().getDataNodeId() + "_" + PipeAgent.runtime().getRebootTimes() + "_" + CONFIG_RECEIVER_ID_GENERATOR.incrementAndGet());
        }
        return this.configReceiverId.get();
    }

    private TSStatus executeStatementAndClassifyExceptions(Statement statement) {
        try {
            TSStatus result = this.executeStatement(statement);
            if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                return result;
            }
            LOGGER.warn("Receiver id = {}: Failure status encountered while executing statement {}: {}", new Object[]{this.receiverId.get(), statement, result});
            return statement.accept(STATEMENT_STATUS_VISITOR, result);
        }
        catch (Exception e) {
            LOGGER.warn("Receiver id = {}: Exception encountered while executing statement {}: ", new Object[]{this.receiverId.get(), statement, e});
            return statement.accept(STATEMENT_EXCEPTION_VISITOR, e);
        }
    }

    private TSStatus executeStatement(Statement statement) {
        if (statement == null) {
            return RpcUtils.getStatus((TSStatusCode)TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, (String)"Execute null statement.");
        }
        statement = new PipeEnrichedStatement(statement);
        ExecutionResult result = Coordinator.getInstance().executeForTreeModel(statement, SessionManager.getInstance().requestQueryId(), new SessionInfo(0L, AuthorityChecker.SUPER_USER, ZoneId.systemDefault()), "", ClusterPartitionFetcher.getInstance(), ClusterSchemaFetcher.getInstance(), IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
        return result.status;
    }

    public synchronized void handleExit() {
        if (Objects.nonNull(this.configReceiverId.get())) {
            try {
                ClusterConfigTaskExecutor.getInstance().handlePipeConfigClientExit(this.configReceiverId.get());
            }
            catch (Exception e) {
                LOGGER.warn("Failed to handle config client (id = {}) exit", (Object)this.configReceiverId.get(), (Object)e);
            }
        }
        super.handleExit();
    }

    static {
        try {
            folderManager = new FolderManager(Arrays.asList(RECEIVER_FILE_BASE_DIRS), DirectoryStrategyType.SEQUENCE_STRATEGY);
        }
        catch (DiskSpaceInsufficientException e) {
            LOGGER.error("Fail to create pipe receiver file folders allocation strategy because all disks of folders are full.", (Throwable)((Object)e));
        }
    }
}

