/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;

import java.io.File;
import java.io.IOException;
import java.util.Objects;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftSyncPipeTransferBatchReqBuilder;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataNodeSyncConnector;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IoTDBDataRegionSyncConnector
extends IoTDBDataNodeSyncConnector {
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionSyncConnector.class);
    private IoTDBThriftSyncPipeTransferBatchReqBuilder tabletBatchBuilder;

    public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) throws Exception {
        super.customize(parameters, configuration);
        if (this.isTabletBatchModeEnabled) {
            this.tabletBatchBuilder = new IoTDBThriftSyncPipeTransferBatchReqBuilder(parameters);
        }
    }

    protected PipeTransferFilePieceReq getTransferSingleFilePieceReq(String fileName, long position, byte[] payLoad) throws IOException {
        return PipeTransferTsFilePieceReq.toTPipeTransferReq(fileName, position, payLoad);
    }

    protected PipeTransferFilePieceReq getTransferMultiFilePieceReq(String fileName, long position, byte[] payLoad) throws IOException {
        return PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(fileName, position, payLoad);
    }

    public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
        if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
            LOGGER.warn("IoTDBThriftSyncConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. Ignore {}.", (Object)tabletInsertionEvent);
            return;
        }
        try {
            if (this.isTabletBatchModeEnabled) {
                if (this.tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
                    this.doTransfer();
                }
            } else if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
                this.doTransferWrapper((PipeInsertNodeTabletInsertionEvent)tabletInsertionEvent);
            } else {
                this.doTransferWrapper((PipeRawTabletInsertionEvent)tabletInsertionEvent);
            }
        }
        catch (Exception e) {
            throw new PipeConnectionException(String.format("Failed to transfer tablet insertion event %s, because %s.", ((EnrichedEvent)tabletInsertionEvent).coreReportMessage(), e.getMessage()), (Throwable)e);
        }
    }

    public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
        if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
            LOGGER.warn("IoTDBThriftSyncConnector only support PipeTsFileInsertionEvent. Ignore {}.", (Object)tsFileInsertionEvent);
            return;
        }
        try {
            if (this.isTabletBatchModeEnabled && !this.tabletBatchBuilder.isEmpty()) {
                this.doTransfer();
            }
            this.doTransferWrapper((PipeTsFileInsertionEvent)tsFileInsertionEvent);
        }
        catch (Exception e) {
            throw new PipeConnectionException(String.format("Failed to transfer tsfile insertion event %s, because %s.", ((PipeTsFileInsertionEvent)tsFileInsertionEvent).coreReportMessage(), e.getMessage()), (Throwable)e);
        }
    }

    public void transfer(Event event) throws Exception {
        if (event instanceof PipeSchemaRegionWritePlanEvent) {
            this.doTransferWrapper((PipeSchemaRegionWritePlanEvent)event);
            return;
        }
        if (this.isTabletBatchModeEnabled && !this.tabletBatchBuilder.isEmpty()) {
            this.doTransfer();
        }
        if (!(event instanceof PipeHeartbeatEvent)) {
            LOGGER.warn("IoTDBThriftSyncConnector does not support transferring generic event: {}.", (Object)event);
        }
    }

    private void doTransfer() {
        TPipeTransferResp resp;
        Pair clientAndStatus = this.clientManager.getClient();
        try {
            resp = ((IoTDBSyncClient)clientAndStatus.getLeft()).pipeTransfer((TPipeTransferReq)this.tabletBatchBuilder.toTPipeTransferReq());
        }
        catch (Exception e) {
            clientAndStatus.setRight((Object)false);
            throw new PipeConnectionException(String.format("Network error when transfer tablet batch, because %s.", e.getMessage()), (Throwable)e);
        }
        TSStatus status = resp.getStatus();
        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
            this.receiverStatusHandler.handle(resp.getStatus(), String.format("Transfer PipeTransferTabletBatchReq error, result status %s", resp.status), this.tabletBatchBuilder.deepCopyEvents().toString());
        }
        this.tabletBatchBuilder.decreaseEventsReferenceCount(IoTDBDataRegionSyncConnector.class.getName(), true);
        this.tabletBatchBuilder.onSuccess();
    }

    private void doTransferWrapper(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException {
        try {
            if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(IoTDBDataRegionSyncConnector.class.getName())) {
                return;
            }
            this.doTransfer(pipeInsertNodeTabletInsertionEvent);
        }
        finally {
            pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(IoTDBDataRegionSyncConnector.class.getName(), false);
        }
    }

    private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException {
        TPipeTransferResp resp;
        InsertNode insertNode;
        Pair clientAndStatus = null;
        try {
            insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
            if (insertNode != null) {
                clientAndStatus = Objects.nonNull(insertNode.getDevicePath()) ? this.clientManager.getClient(insertNode.getDevicePath().getFullPath()) : this.clientManager.getClient();
                resp = ((IoTDBSyncClient)clientAndStatus.getLeft()).pipeTransfer((TPipeTransferReq)PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNode));
            } else {
                clientAndStatus = this.clientManager.getClient();
                resp = ((IoTDBSyncClient)clientAndStatus.getLeft()).pipeTransfer((TPipeTransferReq)PipeTransferTabletBinaryReq.toTPipeTransferReq(pipeInsertNodeTabletInsertionEvent.getByteBuffer()));
            }
        }
        catch (Exception e) {
            if (clientAndStatus != null) {
                clientAndStatus.setRight((Object)false);
            }
            throw new PipeConnectionException(String.format("Network error when transfer insert node tablet insertion event, because %s.", e.getMessage()), (Throwable)e);
        }
        TSStatus status = resp.getStatus();
        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
            this.receiverStatusHandler.handle(status, String.format("Transfer PipeInsertNodeTabletInsertionEvent %s error, result status %s", pipeInsertNodeTabletInsertionEvent.coreReportMessage(), status), pipeInsertNodeTabletInsertionEvent.toString());
        }
        if (insertNode != null && insertNode.getDevicePath() != null && status.isSetRedirectNode()) {
            this.clientManager.updateLeaderCache(insertNode.getDevicePath().getFullPath(), status.getRedirectNode());
        }
    }

    private void doTransferWrapper(PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException {
        try {
            if (!pipeRawTabletInsertionEvent.increaseReferenceCount(IoTDBDataRegionSyncConnector.class.getName())) {
                return;
            }
            this.doTransfer(pipeRawTabletInsertionEvent);
        }
        finally {
            pipeRawTabletInsertionEvent.decreaseReferenceCount(IoTDBDataRegionSyncConnector.class.getName(), false);
        }
    }

    private void doTransfer(PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException {
        TPipeTransferResp resp;
        Pair<IoTDBSyncClient, Boolean> clientAndStatus = this.clientManager.getClient(pipeRawTabletInsertionEvent.getDeviceId());
        try {
            resp = ((IoTDBSyncClient)clientAndStatus.getLeft()).pipeTransfer((TPipeTransferReq)PipeTransferTabletRawReq.toTPipeTransferReq(pipeRawTabletInsertionEvent.convertToTablet(), pipeRawTabletInsertionEvent.isAligned()));
        }
        catch (Exception e) {
            clientAndStatus.setRight((Object)false);
            throw new PipeConnectionException(String.format("Network error when transfer raw tablet insertion event, because %s.", e.getMessage()), (Throwable)e);
        }
        TSStatus status = resp.getStatus();
        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
            this.receiverStatusHandler.handle(status, String.format("Transfer PipeRawTabletInsertionEvent %s error, result status %s", pipeRawTabletInsertionEvent.coreReportMessage(), status), pipeRawTabletInsertionEvent.toString());
        }
        if (status.isSetRedirectNode()) {
            this.clientManager.updateLeaderCache(pipeRawTabletInsertionEvent.getDeviceId(), status.getRedirectNode());
        }
    }

    private void doTransferWrapper(PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException, IOException {
        try {
            if (!pipeTsFileInsertionEvent.increaseReferenceCount(IoTDBDataRegionSyncConnector.class.getName())) {
                return;
            }
            this.doTransfer(pipeTsFileInsertionEvent);
        }
        finally {
            pipeTsFileInsertionEvent.decreaseReferenceCount(IoTDBDataRegionSyncConnector.class.getName(), false);
        }
    }

    private void doTransfer(PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException, IOException {
        TPipeTransferResp resp;
        File tsFile = pipeTsFileInsertionEvent.getTsFile();
        File modFile = pipeTsFileInsertionEvent.getModFile();
        Pair clientAndStatus = this.clientManager.getClient();
        if (pipeTsFileInsertionEvent.isWithMod() && this.clientManager.supportModsIfIsDataNodeReceiver()) {
            this.transferFilePieces(modFile, clientAndStatus, true);
            this.transferFilePieces(tsFile, clientAndStatus, true);
            try {
                resp = ((IoTDBSyncClient)clientAndStatus.getLeft()).pipeTransfer((TPipeTransferReq)PipeTransferTsFileSealWithModReq.toTPipeTransferReq(modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length()));
            }
            catch (Exception e) {
                clientAndStatus.setRight((Object)false);
                this.clientManager.adjustTimeoutIfNecessary(e);
                throw new PipeConnectionException(String.format("Network error when seal file %s, because %s.", tsFile, e.getMessage()), (Throwable)e);
            }
        }
        this.transferFilePieces(tsFile, clientAndStatus, false);
        try {
            resp = ((IoTDBSyncClient)clientAndStatus.getLeft()).pipeTransfer((TPipeTransferReq)PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length()));
        }
        catch (Exception e) {
            clientAndStatus.setRight((Object)false);
            this.clientManager.adjustTimeoutIfNecessary(e);
            throw new PipeConnectionException(String.format("Network error when seal file %s, because %s.", tsFile, e.getMessage()), (Throwable)e);
        }
        TSStatus status = resp.getStatus();
        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
            this.receiverStatusHandler.handle(resp.getStatus(), String.format("Seal file %s error, result status %s.", tsFile, resp.getStatus()), tsFile.getName());
        }
        LOGGER.info("Successfully transferred file {}.", (Object)tsFile);
    }

    public void close() {
        super.close();
        if (this.tabletBatchBuilder != null) {
            this.tabletBatchBuilder.close();
        }
    }
}

