package org.apache.iotdb.db.pipe.connector.protocol.thrift.async;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftClientManager;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.class */
public class IoTDBThriftAsyncClientManager extends IoTDBThriftClientManager {
    private final Set<TEndPoint> endPointSet;
    private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> endPoint2Client;
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftAsyncClientManager.class);
    private static final AtomicReference<IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>> ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new AtomicReference<>();

    public IoTDBThriftAsyncClientManager(List<TEndPoint> list, boolean z) {
        super(list, z);
        this.endPointSet = new HashSet(list);
        if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
            synchronized (IoTDBThriftAsyncConnector.class) {
                if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
                    ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.set(new IClientManager.Factory().createClientManager(new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
                }
            }
        }
        this.endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get();
    }

    public AsyncPipeDataTransferServiceClient borrowClient() throws Exception {
        TEndPoint tEndPoint;
        AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient;
        int size = this.endPointList.size();
        do {
            List<TEndPoint> list = this.endPointList;
            long j = this.currentClientIndex;
            this.currentClientIndex = j + 1;
            tEndPoint = list.get((int) (j % size));
            asyncPipeDataTransferServiceClient = (AsyncPipeDataTransferServiceClient) this.endPoint2Client.borrowClient(tEndPoint);
        } while (!handshakeIfNecessary(tEndPoint, asyncPipeDataTransferServiceClient));
        return asyncPipeDataTransferServiceClient;
    }

    public AsyncPipeDataTransferServiceClient borrowClient(String str) throws Exception {
        TEndPoint leaderEndPoint;
        if (this.useLeaderCache && (leaderEndPoint = leaderCacheManager.getLeaderEndPoint(str)) != null) {
            try {
                AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient = (AsyncPipeDataTransferServiceClient) this.endPoint2Client.borrowClient(leaderEndPoint);
                if (handshakeIfNecessary(leaderEndPoint, asyncPipeDataTransferServiceClient)) {
                    return asyncPipeDataTransferServiceClient;
                }
            } catch (Exception e) {
                LOGGER.warn("failed to borrow client {}:{} for cached leader.", new Object[]{leaderEndPoint.getIp(), Integer.valueOf(leaderEndPoint.getPort()), e});
            }
            return borrowClient();
        }
        return borrowClient();
    }

    private boolean handshakeIfNecessary(final TEndPoint tEndPoint, final AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient) throws Exception {
        if (asyncPipeDataTransferServiceClient.isHandshakeFinished()) {
            return true;
        }
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicReference atomicReference = new AtomicReference();
        asyncPipeDataTransferServiceClient.pipeTransfer(PipeTransferHandshakeReq.toTPipeTransferReq(CommonDescriptor.getInstance().getConfig().getTimestampPrecision()), new AsyncMethodCallback<TPipeTransferResp>() { // from class: org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncClientManager.1
            public void onComplete(TPipeTransferResp tPipeTransferResp) {
                if (tPipeTransferResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                    IoTDBThriftAsyncClientManager.LOGGER.warn("Handshake error with receiver {}:{}, code: {}, message: {}.", new Object[]{tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort()), Integer.valueOf(tPipeTransferResp.getStatus().getCode()), tPipeTransferResp.getStatus().getMessage()});
                    atomicReference.set(new PipeConnectionException(String.format("Handshake error with receiver %s:%s, code: %d, message: %s.", tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort()), Integer.valueOf(tPipeTransferResp.getStatus().getCode()), tPipeTransferResp.getStatus().getMessage())));
                } else {
                    IoTDBThriftAsyncClientManager.LOGGER.info("Handshake successfully with receiver {}:{}.", tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort()));
                    asyncPipeDataTransferServiceClient.markHandshakeFinished();
                }
                atomicBoolean.set(true);
            }

            public void onError(Exception exc) {
                IoTDBThriftAsyncClientManager.LOGGER.warn("Handshake error with receiver {}:{}.", new Object[]{tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort()), exc});
                atomicReference.set(exc);
                atomicBoolean.set(true);
            }
        });
        while (!atomicBoolean.get()) {
            try {
                Thread.sleep(10L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new PipeException("Interrupted while waiting for handshake response.", e);
            }
        }
        if (atomicReference.get() != null) {
            throw new PipeConnectionException("Failed to handshake.", (Throwable) atomicReference.get());
        }
        return false;
    }

    public void updateLeaderCache(String str, TEndPoint tEndPoint) {
        if (this.useLeaderCache) {
            if (!this.endPointSet.contains(tEndPoint)) {
                this.endPointList.add(tEndPoint);
                this.endPointSet.add(tEndPoint);
            }
            leaderCacheManager.updateLeaderEndPoint(str, tEndPoint);
        }
    }
}
