package org.apache.iotdb.db.pipe.receiver.legacy;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.legacy.PipeData;
import org.apache.iotdb.db.pipe.connector.payload.legacy.TsFilePipeData;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
import org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.SubStringFunctionColumnTransformer;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent.class */
public class IoTDBLegacyPipeReceiverAgent {
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBLegacyPipeReceiverAgent.class);
    private static final String PATCH_SUFFIX = ".patch";
    private final ThreadLocal<Long> currentConnectionId = new ThreadLocal<>();
    private final Map<Long, SyncIdentityInfo> connectionIdToIdentityInfoMap = new ConcurrentHashMap();
    private final Map<Long, Map<String, Long>> connectionIdToStartIndexRecord = new ConcurrentHashMap();
    private final Map<String, String> registeredDatabase = new ConcurrentHashMap();
    private final AtomicLong connectionIdGenerator = new AtomicLong();
    private static final String RECEIVER_DIR_NAME = "receiver";
    private static final String FILE_DATA_DIR_NAME = "file-data";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent$IndexCheckResult.class */
    public static class IndexCheckResult {
        private final boolean result;
        private final String index;

        public IndexCheckResult(boolean z, String str) {
            this.result = z;
            this.index = str;
        }

        public boolean isResult() {
            return this.result;
        }

        public String getIndex() {
            return this.index;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/pipe/receiver/legacy/IoTDBLegacyPipeReceiverAgent$SyncIdentityInfo.class */
    public static class SyncIdentityInfo {
        private final String pipeName;
        private final long createTime;
        private final String version;
        private final String database;
        private final String remoteAddress;

        public SyncIdentityInfo(TSyncIdentityInfo tSyncIdentityInfo, String str) {
            this.pipeName = tSyncIdentityInfo.getPipeName();
            this.createTime = tSyncIdentityInfo.getCreateTime();
            this.version = tSyncIdentityInfo.getVersion();
            this.database = tSyncIdentityInfo.getDatabase();
            this.remoteAddress = str;
        }

        public String getPipeName() {
            return this.pipeName;
        }

        public long getCreateTime() {
            return this.createTime;
        }

        public String getVersion() {
            return this.version;
        }

        public String getRemoteAddress() {
            return this.remoteAddress;
        }

        public String getDatabase() {
            return this.database;
        }
    }

    public void handleClientExit() {
        if (this.currentConnectionId.get() != null) {
            long longValue = this.currentConnectionId.get().longValue();
            this.connectionIdToIdentityInfoMap.remove(Long.valueOf(longValue));
            this.connectionIdToStartIndexRecord.remove(Long.valueOf(longValue));
            this.currentConnectionId.remove();
        }
    }

    public TSStatus handshake(TSyncIdentityInfo tSyncIdentityInfo, String str, IPartitionFetcher iPartitionFetcher, ISchemaFetcher iSchemaFetcher) {
        SyncIdentityInfo syncIdentityInfo = new SyncIdentityInfo(tSyncIdentityInfo, str);
        LOGGER.info("Invoke handshake method from client ip = {}", syncIdentityInfo.getRemoteAddress());
        if (!new File(getFileDataDir(syncIdentityInfo)).exists()) {
            new File(getFileDataDir(syncIdentityInfo)).mkdirs();
        }
        createConnection(syncIdentityInfo);
        return (StringUtils.isEmpty(syncIdentityInfo.getDatabase()) || registerDatabase(syncIdentityInfo.getDatabase(), iPartitionFetcher, iSchemaFetcher)) ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, SubStringFunctionColumnTransformer.EMPTY_STRING) : RpcUtils.getStatus(TSStatusCode.PIPESERVER_ERROR, String.format("Auto register database %s error.", syncIdentityInfo.getDatabase()));
    }

    private void createConnection(SyncIdentityInfo syncIdentityInfo) {
        long incrementAndGet = this.connectionIdGenerator.incrementAndGet();
        this.currentConnectionId.set(Long.valueOf(incrementAndGet));
        this.connectionIdToIdentityInfoMap.put(Long.valueOf(incrementAndGet), syncIdentityInfo);
    }

    private boolean registerDatabase(String str, IPartitionFetcher iPartitionFetcher, ISchemaFetcher iSchemaFetcher) {
        if (this.registeredDatabase.containsKey(str)) {
            return true;
        }
        try {
            DatabaseSchemaStatement databaseSchemaStatement = new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
            databaseSchemaStatement.setDatabasePath(new PartialPath(str));
            ExecutionResult execute = Coordinator.getInstance().execute(databaseSchemaStatement, SessionManager.getInstance().requestQueryId(), null, SubStringFunctionColumnTransformer.EMPTY_STRING, iPartitionFetcher, iSchemaFetcher, IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
            if (execute.status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode() || execute.status.code == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
                this.registeredDatabase.put(str, SubStringFunctionColumnTransformer.EMPTY_STRING);
                return true;
            }
            LOGGER.error("Create Database error, statement: {}.", databaseSchemaStatement);
            LOGGER.error("Create database result status : {}.", execute.status);
            return false;
        } catch (IllegalPathException e) {
            LOGGER.error(String.format("Parse database PartialPath %s error", str), e);
            return false;
        }
    }

    public TSStatus transportPipeData(ByteBuffer byteBuffer) throws TException {
        SyncIdentityInfo currentSyncIdentityInfo = getCurrentSyncIdentityInfo();
        if (currentSyncIdentityInfo == null) {
            throw new TException("Thrift connection is not alive.");
        }
        LOGGER.debug("Invoke transportPipeData method from client ip = {}", currentSyncIdentityInfo.getRemoteAddress());
        String fileDataDir = getFileDataDir(currentSyncIdentityInfo);
        try {
            byte[] bArr = new byte[byteBuffer.capacity()];
            byteBuffer.get(bArr);
            PipeData createPipeData = PipeData.createPipeData(bArr);
            if (createPipeData instanceof TsFilePipeData) {
                TsFilePipeData tsFilePipeData = (TsFilePipeData) createPipeData;
                tsFilePipeData.setDatabase(currentSyncIdentityInfo.getDatabase());
                handleTsFilePipeData(tsFilePipeData, fileDataDir);
            }
            LOGGER.info("Start load pipeData with serialize number {} and type {},value={}", new Object[]{Long.valueOf(createPipeData.getSerialNumber()), createPipeData.getPipeDataType(), createPipeData});
            try {
                createPipeData.createLoader().load();
                LOGGER.info("Load pipeData with serialize number {} successfully.", Long.valueOf(createPipeData.getSerialNumber()));
                return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, SubStringFunctionColumnTransformer.EMPTY_STRING);
            } catch (PipeException e) {
                LOGGER.error("Fail to load pipeData because {}.", e.getMessage());
                return RpcUtils.getStatus(TSStatusCode.PIPESERVER_ERROR, "Fail to load pipeData because " + e.getMessage());
            }
        } catch (IOException e2) {
            LOGGER.error("Pipe data transport error, {}", e2.getMessage());
            return RpcUtils.getStatus(TSStatusCode.PIPESERVER_ERROR, "Pipe data transport error, " + e2.getMessage());
        }
    }

    private SyncIdentityInfo getCurrentSyncIdentityInfo() {
        Long l = this.currentConnectionId.get();
        if (l != null) {
            return this.connectionIdToIdentityInfoMap.get(l);
        }
        return null;
    }

    private void handleTsFilePipeData(TsFilePipeData tsFilePipeData, String str) {
        String tsFileName = tsFilePipeData.getTsFileName();
        File file = new File(str);
        File[] listFiles = file.listFiles((file2, str2) -> {
            return str2.startsWith(tsFileName) && str2.endsWith(PATCH_SUFFIX);
        });
        if (listFiles != null) {
            for (File file3 : listFiles) {
                File file4 = new File(file, file3.getName().substring(0, file3.getName().length() - PATCH_SUFFIX.length()));
                if (!file3.renameTo(file4)) {
                    LOGGER.error("Fail to rename file {} to {}", file3, file4);
                }
            }
        }
        tsFilePipeData.setParentDirPath(file.getAbsolutePath());
    }

    public TSStatus transportFile(TSyncTransportMetaInfo tSyncTransportMetaInfo, ByteBuffer byteBuffer) throws TException {
        SyncIdentityInfo currentSyncIdentityInfo = getCurrentSyncIdentityInfo();
        if (currentSyncIdentityInfo == null) {
            throw new TException("Thrift connection is not alive.");
        }
        LOGGER.debug("Invoke transportData method from client ip = {}", currentSyncIdentityInfo.getRemoteAddress());
        String fileDataDir = getFileDataDir(currentSyncIdentityInfo);
        String str = tSyncTransportMetaInfo.fileName;
        long j = tSyncTransportMetaInfo.startIndex;
        File file = new File(fileDataDir, str + PATCH_SUFFIX);
        IndexCheckResult checkStartIndexValid = checkStartIndexValid(new File(fileDataDir, str), j);
        if (!checkStartIndexValid.isResult()) {
            return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REDIRECTION_ERROR, checkStartIndexValid.getIndex());
        }
        try {
            RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
            try {
                int capacity = byteBuffer.capacity();
                randomAccessFile.seek(j);
                byte[] bArr = new byte[capacity];
                byteBuffer.get(bArr);
                randomAccessFile.write(bArr);
                recordStartIndex(new File(fileDataDir, str), j + capacity);
                LOGGER.debug("Sync {} start at {} to {} is done.", new Object[]{str, Long.valueOf(j), Long.valueOf(j + capacity)});
                randomAccessFile.close();
                return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, SubStringFunctionColumnTransformer.EMPTY_STRING);
            } finally {
            }
        } catch (IOException e) {
            LOGGER.error(e.getMessage());
            return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
        }
    }

    private IndexCheckResult checkStartIndexValid(File file, long j) {
        long currentFileStartIndex = getCurrentFileStartIndex(file.getAbsolutePath());
        if (currentFileStartIndex < 0 && file.exists()) {
            currentFileStartIndex = file.length();
            recordStartIndex(file, currentFileStartIndex);
        }
        if (currentFileStartIndex < 0 && j != 0) {
            LOGGER.error("The start index {} of data sync is not valid. The file is not exist and start index should equal to 0).", Long.valueOf(j));
            return new IndexCheckResult(false, "0");
        }
        if (currentFileStartIndex < 0 || currentFileStartIndex == j) {
            return new IndexCheckResult(true, "0");
        }
        LOGGER.error("The start index {} of data sync is not valid. The start index of the file should equal to {}.", Long.valueOf(j), Long.valueOf(currentFileStartIndex));
        return new IndexCheckResult(false, String.valueOf(currentFileStartIndex));
    }

    private long getCurrentFileStartIndex(String str) {
        Map<String, Long> map;
        Long l = this.currentConnectionId.get();
        if (l == null || (map = this.connectionIdToStartIndexRecord.get(l)) == null || !map.containsKey(str)) {
            return -1L;
        }
        return map.get(str).longValue();
    }

    private void recordStartIndex(File file, long j) {
        Long l = this.currentConnectionId.get();
        if (l != null) {
            this.connectionIdToStartIndexRecord.computeIfAbsent(l, l2 -> {
                return new ConcurrentHashMap();
            }).put(file.getAbsolutePath(), Long.valueOf(j));
        }
    }

    private static String getFileDataDir(SyncIdentityInfo syncIdentityInfo) {
        return getReceiverPipeDir(syncIdentityInfo.getPipeName(), syncIdentityInfo.getRemoteAddress(), syncIdentityInfo.getCreateTime()) + File.separator + FILE_DATA_DIR_NAME;
    }

    private static String getReceiverPipeDir(String str, String str2, long j) {
        return getReceiverDir() + File.separator + String.format("%s-%d-%s", str, Long.valueOf(j), str2);
    }

    private static String getReceiverDir() {
        return CommonDescriptor.getInstance().getConfig().getSyncDir() + File.separator + RECEIVER_DIR_NAME;
    }
}
