package org.apache.iotdb.db.sync.transport.client;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.Iterator;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.sync.SyncConstant;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.SyncConnectionException;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginConfiguration;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/sync/transport/client/IoTDBSyncClient.class */
public class IoTDBSyncClient implements ISyncClient {
    private static final Logger logger = LoggerFactory.getLogger(IoTDBSyncClient.class);
    private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
    private static final int TRANSFER_BUFFER_SIZE_IN_BYTES = 1048576;
    private TTransport transport = null;
    private volatile IClientRPCService.Client serviceClient = null;
    private final String ipAddress;
    private final int port;
    private final String localIP;
    private final Pipe pipe;

    public IoTDBSyncClient(Pipe pipe, String str, int i, String str2) {
        RpcTransportFactory.setThriftMaxFrameSize(config.getThriftMaxFrameSize());
        this.pipe = pipe;
        this.ipAddress = str;
        this.port = i;
        this.localIP = str2;
    }

    @Override // org.apache.iotdb.db.sync.transport.client.ISyncClient
    public synchronized boolean handshake() throws SyncConnectionException {
        if (this.transport != null && this.transport.isOpen()) {
            this.transport.close();
        }
        try {
            this.transport = RpcTransportFactory.INSTANCE.getTransport(new TSocket(TConfigurationConst.defaultTConfiguration, this.ipAddress, this.port, ExtPipePluginConfiguration.DEFAULT_OPERATION_BATCH_SIZE, ExtPipePluginConfiguration.DEFAULT_BACKOFF_INTERVAL));
            this.serviceClient = new IClientRPCService.Client(config.isRpcThriftCompressionEnable() ? new TCompactProtocol(this.transport) : new TBinaryProtocol(this.transport));
            if (!this.transport.isOpen()) {
                this.transport.open();
            }
            TSStatus handshake = this.serviceClient.handshake(new TSyncIdentityInfo(this.localIP, this.pipe.getName(), this.pipe.getCreateTime(), config.getIoTDBMajorVersion()));
            if (handshake.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                return true;
            }
            logger.error("The receiver rejected the synchronization task because {}", handshake.message);
            return false;
        } catch (TException e) {
            logger.warn("Cannot connect to the receiver because {}", e.getMessage());
            throw new SyncConnectionException(String.format("Cannot connect to the receiver because %s.", e.getMessage()));
        }
    }

    @Override // org.apache.iotdb.db.sync.transport.client.ISyncClient
    public boolean send(PipeData pipeData) throws SyncConnectionException {
        if (pipeData instanceof TsFilePipeData) {
            try {
                Iterator<File> it = ((TsFilePipeData) pipeData).getTsFiles(true).iterator();
                while (it.hasNext()) {
                    if (!transportSingleFilePieceByPiece(it.next())) {
                        return false;
                    }
                }
            } catch (IOException e) {
                logger.error(String.format("Get TsFiles error, because %s.", e), e);
                return false;
            }
        }
        try {
            return transportPipeData(pipeData);
        } catch (IOException e2) {
            logger.error(String.format("Transport PipeData error, because %s.", e2), e2);
            return false;
        }
    }

    private boolean transportSingleFilePieceByPiece(File file) throws SyncConnectionException, IOException {
        long j = 0;
        long fileSizeLimit = getFileSizeLimit(file);
        byte[] bArr = new byte[TRANSFER_BUFFER_SIZE_IN_BYTES];
        try {
            RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r");
            while (j < fileSizeLimit) {
                if (j != 0) {
                    try {
                        if (bArr.length != SyncConstant.DATA_CHUNK_SIZE) {
                            bArr = new byte[SyncConstant.DATA_CHUNK_SIZE];
                        }
                    } finally {
                    }
                }
                int read = randomAccessFile.read(bArr, 0, Math.min(bArr.length, (int) (fileSizeLimit - j)));
                if (read == -1) {
                    break;
                }
                TSStatus sendFile = this.serviceClient.sendFile(new TSyncTransportMetaInfo(file.getName(), j), ByteBuffer.wrap(bArr, 0, read));
                if (sendFile.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                    j += read;
                } else if (sendFile.code == TSStatusCode.SYNC_FILE_REBASE.getStatusCode()) {
                    j = Long.parseLong(sendFile.message);
                } else if (sendFile.code == TSStatusCode.SYNC_FILE_ERROR.getStatusCode()) {
                    logger.error("Receiver failed to receive data from {} because {}, abort.", file.getAbsoluteFile(), sendFile.message);
                    randomAccessFile.close();
                    return false;
                }
            }
            randomAccessFile.close();
            return true;
        } catch (TException e) {
            logger.error("Cannot sync data with receiver. ", e);
            throw new SyncConnectionException((Throwable) e);
        }
    }

    private long getFileSizeLimit(File file) {
        File file2 = new File(file.getPath() + ".offset");
        if (file2.exists()) {
            try {
                BufferedReader bufferedReader = new BufferedReader(new FileReader(file2));
                try {
                    long parseLong = Long.parseLong(bufferedReader.readLine());
                    bufferedReader.close();
                    return parseLong;
                } finally {
                }
            } catch (IOException e) {
                logger.error(String.format("Deserialize offset of file %s error, because %s.", file.getPath(), e));
            }
        }
        return file.length();
    }

    private boolean transportPipeData(PipeData pipeData) throws SyncConnectionException, IOException {
        try {
            TSStatus sendPipeData = this.serviceClient.sendPipeData(ByteBuffer.wrap(pipeData.serialize()));
            if (sendPipeData.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                logger.info("Transport PipeData {} Successfully", pipeData);
                return true;
            }
            if (sendPipeData.code != TSStatusCode.PIPESERVER_ERROR.getStatusCode()) {
                return true;
            }
            logger.error("Receiver failed to load PipeData {}, skip it.", pipeData);
            return false;
        } catch (TException e) {
            throw new SyncConnectionException((Throwable) e);
        }
    }

    @Override // org.apache.iotdb.db.sync.transport.client.ISyncClient
    public void close() {
        if (this.transport != null) {
            this.transport.close();
            this.transport = null;
        }
    }
}
