/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftSyncPipeTransferBatchReqBuilder;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnectorClient;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IoTDBThriftSyncConnector
extends IoTDBConnector {
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftSyncConnector.class);
    private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
    private final List<IoTDBThriftSyncConnectorClient> clients = new ArrayList<IoTDBThriftSyncConnectorClient>();
    private final List<Boolean> isClientAlive = new ArrayList<Boolean>();
    private long currentClientIndex = 0L;
    private IoTDBThriftSyncPipeTransferBatchReqBuilder tabletBatchBuilder;

    @Override
    public void customize(PipeParameters parameters, PipeConnectorRuntimeConfiguration configuration) throws Exception {
        super.customize(parameters, configuration);
        for (int i = 0; i < this.nodeUrls.size(); ++i) {
            this.isClientAlive.add(false);
            this.clients.add(null);
        }
        if (this.isTabletBatchModeEnabled) {
            this.tabletBatchBuilder = new IoTDBThriftSyncPipeTransferBatchReqBuilder(parameters);
        }
    }

    public void handshake() throws Exception {
        int i;
        for (i = 0; i < this.clients.size(); ++i) {
            if (Boolean.TRUE.equals(this.isClientAlive.get(i))) continue;
            String ip = ((TEndPoint)this.nodeUrls.get(i)).getIp();
            int port = ((TEndPoint)this.nodeUrls.get(i)).getPort();
            if (this.clients.get(i) != null) {
                try {
                    ((IoTDBThriftSyncConnectorClient)this.clients.set(i, null)).close();
                }
                catch (Exception e) {
                    LOGGER.warn("Failed to close client with target server ip: {}, port: {}, because: {}. Ignore it.", new Object[]{ip, port, e.getMessage()});
                }
            }
            this.clients.set(i, new IoTDBThriftSyncConnectorClient(new ThriftClientProperty.Builder().setConnectionTimeoutMs((int)PIPE_CONFIG.getPipeConnectorTimeoutMs()).setRpcThriftCompressionEnabled(PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled()).build(), ip, port));
            try {
                TPipeTransferResp resp = this.clients.get(i).pipeTransfer(PipeTransferHandshakeReq.toTPipeTransferReq(CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
                if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                    LOGGER.warn("Handshake error with target server ip: {}, port: {}, because: {}.", new Object[]{ip, port, resp.status});
                    continue;
                }
                this.isClientAlive.set(i, true);
                LOGGER.info("Handshake success. Target server ip: {}, port: {}", (Object)ip, (Object)port);
                continue;
            }
            catch (TException e) {
                LOGGER.warn("Handshake error with target server ip: {}, port: {}, because: {}.", new Object[]{ip, port, e.getMessage()});
            }
        }
        for (i = 0; i < this.clients.size(); ++i) {
            if (!Boolean.TRUE.equals(this.isClientAlive.get(i))) continue;
            return;
        }
        throw new PipeConnectionException(String.format("All target servers %s are not available.", this.nodeUrls));
    }

    public void heartbeat() {
        try {
            this.handshake();
        }
        catch (Exception e) {
            LOGGER.warn("Failed to reconnect to target server, because: {}. Try to reconnect later.", (Object)e.getMessage(), (Object)e);
        }
    }

    public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
        if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
            LOGGER.warn("IoTDBThriftSyncConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. Ignore {}.", (Object)tabletInsertionEvent);
            return;
        }
        if (((EnrichedEvent)tabletInsertionEvent).shouldParsePatternOrTime()) {
            if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
                this.transfer(((PipeInsertNodeTabletInsertionEvent)tabletInsertionEvent).parseEventWithPattern());
            } else {
                this.transfer(((PipeRawTabletInsertionEvent)tabletInsertionEvent).parseEventWithPattern());
            }
            return;
        }
        int clientIndex = this.nextClientIndex();
        IoTDBThriftSyncConnectorClient client = this.clients.get(clientIndex);
        try {
            if (this.isTabletBatchModeEnabled) {
                if (this.tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
                    this.doTransfer(client);
                }
            } else if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
                this.doTransfer(client, (PipeInsertNodeTabletInsertionEvent)tabletInsertionEvent);
            } else {
                this.doTransfer(client, (PipeRawTabletInsertionEvent)tabletInsertionEvent);
            }
        }
        catch (TException e) {
            this.isClientAlive.set(clientIndex, false);
            throw new PipeConnectionException(String.format("Network error when transfer tablet insertion event %s, because %s.", tabletInsertionEvent, e.getMessage()), (Throwable)e);
        }
    }

    public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
        if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
            LOGGER.warn("IoTDBThriftSyncConnector only support PipeTsFileInsertionEvent. Ignore {}.", (Object)tsFileInsertionEvent);
            return;
        }
        if (((EnrichedEvent)tsFileInsertionEvent).shouldParsePatternOrTime()) {
            for (TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) {
                this.transfer(event);
            }
            return;
        }
        int clientIndex = this.nextClientIndex();
        IoTDBThriftSyncConnectorClient client = this.clients.get(clientIndex);
        try {
            if (this.isTabletBatchModeEnabled && !this.tabletBatchBuilder.isEmpty()) {
                this.doTransfer(client);
            }
            this.doTransfer(client, (PipeTsFileInsertionEvent)tsFileInsertionEvent);
        }
        catch (TException e) {
            this.isClientAlive.set(clientIndex, false);
            throw new PipeConnectionException(String.format("Network error when transfer tsfile insertion event %s, because %s.", tsFileInsertionEvent, e.getMessage()), (Throwable)e);
        }
    }

    public void transfer(Event event) throws TException, IOException {
        if (this.isTabletBatchModeEnabled && !this.tabletBatchBuilder.isEmpty()) {
            this.doTransfer(this.clients.get(this.nextClientIndex()));
        }
        if (!(event instanceof PipeHeartbeatEvent)) {
            LOGGER.warn("IoTDBThriftSyncConnector does not support transfer generic event: {}.", (Object)event);
        }
    }

    private void doTransfer(IoTDBThriftSyncConnectorClient client) throws IOException, TException {
        TPipeTransferResp resp = client.pipeTransfer(PipeTransferTabletBatchReq.toTPipeTransferReq(this.tabletBatchBuilder.getTPipeTransferReqs()));
        if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeException(String.format("Transfer PipeTransferTabletBatchReq error, result status %s", resp.status));
        }
        this.tabletBatchBuilder.onSuccess();
    }

    private void doTransfer(IoTDBThriftSyncConnectorClient client, PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent) throws PipeException, TException, WALPipeException {
        TPipeTransferResp resp;
        TPipeTransferResp tPipeTransferResp = resp = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible() == null ? client.pipeTransfer(PipeTransferTabletBinaryReq.toTPipeTransferReq(pipeInsertNodeTabletInsertionEvent.getByteBuffer())) : client.pipeTransfer(PipeTransferTabletInsertNodeReq.toTPipeTransferReq(pipeInsertNodeTabletInsertionEvent.getInsertNode()));
        if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeException(String.format("Transfer PipeInsertNodeTabletInsertionEvent %s error, result status %s", pipeInsertNodeTabletInsertionEvent, resp.status));
        }
    }

    private void doTransfer(IoTDBThriftSyncConnectorClient client, PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent) throws PipeException, TException, IOException {
        TPipeTransferResp resp = client.pipeTransfer(PipeTransferTabletRawReq.toTPipeTransferReq(pipeRawTabletInsertionEvent.convertToTablet(), pipeRawTabletInsertionEvent.isAligned()));
        if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeException(String.format("Transfer PipeRawTabletInsertionEvent %s error, result status %s", pipeRawTabletInsertionEvent, resp.status));
        }
    }

    private void doTransfer(IoTDBThriftSyncConnectorClient client, PipeTsFileInsertionEvent pipeTsFileInsertionEvent) throws PipeException, TException, InterruptedException, IOException {
        File tsFile;
        block9: {
            pipeTsFileInsertionEvent.waitForTsFileClose();
            tsFile = pipeTsFileInsertionEvent.getTsFile();
            int readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
            byte[] readBuffer = new byte[readFileBufferSize];
            long position = 0L;
            try (RandomAccessFile reader = new RandomAccessFile(tsFile, "r");){
                PipeTransferFilePieceResp resp;
                while (true) {
                    int readLength;
                    if ((readLength = reader.read(readBuffer)) == -1) {
                        break block9;
                    }
                    resp = PipeTransferFilePieceResp.fromTPipeTransferResp(client.pipeTransfer(PipeTransferFilePieceReq.toTPipeTransferReq(tsFile.getName(), position, readLength == readFileBufferSize ? readBuffer : Arrays.copyOfRange(readBuffer, 0, readLength))));
                    position += (long)readLength;
                    if (resp.getStatus().getCode() == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) {
                        position = resp.getEndWritingOffset();
                        reader.seek(position);
                        LOGGER.info("Redirect file position to {}.", (Object)position);
                        continue;
                    }
                    if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) break;
                }
                throw new PipeException(String.format("Transfer file %s error, result status %s.", tsFile, resp.getStatus()));
            }
        }
        TPipeTransferResp resp = client.pipeTransfer(PipeTransferFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length()));
        if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            throw new PipeException(String.format("Seal file %s error, result status %s.", tsFile, resp.getStatus()));
        }
        LOGGER.info("Successfully transferred file {}.", (Object)tsFile);
    }

    private int nextClientIndex() {
        int clientSize = this.clients.size();
        for (int tryCount = 0; tryCount < clientSize; ++tryCount) {
            int clientIndex;
            if (!Boolean.TRUE.equals(this.isClientAlive.get(clientIndex = (int)(this.currentClientIndex++ % (long)clientSize)))) continue;
            return clientIndex;
        }
        throw new PipeConnectionException("All clients are dead, please check the connection to the receiver.");
    }

    public void close() {
        for (int i = 0; i < this.clients.size(); ++i) {
            try {
                if (this.clients.get(i) == null) continue;
                ((IoTDBThriftSyncConnectorClient)this.clients.set(i, null)).close();
                continue;
            }
            catch (Exception e) {
                LOGGER.warn("Failed to close client {}.", (Object)i, (Object)e);
                continue;
            }
            finally {
                this.isClientAlive.set(i, false);
            }
        }
    }
}

