package org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftSyncPipeTransferBatchReqBuilder;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.class */
public class IoTDBThriftSyncConnector extends IoTDBConnector {
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftSyncConnector.class);
    private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
    private final List<IoTDBThriftSyncConnectorClient> clients = new ArrayList();
    private final List<Boolean> isClientAlive = new ArrayList();
    private long currentClientIndex = 0;
    private IoTDBThriftSyncPipeTransferBatchReqBuilder tabletBatchBuilder;

    @Override // org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector
    public void customize(PipeParameters pipeParameters, PipeConnectorRuntimeConfiguration pipeConnectorRuntimeConfiguration) throws Exception {
        super.customize(pipeParameters, pipeConnectorRuntimeConfiguration);
        for (int i = 0; i < this.nodeUrls.size(); i++) {
            this.isClientAlive.add(false);
            this.clients.add(null);
        }
        if (this.isTabletBatchModeEnabled) {
            this.tabletBatchBuilder = new IoTDBThriftSyncPipeTransferBatchReqBuilder(pipeParameters);
        }
    }

    public void handshake() throws Exception {
        for (int i = 0; i < this.clients.size(); i++) {
            if (!Boolean.TRUE.equals(this.isClientAlive.get(i))) {
                String ip = this.nodeUrls.get(i).getIp();
                int port = this.nodeUrls.get(i).getPort();
                if (this.clients.get(i) != null) {
                    try {
                        this.clients.set(i, null).close();
                    } catch (Exception e) {
                        LOGGER.warn("Failed to close client with target server ip: {}, port: {}, because: {}. Ignore it.", new Object[]{ip, Integer.valueOf(port), e.getMessage()});
                    }
                }
                this.clients.set(i, new IoTDBThriftSyncConnectorClient(new ThriftClientProperty.Builder().setConnectionTimeoutMs((int) PIPE_CONFIG.getPipeConnectorTimeoutMs()).setRpcThriftCompressionEnabled(PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled()).build(), ip, port));
                try {
                    TPipeTransferResp pipeTransfer = this.clients.get(i).pipeTransfer(PipeTransferHandshakeReq.toTPipeTransferReq(CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
                    if (pipeTransfer.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                        LOGGER.warn("Handshake error with target server ip: {}, port: {}, because: {}.", new Object[]{ip, Integer.valueOf(port), pipeTransfer.status});
                    } else {
                        this.isClientAlive.set(i, true);
                        LOGGER.info("Handshake success. Target server ip: {}, port: {}", ip, Integer.valueOf(port));
                    }
                } catch (TException e2) {
                    LOGGER.warn("Handshake error with target server ip: {}, port: {}, because: {}.", new Object[]{ip, Integer.valueOf(port), e2.getMessage()});
                }
            }
        }
        for (int i2 = 0; i2 < this.clients.size(); i2++) {
            if (Boolean.TRUE.equals(this.isClientAlive.get(i2))) {
                return;
            }
        }
        throw new PipeConnectionException(String.format("All target servers %s are not available.", this.nodeUrls));
    }

    public void heartbeat() {
        try {
            handshake();
        } catch (Exception e) {
            LOGGER.warn("Failed to reconnect to target server, because: {}. Try to reconnect later.", e.getMessage(), e);
        }
    }

    public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
        if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
            LOGGER.warn("IoTDBThriftSyncConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. Ignore {}.", tabletInsertionEvent);
            return;
        }
        if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) {
            if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
                transfer(((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
                return;
            } else {
                transfer(((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
                return;
            }
        }
        int nextClientIndex = nextClientIndex();
        IoTDBThriftSyncConnectorClient ioTDBThriftSyncConnectorClient = this.clients.get(nextClientIndex);
        try {
            if (this.isTabletBatchModeEnabled) {
                if (this.tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
                    doTransfer(ioTDBThriftSyncConnectorClient);
                }
            } else if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
                doTransfer(ioTDBThriftSyncConnectorClient, (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
            } else {
                doTransfer(ioTDBThriftSyncConnectorClient, (PipeRawTabletInsertionEvent) tabletInsertionEvent);
            }
        } catch (TException e) {
            this.isClientAlive.set(nextClientIndex, false);
            throw new PipeConnectionException(String.format("Network error when transfer tablet insertion event %s, because %s.", tabletInsertionEvent, e.getMessage()), e);
        }
    }

    public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
        if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
            LOGGER.warn("IoTDBThriftSyncConnector only support PipeTsFileInsertionEvent. Ignore {}.", tsFileInsertionEvent);
            return;
        }
        if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) {
            Iterator it = tsFileInsertionEvent.toTabletInsertionEvents().iterator();
            while (it.hasNext()) {
                transfer((TabletInsertionEvent) it.next());
            }
            return;
        }
        int nextClientIndex = nextClientIndex();
        IoTDBThriftSyncConnectorClient ioTDBThriftSyncConnectorClient = this.clients.get(nextClientIndex);
        try {
            if (this.isTabletBatchModeEnabled && !this.tabletBatchBuilder.isEmpty()) {
                doTransfer(ioTDBThriftSyncConnectorClient);
            }
            doTransfer(ioTDBThriftSyncConnectorClient, (PipeTsFileInsertionEvent) tsFileInsertionEvent);
        } catch (TException e) {
            this.isClientAlive.set(nextClientIndex, false);
            throw new PipeConnectionException(String.format("Network error when transfer tsfile insertion event %s, because %s.", tsFileInsertionEvent, e.getMessage()), e);
        }
    }

    public void transfer(Event event) throws TException, IOException {
        if (this.isTabletBatchModeEnabled && !this.tabletBatchBuilder.isEmpty()) {
            doTransfer(this.clients.get(nextClientIndex()));
        }
        if (event instanceof PipeHeartbeatEvent) {
            return;
        }
        LOGGER.warn("IoTDBThriftSyncConnector does not support transfer generic event: {}.", event);
    }

    private void doTransfer(IoTDBThriftSyncConnectorClient ioTDBThriftSyncConnectorClient) throws IOException, TException {
        TPipeTransferResp pipeTransfer = ioTDBThriftSyncConnectorClient.pipeTransfer(PipeTransferTabletBatchReq.toTPipeTransferReq(this.tabletBatchBuilder.getTPipeTransferReqs()));
        if (pipeTransfer.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeException(String.format("Transfer PipeTransferTabletBatchReq error, result status %s", pipeTransfer.status));
        }
        this.tabletBatchBuilder.onSuccess();
    }

    private void doTransfer(IoTDBThriftSyncConnectorClient ioTDBThriftSyncConnectorClient, PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException, TException, WALPipeException {
        TPipeTransferResp pipeTransfer = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible() == null ? ioTDBThriftSyncConnectorClient.pipeTransfer(PipeTransferTabletBinaryReq.toTPipeTransferReq(pipeInsertNodeTabletInsertionEvent.getByteBuffer())) : ioTDBThriftSyncConnectorClient.pipeTransfer(PipeTransferTabletInsertNodeReq.toTPipeTransferReq(pipeInsertNodeTabletInsertionEvent.getInsertNode()));
        if (pipeTransfer.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeException(String.format("Transfer PipeInsertNodeTabletInsertionEvent %s error, result status %s", pipeInsertNodeTabletInsertionEvent, pipeTransfer.status));
        }
    }

    private void doTransfer(IoTDBThriftSyncConnectorClient ioTDBThriftSyncConnectorClient, PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException, TException, IOException {
        TPipeTransferResp pipeTransfer = ioTDBThriftSyncConnectorClient.pipeTransfer(PipeTransferTabletRawReq.toTPipeTransferReq(pipeRawTabletInsertionEvent.convertToTablet(), pipeRawTabletInsertionEvent.isAligned()));
        if (pipeTransfer.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeException(String.format("Transfer PipeRawTabletInsertionEvent %s error, result status %s", pipeRawTabletInsertionEvent, pipeTransfer.status));
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:18:0x00c4, code lost:
    
        throw new org.apache.iotdb.pipe.api.exception.PipeException(java.lang.String.format("Transfer file %s error, result status %s.", r0, r0.getStatus()));
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void doTransfer(org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnectorClient r9, org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent r10) throws org.apache.iotdb.pipe.api.exception.PipeException, org.apache.thrift.TException, java.lang.InterruptedException, java.io.IOException {
        /*
            Method dump skipped, instructions count: 305
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector.doTransfer(org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnectorClient, org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent):void");
    }

    /*  JADX ERROR: Failed to decode insn: 0x0016: MOVE_MULTI, method: org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector.nextClientIndex():int
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:110)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    private int nextClientIndex() {
        /*
            r8 = this;
            r0 = r8
            java.util.List<org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnectorClient> r0 = r0.clients
            int r0 = r0.size()
            r9 = r0
            r0 = 0
            r10 = r0
            r0 = r10
            r1 = r9
            if (r0 >= r1) goto L3c
            r0 = r8
            r1 = r0
            long r1 = r1.currentClientIndex
            // decode failed: arraycopy: source index -1 out of bounds for object array[8]
            r2 = 1
            long r1 = r1 + r2
            r0.currentClientIndex = r1
            r0 = r9
            long r0 = (long) r0
            long r-1 = r-1 % r0
            int r-1 = (int) r-1
            r11 = r-1
            java.lang.Boolean r-1 = java.lang.Boolean.TRUE
            r0 = r8
            java.util.List<java.lang.Boolean> r0 = r0.isClientAlive
            r1 = r11
            java.lang.Object r0 = r0.get(r1)
            r-1.equals(r0)
            if (r-1 == 0) goto L36
            r-1 = r11
            return r-1
            int r10 = r10 + 1
            goto Lc
            org.apache.iotdb.pipe.api.exception.PipeConnectionException r0 = new org.apache.iotdb.pipe.api.exception.PipeConnectionException
            r1 = r0
            java.lang.String r2 = "All clients are dead, please check the connection to the receiver."
            r1.<init>(r2)
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector.nextClientIndex():int");
    }

    public void close() {
        int i = 0;
        while (i < this.clients.size()) {
            try {
                if (this.clients.get(i) != null) {
                    this.clients.set(i, null).close();
                }
            } catch (Exception e) {
                LOGGER.warn("Failed to close client {}.", Integer.valueOf(i), e);
            } finally {
                this.isClientAlive.set(i, Boolean.valueOf(false));
            }
            i++;
        }
    }
}
