package org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;

import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBThriftSyncConnectorClient;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.IoTDBConnector;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftSyncPipeTransferBatchReqBuilder;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.class */
public class IoTDBThriftSyncConnector extends IoTDBConnector {
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftSyncConnector.class);
    private IoTDBThriftSyncPipeTransferBatchReqBuilder tabletBatchBuilder;
    private IoTDBThriftSyncClientManager clientManager;

    public void validate(PipeParameterValidator pipeParameterValidator) throws Exception {
        super.validate(pipeParameterValidator);
        IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
        PipeParameters parameters = pipeParameterValidator.getParameters();
        String lowerCase = parameters.getStringOrDefault(ImmutableList.of("connector", "sink"), BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName()).toLowerCase();
        Set parseNodeUrls = parseNodeUrls(parameters);
        PipeParameterValidator validate = pipeParameterValidator.validate(objArr -> {
            try {
                return !NodeUrlUtils.containsLocalAddress((List) parseNodeUrls.stream().filter(tEndPoint -> {
                    return tEndPoint.getPort() == config.getRpcPort();
                }).map((v0) -> {
                    return v0.getIp();
                }).collect(Collectors.toList()));
            } catch (UnknownHostException e) {
                LOGGER.warn("Unknown host when checking pipe sink IP.", e);
                return false;
            }
        }, String.format("One of the endpoints %s of the receivers is pointing back to the thrift receiver %s on sender itself, or unknown host when checking pipe sink IP.", parseNodeUrls, new TEndPoint(config.getRpcAddress(), config.getRpcPort())), new Object[0]);
        PipeParameterValidator.MultipleObjectsValidationRule multipleObjectsValidationRule = objArr2 -> {
            return !((Boolean) objArr2[0]).booleanValue() || (((Boolean) objArr2[1]).booleanValue() && ((Boolean) objArr2[2]).booleanValue());
        };
        String format = String.format("When ssl transport is enabled, %s and %s must be specified", "sink.ssl.trust-store-path", "sink.ssl.trust-store-pwd");
        Object[] objArr3 = new Object[3];
        objArr3[0] = Boolean.valueOf(BuiltinPipePlugin.IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName().equals(lowerCase) || BuiltinPipePlugin.IOTDB_THRIFT_SSL_SINK.getPipePluginName().equals(lowerCase) || parameters.getBooleanOrDefault("sink.ssl.enable", false));
        objArr3[1] = Boolean.valueOf(parameters.hasAttribute("sink.ssl.trust-store-path"));
        objArr3[2] = Boolean.valueOf(parameters.hasAttribute("sink.ssl.trust-store-pwd"));
        validate.validate(multipleObjectsValidationRule, format, objArr3);
    }

    public void customize(PipeParameters pipeParameters, PipeConnectorRuntimeConfiguration pipeConnectorRuntimeConfiguration) throws Exception {
        super.customize(pipeParameters, pipeConnectorRuntimeConfiguration);
        if (this.isTabletBatchModeEnabled) {
            this.tabletBatchBuilder = new IoTDBThriftSyncPipeTransferBatchReqBuilder(pipeParameters);
        }
        String lowerCase = pipeParameters.getStringOrDefault(ImmutableList.of("connector", "sink"), BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName()).toLowerCase();
        this.clientManager = new IoTDBThriftSyncClientManager(this.nodeUrls, BuiltinPipePlugin.IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName().equals(lowerCase) || BuiltinPipePlugin.IOTDB_THRIFT_SSL_SINK.getPipePluginName().equals(lowerCase) || pipeParameters.getBooleanOrDefault("sink.ssl.enable", false), pipeParameters.getString("sink.ssl.trust-store-path"), pipeParameters.getString("sink.ssl.trust-store-pwd"), pipeParameters.getBooleanOrDefault(Arrays.asList("sink.leader-cache.enable", "connector.leader-cache.enable"), true));
    }

    public void handshake() throws Exception {
        this.clientManager.checkClientStatusAndTryReconstructIfNecessary();
    }

    public void heartbeat() {
        try {
            handshake();
        } catch (Exception e) {
            LOGGER.warn("Failed to reconnect to target server, because: {}. Try to reconnect later.", e.getMessage(), e);
        }
    }

    public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
        if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
            LOGGER.warn("IoTDBThriftSyncConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. Ignore {}.", tabletInsertionEvent);
            return;
        }
        if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) {
            if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
                transfer(((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPatternOrTime());
                return;
            } else {
                transfer(((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPatternOrTime());
                return;
            }
        }
        if ((tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) && ((PipeRawTabletInsertionEvent) tabletInsertionEvent).hasNoNeedParsingAndIsEmpty()) {
            return;
        }
        try {
            if (this.isTabletBatchModeEnabled) {
                if (this.tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
                    doTransfer();
                }
            } else if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
                doTransfer((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
            } else {
                doTransfer((PipeRawTabletInsertionEvent) tabletInsertionEvent);
            }
        } catch (Exception e) {
            throw new PipeConnectionException(String.format("Failed to transfer tablet insertion event %s, because %s.", tabletInsertionEvent, e.getMessage()), e);
        }
    }

    public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
        if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
            LOGGER.warn("IoTDBThriftSyncConnector only support PipeTsFileInsertionEvent. Ignore {}.", tsFileInsertionEvent);
            return;
        }
        if (!((PipeTsFileInsertionEvent) tsFileInsertionEvent).waitForTsFileClose()) {
            LOGGER.warn("Pipe skipping temporary TsFile which shouldn't be transferred: {}", ((PipeTsFileInsertionEvent) tsFileInsertionEvent).getTsFile());
            return;
        }
        if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) {
            Iterator it = tsFileInsertionEvent.toTabletInsertionEvents().iterator();
            while (it.hasNext()) {
                transfer((TabletInsertionEvent) it.next());
            }
        } else {
            try {
                if (this.isTabletBatchModeEnabled && !this.tabletBatchBuilder.isEmpty()) {
                    doTransfer();
                }
                doTransfer((PipeTsFileInsertionEvent) tsFileInsertionEvent);
            } catch (Exception e) {
                throw new PipeConnectionException(String.format("Failed to transfer tsfile insertion event %s, because %s.", tsFileInsertionEvent, e.getMessage()), e);
            }
        }
    }

    public void transfer(Event event) throws TException, IOException {
        if (this.isTabletBatchModeEnabled && !this.tabletBatchBuilder.isEmpty()) {
            doTransfer();
        }
        if (event instanceof PipeHeartbeatEvent) {
            return;
        }
        LOGGER.warn("IoTDBThriftSyncConnector does not support transferring generic event: {}.", event);
    }

    private void doTransfer() {
        Pair<IoTDBThriftSyncConnectorClient, Boolean> client = this.clientManager.getClient();
        try {
            TPipeTransferResp pipeTransfer = ((IoTDBThriftSyncConnectorClient) client.getLeft()).pipeTransfer(this.tabletBatchBuilder.toTPipeTransferReq());
            if (pipeTransfer.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                throw new PipeException(String.format("Transfer PipeTransferTabletBatchReq error, result status %s", pipeTransfer.status));
            }
            this.tabletBatchBuilder.onSuccess();
        } catch (Exception e) {
            client.setRight(false);
            throw new PipeConnectionException(String.format("Network error when transfer tablet batch, because %s.", e.getMessage()), e);
        }
    }

    private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException {
        Pair pair = null;
        try {
            InsertNode insertNodeViaCacheIfPossible = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
            TSStatus status = (insertNodeViaCacheIfPossible != null ? ((IoTDBThriftSyncConnectorClient) this.clientManager.getClient(insertNodeViaCacheIfPossible.getDevicePath().getFullPath()).getLeft()).pipeTransfer(PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNodeViaCacheIfPossible)) : ((IoTDBThriftSyncConnectorClient) this.clientManager.getClient().getLeft()).pipeTransfer(PipeTransferTabletBinaryReq.toTPipeTransferReq(pipeInsertNodeTabletInsertionEvent.getByteBuffer()))).getStatus();
            if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                throw new PipeException(String.format("Transfer PipeInsertNodeTabletInsertionEvent %s error, result status %s", pipeInsertNodeTabletInsertionEvent, status));
            }
            if (insertNodeViaCacheIfPossible == null || !status.isSetRedirectNode()) {
                return;
            }
            this.clientManager.updateLeaderCache(insertNodeViaCacheIfPossible.getDevicePath().getFullPath(), status.getRedirectNode());
        } catch (Exception e) {
            if (0 != 0) {
                pair.setRight(false);
            }
            throw new PipeConnectionException(String.format("Network error when transfer insert node tablet insertion event, because %s.", e.getMessage()), e);
        }
    }

    private void doTransfer(PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException {
        Pair<IoTDBThriftSyncConnectorClient, Boolean> client = this.clientManager.getClient(pipeRawTabletInsertionEvent.getDeviceId());
        try {
            TSStatus status = ((IoTDBThriftSyncConnectorClient) client.getLeft()).pipeTransfer(PipeTransferTabletRawReq.toTPipeTransferReq(pipeRawTabletInsertionEvent.convertToTablet(), pipeRawTabletInsertionEvent.isAligned())).getStatus();
            if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                throw new PipeException(String.format("Transfer PipeRawTabletInsertionEvent %s error, result status %s", pipeRawTabletInsertionEvent, status));
            }
            if (status.isSetRedirectNode()) {
                this.clientManager.updateLeaderCache(pipeRawTabletInsertionEvent.getDeviceId(), status.getRedirectNode());
            }
        } catch (Exception e) {
            client.setRight(false);
            throw new PipeConnectionException(String.format("Network error when transfer raw tablet insertion event, because %s.", e.getMessage()), e);
        }
    }

    private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException, IOException {
        File tsFile = pipeTsFileInsertionEvent.getTsFile();
        Pair<IoTDBThriftSyncConnectorClient, Boolean> client = this.clientManager.getClient();
        int pipeConnectorReadFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
        byte[] bArr = new byte[pipeConnectorReadFileBufferSize];
        long j = 0;
        RandomAccessFile randomAccessFile = new RandomAccessFile(tsFile, "r");
        while (true) {
            try {
                int read = randomAccessFile.read(bArr);
                if (read == -1) {
                    randomAccessFile.close();
                    try {
                        TPipeTransferResp pipeTransfer = ((IoTDBThriftSyncConnectorClient) client.getLeft()).pipeTransfer(PipeTransferFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length()));
                        if (pipeTransfer.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                            throw new PipeException(String.format("Seal file %s error, result status %s.", tsFile, pipeTransfer.getStatus()));
                        }
                        LOGGER.info("Successfully transferred file {}.", tsFile);
                        return;
                    } catch (Exception e) {
                        client.setRight(false);
                        throw new PipeConnectionException(String.format("Network error when seal file %s, because %s.", tsFile, e.getMessage()), e);
                    }
                }
                try {
                    PipeTransferFilePieceResp fromTPipeTransferResp = PipeTransferFilePieceResp.fromTPipeTransferResp(((IoTDBThriftSyncConnectorClient) client.getLeft()).pipeTransfer(PipeTransferFilePieceReq.toTPipeTransferReq(tsFile.getName(), j, read == pipeConnectorReadFileBufferSize ? bArr : Arrays.copyOfRange(bArr, 0, read))));
                    j += read;
                    if (fromTPipeTransferResp.getStatus().getCode() == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
                        j = fromTPipeTransferResp.getEndWritingOffset();
                        randomAccessFile.seek(j);
                        LOGGER.info("Redirect file position to {}.", Long.valueOf(j));
                    } else if (fromTPipeTransferResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                        throw new PipeException(String.format("Transfer file %s error, result status %s.", tsFile, fromTPipeTransferResp.getStatus()));
                    }
                } catch (Exception e2) {
                    client.setRight(false);
                    throw new PipeConnectionException(String.format("Network error when transfer file %s, because %s.", tsFile, e2.getMessage()), e2);
                }
            } catch (Throwable th) {
                try {
                    randomAccessFile.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
    }

    public void close() {
        if (this.clientManager != null) {
            this.clientManager.close();
        }
        if (this.tabletBatchBuilder != null) {
            this.tabletBatchBuilder.close();
        }
    }
}
