package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.class */
public class PipeTransferTsFileInsertionEventHandler implements AsyncMethodCallback<TPipeTransferResp> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTsFileInsertionEventHandler.class);
    private final long requestCommitId;
    private final PipeTsFileInsertionEvent event;
    private final IoTDBThriftAsyncConnector connector;
    private final File tsFile;
    private final RandomAccessFile reader;
    private AsyncPipeDataTransferServiceClient client;
    private final int readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
    private final byte[] readBuffer = new byte[this.readFileBufferSize];
    private long position = 0;
    private final AtomicBoolean isSealSignalSent = new AtomicBoolean(false);

    public PipeTransferTsFileInsertionEventHandler(long j, PipeTsFileInsertionEvent pipeTsFileInsertionEvent, IoTDBThriftAsyncConnector ioTDBThriftAsyncConnector) throws FileNotFoundException {
        this.requestCommitId = j;
        this.event = pipeTsFileInsertionEvent;
        this.connector = ioTDBThriftAsyncConnector;
        this.tsFile = pipeTsFileInsertionEvent.getTsFile();
        this.reader = new RandomAccessFile(this.tsFile, "r");
        pipeTsFileInsertionEvent.increaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName());
    }

    public void transfer(AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient) throws TException, IOException {
        this.client = asyncPipeDataTransferServiceClient;
        asyncPipeDataTransferServiceClient.setShouldReturnSelf(false);
        int read = this.reader.read(this.readBuffer);
        if (read == -1) {
            this.isSealSignalSent.set(true);
            asyncPipeDataTransferServiceClient.pipeTransfer(PipeTransferFileSealReq.toTPipeTransferReq(this.tsFile.getName(), this.tsFile.length()), this);
        } else {
            asyncPipeDataTransferServiceClient.pipeTransfer(PipeTransferFilePieceReq.toTPipeTransferReq(this.tsFile.getName(), this.position, read == this.readFileBufferSize ? this.readBuffer : Arrays.copyOfRange(this.readBuffer, 0, read)), this);
            this.position += read;
        }
    }

    public void onComplete(TPipeTransferResp tPipeTransferResp) {
        if (!this.isSealSignalSent.get()) {
            try {
                PipeTransferFilePieceResp fromTPipeTransferResp = PipeTransferFilePieceResp.fromTPipeTransferResp(tPipeTransferResp);
                long code = fromTPipeTransferResp.getStatus().getCode();
                if (code == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
                    this.position = fromTPipeTransferResp.getEndWritingOffset();
                    this.reader.seek(this.position);
                    LOGGER.info("Redirect file position to {}.", Long.valueOf(this.position));
                } else if (code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                    throw new PipeException(String.format("Transfer file %s error, result status %s.", this.tsFile, fromTPipeTransferResp.getStatus()));
                }
                transfer(this.client);
                return;
            } catch (Exception e) {
                onError(e);
                return;
            }
        }
        try {
            if (tPipeTransferResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                onError(new PipeException(String.format("Seal file %s error, result status %s.", this.tsFile, tPipeTransferResp.getStatus())));
                return;
            }
            try {
                if (this.reader != null) {
                    this.reader.close();
                }
                this.connector.commit(this.requestCommitId, this.event);
                LOGGER.info("Successfully transferred file {}. Request commit id is {}.", this.tsFile, Long.valueOf(this.requestCommitId));
                if (this.client != null) {
                    this.client.setShouldReturnSelf(true);
                    this.client.returnSelf();
                }
            } catch (IOException e2) {
                LOGGER.warn("Failed to close file reader when successfully transferred file.", e2);
                this.connector.commit(this.requestCommitId, this.event);
                LOGGER.info("Successfully transferred file {}. Request commit id is {}.", this.tsFile, Long.valueOf(this.requestCommitId));
                if (this.client != null) {
                    this.client.setShouldReturnSelf(true);
                    this.client.returnSelf();
                }
            }
        } catch (Throwable th) {
            this.connector.commit(this.requestCommitId, this.event);
            LOGGER.info("Successfully transferred file {}. Request commit id is {}.", this.tsFile, Long.valueOf(this.requestCommitId));
            if (this.client != null) {
                this.client.setShouldReturnSelf(true);
                this.client.returnSelf();
            }
            throw th;
        }
    }

    public void onError(Exception exc) {
        LOGGER.warn("Failed to transfer TsFileInsertionEvent {} (request commit id {}).", new Object[]{this.tsFile, Long.valueOf(this.requestCommitId), exc});
        try {
            try {
                if (this.reader != null) {
                    this.reader.close();
                }
                this.connector.addFailureEventToRetryQueue(this.requestCommitId, this.event);
                if (this.client != null) {
                    this.client.setShouldReturnSelf(true);
                    this.client.returnSelf();
                }
            } catch (IOException e) {
                LOGGER.warn("Failed to close file reader when failed to transfer file.", e);
                this.connector.addFailureEventToRetryQueue(this.requestCommitId, this.event);
                if (this.client != null) {
                    this.client.setShouldReturnSelf(true);
                    this.client.returnSelf();
                }
            }
        } catch (Throwable th) {
            this.connector.addFailureEventToRetryQueue(this.requestCommitId, this.event);
            if (this.client != null) {
                this.client.setShouldReturnSelf(true);
                this.client.returnSelf();
            }
            throw th;
        }
    }
}
