/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.sync.sender;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.math.BigInteger;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileLock;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.exception.SyncConnectionException;
import org.apache.iotdb.db.sync.conf.Constans;
import org.apache.iotdb.db.sync.conf.SyncSenderConfig;
import org.apache.iotdb.db.sync.conf.SyncSenderDescriptor;
import org.apache.iotdb.db.sync.sender.SyncFileManager;
import org.apache.iotdb.db.sync.sender.SyncSender;
import org.apache.iotdb.db.utils.SyncUtils;
import org.apache.iotdb.service.sync.thrift.SyncDataStatus;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SyncSenderImpl
implements SyncSender {
    private static final Logger logger = LoggerFactory.getLogger(SyncSenderImpl.class);
    private TTransport transport;
    private SyncService.Client serviceClient;
    private List<String> schema = new ArrayList<String>();
    private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
    private Map<String, Set<String>> validAllFiles;
    private Map<String, Set<String>> currentLocalFiles;
    private volatile boolean syncStatus = false;
    private Map<String, Set<String>> validFileSnapshot = new HashMap<String, Set<String>>();
    private SyncFileManager syncFileManager = SyncFileManager.getInstance();
    private ScheduledExecutorService executorService;

    private SyncSenderImpl() {
        this.init();
    }

    public static final SyncSenderImpl getInstance() {
        return InstanceHolder.INSTANCE;
    }

    public static void main(String[] args) throws IOException {
        Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName());
        SyncSenderImpl fileSenderImpl = new SyncSenderImpl();
        fileSenderImpl.verifySingleton();
        fileSenderImpl.startMonitor();
        fileSenderImpl.startTimedTask();
    }

    @Override
    public void init() {
        if (this.executorService == null) {
            this.executorService = IoTDBThreadPoolFactory.newScheduledThreadPool(2, "sync-client-timer");
        }
    }

    private void startMonitor() {
        this.executorService.scheduleWithFixedDelay(() -> {
            if (this.syncStatus) {
                logger.info("Sync process is in execution!");
            }
        }, Constans.SYNC_MONITOR_DELAY, Constans.SYNC_MONITOR_PERIOD, TimeUnit.SECONDS);
    }

    private void startTimedTask() {
        this.executorService.scheduleWithFixedDelay(() -> {
            try {
                this.sync();
            }
            catch (IOException | SyncConnectionException e) {
                logger.error("Sync failed", (Throwable)e);
                this.stop();
            }
        }, 0L, Constans.SYNC_PROCESS_PERIOD, TimeUnit.SECONDS);
    }

    @Override
    public void stop() {
        this.executorService.shutdownNow();
        this.executorService = null;
    }

    @Override
    public void sync() throws SyncConnectionException, IOException {
        for (String snapshotPath : config.getSnapshotPaths()) {
            if (!new File(snapshotPath).exists() || new File(snapshotPath).list().length == 0) continue;
            FileUtils.deleteDirectory((File)new File(snapshotPath));
        }
        this.syncFileManager.init();
        this.validAllFiles = this.syncFileManager.getValidAllFiles();
        this.currentLocalFiles = this.syncFileManager.getCurrentLocalFiles();
        if (SyncUtils.isEmpty(this.validAllFiles)) {
            logger.info("There has no file to sync !");
            return;
        }
        this.establishConnection(config.getServerIp(), config.getServerPort());
        if (!this.confirmIdentity(config.getUuidPath())) {
            logger.error("Sorry, you do not have the permission to connect to sync receiver.");
            System.exit(1);
        }
        for (Map.Entry entry : this.validAllFiles.entrySet()) {
            this.validFileSnapshot.put((String)entry.getKey(), this.makeFileSnapshot((Set)entry.getValue()));
        }
        this.syncStatus = true;
        try {
            this.syncSchema();
            this.syncAllData();
        }
        catch (SyncConnectionException e) {
            logger.error("cannot finish sync process", (Throwable)e);
            this.syncStatus = false;
            return;
        }
        for (String snapshotPath : config.getSnapshotPaths()) {
            FileUtils.deleteDirectory((File)new File(snapshotPath));
        }
        try {
            this.serviceClient.cleanUp();
        }
        catch (TException e) {
            logger.error("Unable to connect to receiver.", (Throwable)e);
        }
        this.transport.close();
        logger.info("Sync process has finished.");
        this.syncStatus = false;
    }

    @Override
    public void syncAllData() throws SyncConnectionException {
        for (Map.Entry<String, Set<String>> entry : this.validAllFiles.entrySet()) {
            Set<String> validFiles = entry.getValue();
            Set<String> validSnapshot = this.validFileSnapshot.get(entry.getKey());
            if (validSnapshot.isEmpty()) continue;
            logger.info("Sync process starts to transfer data of storage group {}", (Object)entry.getKey());
            try {
                if (!this.serviceClient.init(entry.getKey())) {
                    throw new SyncConnectionException("unable init receiver");
                }
            }
            catch (TException e) {
                throw new SyncConnectionException("Unable to connect to receiver", e);
            }
            this.syncData(validSnapshot);
            if (this.afterSynchronization()) {
                this.currentLocalFiles.get(entry.getKey()).addAll(validFiles);
                this.syncFileManager.setCurrentLocalFiles(this.currentLocalFiles);
                this.syncFileManager.backupNowLocalFileInfo(config.getLastFileInfo());
                logger.info("Sync process has finished storage group {}.", (Object)entry.getKey());
                continue;
            }
            logger.error("Receiver cannot sync data, abandon this synchronization of storage group {}", (Object)entry.getKey());
        }
    }

    @Override
    public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
        this.transport = new TSocket(serverIp, serverPort);
        TBinaryProtocol protocol = new TBinaryProtocol(this.transport);
        this.serviceClient = new SyncService.Client((TProtocol)protocol);
        try {
            this.transport.open();
        }
        catch (TTransportException e) {
            this.syncStatus = false;
            logger.error("Cannot connect to server");
            throw new SyncConnectionException(e);
        }
    }

    @Override
    public boolean confirmIdentity(String uuidPath) throws SyncConnectionException, IOException {
        boolean legalConnection;
        String uuid;
        block32: {
            Throwable throwable;
            File file = new File(uuidPath);
            if (!file.getParentFile().exists()) {
                file.getParentFile().mkdirs();
            }
            if (!file.exists()) {
                try {
                    throwable = null;
                    try (FileOutputStream out = new FileOutputStream(file);){
                        file.createNewFile();
                        uuid = this.generateUUID();
                        out.write(uuid.getBytes());
                        break block32;
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                }
                catch (IOException e) {
                    logger.error("Cannot insert UUID to file {}", (Object)file.getPath());
                    throw new IOException(e);
                }
            }
            try {
                throwable = null;
                try (BufferedReader bf = new BufferedReader(new FileReader(uuidPath));){
                    uuid = bf.readLine();
                }
                catch (Throwable throwable3) {
                    throwable = throwable3;
                    throw throwable3;
                }
            }
            catch (IOException e) {
                logger.error("Cannot read UUID from file{}", (Object)file.getPath());
                throw new IOException(e);
            }
        }
        try {
            legalConnection = this.serviceClient.checkIdentity(uuid, InetAddress.getLocalHost().getHostAddress());
        }
        catch (Exception e) {
            logger.error("Cannot confirm identity with receiver");
            throw new SyncConnectionException(e);
        }
        return legalConnection;
    }

    private String generateUUID() {
        return "sync-client" + UUID.randomUUID().toString().replaceAll("-", "");
    }

    @Override
    public Set<String> makeFileSnapshot(Set<String> validFiles) throws IOException {
        HashSet<String> validFilesSnapshot = new HashSet<String>();
        try {
            for (String filePath : validFiles) {
                String snapshotFilePath = SyncUtils.getSnapshotFilePath(filePath);
                validFilesSnapshot.add(snapshotFilePath);
                File newFile = new File(snapshotFilePath);
                if (!newFile.getParentFile().exists()) {
                    newFile.getParentFile().mkdirs();
                }
                Path link = FileSystems.getDefault().getPath(snapshotFilePath, new String[0]);
                Path target = FileSystems.getDefault().getPath(filePath, new String[0]);
                Files.createLink(link, target);
            }
        }
        catch (IOException e) {
            logger.error("Can not make fileSnapshot");
            throw new IOException(e);
        }
        return validFilesSnapshot;
    }

    public void syncData(Set<String> fileSnapshotList) throws SyncConnectionException {
        try {
            int successNum = 0;
            for (String snapshotFilePath : fileSnapshotList) {
                String[] name;
                ++successNum;
                File file = new File(snapshotFilePath);
                ArrayList<String> filePathSplit = new ArrayList<String>();
                String os = System.getProperty("os.name");
                if (os.toLowerCase().startsWith("windows")) {
                    name = snapshotFilePath.split(File.separator + File.separator);
                    filePathSplit.add(name[name.length - 2]);
                    filePathSplit.add(name[name.length - 1]);
                } else {
                    name = snapshotFilePath.split(File.separator);
                    filePathSplit.add(name[name.length - 2]);
                    filePathSplit.add(name[name.length - 1]);
                }
                int retryCount = 0;
                MessageDigest md = MessageDigest.getInstance("MD5");
                block27: while (true) {
                    String md5OfReceiver;
                    if (++retryCount > 10) {
                        throw new SyncConnectionException(String.format("can not sync file %s after %s tries.", snapshotFilePath, 10));
                    }
                    md.reset();
                    byte[] buffer = new byte[0x4000000];
                    FileInputStream fis = new FileInputStream(file);
                    Throwable throwable = null;
                    try {
                        ByteArrayOutputStream bos = new ByteArrayOutputStream(0x4000000);
                        Throwable throwable2 = null;
                        try {
                            int dataLength;
                            while ((dataLength = fis.read(buffer)) != -1) {
                                bos.write(buffer, 0, dataLength);
                                md.update(buffer, 0, dataLength);
                                ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
                                bos.reset();
                                if (Boolean.parseBoolean(this.serviceClient.syncData(null, filePathSplit, buffToSend, SyncDataStatus.PROCESSING_STATUS))) continue;
                                logger.info("Receiver failed to receive data from {}, retry.", (Object)snapshotFilePath);
                                continue block27;
                            }
                        }
                        catch (Throwable throwable3) {
                            throwable2 = throwable3;
                            throw throwable3;
                        }
                        finally {
                            if (bos == null) continue;
                            if (throwable2 != null) {
                                try {
                                    bos.close();
                                }
                                catch (Throwable throwable4) {
                                    throwable2.addSuppressed(throwable4);
                                }
                                continue;
                            }
                            bos.close();
                            continue;
                        }
                    }
                    catch (Throwable throwable5) {
                        throwable = throwable5;
                        throw throwable5;
                    }
                    finally {
                        if (fis == null) continue;
                        if (throwable != null) {
                            try {
                                fis.close();
                            }
                            catch (Throwable throwable6) {
                                throwable.addSuppressed(throwable6);
                            }
                            continue;
                        }
                        fis.close();
                        continue;
                    }
                    String md5OfSender = new BigInteger(1, md.digest()).toString(16);
                    if (md5OfSender.equals(md5OfReceiver = this.serviceClient.syncData(md5OfSender, filePathSplit, null, SyncDataStatus.FINISH_STATUS))) break;
                }
                logger.info("Receiver has received {} successfully.", (Object)snapshotFilePath);
                logger.info(String.format("Task of synchronization has completed %d/%d.", successNum, fileSnapshotList.size()));
            }
        }
        catch (Exception e) {
            throw new SyncConnectionException("Cannot sync data with receiver.", e);
        }
    }

    @Override
    public void syncSchema() throws SyncConnectionException {
        int retryCount = 0;
        block31: while (true) {
            if (++retryCount > 10) {
                throw new SyncConnectionException(String.format("can not sync schema after %s tries.", 10));
            }
            byte[] buffer = new byte[0x4000000];
            try {
                FileInputStream fis = new FileInputStream(new File(config.getSchemaPath()));
                Throwable throwable = null;
                try {
                    ByteArrayOutputStream bos = new ByteArrayOutputStream(0x4000000);
                    Throwable throwable2 = null;
                    try {
                        String md5OfReceiver;
                        int dataLength;
                        MessageDigest md = MessageDigest.getInstance("MD5");
                        while ((dataLength = fis.read(buffer)) != -1) {
                            bos.write(buffer, 0, dataLength);
                            md.update(buffer, 0, dataLength);
                            ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray());
                            bos.reset();
                            if (Boolean.parseBoolean(this.serviceClient.syncSchema(null, buffToSend, SyncDataStatus.PROCESSING_STATUS))) continue;
                            logger.error("Receiver failed to receive metadata, retry.");
                            continue block31;
                        }
                        bos.close();
                        String md5OfSender = new BigInteger(1, md.digest()).toString(16);
                        if (!md5OfSender.equals(md5OfReceiver = this.serviceClient.syncSchema(md5OfSender, null, SyncDataStatus.FINISH_STATUS))) continue;
                        logger.info("Receiver has received schema successfully.");
                        if (Boolean.parseBoolean(this.serviceClient.syncSchema(null, null, SyncDataStatus.SUCCESS_STATUS))) {
                            throw new SyncConnectionException("Receiver failed to load metadata");
                        }
                        break;
                    }
                    catch (Throwable throwable3) {
                        throwable2 = throwable3;
                        throw throwable3;
                    }
                    finally {
                        if (bos == null) continue;
                        if (throwable2 != null) {
                            try {
                                bos.close();
                            }
                            catch (Throwable throwable4) {
                                throwable2.addSuppressed(throwable4);
                            }
                            continue;
                        }
                        bos.close();
                    }
                }
                catch (Throwable throwable5) {
                    throwable = throwable5;
                    throw throwable5;
                }
                finally {
                    if (fis == null) continue;
                    if (throwable != null) {
                        try {
                            fis.close();
                        }
                        catch (Throwable throwable6) {
                            throwable.addSuppressed(throwable6);
                        }
                        continue;
                    }
                    fis.close();
                }
            }
            catch (Exception e) {
                logger.error("Cannot sync schema ", (Throwable)e);
                throw new SyncConnectionException(e);
            }
        }
    }

    @Override
    public boolean afterSynchronization() throws SyncConnectionException {
        boolean successOrNot;
        try {
            successOrNot = this.serviceClient.load();
        }
        catch (TException e) {
            throw new SyncConnectionException("Can not finish sync process because sync receiver has broken down.", e);
        }
        return successOrNot;
    }

    private void verifySingleton() throws IOException {
        File lockFile = new File(config.getLockFilePath());
        if (!lockFile.getParentFile().exists()) {
            lockFile.getParentFile().mkdirs();
        }
        if (!lockFile.exists()) {
            lockFile.createNewFile();
        }
        if (!SyncSenderImpl.lockInstance(config.getLockFilePath())) {
            logger.error("Sync client is running.");
            System.exit(1);
        }
    }

    private static boolean lockInstance(String lockFile) {
        try {
            File file = new File(lockFile);
            RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
            FileLock fileLock = randomAccessFile.getChannel().tryLock();
            if (fileLock != null) {
                Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                    try {
                        fileLock.release();
                        randomAccessFile.close();
                        FileUtils.forceDelete((File)file);
                    }
                    catch (Exception e) {
                        logger.error("Unable to remove lock file: {}", (Object)lockFile, (Object)e);
                    }
                }));
                return true;
            }
        }
        catch (Exception e) {
            logger.error("Unable to create and/or lock file: {}", (Object)lockFile, (Object)e);
        }
        return false;
    }

    public void setConfig(SyncSenderConfig config) {
        SyncSenderImpl.config = config;
    }

    public List<String> getSchema() {
        return this.schema;
    }

    private static class InstanceHolder {
        private static final SyncSenderImpl INSTANCE = new SyncSenderImpl();

        private InstanceHolder() {
        }
    }
}

