package org.apache.iotdb.db.pipe.receiver.thrift;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletReq;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftConnectorRequestVersion;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.expression.multi.builtin.helper.SubStringFunctionHelper;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.SubStringFunctionColumnTransformer;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.class */
public class IoTDBThriftReceiverV1 implements IoTDBThriftReceiver {
    private final AtomicReference<File> receiverFileDirWithIdSuffix = new AtomicReference<>();
    private final AtomicLong receiverId = new AtomicLong(0);
    private File writingFile;
    private RandomAccessFile writingFileWriter;
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftReceiverV1.class);
    private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
    private static final String RECEIVER_FILE_BASE_DIR = IOTDB_CONFIG.getPipeReceiverFileDir();
    private static final AtomicLong RECEIVER_ID_GENERATOR = new AtomicLong(0);

    @Override // org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiver
    public synchronized TPipeTransferResp receive(TPipeTransferReq tPipeTransferReq, IPartitionFetcher iPartitionFetcher, ISchemaFetcher iSchemaFetcher) {
        short type = tPipeTransferReq.getType();
        if (PipeRequestType.isValidatedRequestType(type)) {
            switch (PipeRequestType.valueOf(type)) {
                case HANDSHAKE:
                    return handleTransferHandshake(PipeTransferHandshakeReq.fromTPipeTransferReq(tPipeTransferReq));
                case TRANSFER_INSERT_NODE:
                    return handleTransferInsertNode(PipeTransferInsertNodeReq.fromTPipeTransferReq(tPipeTransferReq), iPartitionFetcher, iSchemaFetcher);
                case TRANSFER_TABLET:
                    return handleTransferTablet(PipeTransferTabletReq.fromTPipeTransferReq(tPipeTransferReq), iPartitionFetcher, iSchemaFetcher);
                case TRANSFER_FILE_PIECE:
                    return handleTransferFilePiece(PipeTransferFilePieceReq.fromTPipeTransferReq(tPipeTransferReq));
                case TRANSFER_FILE_SEAL:
                    return handleTransferFileSeal(PipeTransferFileSealReq.fromTPipeTransferReq(tPipeTransferReq), iPartitionFetcher, iSchemaFetcher);
            }
        }
        TSStatus status = RpcUtils.getStatus(TSStatusCode.PIPE_TYPE_ERROR, String.format("Unknown PipeRequestType %s.", Short.valueOf(type)));
        LOGGER.warn("Unknown PipeRequestType, response status = {}.", status);
        return new TPipeTransferResp(status);
    }

    private TPipeTransferResp handleTransferHandshake(PipeTransferHandshakeReq pipeTransferHandshakeReq) {
        if (!CommonDescriptor.getInstance().getConfig().getTimestampPrecision().equals(pipeTransferHandshakeReq.getTimestampPrecision())) {
            TSStatus status = RpcUtils.getStatus(TSStatusCode.PIPE_HANDSHAKE_ERROR, String.format("IoTDB receiver's timestamp precision %s, connector's timestamp precision %s. Validation fails.", CommonDescriptor.getInstance().getConfig().getTimestampPrecision(), pipeTransferHandshakeReq.getTimestampPrecision()));
            LOGGER.warn("Handshake failed, response status = {}.", status);
            return new TPipeTransferResp(status);
        }
        this.receiverId.set(RECEIVER_ID_GENERATOR.incrementAndGet());
        if (this.receiverFileDirWithIdSuffix.get() != null) {
            if (this.receiverFileDirWithIdSuffix.get().exists()) {
                try {
                    Files.delete(this.receiverFileDirWithIdSuffix.get().toPath());
                    LOGGER.info("Original receiver file dir {} was deleted.", this.receiverFileDirWithIdSuffix.get().getPath());
                } catch (IOException e) {
                    LOGGER.warn("Failed to delete original receiver file dir {}, because {}.", this.receiverFileDirWithIdSuffix.get().getPath(), e.getMessage());
                }
            } else {
                LOGGER.info("Original receiver file dir {} is not existed. No need to delete.", this.receiverFileDirWithIdSuffix.get().getPath());
            }
            this.receiverFileDirWithIdSuffix.set(null);
        } else {
            LOGGER.info("Current receiver file dir is null. No need to delete.");
        }
        File file = new File(RECEIVER_FILE_BASE_DIR, Long.toString(this.receiverId.get()));
        if (!file.exists()) {
            if (file.mkdirs()) {
                LOGGER.info("Receiver file dir {} was created.", file.getPath());
            } else {
                LOGGER.error("Failed to create receiver file dir {}.", file.getPath());
            }
        }
        this.receiverFileDirWithIdSuffix.set(file);
        LOGGER.info("Handshake successfully, receiver id = {}, receiver file dir = {}.", Long.valueOf(this.receiverId.get()), file.getPath());
        return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS);
    }

    private TPipeTransferResp handleTransferInsertNode(PipeTransferInsertNodeReq pipeTransferInsertNodeReq, IPartitionFetcher iPartitionFetcher, ISchemaFetcher iSchemaFetcher) {
        return new TPipeTransferResp(executeStatement(pipeTransferInsertNodeReq.constructStatement(), iPartitionFetcher, iSchemaFetcher));
    }

    private TPipeTransferResp handleTransferTablet(PipeTransferTabletReq pipeTransferTabletReq, IPartitionFetcher iPartitionFetcher, ISchemaFetcher iSchemaFetcher) {
        InsertTabletStatement constructStatement = pipeTransferTabletReq.constructStatement();
        return new TPipeTransferResp(constructStatement.isEmpty() ? RpcUtils.SUCCESS_STATUS : executeStatement(constructStatement, iPartitionFetcher, iSchemaFetcher));
    }

    private TPipeTransferResp handleTransferFilePiece(PipeTransferFilePieceReq pipeTransferFilePieceReq) {
        try {
            updateWritingFileIfNeeded(pipeTransferFilePieceReq.getFileName());
            if (isWritingFileOffsetCorrect(pipeTransferFilePieceReq.getStartWritingOffset())) {
                this.writingFileWriter.write(pipeTransferFilePieceReq.getFilePiece());
                return PipeTransferFilePieceResp.toTPipeTransferResp(RpcUtils.SUCCESS_STATUS, this.writingFileWriter.length());
            }
            TSStatus status = RpcUtils.getStatus(TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET, String.format("Request sender to reset file reader's offset from %s to %s.", Long.valueOf(pipeTransferFilePieceReq.getStartWritingOffset()), Long.valueOf(this.writingFileWriter.length())));
            LOGGER.warn("File offset reset requested by receiver, response status = {}.", status);
            return PipeTransferFilePieceResp.toTPipeTransferResp(status, this.writingFileWriter.length());
        } catch (Exception e) {
            LOGGER.warn(String.format("Failed to write file piece from req %s.", pipeTransferFilePieceReq), e);
            TSStatus status2 = RpcUtils.getStatus(TSStatusCode.PIPE_TRANSFER_FILE_ERROR, String.format("Failed to write file piece, because %s", e.getMessage()));
            try {
                return PipeTransferFilePieceResp.toTPipeTransferResp(status2, -1L);
            } catch (IOException e2) {
                return PipeTransferFilePieceResp.toTPipeTransferResp(status2);
            }
        }
    }

    private void updateWritingFileIfNeeded(String str) throws IOException {
        if (isFileExistedAndNameCorrect(str)) {
            return;
        }
        LOGGER.info("Writing file {} is not existed or name is not correct, try to create it. Current writing file is {}.", str, this.writingFile == null ? SubStringFunctionHelper.NULL_STRING : this.writingFile.getPath());
        closeCurrentWritingFileWriter();
        deleteCurrentWritingFile();
        if (!this.receiverFileDirWithIdSuffix.get().exists()) {
            if (this.receiverFileDirWithIdSuffix.get().mkdirs()) {
                LOGGER.info("Receiver file dir {} was created.", this.receiverFileDirWithIdSuffix.get().getPath());
            } else {
                LOGGER.error("Failed to create receiver file dir {}.", this.receiverFileDirWithIdSuffix.get().getPath());
            }
        }
        this.writingFile = new File(this.receiverFileDirWithIdSuffix.get(), str);
        this.writingFileWriter = new RandomAccessFile(this.writingFile, "rw");
        LOGGER.info("Writing file {} was created. Ready to write file pieces.", this.writingFile.getPath());
    }

    private boolean isFileExistedAndNameCorrect(String str) {
        return this.writingFile != null && this.writingFile.getName().equals(str);
    }

    private void closeCurrentWritingFileWriter() {
        if (this.writingFileWriter == null) {
            LOGGER.info("Current writing file writer is null. No need to close.");
            return;
        }
        try {
            this.writingFileWriter.close();
            LOGGER.info("Current writing file writer {} was closed.", this.writingFile == null ? SubStringFunctionHelper.NULL_STRING : this.writingFile.getPath());
        } catch (IOException e) {
            LOGGER.warn("Failed to close current writing file writer {}, because {}.", this.writingFile == null ? SubStringFunctionHelper.NULL_STRING : this.writingFile.getPath(), e.getMessage());
        }
        this.writingFileWriter = null;
    }

    private void deleteCurrentWritingFile() {
        if (this.writingFile == null) {
            LOGGER.info("Current writing file is null. No need to delete.");
            return;
        }
        if (this.writingFile.exists()) {
            try {
                Files.delete(this.writingFile.toPath());
                LOGGER.info("Original writing file {} was deleted.", this.writingFile.getPath());
            } catch (IOException e) {
                LOGGER.warn("Failed to delete original writing file {}, because {}.", this.writingFile.getPath(), e.getMessage());
            }
        } else {
            LOGGER.info("Original file {} is not existed. No need to delete.", this.writingFile.getPath());
        }
        this.writingFile = null;
    }

    private boolean isWritingFileOffsetCorrect(long j) throws IOException {
        boolean z = this.writingFileWriter.length() == j;
        if (!z) {
            LOGGER.warn("Writing file {}'s offset is {}, but request sender's offset is {}.", new Object[]{this.writingFile.getPath(), Long.valueOf(this.writingFileWriter.length()), Long.valueOf(j)});
        }
        return z;
    }

    private TPipeTransferResp handleTransferFileSeal(PipeTransferFileSealReq pipeTransferFileSealReq, IPartitionFetcher iPartitionFetcher, ISchemaFetcher iSchemaFetcher) {
        try {
            try {
                if (!isWritingFileAvailable()) {
                    TSStatus status = RpcUtils.getStatus(TSStatusCode.PIPE_TRANSFER_FILE_ERROR, String.format("Failed to seal file, because writing file %s is not available.", pipeTransferFileSealReq.getFileName()));
                    LOGGER.warn(status.getMessage());
                    TPipeTransferResp tPipeTransferResp = new TPipeTransferResp(status);
                    closeCurrentWritingFileWriter();
                    deleteCurrentWritingFile();
                    return tPipeTransferResp;
                }
                if (!isFileExistedAndNameCorrect(pipeTransferFileSealReq.getFileName())) {
                    TSStatus status2 = RpcUtils.getStatus(TSStatusCode.PIPE_TRANSFER_FILE_ERROR, String.format("Failed to seal file %s, but writing file is %s.", pipeTransferFileSealReq.getFileName(), this.writingFile));
                    LOGGER.warn(status2.getMessage());
                    TPipeTransferResp tPipeTransferResp2 = new TPipeTransferResp(status2);
                    closeCurrentWritingFileWriter();
                    deleteCurrentWritingFile();
                    return tPipeTransferResp2;
                }
                if (!isWritingFileOffsetCorrect(pipeTransferFileSealReq.getFileLength())) {
                    TSStatus status3 = RpcUtils.getStatus(TSStatusCode.PIPE_TRANSFER_FILE_ERROR, String.format("Failed to seal file %s, because the length of file is not correct. The original file has length %s, but receiver file has length %s.", pipeTransferFileSealReq.getFileName(), Long.valueOf(pipeTransferFileSealReq.getFileLength()), Long.valueOf(this.writingFileWriter.length())));
                    LOGGER.warn(status3.getMessage());
                    TPipeTransferResp tPipeTransferResp3 = new TPipeTransferResp(status3);
                    closeCurrentWritingFileWriter();
                    deleteCurrentWritingFile();
                    return tPipeTransferResp3;
                }
                String absolutePath = this.writingFile.getAbsolutePath();
                LoadTsFileStatement loadTsFileStatement = new LoadTsFileStatement(absolutePath);
                this.writingFileWriter.close();
                this.writingFileWriter = null;
                this.writingFile = null;
                loadTsFileStatement.setDeleteAfterLoad(true);
                loadTsFileStatement.setVerifySchema(true);
                loadTsFileStatement.setAutoCreateDatabase(false);
                TSStatus executeStatement = executeStatement(loadTsFileStatement, iPartitionFetcher, iSchemaFetcher);
                if (executeStatement.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                    LOGGER.info("Seal file {} successfully. Receiver id is {}.", absolutePath, Long.valueOf(this.receiverId.get()));
                } else {
                    LOGGER.warn("Failed to seal file {}, because {}. Receiver id is {}.", new Object[]{absolutePath, executeStatement.getMessage(), Long.valueOf(this.receiverId.get())});
                }
                TPipeTransferResp tPipeTransferResp4 = new TPipeTransferResp(executeStatement);
                closeCurrentWritingFileWriter();
                deleteCurrentWritingFile();
                return tPipeTransferResp4;
            } catch (IOException e) {
                LOGGER.warn(String.format("Failed to seal file %s from req %s. Receiver id is %d.", this.writingFile, pipeTransferFileSealReq, Long.valueOf(this.receiverId.get())), e);
                TPipeTransferResp tPipeTransferResp5 = new TPipeTransferResp(RpcUtils.getStatus(TSStatusCode.PIPE_TRANSFER_FILE_ERROR, String.format("Failed to seal file %s because %s", this.writingFile, e.getMessage())));
                closeCurrentWritingFileWriter();
                deleteCurrentWritingFile();
                return tPipeTransferResp5;
            }
        } catch (Throwable th) {
            closeCurrentWritingFileWriter();
            deleteCurrentWritingFile();
            throw th;
        }
    }

    private boolean isWritingFileAvailable() {
        boolean z = (this.writingFile == null || !this.writingFile.exists() || this.writingFileWriter == null) ? false : true;
        if (!z) {
            Logger logger = LOGGER;
            Object[] objArr = new Object[4];
            objArr[0] = this.writingFile;
            objArr[1] = Boolean.valueOf(this.writingFile == null);
            objArr[2] = Boolean.valueOf(this.writingFile != null && this.writingFile.exists());
            objArr[3] = Boolean.valueOf(this.writingFileWriter == null);
            logger.info("Writing file {} is not available. Writing file is null: {}, writing file exists: {}, writing file writer is null: {}.", objArr);
        }
        return z;
    }

    private TSStatus executeStatement(Statement statement, IPartitionFetcher iPartitionFetcher, ISchemaFetcher iSchemaFetcher) {
        if (statement == null) {
            return RpcUtils.getStatus(TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null statement.");
        }
        ExecutionResult execute = Coordinator.getInstance().execute(statement, SessionManager.getInstance().requestQueryId(), null, SubStringFunctionColumnTransformer.EMPTY_STRING, iPartitionFetcher, iSchemaFetcher, IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
        if (execute.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            LOGGER.warn("failed to execute statement, statement: {}, result status is: {}", statement, execute.status);
        }
        return execute.status;
    }

    @Override // org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiver
    public synchronized void handleExit() {
        if (this.writingFileWriter != null) {
            try {
                this.writingFileWriter.close();
                LOGGER.info("IoTDBThriftReceiverV1#handleExit: writing file writer was closed.");
            } catch (Exception e) {
                LOGGER.warn("IoTDBThriftReceiverV1#handleExit: close writing file writer error.", e);
            }
            this.writingFileWriter = null;
        } else {
            LOGGER.info("IoTDBThriftReceiverV1#handleExit: writing file writer is null. No need to close.");
        }
        if (this.writingFile != null) {
            try {
                Files.delete(this.writingFile.toPath());
                LOGGER.info("IoTDBThriftReceiverV1#handleExit: writing file {} was deleted.", this.writingFile.getPath());
            } catch (Exception e2) {
                LOGGER.warn("IoTDBThriftReceiverV1#handleExit: delete file {} error.", this.writingFile.getPath());
            }
            this.writingFile = null;
        } else {
            LOGGER.info("IoTDBThriftReceiverV1#handleExit: writing file is null. No need to delete.");
        }
        if (this.receiverFileDirWithIdSuffix.get() != null) {
            if (this.receiverFileDirWithIdSuffix.get().exists()) {
                try {
                    Files.delete(this.receiverFileDirWithIdSuffix.get().toPath());
                    LOGGER.info("IoTDBThriftReceiverV1#handleExit: original receiver file dir {} was deleted.", this.receiverFileDirWithIdSuffix.get().getPath());
                } catch (IOException e3) {
                    LOGGER.warn("IoTDBThriftReceiverV1#handleExit: delete original receiver file dir {} error.", this.receiverFileDirWithIdSuffix.get().getPath());
                }
            } else {
                LOGGER.info("IoTDBThriftReceiverV1#handleExit: original receiver file dir {} does not exist. No need to delete.", this.receiverFileDirWithIdSuffix.get().getPath());
            }
            this.receiverFileDirWithIdSuffix.set(null);
        } else {
            LOGGER.info("IoTDBThriftReceiverV1#handleExit: original receiver file dir is null. No need to delete.");
        }
        LOGGER.info("IoTDBThriftReceiverV1#handleExit: receiver exited.");
    }

    @Override // org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiver
    public IoTDBThriftConnectorRequestVersion getVersion() {
        return IoTDBThriftConnectorRequestVersion.VERSION_1;
    }
}
