/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
import org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PipeTransferTsFileInsertionEventHandler
implements AsyncMethodCallback<TPipeTransferResp> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTsFileInsertionEventHandler.class);
    private final PipeTsFileInsertionEvent event;
    private final IoTDBDataRegionAsyncConnector connector;
    private final File tsFile;
    private final File modFile;
    private File currentFile;
    private final boolean transferMod;
    private final int readFileBufferSize;
    private final byte[] readBuffer;
    private long position;
    private RandomAccessFile reader;
    private final AtomicBoolean isSealSignalSent;
    private IoTDBDataNodeAsyncClientManager clientManager;
    private AsyncPipeDataTransferServiceClient client;

    public PipeTransferTsFileInsertionEventHandler(PipeTsFileInsertionEvent event, IoTDBDataRegionAsyncConnector connector) throws FileNotFoundException {
        this.event = event;
        this.connector = connector;
        this.tsFile = event.getTsFile();
        this.modFile = event.getModFile();
        this.transferMod = event.isWithMod() && connector.supportModsIfIsDataNodeReceiver();
        this.currentFile = this.transferMod ? this.modFile : this.tsFile;
        this.readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
        this.readBuffer = new byte[this.readFileBufferSize];
        this.position = 0L;
        this.reader = Objects.nonNull(this.modFile) ? new RandomAccessFile(this.modFile, "r") : new RandomAccessFile(this.tsFile, "r");
        this.isSealSignalSent = new AtomicBoolean(false);
    }

    public void transfer(IoTDBDataNodeAsyncClientManager clientManager, AsyncPipeDataTransferServiceClient client) throws TException, IOException {
        this.clientManager = clientManager;
        this.client = client;
        client.setShouldReturnSelf(false);
        client.setTimeoutDynamically(clientManager.getConnectionTimeout());
        int readLength = this.reader.read(this.readBuffer);
        if (readLength == -1) {
            if (this.currentFile == this.modFile) {
                this.currentFile = this.tsFile;
                this.position = 0L;
                try {
                    this.reader.close();
                }
                catch (IOException e) {
                    LOGGER.warn("Failed to close file reader when successfully transferred mod file.", (Throwable)e);
                }
                this.reader = new RandomAccessFile(this.tsFile, "r");
                this.transfer(clientManager, client);
            } else if (this.currentFile == this.tsFile) {
                this.isSealSignalSent.set(true);
                client.pipeTransfer((TPipeTransferReq)(this.transferMod ? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(this.modFile.getName(), this.modFile.length(), this.tsFile.getName(), this.tsFile.length()) : PipeTransferTsFileSealReq.toTPipeTransferReq(this.tsFile.getName(), this.tsFile.length())), (AsyncMethodCallback)this);
            }
            return;
        }
        byte[] payload = readLength == this.readFileBufferSize ? this.readBuffer : Arrays.copyOfRange(this.readBuffer, 0, readLength);
        client.pipeTransfer((TPipeTransferReq)(this.transferMod ? PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(this.currentFile.getName(), this.position, payload) : PipeTransferTsFilePieceReq.toTPipeTransferReq(this.currentFile.getName(), this.position, payload)), (AsyncMethodCallback)this);
        this.position += (long)readLength;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void onComplete(TPipeTransferResp response) {
        if (this.isSealSignalSent.get()) {
            block13: {
                try {
                    TSStatus status = response.getStatus();
                    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                        this.connector.statusHandler().handle(status, String.format("Seal file %s error, result status %s.", this.tsFile, response.getStatus()), this.tsFile.getName());
                    }
                }
                catch (Exception e) {
                    this.onError(e);
                    return;
                }
                try {
                    if (this.reader == null) break block13;
                    this.reader.close();
                }
                catch (IOException e) {
                    try {
                        LOGGER.warn("Failed to close file reader when successfully transferred file.", (Throwable)e);
                    }
                    catch (Throwable throwable) {
                        this.event.decreaseReferenceCount(PipeTransferTsFileInsertionEventHandler.class.getName(), true);
                        LOGGER.info("Successfully transferred file {} (committer key={}, commit id={}).", new Object[]{this.tsFile, this.event.getCommitterKey(), this.event.getCommitId()});
                        if (this.client == null) throw throwable;
                        this.client.setShouldReturnSelf(true);
                        this.client.returnSelf();
                        throw throwable;
                    }
                    this.event.decreaseReferenceCount(PipeTransferTsFileInsertionEventHandler.class.getName(), true);
                    LOGGER.info("Successfully transferred file {} (committer key={}, commit id={}).", new Object[]{this.tsFile, this.event.getCommitterKey(), this.event.getCommitId()});
                    if (this.client == null) return;
                    this.client.setShouldReturnSelf(true);
                    this.client.returnSelf();
                    return;
                }
            }
            this.event.decreaseReferenceCount(PipeTransferTsFileInsertionEventHandler.class.getName(), true);
            LOGGER.info("Successfully transferred file {} (committer key={}, commit id={}).", new Object[]{this.tsFile, this.event.getCommitterKey(), this.event.getCommitId()});
            if (this.client == null) return;
            this.client.setShouldReturnSelf(true);
            this.client.returnSelf();
            return;
        }
        try {
            PipeTransferFilePieceResp resp = PipeTransferFilePieceResp.fromTPipeTransferResp((TPipeTransferResp)response);
            long code = resp.getStatus().getCode();
            if (code == (long)TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
                this.position = resp.getEndWritingOffset();
                this.reader.seek(this.position);
                LOGGER.info("Redirect file position to {}.", (Object)this.position);
            } else {
                TSStatus status = response.getStatus();
                if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                    this.connector.statusHandler().handle(status, response.getStatus().getMessage(), this.tsFile.getName());
                }
            }
            this.transfer(this.clientManager, this.client);
            return;
        }
        catch (Exception e) {
            this.onError(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onError(Exception exception) {
        LOGGER.warn("Failed to transfer TsFileInsertionEvent {} (committer key {}, commit id {}).", new Object[]{this.tsFile, this.event.getCommitterKey(), this.event.getCommitId(), exception});
        try {
            if (Objects.nonNull(this.clientManager)) {
                this.clientManager.adjustTimeoutIfNecessary(exception);
            }
        }
        catch (Exception e) {
            LOGGER.warn("Failed to adjust timeout when failed to transfer file.", (Throwable)e);
        }
        try {
            if (this.reader != null) {
                this.reader.close();
            }
        }
        catch (IOException e) {
            LOGGER.warn("Failed to close file reader when failed to transfer file.", (Throwable)e);
        }
        finally {
            try {
                if (this.client != null) {
                    this.client.setShouldReturnSelf(true);
                    this.client.returnSelf();
                }
            }
            finally {
                this.connector.addFailureEventToRetryQueue((Event)this.event);
            }
        }
    }
}

