package org.apache.iotdb.db.pipe.receiver.airgap;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.zip.CRC32;
import org.apache.iotdb.commons.concurrent.WrappedRunnable;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapELanguageConstant;
import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapOneByteResponse;
import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverAgent;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/receiver/airgap/IoTDBAirGapReceiver.class */
public class IoTDBAirGapReceiver extends WrappedRunnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBAirGapReceiver.class);
    private final Socket socket;
    private final long receiverId;
    private final IoTDBThriftReceiverAgent agent = PipeAgent.receiver().thrift();
    private final IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance();
    private final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();
    private boolean isELanguagePayload;

    public IoTDBAirGapReceiver(Socket socket, long j) {
        this.socket = socket;
        this.receiverId = j;
    }

    public void runMayThrow() throws Throwable {
        this.socket.setSoTimeout((int) PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
        this.socket.setKeepAlive(true);
        LOGGER.info("Pipe air gap receiver {} started. Socket: {}", Long.valueOf(this.receiverId), this.socket);
        while (!this.socket.isClosed()) {
            try {
                try {
                    this.isELanguagePayload = false;
                    receive();
                } catch (Exception e) {
                    LOGGER.warn("Pipe air gap receiver {} closed because of exception. Socket: {}", new Object[]{Long.valueOf(this.receiverId), this.socket, e});
                    throw e;
                }
            } catch (Throwable th) {
                PipeAgent.receiver().thrift().handleClientExit();
                this.socket.close();
                throw th;
            }
        }
        LOGGER.info("Pipe air gap receiver {} closed because socket is closed. Socket: {}", Long.valueOf(this.receiverId), this.socket);
        PipeAgent.receiver().thrift().handleClientExit();
        this.socket.close();
    }

    private void receive() throws IOException {
        try {
            byte[] readData = readData(new BufferedInputStream(this.socket.getInputStream()));
            if (!checkSum(readData)) {
                LOGGER.warn("Checksum failed, receiverId: {}", Long.valueOf(this.receiverId));
                fail();
                return;
            }
            ByteBuffer wrap = ByteBuffer.wrap(readData, 8, readData.length - 8);
            AirGapPseudoTPipeTransferRequest airGapPseudoTPipeTransferRequest = (AirGapPseudoTPipeTransferRequest) new AirGapPseudoTPipeTransferRequest().setVersion(ReadWriteIOUtils.readByte(wrap)).setType(ReadWriteIOUtils.readShort(wrap)).setBody(wrap.slice());
            TPipeTransferResp receive = this.agent.receive(airGapPseudoTPipeTransferRequest, this.partitionFetcher, this.schemaFetcher);
            if (receive.getStatus().code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                ok();
            } else {
                LOGGER.warn("Handle data failed, receiverId: {}, status: {}, req: {}", new Object[]{Long.valueOf(this.receiverId), receive.getStatus(), airGapPseudoTPipeTransferRequest});
                fail();
            }
        } catch (Exception e) {
            LOGGER.warn("Exception during handling receiving, receiverId: {}", Long.valueOf(this.receiverId), e);
            fail();
        }
    }

    private void ok() throws IOException {
        OutputStream outputStream = this.socket.getOutputStream();
        outputStream.write(AirGapOneByteResponse.OK);
        outputStream.flush();
    }

    private void fail() throws IOException {
        OutputStream outputStream = this.socket.getOutputStream();
        outputStream.write(AirGapOneByteResponse.FAIL);
        outputStream.flush();
    }

    private boolean checkSum(byte[] bArr) {
        try {
            CRC32 crc32 = new CRC32();
            crc32.update(bArr, 8, bArr.length - 8);
            return BytesUtils.bytesToLong(BytesUtils.subBytes(bArr, 0, 8)) == crc32.getValue();
        } catch (Exception e) {
            return false;
        }
    }

    private byte[] readData(InputStream inputStream) throws IOException {
        int readLength = readLength(inputStream);
        if (readLength == 0) {
            return new byte[0];
        }
        byte[] bArr = new byte[readLength];
        readTillFull(inputStream, bArr);
        if (this.isELanguagePayload) {
            skipTillEnough(inputStream, AirGapELanguageConstant.E_LANGUAGE_SUFFIX.length);
        }
        return bArr;
    }

    private int readLength(InputStream inputStream) throws IOException {
        byte[] bArr = new byte[8];
        readTillFull(inputStream, bArr);
        if (Arrays.equals(bArr, BytesUtils.subBytes(AirGapELanguageConstant.E_LANGUAGE_PREFIX, 0, 8))) {
            this.isELanguagePayload = true;
            skipTillEnough(inputStream, AirGapELanguageConstant.E_LANGUAGE_PREFIX.length - 8);
            return readLength(inputStream);
        }
        byte[] subBytes = BytesUtils.subBytes(bArr, 0, 4);
        if (Arrays.equals(subBytes, BytesUtils.subBytes(bArr, 4, 4))) {
            return BytesUtils.bytesToInt(subBytes);
        }
        return 0;
    }

    private void readTillFull(InputStream inputStream, byte[] bArr) throws IOException {
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 >= bArr.length) {
                return;
            } else {
                i = i2 + inputStream.read(bArr, i2, bArr.length - i2);
            }
        }
    }

    private void skipTillEnough(InputStream inputStream, long j) throws IOException {
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 >= j) {
                return;
            } else {
                i = (int) (i2 + inputStream.skip(j - i2));
            }
        }
    }
}
