package org.apache.iotdb.db.pipe.connector.protocol.thrift.async;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.plugin.builtin.connector.iotdb.IoTDBConnector;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletBatchEventHandler;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletInsertNodeEventHandler;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletRawEventHandler;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTsFileInsertionEventHandler;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBThriftSyncConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.utils.constant.SqlConstant;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.class */
public class IoTDBThriftAsyncConnector extends IoTDBConnector {
    private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBThriftAsyncConnector.class);
    private static final String THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT = "Failed to borrow client from client pool or exception occurred when sending to receiver.";
    private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT = "Failed to borrow client from client pool or exception occurred when sending to receiver %s:%s.";
    private IoTDBThriftAsyncClientManager clientManager;
    private final IoTDBThriftSyncConnector retryConnector = new IoTDBThriftSyncConnector();
    private final PriorityBlockingQueue<Event> retryEventQueue = new PriorityBlockingQueue<>(11, Comparator.comparing(event -> {
        return Long.valueOf(event instanceof EnrichedEvent ? ((EnrichedEvent) event).getCommitId() : 0L);
    }));
    private IoTDBThriftAsyncPipeTransferBatchReqBuilder tabletBatchBuilder;

    public void validate(PipeParameterValidator pipeParameterValidator) throws Exception {
        super.validate(pipeParameterValidator);
        this.retryConnector.validate(pipeParameterValidator);
        PipeParameters parameters = pipeParameterValidator.getParameters();
        pipeParameterValidator.validate(objArr -> {
            return (((Boolean) objArr[0]).booleanValue() || ((Boolean) objArr[1]).booleanValue() || ((Boolean) objArr[2]).booleanValue()) ? false : true;
        }, "Only 'iotdb-thrift-ssl-sink' supports SSL transmission currently.", new Object[]{Boolean.valueOf(parameters.getBooleanOrDefault("sink.ssl.enable", false)), Boolean.valueOf(parameters.hasAttribute("sink.ssl.trust-store-path")), Boolean.valueOf(parameters.hasAttribute("sink.ssl.trust-store-pwd"))});
    }

    public void customize(PipeParameters pipeParameters, PipeConnectorRuntimeConfiguration pipeConnectorRuntimeConfiguration) throws Exception {
        super.customize(pipeParameters, pipeConnectorRuntimeConfiguration);
        PipeParameters pipeParameters2 = new PipeParameters(new HashMap(pipeParameters.getAttribute()));
        pipeParameters2.getAttribute().put("sink.batch.enable", SqlConstant.BOOLEAN_FALSE);
        pipeParameters2.getAttribute().put("connector.batch.enable", SqlConstant.BOOLEAN_FALSE);
        this.retryConnector.customize(pipeParameters2, pipeConnectorRuntimeConfiguration);
        this.clientManager = new IoTDBThriftAsyncClientManager(this.nodeUrls, pipeParameters.getBooleanOrDefault(Arrays.asList("sink.leader-cache.enable", "connector.leader-cache.enable"), true));
        if (this.isTabletBatchModeEnabled) {
            this.tabletBatchBuilder = new IoTDBThriftAsyncPipeTransferBatchReqBuilder(pipeParameters);
        }
    }

    public synchronized void handshake() throws Exception {
        this.retryConnector.handshake();
    }

    public void heartbeat() {
        this.retryConnector.heartbeat();
    }

    public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception {
        transferQueuedEventsIfNecessary();
        if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
            LOGGER.warn("IoTDBThriftAsyncConnector only support PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. Current event: {}.", tabletInsertionEvent);
            return;
        }
        if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) {
            if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
                transfer(((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPatternOrTime());
                return;
            } else {
                transfer(((PipeRawTabletInsertionEvent) tabletInsertionEvent).parseEventWithPatternOrTime());
                return;
            }
        }
        if ((tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) && ((PipeRawTabletInsertionEvent) tabletInsertionEvent).hasNoNeedParsingAndIsEmpty()) {
            return;
        }
        if (this.isTabletBatchModeEnabled) {
            if (this.tabletBatchBuilder.onEvent(tabletInsertionEvent)) {
                transfer(new PipeTransferTabletBatchEventHandler(this.tabletBatchBuilder, this));
                this.tabletBatchBuilder.onSuccess();
                return;
            }
            return;
        }
        if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent)) {
            PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent = (PipeRawTabletInsertionEvent) tabletInsertionEvent;
            transfer(new PipeTransferTabletRawEventHandler(pipeRawTabletInsertionEvent, PipeTransferTabletRawReq.toTPipeTransferReq(pipeRawTabletInsertionEvent.convertToTablet(), pipeRawTabletInsertionEvent.isAligned()), this));
        } else {
            PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent = (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
            InsertNode insertNodeViaCacheIfPossible = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
            transfer(new PipeTransferTabletInsertNodeEventHandler(pipeInsertNodeTabletInsertionEvent, Objects.isNull(insertNodeViaCacheIfPossible) ? PipeTransferTabletBinaryReq.toTPipeTransferReq(pipeInsertNodeTabletInsertionEvent.getByteBuffer()) : PipeTransferTabletInsertNodeReq.toTPipeTransferReq(insertNodeViaCacheIfPossible), this));
        }
    }

    private void transfer(PipeTransferTabletBatchEventHandler pipeTransferTabletBatchEventHandler) {
        AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient = null;
        try {
            asyncPipeDataTransferServiceClient = this.clientManager.borrowClient();
            pipeTransferTabletBatchEventHandler.transfer(asyncPipeDataTransferServiceClient);
        } catch (Exception e) {
            logOnClientException(asyncPipeDataTransferServiceClient, e);
            pipeTransferTabletBatchEventHandler.onError(e);
        }
    }

    private void transfer(PipeTransferTabletInsertNodeEventHandler pipeTransferTabletInsertNodeEventHandler) {
        AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient = null;
        try {
            asyncPipeDataTransferServiceClient = this.clientManager.borrowClient();
            pipeTransferTabletInsertNodeEventHandler.transfer(asyncPipeDataTransferServiceClient);
        } catch (Exception e) {
            logOnClientException(asyncPipeDataTransferServiceClient, e);
            pipeTransferTabletInsertNodeEventHandler.onError(e);
        }
    }

    private void transfer(PipeTransferTabletRawEventHandler pipeTransferTabletRawEventHandler) {
        AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient = null;
        try {
            asyncPipeDataTransferServiceClient = this.clientManager.borrowClient();
            pipeTransferTabletRawEventHandler.transfer(asyncPipeDataTransferServiceClient);
        } catch (Exception e) {
            logOnClientException(asyncPipeDataTransferServiceClient, e);
            pipeTransferTabletRawEventHandler.onError(e);
        }
    }

    public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
        transferQueuedEventsIfNecessary();
        transferBatchedEventsIfNecessary();
        if (!(tsFileInsertionEvent instanceof PipeTsFileInsertionEvent)) {
            LOGGER.warn("IoTDBThriftAsyncConnector only support PipeTsFileInsertionEvent. Current event: {}.", tsFileInsertionEvent);
            return;
        }
        PipeTsFileInsertionEvent pipeTsFileInsertionEvent = (PipeTsFileInsertionEvent) tsFileInsertionEvent;
        if (!pipeTsFileInsertionEvent.waitForTsFileClose()) {
            LOGGER.warn("Pipe skipping temporary TsFile which shouldn't be transferred: {}", pipeTsFileInsertionEvent.getTsFile());
            return;
        }
        if (!pipeTsFileInsertionEvent.shouldParsePatternOrTime()) {
            if (!pipeTsFileInsertionEvent.getTsFile().exists()) {
                throw new FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
            }
            transfer(new PipeTransferTsFileInsertionEventHandler(pipeTsFileInsertionEvent, this));
        } else {
            try {
                Iterator<TabletInsertionEvent> it = pipeTsFileInsertionEvent.toTabletInsertionEvents().iterator();
                while (it.hasNext()) {
                    transfer(it.next());
                }
            } finally {
                pipeTsFileInsertionEvent.close();
            }
        }
    }

    private void transfer(PipeTransferTsFileInsertionEventHandler pipeTransferTsFileInsertionEventHandler) {
        AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient = null;
        try {
            asyncPipeDataTransferServiceClient = this.clientManager.borrowClient();
            pipeTransferTsFileInsertionEventHandler.transfer(asyncPipeDataTransferServiceClient);
        } catch (Exception e) {
            logOnClientException(asyncPipeDataTransferServiceClient, e);
            pipeTransferTsFileInsertionEventHandler.onError(e);
        }
    }

    public void transfer(Event event) throws Exception {
        transferQueuedEventsIfNecessary();
        transferBatchedEventsIfNecessary();
        if (event instanceof PipeHeartbeatEvent) {
            return;
        }
        LOGGER.warn("IoTDBThriftAsyncConnector does not support transferring generic event: {}.", event);
    }

    public void updateLeaderCache(String str, TEndPoint tEndPoint) {
        this.clientManager.updateLeaderCache(str, tEndPoint);
    }

    private void logOnClientException(AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient, Exception exc) {
        if (asyncPipeDataTransferServiceClient == null) {
            LOGGER.warn(THRIFT_ERROR_FORMATTER_WITHOUT_ENDPOINT, exc);
        } else {
            LOGGER.warn(String.format(THRIFT_ERROR_FORMATTER_WITH_ENDPOINT, asyncPipeDataTransferServiceClient.getIp(), Integer.valueOf(asyncPipeDataTransferServiceClient.getPort())), exc);
        }
    }

    private synchronized void transferQueuedEventsIfNecessary() throws Exception {
        while (!this.retryEventQueue.isEmpty()) {
            TabletInsertionEvent tabletInsertionEvent = (Event) this.retryEventQueue.peek();
            if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
                this.retryConnector.transfer(tabletInsertionEvent);
            } else if (tabletInsertionEvent instanceof PipeRawTabletInsertionEvent) {
                this.retryConnector.transfer(tabletInsertionEvent);
            } else if (tabletInsertionEvent instanceof PipeTsFileInsertionEvent) {
                this.retryConnector.transfer((TsFileInsertionEvent) tabletInsertionEvent);
            } else {
                LOGGER.warn("IoTDBThriftAsyncConnector does not support transfer generic event: {}.", tabletInsertionEvent);
            }
            if (tabletInsertionEvent instanceof EnrichedEvent) {
                ((EnrichedEvent) tabletInsertionEvent).decreaseReferenceCount(IoTDBThriftAsyncConnector.class.getName(), true);
            }
            TabletInsertionEvent tabletInsertionEvent2 = (Event) this.retryEventQueue.poll();
            if (tabletInsertionEvent2 != tabletInsertionEvent) {
                LOGGER.error("The event polled from the queue is not the same as the event peeked from the queue. Peeked event: {}, polled event: {}.", tabletInsertionEvent, tabletInsertionEvent2);
            }
            if (tabletInsertionEvent2 != null) {
                LOGGER.info("Polled event {} from retry queue.", tabletInsertionEvent2);
            }
        }
    }

    private void transferBatchedEventsIfNecessary() throws IOException {
        if (!this.isTabletBatchModeEnabled || this.tabletBatchBuilder.isEmpty()) {
            return;
        }
        transfer(new PipeTransferTabletBatchEventHandler(this.tabletBatchBuilder, this));
        this.tabletBatchBuilder.onSuccess();
    }

    public synchronized void addFailureEventToRetryQueue(Event event) {
        this.retryEventQueue.offer(event);
        LOGGER.info("Added event {} to retry queue.", event);
    }

    public synchronized void discardEventsOfPipe(String str) {
        this.retryEventQueue.removeIf(event -> {
            return (event instanceof EnrichedEvent) && str.equals(((EnrichedEvent) event).getPipeName());
        });
    }

    public synchronized void close() throws Exception {
        this.retryConnector.close();
        if (this.tabletBatchBuilder != null) {
            this.tabletBatchBuilder.close();
        }
    }

    public int getRetryEventQueueSize() {
        return this.retryEventQueue.size();
    }
}
