package org.apache.iotdb.db.pipe.connector.v1;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.mpp.transformation.dag.column.unary.scalar.SubStringFunctionColumnTransformer;
import org.apache.iotdb.db.pipe.agent.receiver.IoTDBThriftReceiver;
import org.apache.iotdb.db.pipe.connector.IoTDBThriftConnectorRequestVersion;
import org.apache.iotdb.db.pipe.connector.v1.reponse.PipeTransferFilePieceResp;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFilePieceReq;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferFileSealReq;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferHandshakeReq;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferTabletReq;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/v1/IoTDBThriftReceiverV1.class */
public class IoTDBThriftReceiverV1 implements IoTDBThriftReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftReceiverV1.class);
    private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
    private static final String RECEIVER_FILE_DIR = IOTDB_CONFIG.getPipeReceiverFileDir();
    private File writingFile;
    private RandomAccessFile writingFileWriter;

    @Override // org.apache.iotdb.db.pipe.agent.receiver.IoTDBThriftReceiver
    public synchronized TPipeTransferResp receive(TPipeTransferReq tPipeTransferReq, IPartitionFetcher iPartitionFetcher, ISchemaFetcher iSchemaFetcher) {
        short type = tPipeTransferReq.getType();
        if (PipeRequestType.isValidatedRequestType(type)) {
            switch (PipeRequestType.valueOf(type)) {
                case HANDSHAKE:
                    return handleTransferHandshake(PipeTransferHandshakeReq.fromTPipeTransferReq(tPipeTransferReq));
                case TRANSFER_INSERT_NODE:
                    return handleTransferInsertNode(PipeTransferInsertNodeReq.fromTPipeTransferReq(tPipeTransferReq), iPartitionFetcher, iSchemaFetcher);
                case TRANSFER_TABLET:
                    return handleTransferTablet(PipeTransferTabletReq.fromTPipeTransferReq(tPipeTransferReq), iPartitionFetcher, iSchemaFetcher);
                case TRANSFER_FILE_PIECE:
                    return handleTransferFilePiece(PipeTransferFilePieceReq.fromTPipeTransferReq(tPipeTransferReq));
                case TRANSFER_FILE_SEAL:
                    return handleTransferFileSeal(PipeTransferFileSealReq.fromTPipeTransferReq(tPipeTransferReq), iPartitionFetcher, iSchemaFetcher);
            }
        }
        return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_TYPE_ERROR, String.format("Unknown transfer type %s.", Short.valueOf(type))));
    }

    private TPipeTransferResp handleTransferHandshake(PipeTransferHandshakeReq pipeTransferHandshakeReq) {
        if (CommonDescriptor.getInstance().getConfig().getTimestampPrecision().equals(pipeTransferHandshakeReq.getTimestampPrecision())) {
            LOGGER.info("Handshake successfully.");
            return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS);
        }
        String format = String.format("IoTDB receiver's timestamp precision %s, connector's timestamp precision %s. validation fails.", CommonDescriptor.getInstance().getConfig().getTimestampPrecision(), pipeTransferHandshakeReq.getTimestampPrecision());
        LOGGER.warn(format);
        return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_HANDSHAKE_ERROR, format));
    }

    private TPipeTransferResp handleTransferInsertNode(PipeTransferInsertNodeReq pipeTransferInsertNodeReq, IPartitionFetcher iPartitionFetcher, ISchemaFetcher iSchemaFetcher) {
        return new TPipeTransferResp(executeStatement(pipeTransferInsertNodeReq.constructStatement(), iPartitionFetcher, iSchemaFetcher));
    }

    private TPipeTransferResp handleTransferTablet(PipeTransferTabletReq pipeTransferTabletReq, IPartitionFetcher iPartitionFetcher, ISchemaFetcher iSchemaFetcher) {
        InsertTabletStatement constructStatement = pipeTransferTabletReq.constructStatement();
        return new TPipeTransferResp(constructStatement.isEmpty() ? RpcUtils.SUCCESS_STATUS : executeStatement(constructStatement, iPartitionFetcher, iSchemaFetcher));
    }

    private TPipeTransferResp handleTransferFilePiece(PipeTransferFilePieceReq pipeTransferFilePieceReq) {
        try {
            updateWritingFileIfNeeded(pipeTransferFilePieceReq.getFileName());
            if (!isWritingFileOffsetCorrect(pipeTransferFilePieceReq.getStartWritingOffset())) {
                return PipeTransferFilePieceResp.toTPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET, String.format("request sender reset file reader's offset from %s to %s.", Long.valueOf(pipeTransferFilePieceReq.getStartWritingOffset()), Long.valueOf(this.writingFileWriter.length()))), this.writingFileWriter.length());
            }
            this.writingFileWriter.write(pipeTransferFilePieceReq.getFilePiece());
            return PipeTransferFilePieceResp.toTPipeTransferResp(RpcUtils.SUCCESS_STATUS, this.writingFileWriter.length());
        } catch (Exception e) {
            LOGGER.warn(String.format("failed to write file piece from req %s.", pipeTransferFilePieceReq), e);
            TSStatus status = RpcUtils.getStatus(TSStatusCode.PIPE_TRANSFER_FILE_ERROR, String.format("failed to write file piece, because %s", e.getMessage()));
            try {
                return PipeTransferFilePieceResp.toTPipeTransferResp(status, -1L);
            } catch (IOException e2) {
                return PipeTransferFilePieceResp.toTPipeTransferResp(status);
            }
        }
    }

    private void updateWritingFileIfNeeded(String str) throws IOException {
        if (isFileExistedAndNameCorrect(str)) {
            return;
        }
        if (this.writingFileWriter != null) {
            this.writingFileWriter.close();
            this.writingFileWriter = null;
        }
        if (this.writingFile != null && this.writingFile.exists()) {
            if (this.writingFile.delete()) {
                LOGGER.info("original file {} was deleted.", this.writingFile.getPath());
            } else {
                LOGGER.warn("failed to delete original file {}.", this.writingFile.getPath());
            }
            this.writingFile = null;
        }
        File file = new File(RECEIVER_FILE_DIR);
        if (!file.exists()) {
            if (file.mkdirs()) {
                LOGGER.info("receiver file dir {} was created.", file.getPath());
            } else {
                LOGGER.warn("failed to create receiver file dir {}.", file.getPath());
            }
        }
        this.writingFile = new File(RECEIVER_FILE_DIR, str);
        this.writingFileWriter = new RandomAccessFile(this.writingFile, "rw");
        LOGGER.info("start to write transferring file {}.", this.writingFile.getPath());
    }

    private boolean isFileExistedAndNameCorrect(String str) {
        return this.writingFile != null && this.writingFile.getName().equals(str);
    }

    private boolean isWritingFileOffsetCorrect(long j) throws IOException {
        return this.writingFileWriter.length() == j;
    }

    private TPipeTransferResp handleTransferFileSeal(PipeTransferFileSealReq pipeTransferFileSealReq, IPartitionFetcher iPartitionFetcher, ISchemaFetcher iSchemaFetcher) {
        try {
            if (!isWritingFileAvailable()) {
                return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_TRANSFER_FILE_ERROR, String.format("failed to seal file, because writing file %s is not available.", pipeTransferFileSealReq.getFileName())));
            }
            if (!isFileExistedAndNameCorrect(pipeTransferFileSealReq.getFileName())) {
                return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_TRANSFER_FILE_ERROR, String.format("failed to seal file %s, but writing file is %s.", pipeTransferFileSealReq.getFileName(), this.writingFile)));
            }
            if (!isWritingFileOffsetCorrect(pipeTransferFileSealReq.getFileLength())) {
                return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_TRANSFER_FILE_ERROR, String.format("failed to seal file because the length of file is not correct. the original file has length %s, but receiver file has length %s.", Long.valueOf(pipeTransferFileSealReq.getFileLength()), Long.valueOf(this.writingFileWriter.length()))));
            }
            LoadTsFileStatement loadTsFileStatement = new LoadTsFileStatement(this.writingFile.getAbsolutePath());
            this.writingFileWriter.close();
            this.writingFile = null;
            loadTsFileStatement.setDeleteAfterLoad(true);
            loadTsFileStatement.setVerifySchema(true);
            loadTsFileStatement.setAutoCreateDatabase(false);
            return new TPipeTransferResp(executeStatement(loadTsFileStatement, iPartitionFetcher, iSchemaFetcher));
        } catch (IOException e) {
            LOGGER.warn(String.format("failed to seal file %s from req %s.", this.writingFile, pipeTransferFileSealReq), e);
            return new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_TRANSFER_FILE_ERROR, String.format("failed to seal file %s because %s", this.writingFile, e.getMessage())));
        }
    }

    private boolean isWritingFileAvailable() {
        return (this.writingFile == null || !this.writingFile.exists() || this.writingFileWriter == null) ? false : true;
    }

    private TSStatus executeStatement(Statement statement, IPartitionFetcher iPartitionFetcher, ISchemaFetcher iSchemaFetcher) {
        if (statement == null) {
            return RpcUtils.getStatus(TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null statement.");
        }
        ExecutionResult execute = Coordinator.getInstance().execute(statement, SessionManager.getInstance().requestQueryId(), null, SubStringFunctionColumnTransformer.EMPTY_STRING, iPartitionFetcher, iSchemaFetcher, IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
        if (execute.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            LOGGER.warn("failed to execute statement, statement: {}, result status is: {}", statement, execute.status);
        }
        return execute.status;
    }

    @Override // org.apache.iotdb.db.pipe.agent.receiver.IoTDBThriftReceiver
    public synchronized void handleExit() {
        try {
            if (this.writingFileWriter != null) {
                this.writingFileWriter.close();
            }
            if (this.writingFile != null && !this.writingFile.delete()) {
                LOGGER.warn("IoTDBThriftReceiverV1#handleExit: delete file {} error.", this.writingFile.getPath());
            }
        } catch (IOException e) {
            LOGGER.warn("IoTDBThriftReceiverV1#handleExit: meeting errors on handleExit().", e);
        }
    }

    @Override // org.apache.iotdb.db.pipe.agent.receiver.IoTDBThriftReceiver
    public IoTDBThriftConnectorRequestVersion getVersion() {
        return IoTDBThriftConnectorRequestVersion.VERSION_1;
    }
}
