/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.sync.transport.server;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.RandomAccessFile;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.sync.conf.SyncConstant;
import org.apache.iotdb.db.sync.conf.SyncPathUtil;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.sync.pipedata.queue.PipeDataQueueFactory;
import org.apache.iotdb.db.sync.receiver.ReceiverService;
import org.apache.iotdb.service.transport.thrift.IdentityInfo;
import org.apache.iotdb.service.transport.thrift.MetaInfo;
import org.apache.iotdb.service.transport.thrift.RequestType;
import org.apache.iotdb.service.transport.thrift.SyncRequest;
import org.apache.iotdb.service.transport.thrift.SyncResponse;
import org.apache.iotdb.service.transport.thrift.TransportService;
import org.apache.iotdb.service.transport.thrift.TransportStatus;
import org.apache.iotdb.service.transport.thrift.Type;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransportServiceImpl
implements TransportService.Iface {
    private static Logger logger = LoggerFactory.getLogger(TransportServiceImpl.class);
    private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
    private static final String RECORD_SUFFIX = ".record";
    private static final String PATCH_SUFFIX = ".patch";
    private final ThreadLocal<IdentityInfo> identityInfoThreadLocal = new ThreadLocal();
    private final Map<IdentityInfo, Integer> identityInfoCounter = new ConcurrentHashMap<IdentityInfo, Integer>();

    private CheckResult checkStartIndexValid(File file, long startIndex) throws IOException {
        block38: {
            File recordFile = new File(file.getAbsolutePath() + RECORD_SUFFIX);
            if (!recordFile.exists() && startIndex != 0L) {
                logger.error("The start index {} of data sync is not valid. The file {} is not exist and start index should equal to 0).", (Object)startIndex, (Object)recordFile.getAbsolutePath());
                return new CheckResult(false, "0");
            }
            if (recordFile.exists()) {
                try (FileInputStream inputStream = new FileInputStream(recordFile);){
                    CheckResult checkResult;
                    try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));){
                        String index = bufferedReader.readLine();
                        if ((index == null || index.length() == 0) && startIndex != 0L) {
                            logger.error("The start index {} of data sync is not valid. The file {} is not exist and start index is should equal to 0.", (Object)startIndex, (Object)recordFile.getAbsolutePath());
                            CheckResult checkResult2 = new CheckResult(false, "0");
                            return checkResult2;
                        }
                        if (Long.parseLong(index) == startIndex) break block38;
                        logger.error("The start index {} of data sync is not valid. The start index of the file {} should equal to {}.", new Object[]{startIndex, recordFile.getAbsolutePath(), index});
                        checkResult = new CheckResult(false, index);
                    }
                    return checkResult;
                }
            }
        }
        return new CheckResult(true, "0");
    }

    public TransportStatus handshake(IdentityInfo identityInfo) throws TException {
        logger.debug("Invoke handshake method from client ip = {}", (Object)identityInfo.address);
        this.identityInfoThreadLocal.set(identityInfo);
        this.identityInfoCounter.compute(identityInfo, (k, v) -> v == null ? 1 : v + 1);
        if (!this.verifyIPSegment(this.config.getIpWhiteList(), identityInfo.address)) {
            return new TransportStatus(-1, "Sender IP is not in the white list of receiver IP and synchronization tasks are not allowed.");
        }
        if (!this.config.getIoTDBMajorVersion(identityInfo.version).equals(this.config.getIoTDBMajorVersion())) {
            return new TransportStatus(-1, String.format("Version mismatch: the sender <%s>, the receiver <%s>", identityInfo.version, this.config.getIoTDBVersion()));
        }
        if (!new File(SyncPathUtil.getFileDataDirPath(identityInfo)).exists()) {
            new File(SyncPathUtil.getFileDataDirPath(identityInfo)).mkdirs();
        }
        return new TransportStatus(1, "");
    }

    private boolean verifyIPSegment(String ipWhiteList, String ipAddress) {
        String[] ipSegments;
        for (String IPsegment : ipSegments = ipWhiteList.split(",")) {
            int subnetMask = Integer.parseInt(IPsegment.substring(IPsegment.indexOf(47) + 1));
            if (!this.verifyIP(IPsegment = IPsegment.substring(0, IPsegment.indexOf(47)), ipAddress, subnetMask)) continue;
            return true;
        }
        return false;
    }

    private boolean verifyIP(String ipSegment, String ipAddress, int subnetMark) {
        String[] ipSplits = ipSegment.split("\\.");
        DecimalFormat df = new DecimalFormat("00000000");
        StringBuilder ipSegmentBuilder = new StringBuilder();
        for (String IPsplit : ipSplits) {
            ipSegmentBuilder.append(df.format(Integer.parseInt(Integer.toBinaryString(Integer.parseInt(IPsplit)))));
        }
        String ipSegmentBinary = ipSegmentBuilder.toString();
        ipSegmentBinary = ipSegmentBinary.substring(0, subnetMark);
        ipSplits = ipAddress.split("\\.");
        StringBuilder ipAddressBuilder = new StringBuilder();
        for (String IPsplit : ipSplits) {
            ipAddressBuilder.append(df.format(Integer.parseInt(Integer.toBinaryString(Integer.parseInt(IPsplit)))));
        }
        String ipAddressBinary = ipAddressBuilder.toString();
        ipAddressBinary = ipAddressBinary.substring(0, subnetMark);
        return ipAddressBinary.equals(ipSegmentBinary);
    }

    public TransportStatus transportData(MetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest) {
        IdentityInfo identityInfo = this.identityInfoThreadLocal.get();
        logger.debug("Invoke transportData method from client ip = {}", (Object)identityInfo.address);
        String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
        Type type = metaInfo.type;
        String fileName = metaInfo.fileName;
        long startIndex = metaInfo.startIndex;
        if (type == Type.FILE) {
            try {
                CheckResult result = this.checkStartIndexValid(new File(fileDir, fileName), startIndex);
                if (!result.isResult()) {
                    return new TransportStatus(-2, result.getIndex());
                }
            }
            catch (IOException e) {
                logger.error(e.getMessage());
                return new TransportStatus(-1, e.getMessage());
            }
        }
        int pos = buff.position();
        MessageDigest messageDigest = null;
        try {
            messageDigest = MessageDigest.getInstance("SHA-256");
        }
        catch (NoSuchAlgorithmException e) {
            logger.error(e.getMessage());
            return new TransportStatus(-1, e.getMessage());
        }
        messageDigest.update(buff);
        byte[] digestBytes = new byte[digest.capacity()];
        digest.get(digestBytes);
        if (!Arrays.equals(messageDigest.digest(), digestBytes)) {
            return new TransportStatus(-3, "Data digest check error, retry.");
        }
        if (type != Type.FILE) {
            buff.position(pos);
            int length = buff.capacity();
            byte[] byteArray = new byte[length];
            buff.get(byteArray);
            try {
                PipeData pipeData = PipeData.deserialize(byteArray);
                if (type == Type.TSFILE) {
                    this.handleTsFilePipeData((TsFilePipeData)pipeData, fileDir);
                }
                PipeDataQueueFactory.getBufferedPipeDataQueue(SyncPathUtil.getPipeLogDirPath(identityInfo)).offer(pipeData);
            }
            catch (IOException | IllegalPathException e) {
                logger.error("Pipe data transport error, {}", (Object)e.getMessage());
                return new TransportStatus(-3, "Data digest transport error " + e.getMessage());
            }
        }
        buff.position(pos);
        File file = new File(fileDir, fileName + PATCH_SUFFIX);
        try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");){
            randomAccessFile.seek(startIndex);
            int length = buff.capacity();
            byte[] byteArray = new byte[length];
            buff.get(byteArray);
            randomAccessFile.write(byteArray);
            this.writeRecordFile(new File(fileDir, fileName + RECORD_SUFFIX), startIndex + (long)length);
            logger.debug("Sync " + fileName + " start at " + startIndex + " to " + (startIndex + (long)length) + " is done.");
        }
        catch (IOException e) {
            logger.error(e.getMessage());
            return new TransportStatus(-1, e.getMessage());
        }
        return new TransportStatus(1, "");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public TransportStatus checkFileDigest(MetaInfo metaInfo, ByteBuffer digest) throws TException {
        IdentityInfo identityInfo = this.identityInfoThreadLocal.get();
        logger.debug("Invoke checkFileDigest method from client ip = {}", (Object)identityInfo.address);
        String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
        String string = fileDir.intern();
        synchronized (string) {
            String fileName = metaInfo.fileName;
            MessageDigest messageDigest = null;
            try {
                messageDigest = MessageDigest.getInstance("SHA-256");
            }
            catch (NoSuchAlgorithmException e) {
                logger.error(e.getMessage());
                return new TransportStatus(-1, e.getMessage());
            }
            try (FileInputStream inputStream = new FileInputStream(new File(fileDir, fileName + PATCH_SUFFIX));){
                int length;
                byte[] block = new byte[SyncConstant.DATA_CHUNK_SIZE];
                while ((length = ((InputStream)inputStream).read(block)) > 0) {
                    messageDigest.update(block, 0, length);
                }
                String localDigest = new BigInteger(1, messageDigest.digest()).toString(16);
                byte[] digestBytes = new byte[digest.capacity()];
                digest.get(digestBytes);
                if (!Arrays.equals(messageDigest.digest(), digestBytes)) {
                    logger.error("The file {} digest check error. The local digest is {} (should be equal to {}).", new Object[]{fileName, localDigest, digest});
                    new File(fileDir, fileName + RECORD_SUFFIX).delete();
                    TransportStatus transportStatus = new TransportStatus(-4, "File digest check error.");
                    return transportStatus;
                }
            }
            catch (IOException e) {
                logger.error(e.getMessage());
                return new TransportStatus(-1, e.getMessage());
            }
            return new TransportStatus(1, "");
        }
    }

    public SyncResponse heartbeat(SyncRequest syncRequest) throws TException {
        return ReceiverService.getInstance().receiveMsg(syncRequest);
    }

    private void writeRecordFile(File recordFile, long position) throws IOException {
        File tmpFile = new File(recordFile.getAbsolutePath() + ".tmp");
        FileWriter fileWriter = new FileWriter(tmpFile, false);
        fileWriter.write(String.valueOf(position));
        fileWriter.close();
        Files.move(tmpFile.toPath(), recordFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleClientExit() {
        IdentityInfo identityInfo = this.identityInfoThreadLocal.get();
        if (identityInfo != null) {
            this.identityInfoThreadLocal.remove();
            Map<IdentityInfo, Integer> map = this.identityInfoCounter;
            synchronized (map) {
                this.identityInfoCounter.compute(identityInfo, (k, v) -> v == null ? 0 : v - 1);
                if (this.identityInfoCounter.get(identityInfo) == 0) {
                    this.identityInfoCounter.remove(identityInfo);
                    ReceiverService.getInstance().receiveMsg(new SyncRequest(RequestType.STOP, identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime()));
                }
            }
        }
    }

    private void handleTsFilePipeData(TsFilePipeData tsFilePipeData, String fileDir) {
        File dir = new File(fileDir);
        String tsFileName = tsFilePipeData.getTsFileName();
        File[] targetFiles = dir.listFiles((dir1, name) -> name.startsWith(tsFileName) && name.endsWith(PATCH_SUFFIX));
        if (targetFiles != null) {
            for (File targetFile : targetFiles) {
                File newFile = new File(dir, targetFile.getName().substring(0, targetFile.getName().length() - PATCH_SUFFIX.length()));
                targetFile.renameTo(newFile);
            }
        }
        tsFilePipeData.setParentDirPath(dir.getAbsolutePath());
        File recordFile = new File(fileDir, tsFileName + RECORD_SUFFIX);
        try {
            Files.deleteIfExists(recordFile.toPath());
        }
        catch (IOException e) {
            logger.warn(String.format("Delete record file %s error, because %s.", recordFile.getPath(), e));
        }
    }

    private class CheckResult {
        boolean result;
        String index;

        public CheckResult(boolean result, String index) {
            this.result = result;
            this.index = index;
        }

        public boolean isResult() {
            return this.result;
        }

        public String getIndex() {
            return this.index;
        }
    }
}

