package org.apache.iotdb.db.pipe.connector.protocol.thrift.async;

import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletBatchEventHandler;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletInsertNodeEventHandler;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletRawEventHandler;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTsFileInsertionEventHandler;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.class */
public class IoTDBThriftAsyncConnector extends IoTDBConnector {
    private static final String FAILED_TO_BORROW_CLIENT_FORMATTER = "Failed to borrow client from client pool for receiver %s:%s.";
    private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> asyncPipeDataTransferClientManager;
    private final IoTDBThriftSyncConnector retryConnector = new IoTDBThriftSyncConnector();
    private final PriorityBlockingQueue<Pair<Long, Event>> retryEventQueue = new PriorityBlockingQueue<>(11, Comparator.comparing(pair -> {
        return (Long) pair.left;
    }));
    private final AtomicLong commitIdGenerator = new AtomicLong(0);
    private final AtomicLong lastCommitId = new AtomicLong(0);
    private final PriorityQueue<Pair<Long, Runnable>> commitQueue = new PriorityQueue<>(Comparator.comparing(pair -> {
        return (Long) pair.left;
    }));
    private IoTDBThriftAsyncPipeTransferBatchReqBuilder tabletBatchBuilder;
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftAsyncConnector.class);
    private static final AtomicReference<IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>> ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new AtomicReference<>();

    public IoTDBThriftAsyncConnector() {
        if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
            synchronized (IoTDBThriftAsyncConnector.class) {
                if (ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get() == null) {
                    ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.set(new IClientManager.Factory().createClientManager(new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
                }
            }
        }
        this.asyncPipeDataTransferClientManager = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get();
    }

    @Override // org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector
    public void validate(PipeParameterValidator pipeParameterValidator) throws Exception {
        super.validate(pipeParameterValidator);
        this.retryConnector.validate(pipeParameterValidator);
    }

    @Override // org.apache.iotdb.db.pipe.connector.protocol.IoTDBConnector
    public void customize(PipeParameters pipeParameters, PipeConnectorRuntimeConfiguration pipeConnectorRuntimeConfiguration) throws Exception {
        super.customize(pipeParameters, pipeConnectorRuntimeConfiguration);
        this.retryConnector.customize(pipeParameters, pipeConnectorRuntimeConfiguration);
        if (this.isTabletBatchModeEnabled) {
            this.tabletBatchBuilder = new IoTDBThriftAsyncPipeTransferBatchReqBuilder(pipeParameters);
        }
    }

    public synchronized void handshake() throws Exception {
        this.retryConnector.handshake();
    }

    public void heartbeat() {
        this.retryConnector.heartbeat();
    }

    public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
        transferQueuedEventsIfNecessary();
        if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
            LOGGER.warn("IoTDBThriftAsyncConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. Current event: {}.", tabletInsertionEvent);
            return;
        }
        if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) {
            if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
                transfer(((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
                return;
            } else {
                transfer(((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern());
                return;
            }
        }
        long incrementAndGet = this.commitIdGenerator.incrementAndGet();
        if (this.isTabletBatchModeEnabled) {
            if (this.tabletBatchBuilder.onEvent(tabletInsertionEvent, incrementAndGet)) {
                transfer(incrementAndGet, new PipeTransferTabletBatchEventHandler(this.tabletBatchBuilder, this));
                this.tabletBatchBuilder.onSuccess();
                return;
            }
            return;
        }
        if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
            PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent = (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
            transfer(incrementAndGet, new PipeTransferTabletInsertNodeEventHandler(incrementAndGet, pipeInsertNodeTabletInsertionEvent, pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible() == null ? PipeTransferTabletBinaryReq.toTPipeTransferReq(pipeInsertNodeTabletInsertionEvent.getByteBuffer()) : PipeTransferTabletInsertNodeReq.toTPipeTransferReq(pipeInsertNodeTabletInsertionEvent.getInsertNode()), this));
        } else {
            PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent = (PipeRawTabletInsertionEvent) tabletInsertionEvent;
            transfer(incrementAndGet, new PipeTransferTabletRawEventHandler(incrementAndGet, pipeRawTabletInsertionEvent, PipeTransferTabletRawReq.toTPipeTransferReq(pipeRawTabletInsertionEvent.convertToTablet(), pipeRawTabletInsertionEvent.isAligned()), this));
        }
    }

    private void transfer(long j, PipeTransferTabletBatchEventHandler pipeTransferTabletBatchEventHandler) {
        TEndPoint tEndPoint = this.nodeUrls.get((int) (j % this.nodeUrls.size()));
        try {
            try {
                pipeTransferTabletBatchEventHandler.transfer(borrowClient(tEndPoint));
            } catch (TException e) {
                LOGGER.warn(String.format("Transfer batched insertion requests to receiver %s:%s error, retrying...", tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort())), e);
            }
        } catch (Exception e2) {
            pipeTransferTabletBatchEventHandler.onError(e2);
            LOGGER.warn(String.format(FAILED_TO_BORROW_CLIENT_FORMATTER, tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort())), e2);
        }
    }

    private void transfer(long j, PipeTransferTabletInsertNodeEventHandler pipeTransferTabletInsertNodeEventHandler) {
        TEndPoint tEndPoint = this.nodeUrls.get((int) (j % this.nodeUrls.size()));
        try {
            try {
                pipeTransferTabletInsertNodeEventHandler.transfer(borrowClient(tEndPoint));
            } catch (TException e) {
                LOGGER.warn(String.format("Transfer insert node to receiver %s:%s error, retrying...", tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort())), e);
            }
        } catch (Exception e2) {
            pipeTransferTabletInsertNodeEventHandler.onError(e2);
            LOGGER.warn(String.format(FAILED_TO_BORROW_CLIENT_FORMATTER, tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort())), e2);
        }
    }

    private void transfer(long j, PipeTransferTabletRawEventHandler pipeTransferTabletRawEventHandler) {
        TEndPoint tEndPoint = this.nodeUrls.get((int) (j % this.nodeUrls.size()));
        try {
            try {
                pipeTransferTabletRawEventHandler.transfer(borrowClient(tEndPoint));
            } catch (TException e) {
                LOGGER.warn(String.format("Transfer tablet to receiver %s:%s error, retrying...", tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort())), e);
            }
        } catch (Exception e2) {
            pipeTransferTabletRawEventHandler.onError(e2);
            LOGGER.warn(String.format(FAILED_TO_BORROW_CLIENT_FORMATTER, tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort())), e2);
        }
    }

    public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
        transferQueuedEventsIfNecessary();
        transferBatchedEventsIfNecessary();
        if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
            LOGGER.warn("IoTDBThriftAsyncConnector only support PipeTsFileInsertionEvent. Current event: {}.", tsFileInsertionEvent);
            return;
        }
        if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) {
            try {
                Iterator it = tsFileInsertionEvent.toTabletInsertionEvents().iterator();
                while (it.hasNext()) {
                    transfer((TabletInsertionEvent) it.next());
                }
                return;
            } finally {
                tsFileInsertionEvent.close();
            }
        }
        long incrementAndGet = this.commitIdGenerator.incrementAndGet();
        PipeTsFileInsertionEvent pipeTsFileInsertionEvent = (PipeTsFileInsertionEvent) tsFileInsertionEvent;
        PipeTransferTsFileInsertionEventHandler pipeTransferTsFileInsertionEventHandler = new PipeTransferTsFileInsertionEventHandler(incrementAndGet, pipeTsFileInsertionEvent, this);
        pipeTsFileInsertionEvent.waitForTsFileClose();
        transfer(incrementAndGet, pipeTransferTsFileInsertionEventHandler);
    }

    private void transfer(long j, PipeTransferTsFileInsertionEventHandler pipeTransferTsFileInsertionEventHandler) {
        TEndPoint tEndPoint = this.nodeUrls.get((int) (j % this.nodeUrls.size()));
        try {
            try {
                pipeTransferTsFileInsertionEventHandler.transfer(borrowClient(tEndPoint));
            } catch (TException e) {
                LOGGER.warn(String.format("Transfer tsfile to receiver %s:%s error, retrying...", tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort())), e);
            }
        } catch (Exception e2) {
            pipeTransferTsFileInsertionEventHandler.onError(e2);
            LOGGER.warn(String.format(FAILED_TO_BORROW_CLIENT_FORMATTER, tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort())), e2);
        }
    }

    public void transfer(Event event) throws Exception {
        transferQueuedEventsIfNecessary();
        transferBatchedEventsIfNecessary();
        if (event instanceof PipeHeartbeatEvent) {
            return;
        }
        LOGGER.warn("IoTDBThriftAsyncConnector does not support transfer generic event: {}.", event);
    }

    private AsyncPipeDataTransferServiceClient borrowClient(TEndPoint tEndPoint) throws PipeConnectionException {
        AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient;
        do {
            try {
                asyncPipeDataTransferServiceClient = (AsyncPipeDataTransferServiceClient) this.asyncPipeDataTransferClientManager.borrowClient(tEndPoint);
            } catch (Exception e) {
                throw new PipeConnectionException(String.format(FAILED_TO_BORROW_CLIENT_FORMATTER, tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort())), e);
            }
        } while (!handshakeIfNecessary(tEndPoint, asyncPipeDataTransferServiceClient));
        return asyncPipeDataTransferServiceClient;
    }

    private boolean handshakeIfNecessary(final TEndPoint tEndPoint, final AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient) throws Exception {
        if (asyncPipeDataTransferServiceClient.isHandshakeFinished()) {
            return true;
        }
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicReference atomicReference = new AtomicReference();
        asyncPipeDataTransferServiceClient.pipeTransfer(PipeTransferHandshakeReq.toTPipeTransferReq(CommonDescriptor.getInstance().getConfig().getTimestampPrecision()), new AsyncMethodCallback<TPipeTransferResp>() { // from class: org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector.1
            public void onComplete(TPipeTransferResp tPipeTransferResp) {
                if (tPipeTransferResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
                    IoTDBThriftAsyncConnector.LOGGER.warn("Handshake error with receiver {}:{}, code: {}, message: {}.", new Object[]{tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort()), Integer.valueOf(tPipeTransferResp.getStatus().getCode()), tPipeTransferResp.getStatus().getMessage()});
                    atomicReference.set(new PipeConnectionException(String.format("Handshake error with receiver %s:%s, code: %d, message: %s.", tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort()), Integer.valueOf(tPipeTransferResp.getStatus().getCode()), tPipeTransferResp.getStatus().getMessage())));
                } else {
                    IoTDBThriftAsyncConnector.LOGGER.info("Handshake successfully with receiver {}:{}.", tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort()));
                    asyncPipeDataTransferServiceClient.markHandshakeFinished();
                }
                atomicBoolean.set(true);
            }

            public void onError(Exception exc) {
                IoTDBThriftAsyncConnector.LOGGER.warn("Handshake error with receiver {}:{}.", new Object[]{tEndPoint.getIp(), Integer.valueOf(tEndPoint.getPort()), exc});
                atomicReference.set(exc);
                atomicBoolean.set(true);
            }
        });
        while (!atomicBoolean.get()) {
            try {
                Thread.sleep(10L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new PipeException("Interrupted while waiting for handshake response.", e);
            }
        }
        if (atomicReference.get() != null) {
            throw new PipeConnectionException("Failed to handshake.", (Throwable) atomicReference.get());
        }
        return false;
    }

    private synchronized void transferQueuedEventsIfNecessary() throws Exception {
        while (!this.retryEventQueue.isEmpty()) {
            Pair<Long, Event> peek = this.retryEventQueue.peek();
            long longValue = ((Long) peek.getLeft()).longValue();
            TabletInsertionEvent tabletInsertionEvent = (Event) peek.getRight();
            if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
                this.retryConnector.transfer(tabletInsertionEvent);
            } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
                this.retryConnector.transfer(tabletInsertionEvent);
            } else if (tabletInsertionEvent instanceof PipeTsFileInsertionEvent) {
                this.retryConnector.transfer((TsFileInsertionEvent) tabletInsertionEvent);
            } else {
                LOGGER.warn("IoTDBThriftAsyncConnector does not support transfer generic event: {}.", tabletInsertionEvent);
            }
            if (tabletInsertionEvent instanceof EnrichedEvent) {
                commit(longValue, (EnrichedEvent) tabletInsertionEvent);
            }
            this.retryEventQueue.poll();
        }
    }

    private void transferBatchedEventsIfNecessary() throws IOException {
        if (!this.isTabletBatchModeEnabled || this.tabletBatchBuilder.isEmpty()) {
            return;
        }
        transfer(this.commitIdGenerator.incrementAndGet(), new PipeTransferTabletBatchEventHandler(this.tabletBatchBuilder, this));
        this.tabletBatchBuilder.onSuccess();
    }

    public synchronized void commit(long j, @Nullable EnrichedEvent enrichedEvent) {
        this.commitQueue.offer(new Pair<>(Long.valueOf(j), () -> {
            Optional.ofNullable(enrichedEvent).ifPresent(enrichedEvent2 -> {
                enrichedEvent2.decreaseReferenceCount(IoTDBThriftAsyncConnector.class.getName(), true);
            });
        }));
        while (!this.commitQueue.isEmpty()) {
            Pair<Long, Runnable> peek = this.commitQueue.peek();
            if (this.lastCommitId.get() + 1 != ((Long) peek.left).longValue()) {
                return;
            }
            ((Runnable) peek.right).run();
            this.lastCommitId.incrementAndGet();
            this.commitQueue.poll();
        }
    }

    public void addFailureEventToRetryQueue(long j, Event event) {
        this.retryEventQueue.offer(new Pair<>(Long.valueOf(j), event));
    }

    public synchronized void close() throws Exception {
        this.retryConnector.close();
    }
}
