/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.sync.common.persistence;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.sync.SyncPathUtil;
import org.apache.iotdb.db.mpp.plan.constant.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SyncLogReader {
    private static final Logger logger = LoggerFactory.getLogger(SyncLogReader.class);
    private Map<String, List<PipeMessage>> pipeMessageMap = new ConcurrentHashMap<String, List<PipeMessage>>();
    private Map<String, PipeSink> pipeSinks = new ConcurrentHashMap<String, PipeSink>();
    private List<PipeInfo> pipes = new ArrayList<PipeInfo>();
    private PipeInfo runningPipe;

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void recover() throws StartupException {
        logger.info("Start to recover all sync state for sync.");
        this.pipeMessageMap = new ConcurrentHashMap<String, List<PipeMessage>>();
        this.pipeSinks = new ConcurrentHashMap<String, PipeSink>();
        this.pipes = new ArrayList<PipeInfo>();
        File serviceLogFile = new File(SyncPathUtil.getSysDir(), "syncService.log");
        try (BufferedReader br = new BufferedReader(new FileReader(serviceLogFile));){
            this.recoverPipe(br);
        }
        catch (IOException e) {
            logger.warn("Sync service log file not found");
        }
        File msgLogFile = new File(SyncPathUtil.getSysDir(), "syncMessage.log");
        try (BufferedReader loadReader = new BufferedReader(new FileReader(msgLogFile));){
            String line;
            int lineNum = 0;
            while ((line = loadReader.readLine()) != null) {
                ++lineNum;
                try {
                    this.analyzeMsgLog(line);
                }
                catch (Exception e) {
                    logger.error("Sync msg log recovery error: log file parse error at line " + lineNum);
                    logger.error(e.getMessage());
                    throw new StartupException(ServiceType.SYNC_SERVICE.getName(), "Sync msg log file recover error at line " + lineNum);
                    return;
                }
            }
        }
        catch (IOException e) {
            logger.info("Sync msg log file not found");
        }
    }

    public Map<String, List<PipeMessage>> getPipeMessageMap() {
        return this.pipeMessageMap;
    }

    public Map<String, PipeSink> getAllPipeSinks() {
        return this.pipeSinks;
    }

    public List<PipeInfo> getAllPipeInfos() {
        return this.pipes;
    }

    public PipeInfo getRunningPipeInfo() {
        return this.runningPipe;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void recoverPipe(BufferedReader br) throws IOException {
        int lineNumber = 0;
        String readLine = "";
        try {
            StatementType type;
            block10: while (true) {
                readLine = br.readLine();
                if (readLine == null) return;
                ++lineNumber;
                String[] parseStrings = readLine.split("#");
                type = StatementType.valueOf(parseStrings[0]);
                switch (type) {
                    case CREATE_PIPESINK: {
                        readLine = br.readLine();
                        ++lineNumber;
                        CreatePipeSinkStatement createPipeSinkStatement = CreatePipeSinkStatement.parseString(readLine);
                        this.pipeSinks.put(createPipeSinkStatement.getPipeSinkName(), SyncPipeUtil.parseCreatePipeSinkStatement(createPipeSinkStatement));
                        continue block10;
                    }
                    case DROP_PIPESINK: {
                        this.pipeSinks.remove(parseStrings[1]);
                        continue block10;
                    }
                    case CREATE_PIPE: {
                        readLine = br.readLine();
                        ++lineNumber;
                        CreatePipeStatement createPipeStatement = CreatePipeStatement.parseString(readLine);
                        this.runningPipe = SyncPipeUtil.parseCreatePipePlanAsPipeInfo(createPipeStatement, this.pipeSinks.get(createPipeStatement.getPipeSinkName()), Long.parseLong(parseStrings[1]));
                        this.pipes.add(this.runningPipe);
                        continue block10;
                    }
                    case STOP_PIPE: {
                        this.runningPipe.stop();
                        continue block10;
                    }
                    case START_PIPE: {
                        this.runningPipe.start();
                        continue block10;
                    }
                    case DROP_PIPE: {
                        this.runningPipe.drop();
                        continue block10;
                    }
                }
                break;
            }
            throw new UnsupportedOperationException(String.format("Can not recognize type %s.", type.name()));
        }
        catch (Exception e) {
            throw new IOException(String.format("Recover error in line %d : %s, because %s", lineNumber, readLine, e));
        }
    }

    private void analyzeMsgLog(String logLine) {
        String[] items = logLine.split(",");
        String pipeIdentifier = items[0];
        if (items.length == 3) {
            PipeMessage message = new PipeMessage(PipeMessage.MsgType.valueOf(items[1]), items[2]);
            this.pipeMessageMap.computeIfAbsent(pipeIdentifier, i -> new ArrayList()).add(message);
        } else {
            this.pipeMessageMap.remove(pipeIdentifier);
        }
    }
}

