/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.sync.receiver;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.db.exception.sync.PipeServerException;
import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.query.dataset.ListDataSet;
import org.apache.iotdb.db.sync.conf.SyncPathUtil;
import org.apache.iotdb.db.sync.pipedata.queue.PipeDataQueueFactory;
import org.apache.iotdb.db.sync.receiver.collector.Collector;
import org.apache.iotdb.db.sync.receiver.manager.PipeInfo;
import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
import org.apache.iotdb.db.sync.receiver.manager.ReceiverManager;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.transport.server.TransportServerManager;
import org.apache.iotdb.service.transport.thrift.ResponseType;
import org.apache.iotdb.service.transport.thrift.SyncRequest;
import org.apache.iotdb.service.transport.thrift.SyncResponse;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReceiverService
implements IService {
    private static final Logger logger = LoggerFactory.getLogger(ReceiverService.class);
    private static final ReceiverManager receiverManager = ReceiverManager.getInstance();
    private final Collector collector = new Collector();

    public synchronized void startPipeServer(boolean isRecovery) throws PipeServerException {
        if (receiverManager.isPipeServerEnable() && !isRecovery) {
            return;
        }
        try {
            TransportServerManager.getInstance().startService();
            receiverManager.startServer();
            this.collector.startCollect();
        }
        catch (IOException | StartupException e) {
            throw new PipeServerException("Failed to start pipe server because " + e.getMessage());
        }
    }

    public synchronized void stopPipeServer() throws PipeServerException {
        if (!receiverManager.isPipeServerEnable()) {
            return;
        }
        try {
            List<PipeInfo> pipeInfos = receiverManager.getAllPipeInfos();
            for (PipeInfo pipeInfo : pipeInfos) {
                if (!pipeInfo.getStatus().equals((Object)Pipe.PipeStatus.RUNNING)) continue;
                throw new PipeServerException("Failed to stop pipe server because there is pipe still running.");
            }
            TransportServerManager.getInstance().stopService();
            receiverManager.stopServer();
            this.collector.stopCollect();
        }
        catch (IOException e) {
            throw new PipeServerException("Failed to start pipe server because " + e.getMessage());
        }
    }

    private void checkPipe(String pipeName, String remoteIp, long createTime) throws IOException {
        PipeInfo pipeInfo = receiverManager.getPipeInfo(pipeName, remoteIp, createTime);
        if (pipeInfo != null && pipeInfo.getStatus().equals((Object)Pipe.PipeStatus.STOP)) {
            this.startPipe(pipeName, remoteIp, createTime);
        }
    }

    public synchronized SyncResponse receiveMsg(SyncRequest request) {
        SyncResponse response = new SyncResponse(ResponseType.INFO, "");
        try {
            block1 : switch (request.getType()) {
                case HEARTBEAT: {
                    this.checkPipe(request.getPipeName(), request.getRemoteIp(), request.getCreateTime());
                    PipeMessage message = receiverManager.getPipeMessage(request.getPipeName(), request.getRemoteIp(), request.getCreateTime(), true);
                    switch (message.getType()) {
                        case INFO: {
                            break block1;
                        }
                        case WARN: {
                            response = new SyncResponse(ResponseType.WARN, "");
                            break block1;
                        }
                        case ERROR: {
                            response = new SyncResponse(ResponseType.ERROR, "");
                            break block1;
                        }
                    }
                    throw new UnsupportedOperationException("Wrong message type " + (Object)((Object)message.getType()));
                }
                case CREATE: {
                    this.createPipe(request.getPipeName(), request.getRemoteIp(), request.getCreateTime());
                    break;
                }
                case START: {
                    this.startPipe(request.getPipeName(), request.getRemoteIp(), request.getCreateTime());
                    break;
                }
                case STOP: {
                    this.stopPipe(request.getPipeName(), request.getRemoteIp(), request.getCreateTime());
                    break;
                }
                case DROP: {
                    this.dropPipe(request.getPipeName(), request.getRemoteIp(), request.getCreateTime());
                }
            }
        }
        catch (IOException e) {
            logger.warn("Cannot handle message because {}", (Object)e.getMessage());
        }
        return response;
    }

    private void createPipe(String pipeName, String remoteIp, long createTime) throws IOException {
        PipeInfo pipeInfo = receiverManager.getPipeInfo(pipeName, remoteIp, createTime);
        if (pipeInfo == null || pipeInfo.getStatus().equals((Object)Pipe.PipeStatus.DROP)) {
            logger.info("create Pipe name={}, remoteIp={}, createTime={}", new Object[]{pipeName, remoteIp, createTime});
            this.createDir(pipeName, remoteIp, createTime);
            receiverManager.createPipe(pipeName, remoteIp, createTime);
        }
    }

    private void startPipe(String pipeName, String remoteIp, long createTime) throws IOException {
        PipeInfo pipeInfo = receiverManager.getPipeInfo(pipeName, remoteIp, createTime);
        if (pipeInfo != null && pipeInfo.getStatus().equals((Object)Pipe.PipeStatus.STOP)) {
            logger.info("start Pipe name={}, remoteIp={}, createTime={}", new Object[]{pipeName, remoteIp, createTime});
            receiverManager.startPipe(pipeName, remoteIp, createTime);
            this.collector.startPipe(pipeName, remoteIp, createTime);
        }
    }

    private void stopPipe(String pipeName, String remoteIp, long createTime) throws IOException {
        PipeInfo pipeInfo = receiverManager.getPipeInfo(pipeName, remoteIp, createTime);
        if (pipeInfo != null && pipeInfo.getStatus().equals((Object)Pipe.PipeStatus.RUNNING)) {
            logger.info("stop Pipe name={}, remoteIp={}, createTime={}", new Object[]{pipeName, remoteIp, createTime});
            receiverManager.stopPipe(pipeName, remoteIp, createTime);
            this.collector.stopPipe(pipeName, remoteIp, createTime);
        }
    }

    private void dropPipe(String pipeName, String remoteIp, long createTime) throws IOException {
        PipeInfo pipeInfo = receiverManager.getPipeInfo(pipeName, remoteIp, createTime);
        if (pipeInfo != null && !pipeInfo.getStatus().equals((Object)Pipe.PipeStatus.DROP)) {
            logger.info("drop Pipe name={}, remoteIp={}, createTime={}", new Object[]{pipeName, remoteIp, createTime});
            receiverManager.dropPipe(pipeName, remoteIp, createTime);
            this.collector.stopPipe(pipeName, remoteIp, createTime);
            PipeDataQueueFactory.removeBufferedPipeDataQueue(SyncPathUtil.getReceiverPipeLogDir(pipeName, remoteIp, createTime));
            File dir = new File(SyncPathUtil.getReceiverPipeDir(pipeName, remoteIp, createTime));
            FileUtils.deleteDirectory((File)dir);
        }
    }

    private void createDir(String pipeName, String remoteIp, long createTime) {
        File f = new File(SyncPathUtil.getReceiverFileDataDir(pipeName, remoteIp, createTime));
        if (!f.exists()) {
            f.mkdirs();
        }
        if (!(f = new File(SyncPathUtil.getReceiverPipeLogDir(pipeName, remoteIp, createTime))).exists()) {
            f.mkdirs();
        }
    }

    public QueryDataSet showPipeServer(ShowPipeServerPlan plan) {
        ListDataSet dataSet = new ListDataSet(Collections.singletonList(new PartialPath("enable", false)), Collections.singletonList(TSDataType.BOOLEAN));
        RowRecord rowRecord = new RowRecord(0L);
        Field status = new Field(TSDataType.BOOLEAN);
        status.setBoolV(receiverManager.isPipeServerEnable());
        rowRecord.addField(status);
        dataSet.putRecord(rowRecord);
        return dataSet;
    }

    public QueryDataSet showPipe(ShowPipePlan plan, ListDataSet dataSet) {
        List<PipeInfo> pipeInfos = !StringUtils.isEmpty((CharSequence)plan.getPipeName()) ? receiverManager.getPipeInfosByPipeName(plan.getPipeName()) : receiverManager.getAllPipeInfos();
        for (PipeInfo pipeInfo : pipeInfos) {
            this.putPipeRecord(dataSet, pipeInfo);
        }
        return dataSet;
    }

    private void putPipeRecord(ListDataSet dataSet, PipeInfo pipeInfo) {
        RowRecord record = new RowRecord(0L);
        record.addField((Object)Binary.valueOf((String)DatetimeUtils.convertLongToDate(pipeInfo.getCreateTime())), TSDataType.TEXT);
        record.addField((Object)Binary.valueOf((String)pipeInfo.getPipeName()), TSDataType.TEXT);
        record.addField((Object)Binary.valueOf((String)"receiver"), TSDataType.TEXT);
        record.addField((Object)Binary.valueOf((String)pipeInfo.getRemoteIp()), TSDataType.TEXT);
        record.addField((Object)Binary.valueOf((String)pipeInfo.getStatus().name()), TSDataType.TEXT);
        record.addField((Object)Binary.valueOf((String)receiverManager.getPipeMessage(pipeInfo.getPipeName(), pipeInfo.getRemoteIp(), pipeInfo.getCreateTime(), false).getMsg()), TSDataType.TEXT);
        dataSet.putRecord(record);
    }

    private ReceiverService() {
    }

    public static ReceiverService getInstance() {
        return ReceiverServiceHolder.INSTANCE;
    }

    public void start() throws StartupException {
        receiverManager.init();
        if (receiverManager.isPipeServerEnable()) {
            try {
                this.startPipeServer(true);
            }
            catch (PipeServerException e) {
                throw new StartupException(e.getMessage());
            }
        }
    }

    public void stop() {
        try {
            receiverManager.close();
            this.collector.stopCollect();
        }
        catch (IOException e) {
            logger.error(e.getMessage());
        }
    }

    public ServiceType getID() {
        return ServiceType.RECEIVER_SERVICE;
    }

    private static class ReceiverServiceHolder {
        private static final ReceiverService INSTANCE = new ReceiverService();

        private ReceiverServiceHolder() {
        }
    }
}

