package org.apache.iotdb.db.pipe.connector.v2;

import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.commons.lang.NotImplementedException;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant;
import org.apache.iotdb.db.pipe.connector.v1.IoTDBThriftConnectorClient;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferHandshakeReq;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.v1.request.PipeTransferTabletReq;
import org.apache.iotdb.db.pipe.connector.v2.handler.PipeTransferInsertNodeTabletInsertionEventHandler;
import org.apache.iotdb.db.pipe.connector.v2.handler.PipeTransferRawTabletInsertionEventHandler;
import org.apache.iotdb.db.pipe.connector.v2.handler.PipeTransferTsFileInsertionEventHandler;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.session.util.SessionUtils;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.class */
public class IoTDBThriftConnectorV2 implements PipeConnector {
    private static final String FAILED_TO_BORROW_CLIENT_FORMATTER = "Failed to borrow client from client pool for receiver %s:%s.";
    private static volatile IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> asyncPipeDataTransferClientManagerHolder;
    private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> asyncPipeDataTransferClientManager;
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final AtomicLong commitIdGenerator = new AtomicLong(0);
    private final AtomicLong lastCommitId = new AtomicLong(0);
    private final PriorityQueue<Pair<Long, Runnable>> commitQueue = new PriorityQueue<>(Comparator.comparing(pair -> {
        return (Long) pair.left;
    }));
    private List<TEndPoint> nodeUrls;
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftConnectorV2.class);
    private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig();
    private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();

    public IoTDBThriftConnectorV2() {
        if (asyncPipeDataTransferClientManagerHolder == null) {
            synchronized (IoTDBThriftConnectorV2.class) {
                if (asyncPipeDataTransferClientManagerHolder == null) {
                    asyncPipeDataTransferClientManagerHolder = new IClientManager.Factory().createClientManager(new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory());
                }
            }
        }
        this.asyncPipeDataTransferClientManager = asyncPipeDataTransferClientManagerHolder;
    }

    public synchronized void commit(long j, @Nullable EnrichedEvent enrichedEvent) {
        this.commitQueue.offer(new Pair<>(Long.valueOf(j), () -> {
            Optional.ofNullable(enrichedEvent).ifPresent(enrichedEvent2 -> {
                enrichedEvent2.decreaseReferenceCount(IoTDBThriftConnectorV2.class.getName());
            });
        }));
        while (!this.commitQueue.isEmpty()) {
            Pair<Long, Runnable> peek = this.commitQueue.peek();
            if (this.lastCommitId.get() + 1 != ((Long) peek.left).longValue()) {
                return;
            }
            ((Runnable) peek.right).run();
            this.lastCommitId.incrementAndGet();
            this.commitQueue.poll();
        }
    }

    public void validate(PipeParameterValidator pipeParameterValidator) throws Exception {
        pipeParameterValidator.validateRequiredAttribute(PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY);
    }

    public void customize(PipeParameters pipeParameters, PipeConnectorRuntimeConfiguration pipeConnectorRuntimeConfiguration) throws Exception {
        this.nodeUrls = SessionUtils.parseSeedNodeUrls(Arrays.asList(pipeParameters.getString(PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY).split(",")));
        if (this.nodeUrls.isEmpty()) {
            throw new PipeException("Node urls is empty.");
        }
    }

    public void handshake() throws Exception {
        TEndPoint tEndPoint = this.nodeUrls.get(0);
        try {
            IoTDBThriftConnectorClient ioTDBThriftConnectorClient = new IoTDBThriftConnectorClient(new ThriftClientProperty.Builder().setConnectionTimeoutMs(COMMON_CONFIG.getConnectionTimeoutInMS()).setRpcThriftCompressionEnabled(COMMON_CONFIG.isRpcThriftCompressionEnabled()).build(), tEndPoint.getIp(), tEndPoint.getPort());
            try {
                TPipeTransferResp pipeTransfer = ioTDBThriftConnectorClient.pipeTransfer(PipeTransferHandshakeReq.toTPipeTransferReq(CommonDescriptor.getInstance().getConfig().getTimestampPrecision()));
                if (pipeTransfer.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                    throw new PipeException(String.format("Handshake error, result status %s.", pipeTransfer.status));
                }
                ioTDBThriftConnectorClient.close();
            } finally {
            }
        } catch (TException e) {
            throw new PipeConnectionException(String.format("Connect to receiver %s:%s error: %s", e.getMessage(), tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort())), e);
        }
    }

    public void heartbeat() {
    }

    public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
        if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
            long incrementAndGet = this.commitIdGenerator.incrementAndGet();
            PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent = (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
            transfer(incrementAndGet, new PipeTransferInsertNodeTabletInsertionEventHandler(incrementAndGet, pipeInsertNodeTabletInsertionEvent, PipeTransferInsertNodeReq.toTPipeTransferReq(pipeInsertNodeTabletInsertionEvent.getInsertNode()), this));
        } else {
            if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
                throw new NotImplementedException("IoTDBThriftConnectorV2 only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent.");
            }
            long incrementAndGet2 = this.commitIdGenerator.incrementAndGet();
            PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent = (PipeRawTabletInsertionEvent) tabletInsertionEvent;
            transfer(incrementAndGet2, new PipeTransferRawTabletInsertionEventHandler(incrementAndGet2, PipeTransferTabletReq.toTPipeTransferReq(pipeRawTabletInsertionEvent.convertToTablet(), pipeRawTabletInsertionEvent.isAligned()), this));
        }
    }

    public void transfer(long j, PipeTransferInsertNodeTabletInsertionEventHandler pipeTransferInsertNodeTabletInsertionEventHandler) {
        TEndPoint tEndPoint = this.nodeUrls.get((int) (j % this.nodeUrls.size()));
        try {
            try {
                pipeTransferInsertNodeTabletInsertionEventHandler.transfer((AsyncPipeDataTransferServiceClient) this.asyncPipeDataTransferClientManager.borrowClient(tEndPoint));
            } catch (TException e) {
                LOGGER.warn(String.format("Transfer insert node to receiver %s:%s error, retrying...", tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort())), e);
            }
        } catch (Exception e2) {
            pipeTransferInsertNodeTabletInsertionEventHandler.onError(e2);
            LOGGER.warn(String.format(FAILED_TO_BORROW_CLIENT_FORMATTER, tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort())), e2);
        }
    }

    public void transfer(long j, PipeTransferRawTabletInsertionEventHandler pipeTransferRawTabletInsertionEventHandler) {
        TEndPoint tEndPoint = this.nodeUrls.get((int) (j % this.nodeUrls.size()));
        try {
            try {
                pipeTransferRawTabletInsertionEventHandler.transfer((AsyncPipeDataTransferServiceClient) this.asyncPipeDataTransferClientManager.borrowClient(tEndPoint));
            } catch (TException e) {
                LOGGER.warn(String.format("Transfer tablet to receiver %s:%s error, retrying...", tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort())), e);
            }
        } catch (Exception e2) {
            pipeTransferRawTabletInsertionEventHandler.onError(e2);
            LOGGER.warn(String.format(FAILED_TO_BORROW_CLIENT_FORMATTER, tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort())), e2);
        }
    }

    public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
        if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
            throw new NotImplementedException("IoTDBThriftConnectorV2 only support PipeTsFileInsertionEvent.");
        }
        long incrementAndGet = this.commitIdGenerator.incrementAndGet();
        PipeTsFileInsertionEvent pipeTsFileInsertionEvent = (PipeTsFileInsertionEvent) tsFileInsertionEvent;
        PipeTransferTsFileInsertionEventHandler pipeTransferTsFileInsertionEventHandler = new PipeTransferTsFileInsertionEventHandler(incrementAndGet, pipeTsFileInsertionEvent, this);
        pipeTsFileInsertionEvent.waitForTsFileClose();
        transfer(incrementAndGet, pipeTransferTsFileInsertionEventHandler);
    }

    public void transfer(long j, PipeTransferTsFileInsertionEventHandler pipeTransferTsFileInsertionEventHandler) {
        TEndPoint tEndPoint = this.nodeUrls.get((int) (j % this.nodeUrls.size()));
        try {
            try {
                pipeTransferTsFileInsertionEventHandler.transfer((AsyncPipeDataTransferServiceClient) this.asyncPipeDataTransferClientManager.borrowClient(tEndPoint));
            } catch (TException e) {
                LOGGER.warn(String.format("Transfer tsfile to receiver %s:%s error, retrying...", tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort())), e);
            }
        } catch (Exception e2) {
            pipeTransferTsFileInsertionEventHandler.onError(e2);
            LOGGER.warn(String.format(FAILED_TO_BORROW_CLIENT_FORMATTER, tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort())), e2);
        }
    }

    public void transfer(Event event) {
        LOGGER.warn("IoTDBThriftConnectorV2 does not support transfer generic event: {}.", event);
    }

    public void close() {
        this.isClosed.set(true);
    }

    public boolean isClosed() {
        return this.isClosed.get();
    }
}
