package org.apache.iotdb.db.sync.receiver;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.db.engine.compaction.log.TsFileIdentifier;
import org.apache.iotdb.db.exception.sync.PipeServerException;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.query.dataset.ListDataSet;
import org.apache.iotdb.db.sync.conf.SyncConstant;
import org.apache.iotdb.db.sync.conf.SyncPathUtil;
import org.apache.iotdb.db.sync.pipedata.queue.PipeDataQueueFactory;
import org.apache.iotdb.db.sync.receiver.collector.Collector;
import org.apache.iotdb.db.sync.receiver.manager.PipeInfo;
import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
import org.apache.iotdb.db.sync.receiver.manager.ReceiverManager;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.transport.server.TransportServerManager;
import org.apache.iotdb.service.transport.thrift.RequestType;
import org.apache.iotdb.service.transport.thrift.ResponseType;
import org.apache.iotdb.service.transport.thrift.SyncRequest;
import org.apache.iotdb.service.transport.thrift.SyncResponse;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/sync/receiver/ReceiverService.class */
public class ReceiverService implements IService {
    private static final Logger logger = LoggerFactory.getLogger(ReceiverService.class);
    private static final ReceiverManager receiverManager = ReceiverManager.getInstance();
    private final Collector collector;

    /* renamed from: org.apache.iotdb.db.sync.receiver.ReceiverService$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/iotdb/db/sync/receiver/ReceiverService$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$iotdb$service$transport$thrift$RequestType = new int[RequestType.values().length];

        static {
            try {
                $SwitchMap$org$apache$iotdb$service$transport$thrift$RequestType[RequestType.HEARTBEAT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$iotdb$service$transport$thrift$RequestType[RequestType.CREATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$iotdb$service$transport$thrift$RequestType[RequestType.START.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$iotdb$service$transport$thrift$RequestType[RequestType.STOP.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$iotdb$service$transport$thrift$RequestType[RequestType.DROP.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            $SwitchMap$org$apache$iotdb$db$sync$receiver$manager$PipeMessage$MsgType = new int[PipeMessage.MsgType.values().length];
            try {
                $SwitchMap$org$apache$iotdb$db$sync$receiver$manager$PipeMessage$MsgType[PipeMessage.MsgType.INFO.ordinal()] = 1;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$iotdb$db$sync$receiver$manager$PipeMessage$MsgType[PipeMessage.MsgType.WARN.ordinal()] = 2;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$iotdb$db$sync$receiver$manager$PipeMessage$MsgType[PipeMessage.MsgType.ERROR.ordinal()] = 3;
            } catch (NoSuchFieldError e8) {
            }
        }
    }

    /* loaded from: input_file:org/apache/iotdb/db/sync/receiver/ReceiverService$ReceiverServiceHolder.class */
    private static class ReceiverServiceHolder {
        private static final ReceiverService INSTANCE = new ReceiverService(null);

        private ReceiverServiceHolder() {
        }
    }

    public synchronized void startPipeServer(boolean z) throws PipeServerException {
        if (!receiverManager.isPipeServerEnable() || z) {
            try {
                TransportServerManager.getInstance().startService();
                receiverManager.startServer();
                this.collector.startCollect();
            } catch (IOException | StartupException e) {
                throw new PipeServerException("Failed to start pipe server because " + e.getMessage());
            }
        }
    }

    public synchronized void stopPipeServer() throws PipeServerException {
        if (receiverManager.isPipeServerEnable()) {
            try {
                Iterator<PipeInfo> it = receiverManager.getAllPipeInfos().iterator();
                while (it.hasNext()) {
                    if (it.next().getStatus().equals(Pipe.PipeStatus.RUNNING)) {
                        throw new PipeServerException("Failed to stop pipe server because there is pipe still running.");
                    }
                }
                TransportServerManager.getInstance().stopService();
                receiverManager.stopServer();
                this.collector.stopCollect();
            } catch (IOException e) {
                throw new PipeServerException("Failed to start pipe server because " + e.getMessage());
            }
        }
    }

    private void checkPipe(String str, String str2, long j) throws IOException {
        PipeInfo pipeInfo = receiverManager.getPipeInfo(str, str2, j);
        if (pipeInfo == null || !pipeInfo.getStatus().equals(Pipe.PipeStatus.STOP)) {
            return;
        }
        startPipe(str, str2, j);
    }

    /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
    /* JADX WARN: Failed to find 'out' block for switch in B:4:0x0018. Please report as an issue. */
    public synchronized SyncResponse receiveMsg(SyncRequest syncRequest) {
        SyncResponse syncResponse = new SyncResponse(ResponseType.INFO, AlignedPath.VECTOR_PLACEHOLDER);
        try {
        } catch (IOException e) {
            logger.warn("Cannot handle message because {}", e.getMessage());
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$service$transport$thrift$RequestType[syncRequest.getType().ordinal()]) {
            case 1:
                checkPipe(syncRequest.getPipeName(), syncRequest.getRemoteIp(), syncRequest.getCreateTime());
                PipeMessage pipeMessage = receiverManager.getPipeMessage(syncRequest.getPipeName(), syncRequest.getRemoteIp(), syncRequest.getCreateTime(), true);
                switch (pipeMessage.getType()) {
                    case INFO:
                        break;
                    case WARN:
                        syncResponse = new SyncResponse(ResponseType.WARN, AlignedPath.VECTOR_PLACEHOLDER);
                        break;
                    case ERROR:
                        syncResponse = new SyncResponse(ResponseType.ERROR, AlignedPath.VECTOR_PLACEHOLDER);
                        break;
                    default:
                        throw new UnsupportedOperationException("Wrong message type " + pipeMessage.getType());
                }
                return syncResponse;
            case 2:
                createPipe(syncRequest.getPipeName(), syncRequest.getRemoteIp(), syncRequest.getCreateTime());
                return syncResponse;
            case 3:
                startPipe(syncRequest.getPipeName(), syncRequest.getRemoteIp(), syncRequest.getCreateTime());
                return syncResponse;
            case 4:
                stopPipe(syncRequest.getPipeName(), syncRequest.getRemoteIp(), syncRequest.getCreateTime());
                return syncResponse;
            case TsFileIdentifier.SEQUENCE_OFFSET_IN_PATH /* 5 */:
                dropPipe(syncRequest.getPipeName(), syncRequest.getRemoteIp(), syncRequest.getCreateTime());
                return syncResponse;
            default:
                return syncResponse;
        }
    }

    private void createPipe(String str, String str2, long j) throws IOException {
        PipeInfo pipeInfo = receiverManager.getPipeInfo(str, str2, j);
        if (pipeInfo == null || pipeInfo.getStatus().equals(Pipe.PipeStatus.DROP)) {
            logger.info("create Pipe name={}, remoteIp={}, createTime={}", new Object[]{str, str2, Long.valueOf(j)});
            createDir(str, str2, j);
            receiverManager.createPipe(str, str2, j);
        }
    }

    private void startPipe(String str, String str2, long j) throws IOException {
        PipeInfo pipeInfo = receiverManager.getPipeInfo(str, str2, j);
        if (pipeInfo == null || !pipeInfo.getStatus().equals(Pipe.PipeStatus.STOP)) {
            return;
        }
        logger.info("start Pipe name={}, remoteIp={}, createTime={}", new Object[]{str, str2, Long.valueOf(j)});
        receiverManager.startPipe(str, str2, j);
        this.collector.startPipe(str, str2, j);
    }

    private void stopPipe(String str, String str2, long j) throws IOException {
        PipeInfo pipeInfo = receiverManager.getPipeInfo(str, str2, j);
        if (pipeInfo == null || !pipeInfo.getStatus().equals(Pipe.PipeStatus.RUNNING)) {
            return;
        }
        logger.info("stop Pipe name={}, remoteIp={}, createTime={}", new Object[]{str, str2, Long.valueOf(j)});
        receiverManager.stopPipe(str, str2, j);
        this.collector.stopPipe(str, str2, j);
    }

    private void dropPipe(String str, String str2, long j) throws IOException {
        PipeInfo pipeInfo = receiverManager.getPipeInfo(str, str2, j);
        if (pipeInfo == null || pipeInfo.getStatus().equals(Pipe.PipeStatus.DROP)) {
            return;
        }
        logger.info("drop Pipe name={}, remoteIp={}, createTime={}", new Object[]{str, str2, Long.valueOf(j)});
        receiverManager.dropPipe(str, str2, j);
        this.collector.stopPipe(str, str2, j);
        PipeDataQueueFactory.removeBufferedPipeDataQueue(SyncPathUtil.getReceiverPipeLogDir(str, str2, j));
        FileUtils.deleteDirectory(new File(SyncPathUtil.getReceiverPipeDir(str, str2, j)));
    }

    private void createDir(String str, String str2, long j) {
        File file = new File(SyncPathUtil.getReceiverFileDataDir(str, str2, j));
        if (!file.exists()) {
            file.mkdirs();
        }
        File file2 = new File(SyncPathUtil.getReceiverPipeLogDir(str, str2, j));
        if (file2.exists()) {
            return;
        }
        file2.mkdirs();
    }

    public QueryDataSet showPipeServer(ShowPipeServerPlan showPipeServerPlan) {
        ListDataSet listDataSet = new ListDataSet(Collections.singletonList(new PartialPath("enable", false)), Collections.singletonList(TSDataType.BOOLEAN));
        RowRecord rowRecord = new RowRecord(0L);
        Field field = new Field(TSDataType.BOOLEAN);
        field.setBoolV(receiverManager.isPipeServerEnable());
        rowRecord.addField(field);
        listDataSet.putRecord(rowRecord);
        return listDataSet;
    }

    public QueryDataSet showPipe(ShowPipePlan showPipePlan, ListDataSet listDataSet) {
        Iterator<PipeInfo> it = (!StringUtils.isEmpty(showPipePlan.getPipeName()) ? receiverManager.getPipeInfosByPipeName(showPipePlan.getPipeName()) : receiverManager.getAllPipeInfos()).iterator();
        while (it.hasNext()) {
            putPipeRecord(listDataSet, it.next());
        }
        return listDataSet;
    }

    private void putPipeRecord(ListDataSet listDataSet, PipeInfo pipeInfo) {
        RowRecord rowRecord = new RowRecord(0L);
        rowRecord.addField(Binary.valueOf(DatetimeUtils.convertLongToDate(pipeInfo.getCreateTime())), TSDataType.TEXT);
        rowRecord.addField(Binary.valueOf(pipeInfo.getPipeName()), TSDataType.TEXT);
        rowRecord.addField(Binary.valueOf(SyncConstant.RECEIVER_DIR_NAME), TSDataType.TEXT);
        rowRecord.addField(Binary.valueOf(pipeInfo.getRemoteIp()), TSDataType.TEXT);
        rowRecord.addField(Binary.valueOf(pipeInfo.getStatus().name()), TSDataType.TEXT);
        rowRecord.addField(Binary.valueOf(receiverManager.getPipeMessage(pipeInfo.getPipeName(), pipeInfo.getRemoteIp(), pipeInfo.getCreateTime(), false).getMsg()), TSDataType.TEXT);
        listDataSet.putRecord(rowRecord);
    }

    private ReceiverService() {
        this.collector = new Collector();
    }

    public static ReceiverService getInstance() {
        return ReceiverServiceHolder.INSTANCE;
    }

    public void start() throws StartupException {
        receiverManager.init();
        if (receiverManager.isPipeServerEnable()) {
            try {
                startPipeServer(true);
            } catch (PipeServerException e) {
                throw new StartupException(e.getMessage());
            }
        }
    }

    public void stop() {
        try {
            receiverManager.close();
            this.collector.stopCollect();
        } catch (IOException e) {
            logger.error(e.getMessage());
        }
    }

    public ServiceType getID() {
        return ServiceType.RECEIVER_SERVICE;
    }

    /* synthetic */ ReceiverService(AnonymousClass1 anonymousClass1) {
        this();
    }
}
