package org.apache.iotdb.db.sync;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.ShutdownException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.sync.SyncConstant;
import org.apache.iotdb.commons.sync.SyncPathUtil;
import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.db.exception.sync.PipeException;
import org.apache.iotdb.db.exception.sync.PipeSinkException;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeStatement;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.query.dataset.ListDataSet;
import org.apache.iotdb.db.sync.common.ISyncInfoFetcher;
import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginManager;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginRegister;
import org.apache.iotdb.db.sync.externalpipe.ExternalPipeStatus;
import org.apache.iotdb.db.sync.sender.manager.ISyncManager;
import org.apache.iotdb.db.sync.sender.pipe.ExternalPipeSink;
import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
import org.apache.iotdb.db.sync.sender.pipe.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
import org.apache.iotdb.db.sync.transport.client.SenderManager;
import org.apache.iotdb.db.sync.transport.server.ReceiverManager;
import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/sync/SyncService.class */
public class SyncService implements IService {
    private static final Logger logger = LoggerFactory.getLogger(SyncService.class);
    private Pipe runningPipe;
    private ExtPipePluginManager extPipePluginManager;
    private ISyncInfoFetcher syncInfoFetcher;
    private SenderManager senderManager;
    private final ReceiverManager receiverManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/sync/SyncService$SyncServiceHolder.class */
    public static class SyncServiceHolder {
        private static final SyncService INSTANCE = new SyncService();

        private SyncServiceHolder() {
        }
    }

    private SyncService() {
        this.syncInfoFetcher = LocalSyncInfoFetcher.getInstance();
        this.receiverManager = new ReceiverManager();
    }

    public static SyncService getInstance() {
        return SyncServiceHolder.INSTANCE;
    }

    public TSStatus handshake(TSyncIdentityInfo tSyncIdentityInfo) {
        return this.receiverManager.handshake(tSyncIdentityInfo);
    }

    public TSStatus transportFile(TSyncTransportMetaInfo tSyncTransportMetaInfo, ByteBuffer byteBuffer) throws TException {
        return this.receiverManager.transportFile(tSyncTransportMetaInfo, byteBuffer);
    }

    public TSStatus transportPipeData(ByteBuffer byteBuffer) throws TException {
        return this.receiverManager.transportPipeData(byteBuffer);
    }

    public void handleClientExit() {
        this.receiverManager.handleClientExit();
    }

    public PipeSink getPipeSink(String str) {
        return this.syncInfoFetcher.getPipeSink(str);
    }

    public void addPipeSink(CreatePipeSinkPlan createPipeSinkPlan) throws PipeSinkException {
        TSStatus addPipeSink = this.syncInfoFetcher.addPipeSink(createPipeSinkPlan);
        if (addPipeSink.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeSinkException(addPipeSink.message);
        }
    }

    public void addPipeSink(CreatePipeSinkStatement createPipeSinkStatement) throws PipeSinkException {
        TSStatus addPipeSink = this.syncInfoFetcher.addPipeSink(createPipeSinkStatement);
        if (addPipeSink.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeSinkException(addPipeSink.message);
        }
    }

    public void dropPipeSink(String str) throws PipeSinkException {
        TSStatus dropPipeSink = this.syncInfoFetcher.dropPipeSink(str);
        if (dropPipeSink.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeSinkException(dropPipeSink.message);
        }
    }

    public List<PipeSink> getAllPipeSink() {
        return this.syncInfoFetcher.getAllPipeSinks();
    }

    public synchronized void addPipe(CreatePipePlan createPipePlan) throws PipeException {
        long currentTime = DatetimeUtils.currentTime();
        if (createPipePlan.getDataStartTimestamp() > currentTime) {
            throw new PipeException(String.format("Start time %s is later than current time %s, this is not supported yet.", DatetimeUtils.convertLongToDate(createPipePlan.getDataStartTimestamp()), DatetimeUtils.convertLongToDate(currentTime)));
        }
        TSStatus addPipe = this.syncInfoFetcher.addPipe(createPipePlan, currentTime);
        if (addPipe.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeException(addPipe.message);
        }
        PipeSink pipeSink = getPipeSink(createPipePlan.getPipeSinkName());
        this.runningPipe = SyncPipeUtil.parseCreatePipePlanAsPipe(createPipePlan, pipeSink, currentTime);
        if (this.runningPipe.getPipeSink().getType() != PipeSink.PipeSinkType.IoTDB) {
            startExternalPipeManager(false);
            return;
        }
        try {
            this.senderManager = new SenderManager(this.runningPipe, (IoTDBPipeSink) pipeSink);
        } catch (ClassCastException e) {
            logger.error(String.format("Cast Class to %s error when create pipe %s.", IoTDBPipeSink.class.getName(), createPipePlan.getPipeName()), e);
            this.runningPipe = null;
            throw new PipeException(String.format("Wrong pipeSink type %s for create pipe %s", pipeSink.getType(), pipeSink.getPipeSinkName()));
        }
    }

    public synchronized void addPipe(CreatePipeStatement createPipeStatement) throws PipeException {
        long currentTime = DatetimeUtils.currentTime();
        if (createPipeStatement.getStartTime() > currentTime) {
            throw new PipeException(String.format("Start time %s is later than current time %s, this is not supported yet.", DatetimeUtils.convertLongToDate(createPipeStatement.getStartTime()), DatetimeUtils.convertLongToDate(currentTime)));
        }
        TSStatus addPipe = this.syncInfoFetcher.addPipe(createPipeStatement, currentTime);
        if (addPipe.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeException(addPipe.message);
        }
        PipeSink pipeSink = getPipeSink(createPipeStatement.getPipeSinkName());
        this.runningPipe = SyncPipeUtil.parseCreatePipePlanAsPipe(createPipeStatement, pipeSink, currentTime);
        if (this.runningPipe.getPipeSink().getType() != PipeSink.PipeSinkType.IoTDB) {
            startExternalPipeManager(false);
            return;
        }
        try {
            this.senderManager = new SenderManager(this.runningPipe, (IoTDBPipeSink) pipeSink);
        } catch (ClassCastException e) {
            logger.error(String.format("Cast Class to %s error when create pipe %s.", IoTDBPipeSink.class.getName(), createPipeStatement.getPipeName()), e);
            this.runningPipe = null;
            throw new PipeException(String.format("Wrong pipeSink type %s for create pipe %s", pipeSink.getType(), pipeSink.getPipeSinkName()));
        }
    }

    public synchronized void stopPipe(String str) throws PipeException {
        checkRunningPipeExistAndName(str);
        if (this.runningPipe.getStatus() == Pipe.PipeStatus.RUNNING) {
            if (this.runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
                this.runningPipe.stop();
                this.senderManager.stop();
            } else {
                if (this.extPipePluginManager != null) {
                    try {
                        this.extPipePluginManager.stopExtPipe(((ExternalPipeSink) this.runningPipe.getPipeSink()).getExtPipeSinkTypeName());
                    } catch (Exception e) {
                        throw new PipeException("Failed to stop externalPipeProcessor. " + e.getMessage());
                    }
                }
                this.runningPipe.stop();
            }
        }
        this.syncInfoFetcher.stopPipe(str);
    }

    public synchronized void startPipe(String str) throws PipeException {
        checkRunningPipeExistAndName(str);
        if (this.runningPipe.getStatus() == Pipe.PipeStatus.STOP) {
            if (this.runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
                this.runningPipe.start();
                this.senderManager.start();
            } else {
                this.runningPipe.start();
                startExternalPipeManager(true);
            }
        }
        this.syncInfoFetcher.startPipe(str);
    }

    public synchronized void dropPipe(String str) throws PipeException {
        checkRunningPipeExistAndName(str);
        try {
            if (this.runningPipe.getPipeSink().getType() != PipeSink.PipeSinkType.IoTDB) {
                if (this.extPipePluginManager != null) {
                    this.extPipePluginManager.dropExtPipe(((ExternalPipeSink) this.runningPipe.getPipeSink()).getExtPipeSinkTypeName());
                    this.extPipePluginManager = null;
                }
                this.runningPipe.drop();
            } else {
                if (!this.senderManager.close()) {
                    throw new PipeException(String.format("Close pipe %s transport error after %s %s, please try again.", this.runningPipe.getName(), SyncConstant.DEFAULT_WAITING_FOR_STOP_MILLISECONDS, TimeUnit.MILLISECONDS.name()));
                }
                this.runningPipe.drop();
            }
            this.syncInfoFetcher.dropPipe(str);
        } catch (InterruptedException e) {
            logger.warn(String.format("Interrupted when waiting for clear transport %s.", this.runningPipe.getName()), e);
            throw new PipeException("Drop error, be interrupted, please try again.");
        }
    }

    public List<PipeInfo> getAllPipeInfos() {
        return this.syncInfoFetcher.getAllPipeInfos();
    }

    private void checkRunningPipeExistAndName(String str) throws PipeException {
        if (this.runningPipe == null || this.runningPipe.getStatus() == Pipe.PipeStatus.DROP) {
            throw new PipeException("There is no existing pipe.");
        }
        if (!this.runningPipe.getName().equals(str)) {
            throw new PipeException(String.format("Pipe %s is %s, please retry after drop it.", this.runningPipe.getName(), this.runningPipe.getStatus()));
        }
    }

    public synchronized void receiveMsg(PipeMessage.MsgType msgType, String str) {
        if (this.runningPipe == null || this.runningPipe.getStatus() == Pipe.PipeStatus.DROP) {
            logger.info(String.format("No running pipe for receiving msg %s.", str));
            return;
        }
        switch (msgType) {
            case ERROR:
                logger.error(String.format("%s from receiver: %s", msgType.name(), str));
                try {
                    stopPipe(this.runningPipe.getName());
                    break;
                } catch (PipeException e) {
                    logger.error(String.format("Stop pipe %s when meeting error in sender service.", this.runningPipe.getName()), e);
                    break;
                }
            case WARN:
                break;
            default:
                return;
        }
        logger.warn(String.format("%s from receiver: %s", msgType.name(), str));
        if (this.syncInfoFetcher.recordMsg(this.runningPipe.getName(), this.runningPipe.getCreateTime(), new PipeMessage(msgType, str)).getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            logger.warn(String.format("Failed to record message: [%s] %s", msgType.name(), str));
        }
    }

    public List<TPipeInfo> showPipe(String str) {
        boolean isEmpty = StringUtils.isEmpty(str);
        ArrayList arrayList = new ArrayList();
        for (PipeInfo pipeInfo : getInstance().getAllPipeInfos()) {
            if (isEmpty || str.equals(pipeInfo.getPipeName())) {
                arrayList.add(new TPipeInfo(pipeInfo.getCreateTime(), pipeInfo.getPipeName(), "sender", pipeInfo.getPipeSinkName(), pipeInfo.getStatus().name(), AlignedPath.VECTOR_PLACEHOLDER));
            }
        }
        for (TSyncIdentityInfo tSyncIdentityInfo : this.receiverManager.getAllTSyncIdentityInfos()) {
            if (isEmpty || str.equals(tSyncIdentityInfo.getPipeName())) {
                arrayList.add(new TPipeInfo(tSyncIdentityInfo.getCreateTime(), tSyncIdentityInfo.getPipeName(), "receiver", tSyncIdentityInfo.getAddress(), Pipe.PipeStatus.RUNNING.name(), AlignedPath.VECTOR_PLACEHOLDER));
            }
        }
        return arrayList;
    }

    public void showPipe(ShowPipePlan showPipePlan, ListDataSet listDataSet) {
        ExtPipePluginManager externalPipeManager;
        ExternalPipeStatus externalPipeStatus;
        boolean equals = AlignedPath.VECTOR_PLACEHOLDER.equals(showPipePlan.getPipeName());
        for (PipeInfo pipeInfo : getInstance().getAllPipeInfos()) {
            if (equals || showPipePlan.getPipeName().equals(pipeInfo.getPipeName())) {
                RowRecord rowRecord = new RowRecord(0L);
                rowRecord.addField(Binary.valueOf(DatetimeUtils.convertLongToDate(pipeInfo.getCreateTime())), TSDataType.TEXT);
                rowRecord.addField(Binary.valueOf(pipeInfo.getPipeName()), TSDataType.TEXT);
                rowRecord.addField(Binary.valueOf("sender"), TSDataType.TEXT);
                rowRecord.addField(Binary.valueOf(pipeInfo.getPipeSinkName()), TSDataType.TEXT);
                rowRecord.addField(Binary.valueOf(pipeInfo.getStatus().name()), TSDataType.TEXT);
                rowRecord.addField(Binary.valueOf(this.syncInfoFetcher.getPipeMsg(pipeInfo.getPipeName(), pipeInfo.getCreateTime())), TSDataType.TEXT);
                boolean z = true;
                PipeSink pipeSink = this.syncInfoFetcher.getPipeSink(pipeInfo.getPipeSinkName());
                if (pipeSink.getType() == PipeSink.PipeSinkType.ExternalPipe && (externalPipeManager = getInstance().getExternalPipeManager()) != null && (externalPipeStatus = externalPipeManager.getExternalPipeStatus(((ExternalPipeSink) pipeSink).getExtPipeSinkTypeName())) != null) {
                    rowRecord.addField(Binary.valueOf(externalPipeStatus.getWriterInvocationFailures().toString()), TSDataType.TEXT);
                    rowRecord.addField(Binary.valueOf(externalPipeStatus.getWriterStatuses().toString()), TSDataType.TEXT);
                    z = false;
                }
                if (z) {
                    rowRecord.addField(Binary.valueOf("N/A"), TSDataType.TEXT);
                    rowRecord.addField(Binary.valueOf("N/A"), TSDataType.TEXT);
                }
                listDataSet.putRecord(rowRecord);
            }
        }
        for (TSyncIdentityInfo tSyncIdentityInfo : this.receiverManager.getAllTSyncIdentityInfos()) {
            RowRecord rowRecord2 = new RowRecord(0L);
            rowRecord2.addField(Binary.valueOf(DatetimeUtils.convertLongToDate(tSyncIdentityInfo.getCreateTime())), TSDataType.TEXT);
            rowRecord2.addField(Binary.valueOf(tSyncIdentityInfo.getPipeName()), TSDataType.TEXT);
            rowRecord2.addField(Binary.valueOf("receiver"), TSDataType.TEXT);
            rowRecord2.addField(Binary.valueOf(tSyncIdentityInfo.getAddress()), TSDataType.TEXT);
            rowRecord2.addField(Binary.valueOf(Pipe.PipeStatus.RUNNING.name()), TSDataType.TEXT);
            rowRecord2.addField(Binary.valueOf(AlignedPath.VECTOR_PLACEHOLDER), TSDataType.TEXT);
            rowRecord2.addField(Binary.valueOf("N/A"), TSDataType.TEXT);
            rowRecord2.addField(Binary.valueOf("N/A"), TSDataType.TEXT);
            listDataSet.putRecord(rowRecord2);
        }
    }

    private void startExternalPipeManager(boolean z) throws PipeException {
        if (!(this.runningPipe instanceof TsFilePipe)) {
            logger.error("startExternalPipeManager(), runningPipe is not TsFilePipe. " + this.runningPipe);
            return;
        }
        PipeSink pipeSink = this.runningPipe.getPipeSink();
        if (!(pipeSink instanceof ExternalPipeSink)) {
            logger.error("startExternalPipeManager(), pipeSink is not ExternalPipeSink." + pipeSink);
            return;
        }
        String extPipeSinkTypeName = ((ExternalPipeSink) pipeSink).getExtPipeSinkTypeName();
        if (ExtPipePluginRegister.getInstance().getWriteFactory(extPipeSinkTypeName) == null) {
            logger.error(String.format("startExternalPipeManager(), can not found ExternalPipe plugin for %s.", extPipeSinkTypeName));
            throw new PipeException("Can not found ExternalPipe plugin for " + extPipeSinkTypeName + ".");
        }
        if (this.extPipePluginManager == null) {
            this.extPipePluginManager = new ExtPipePluginManager((TsFilePipe) this.runningPipe);
        }
        if (z) {
            try {
                this.extPipePluginManager.startExtPipe(extPipeSinkTypeName, ((ExternalPipeSink) pipeSink).getSinkParams());
            } catch (IOException e) {
                logger.error("Failed to start External Pipe: {}.", extPipeSinkTypeName, e);
                throw new PipeException("Failed to start External Pipe: " + extPipeSinkTypeName + ". " + e.getMessage());
            }
        }
    }

    public ExtPipePluginManager getExternalPipeManager() {
        return this.extPipePluginManager;
    }

    public void start() throws StartupException {
        ExtPipePluginRegister extPipePluginRegister = ExtPipePluginRegister.getInstance();
        if (extPipePluginRegister == null) {
            throw new StartupException("Load ExternalPipe Plugin error.");
        }
        logger.info("Load {} ExternalPipe Plugin: {}", Integer.valueOf(extPipePluginRegister.getAllPluginName().size()), extPipePluginRegister.getAllPluginName());
        if (new File(SyncPathUtil.getSysDir(), "syncService.log").exists()) {
            try {
                recover();
            } catch (Exception e) {
                logger.error("Recover from disk error.", e);
                throw new StartupException(e);
            }
        }
    }

    public void stop() {
        if (this.runningPipe == null || Pipe.PipeStatus.DROP.equals(this.runningPipe.getStatus())) {
            return;
        }
        try {
            this.runningPipe.stop();
            this.senderManager.stop();
        } catch (PipeException e) {
            logger.warn(String.format("Stop pipe %s error when stop Sender Service.", this.runningPipe.getName()), e);
        }
    }

    public void shutdown(long j) throws ShutdownException {
        if (this.runningPipe == null || Pipe.PipeStatus.DROP.equals(this.runningPipe.getStatus())) {
            return;
        }
        try {
            this.runningPipe.stop();
            this.senderManager.close();
            this.runningPipe.close();
        } catch (InterruptedException | PipeException e) {
            logger.warn(String.format("Stop pipe %s error when shutdown Sender Service.", this.runningPipe.getName()), e);
            throw new ShutdownException(e);
        }
    }

    public ServiceType getID() {
        return ServiceType.SYNC_SERVICE;
    }

    private void recover() throws IOException, PipeException, StartupException {
        PipeInfo runningPipeInfo = this.syncInfoFetcher.getRunningPipeInfo();
        if (runningPipeInfo == null || Pipe.PipeStatus.DROP.equals(runningPipeInfo.getStatus())) {
            return;
        }
        this.runningPipe = SyncPipeUtil.parsePipeInfoAsPipe(runningPipeInfo, this.syncInfoFetcher.getPipeSink(runningPipeInfo.getPipeSinkName()));
        switch (runningPipeInfo.getStatus()) {
            case RUNNING:
                this.runningPipe.start();
                break;
            case STOP:
                this.runningPipe.stop();
                break;
            case DROP:
                this.runningPipe.drop();
                break;
            default:
                throw new IOException(String.format("Can not recognize running pipe status %s.", runningPipeInfo.getStatus()));
        }
        if (this.runningPipe.getPipeSink().getType() != PipeSink.PipeSinkType.IoTDB) {
            startExternalPipeManager(this.runningPipe.getStatus() == Pipe.PipeStatus.RUNNING);
            return;
        }
        this.senderManager = new SenderManager(this.runningPipe, (IoTDBPipeSink) this.runningPipe.getPipeSink());
        if (Pipe.PipeStatus.RUNNING.equals(this.runningPipe.getStatus())) {
            this.senderManager.start();
        }
    }

    public List<ISyncManager> getOrCreateSyncManager(String str) {
        ArrayList arrayList = new ArrayList();
        if (this.runningPipe != null) {
            arrayList.add(this.runningPipe.getOrCreateSyncManager(str));
        }
        return arrayList;
    }

    public void deleteSyncManager(String str) {
        if (this.runningPipe != null) {
            this.runningPipe.deleteSyncManager(str);
        }
    }

    public SenderManager getSenderManager() {
        return this.senderManager;
    }
}
