package org.apache.iotdb.db.sync.sender.service;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iotdb.commons.exception.ShutdownException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.db.exception.SyncConnectionException;
import org.apache.iotdb.db.exception.sync.PipeException;
import org.apache.iotdb.db.exception.sync.PipeSinkException;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.sync.conf.SyncConstant;
import org.apache.iotdb.db.sync.conf.SyncPathUtil;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginManager;
import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginRegister;
import org.apache.iotdb.db.sync.sender.pipe.ExternalPipeSink;
import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
import org.apache.iotdb.db.sync.sender.recovery.SenderLogAnalyzer;
import org.apache.iotdb.db.sync.sender.recovery.SenderLogger;
import org.apache.iotdb.service.transport.thrift.RequestType;
import org.apache.iotdb.service.transport.thrift.ResponseType;
import org.apache.iotdb.service.transport.thrift.SyncResponse;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/sync/sender/service/SenderService.class */
public class SenderService implements IService {
    private static final Logger logger = LoggerFactory.getLogger(SenderService.class);
    private SenderLogger senderLogger;
    private Map<String, PipeSink> pipeSinks;
    private List<Pipe> pipes;
    private Pipe runningPipe;
    private MsgManager msgManager;
    private TransportHandler transportHandler;
    private ExtPipePluginManager extPipePluginManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.iotdb.db.sync.sender.service.SenderService$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/iotdb/db/sync/sender/service/SenderService$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$iotdb$service$transport$thrift$ResponseType = new int[ResponseType.values().length];

        static {
            try {
                $SwitchMap$org$apache$iotdb$service$transport$thrift$ResponseType[ResponseType.INFO.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$iotdb$service$transport$thrift$ResponseType[ResponseType.ERROR.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$iotdb$service$transport$thrift$ResponseType[ResponseType.WARN.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/sync/sender/service/SenderService$SenderServiceHolder.class */
    public static class SenderServiceHolder {
        private static final SenderService INSTANCE = new SenderService(null);

        private SenderServiceHolder() {
        }
    }

    private SenderService() {
    }

    public static SenderService getInstance() {
        return SenderServiceHolder.INSTANCE;
    }

    public PipeSink getPipeSink(String str) {
        return this.pipeSinks.getOrDefault(str, null);
    }

    public ExtPipePluginManager getExternalPipeManager() {
        return this.extPipePluginManager;
    }

    public boolean isPipeSinkExist(String str) {
        return this.pipeSinks.containsKey(str);
    }

    public void addPipeSink(CreatePipeSinkPlan createPipeSinkPlan) throws PipeSinkException {
        if (isPipeSinkExist(createPipeSinkPlan.getPipeSinkName())) {
            throw new PipeSinkException("There is a pipeSink named " + createPipeSinkPlan.getPipeSinkName() + " in IoTDB, please drop it.");
        }
        addPipeSink(parseCreatePipeSinkPlan(createPipeSinkPlan));
        this.senderLogger.addPipeSink(createPipeSinkPlan);
    }

    public PipeSink parseCreatePipeSinkPlan(CreatePipeSinkPlan createPipeSinkPlan) throws PipeSinkException {
        try {
            PipeSink createPipeSink = PipeSink.PipeSinkFactory.createPipeSink(createPipeSinkPlan.getPipeSinkType(), createPipeSinkPlan.getPipeSinkName());
            createPipeSink.setAttribute(createPipeSinkPlan.getPipeSinkAttributes());
            return createPipeSink;
        } catch (UnsupportedOperationException e) {
            throw new PipeSinkException(e.getMessage());
        }
    }

    public void addPipeSink(PipeSink pipeSink) {
        this.pipeSinks.put(pipeSink.getPipeSinkName(), pipeSink);
    }

    public void dropPipeSink(String str) throws PipeSinkException {
        if (!isPipeSinkExist(str)) {
            throw new PipeSinkException("PipeSink " + str + " is not exist.");
        }
        if (this.runningPipe != null && this.runningPipe.getStatus() != Pipe.PipeStatus.DROP && this.runningPipe.getPipeSink().getPipeSinkName().equals(str)) {
            throw new PipeSinkException(String.format("Can not drop pipeSink %s, because pipe %s is using it.", str, this.runningPipe.getName()));
        }
        this.pipeSinks.remove(str);
        this.senderLogger.dropPipeSink(str);
    }

    public List<PipeSink> getAllPipeSink() {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<String, PipeSink>> it = this.pipeSinks.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getValue());
        }
        return arrayList;
    }

    public synchronized void addPipe(CreatePipePlan createPipePlan) throws PipeException {
        if (this.runningPipe != null && this.runningPipe.getStatus() != Pipe.PipeStatus.DROP) {
            throw new PipeException(String.format("Pipe %s is %s, please retry after drop it.", this.runningPipe.getName(), this.runningPipe.getStatus().name()));
        }
        if (!isPipeSinkExist(createPipePlan.getPipeSinkName())) {
            throw new PipeException(String.format("Can not find pipeSink %s.", createPipePlan.getPipeSinkName()));
        }
        long currentTime = DatetimeUtils.currentTime();
        if (createPipePlan.getDataStartTimestamp() > currentTime) {
            throw new PipeException(String.format("Start time %s is later than current time %s, this is not supported yet.", DatetimeUtils.convertLongToDate(createPipePlan.getDataStartTimestamp()), DatetimeUtils.convertLongToDate(currentTime)));
        }
        PipeSink pipeSink = getPipeSink(createPipePlan.getPipeSinkName());
        this.runningPipe = parseCreatePipePlan(createPipePlan, pipeSink, currentTime);
        if (this.runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
            try {
                this.transportHandler = TransportHandler.getNewTransportHandler(this.runningPipe, (IoTDBPipeSink) pipeSink);
                sendMsg(RequestType.CREATE);
            } catch (ClassCastException e) {
                logger.error(String.format("Cast Class to %s error when create pipe %s.", IoTDBPipeSink.class.getName(), createPipePlan.getPipeName()), e);
                this.runningPipe = null;
                throw new PipeException(String.format("Wrong pipeSink type %s for create pipe %s", pipeSink.getType(), pipeSink.getPipeSinkName()));
            } catch (PipeException e2) {
                this.runningPipe = null;
                throw e2;
            }
        } else {
            startExternalPipeManager(false);
        }
        this.msgManager.addPipe(this.runningPipe);
        this.pipes.add(this.runningPipe);
        this.senderLogger.addPipe(createPipePlan, currentTime);
    }

    public Pipe parseCreatePipePlan(CreatePipePlan createPipePlan, PipeSink pipeSink, long j) throws PipeException {
        boolean z = true;
        for (Pair<String, String> pair : createPipePlan.getPipeAttributes()) {
            pair.left = ((String) pair.left).toLowerCase();
            if (!"syncdelop".equals(pair.left)) {
                throw new PipeException(String.format("Can not recognition attribute %s", pair.left));
            }
            z = Boolean.parseBoolean((String) pair.right);
        }
        return new TsFilePipe(j, createPipePlan.getPipeName(), pipeSink, createPipePlan.getDataStartTimestamp(), z);
    }

    public synchronized void stopPipe(String str) throws PipeException {
        checkRunningPipeExistAndName(str);
        if (this.runningPipe.getStatus() == Pipe.PipeStatus.RUNNING) {
            if (this.runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
                this.runningPipe.stop();
                this.transportHandler.stop();
            } else {
                if (this.extPipePluginManager != null) {
                    try {
                        this.extPipePluginManager.stopExtPipe(((ExternalPipeSink) this.runningPipe.getPipeSink()).getExtPipeSinkTypeName());
                    } catch (Exception e) {
                        throw new PipeException("Failed to stop externalPipeProcessor. " + e.getMessage());
                    }
                }
                this.runningPipe.stop();
            }
        }
        this.senderLogger.operatePipe(str, Operator.OperatorType.STOP_PIPE);
    }

    public synchronized void startPipe(String str) throws PipeException {
        checkRunningPipeExistAndName(str);
        if (this.runningPipe.getStatus() == Pipe.PipeStatus.STOP) {
            if (this.runningPipe.getPipeSink().getType() == PipeSink.PipeSinkType.IoTDB) {
                sendMsg(RequestType.START);
                this.runningPipe.start();
                this.transportHandler.start();
            } else {
                this.runningPipe.start();
                startExternalPipeManager(true);
            }
        }
        this.senderLogger.operatePipe(str, Operator.OperatorType.START_PIPE);
    }

    private void startExternalPipeManager(boolean z) throws PipeException {
        if (!(this.runningPipe instanceof TsFilePipe)) {
            logger.error("startExternalPipeManager(), runningPipe is not TsFilePipe. " + this.runningPipe);
            return;
        }
        PipeSink pipeSink = this.runningPipe.getPipeSink();
        if (!(pipeSink instanceof ExternalPipeSink)) {
            logger.error("startExternalPipeManager(), pipeSink is not ExternalPipeSink." + pipeSink);
            return;
        }
        String extPipeSinkTypeName = ((ExternalPipeSink) pipeSink).getExtPipeSinkTypeName();
        if (ExtPipePluginRegister.getInstance().getWriteFactory(extPipeSinkTypeName) == null) {
            logger.error(String.format("startExternalPipeManager(), can not found ExternalPipe plugin for {}.", extPipeSinkTypeName));
            throw new PipeException("Can not found ExternalPipe plugin for " + extPipeSinkTypeName + ".");
        }
        if (this.extPipePluginManager == null) {
            this.extPipePluginManager = new ExtPipePluginManager((TsFilePipe) this.runningPipe);
        }
        if (z) {
            try {
                this.extPipePluginManager.startExtPipe(extPipeSinkTypeName, ((ExternalPipeSink) pipeSink).getSinkParams());
            } catch (IOException e) {
                logger.error("Failed to start External Pipe: {}.", extPipeSinkTypeName, e);
                throw new PipeException("Failed to start External Pipe: " + extPipeSinkTypeName + ". " + e.getMessage());
            }
        }
    }

    public synchronized void dropPipe(String str) throws PipeException {
        checkRunningPipeExistAndName(str);
        try {
            if (this.runningPipe.getPipeSink().getType() != PipeSink.PipeSinkType.IoTDB) {
                if (this.extPipePluginManager != null) {
                    this.extPipePluginManager.dropExtPipe(((ExternalPipeSink) this.runningPipe.getPipeSink()).getExtPipeSinkTypeName());
                    this.extPipePluginManager = null;
                }
                this.runningPipe.drop();
            } else {
                if (!this.transportHandler.close()) {
                    throw new PipeException(String.format("Close pipe %s transport error after %s %s, please try again.", this.runningPipe.getName(), SyncConstant.DEFAULT_WAITING_FOR_STOP_MILLISECONDS, TimeUnit.MILLISECONDS.name()));
                }
                this.runningPipe.drop();
                this.msgManager.removeAllPipe();
                sendMsg(RequestType.DROP);
            }
            this.senderLogger.operatePipe(str, Operator.OperatorType.DROP_PIPE);
        } catch (InterruptedException e) {
            logger.warn(String.format("Interrupted when waiting for clear transport %s.", this.runningPipe.getName()), e);
            throw new PipeException("Drop error, be interrupted, please try again.");
        }
    }

    public List<Pipe> getAllPipes() {
        return new ArrayList(this.pipes);
    }

    public synchronized String getPipeMsg(Pipe pipe) {
        return this.msgManager.getPipeMsg(pipe);
    }

    private void checkRunningPipeExistAndName(String str) throws PipeException {
        if (this.runningPipe == null || this.runningPipe.getStatus() == Pipe.PipeStatus.DROP) {
            throw new PipeException("There is no existing pipe.");
        }
        if (!this.runningPipe.getName().equals(str)) {
            throw new PipeException(String.format("Pipe %s is %s, please retry after drop it.", this.runningPipe.getName(), this.runningPipe.getStatus()));
        }
    }

    public void setConnecting(boolean z) {
        this.runningPipe.setDisconnected(z);
    }

    private void sendMsg(RequestType requestType) throws PipeException {
        try {
            receiveMsg(this.transportHandler.sendMsg(requestType));
        } catch (SyncConnectionException e) {
            logger.warn(String.format("Connect to pipeSink %s error when %s pipe.", this.runningPipe.getPipeSink(), requestType.name()), e);
            throw new PipeException(String.format("Can not connect to pipeSink %s, please check net and receiver is available, and try again.", this.runningPipe.getPipeSink()));
        }
    }

    public synchronized void receiveMsg(SyncResponse syncResponse) {
        if (this.runningPipe == null || this.runningPipe.getStatus() == Pipe.PipeStatus.DROP) {
            logger.info(String.format("No running pipe for receiving msg %s.", syncResponse));
            return;
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$service$transport$thrift$ResponseType[syncResponse.type.ordinal()]) {
            case 1:
            default:
                return;
            case 2:
                logger.warn(String.format("%s from receiver: %s", syncResponse.type.name(), syncResponse.msg));
                try {
                    stopPipe(this.runningPipe.getName());
                    break;
                } catch (PipeException e) {
                    logger.warn(String.format("Stop pipe %s when meeting error in sender service.", this.runningPipe.getName()), e);
                    break;
                }
            case 3:
                break;
        }
        this.msgManager.recordMsg(this.runningPipe, this.runningPipe.getStatus() == Pipe.PipeStatus.RUNNING ? Operator.OperatorType.START_PIPE : Operator.OperatorType.STOP_PIPE, syncResponse.type, syncResponse.msg);
    }

    public void start() throws StartupException {
        ExtPipePluginRegister extPipePluginRegister = ExtPipePluginRegister.getInstance();
        if (extPipePluginRegister == null) {
            throw new StartupException("Load ExternalPipe Plugin error.");
        }
        logger.info("Load {} ExternalPipe Plugin: {}", Integer.valueOf(extPipePluginRegister.getAllPluginName().size()), extPipePluginRegister.getAllPluginName());
        this.pipeSinks = new HashMap();
        this.pipes = new ArrayList();
        this.senderLogger = new SenderLogger();
        this.msgManager = new MsgManager(this.senderLogger);
        if (new File(SyncPathUtil.getSysDir(), SyncConstant.SENDER_LOG_NAME).exists()) {
            try {
                recover();
            } catch (Exception e) {
                logger.error("Recover from disk error.", e);
                throw new StartupException(e);
            }
        }
    }

    public void stop() {
        if (this.runningPipe == null || Pipe.PipeStatus.DROP.equals(this.runningPipe.getStatus())) {
            return;
        }
        try {
            this.runningPipe.stop();
            this.transportHandler.stop();
        } catch (PipeException e) {
            logger.warn(String.format("Stop pipe %s error when stop Sender Service.", this.runningPipe.getName()), e);
        }
    }

    public void shutdown(long j) throws ShutdownException {
        this.pipeSinks = null;
        this.pipes = null;
        this.msgManager = null;
        this.senderLogger.close();
        if (this.runningPipe == null || Pipe.PipeStatus.DROP.equals(this.runningPipe.getStatus())) {
            return;
        }
        try {
            this.runningPipe.stop();
            this.transportHandler.close();
            this.runningPipe.close();
        } catch (InterruptedException | PipeException e) {
            logger.warn(String.format("Stop pipe %s error when shutdown Sender Service.", this.runningPipe.getName()), e);
            throw new ShutdownException(e);
        }
    }

    public ServiceType getID() {
        return ServiceType.SENDER_SERVICE;
    }

    private void recover() throws IOException, PipeException {
        SenderLogAnalyzer senderLogAnalyzer = new SenderLogAnalyzer();
        senderLogAnalyzer.recover();
        this.pipeSinks = senderLogAnalyzer.getRecoveryAllPipeSinks();
        this.pipes = senderLogAnalyzer.getRecoveryAllPipes();
        this.runningPipe = senderLogAnalyzer.getRecoveryRunningPipe();
        this.msgManager = senderLogAnalyzer.getMsgManager();
        if (this.runningPipe == null || Pipe.PipeStatus.DROP.equals(this.runningPipe.getStatus())) {
            return;
        }
        if (this.runningPipe.getPipeSink().getType() != PipeSink.PipeSinkType.IoTDB) {
            startExternalPipeManager(this.runningPipe.getStatus() == Pipe.PipeStatus.RUNNING);
            return;
        }
        this.transportHandler = TransportHandler.getNewTransportHandler(this.runningPipe, (IoTDBPipeSink) this.runningPipe.getPipeSink());
        if (Pipe.PipeStatus.RUNNING.equals(this.runningPipe.getStatus())) {
            this.transportHandler.start();
        }
    }

    /* synthetic */ SenderService(AnonymousClass1 anonymousClass1) {
        this();
    }
}
