package org.apache.iotdb.db.pipe.connector.v2.handler;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.connector.v2.IoTDBThriftConnectorV2;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/v2/handler/PipeTransferTabletInsertionEventHandler.class */
public abstract class PipeTransferTabletInsertionEventHandler<E extends TPipeTransferResp> implements AsyncMethodCallback<E> {
    private final long requestCommitId;
    private final EnrichedEvent event;
    private final TPipeTransferReq req;
    private final IoTDBThriftConnectorV2 connector;
    private int retryCount = 0;
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTabletInsertionEventHandler.class);
    private static final long MAX_RETRY_WAIT_TIME_MS = (long) (PipeConfig.getInstance().getPipeConnectorRetryIntervalMs() * Math.pow(2.0d, 5.0d));

    /* JADX INFO: Access modifiers changed from: protected */
    public PipeTransferTabletInsertionEventHandler(long j, @Nullable EnrichedEvent enrichedEvent, TPipeTransferReq tPipeTransferReq, IoTDBThriftConnectorV2 ioTDBThriftConnectorV2) {
        this.requestCommitId = j;
        this.event = enrichedEvent;
        this.req = tPipeTransferReq;
        this.connector = ioTDBThriftConnectorV2;
        Optional.ofNullable(enrichedEvent).ifPresent(enrichedEvent2 -> {
            enrichedEvent2.increaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName());
        });
    }

    public void transfer(AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient) throws TException {
        doTransfer(asyncPipeDataTransferServiceClient, this.req);
    }

    protected abstract void doTransfer(AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient, TPipeTransferReq tPipeTransferReq) throws TException;

    public void onComplete(TPipeTransferResp tPipeTransferResp) {
        if (tPipeTransferResp == null) {
            onError(new PipeException("TPipeTransferResp is null"));
        } else if (tPipeTransferResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            this.connector.commit(this.requestCommitId, this.event);
        } else {
            onError(new PipeException(tPipeTransferResp.getStatus().getMessage()));
        }
    }

    public void onError(Exception exc) {
        this.retryCount++;
        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(Math.min((long) (PipeConfig.getInstance().getPipeConnectorRetryIntervalMs() * Math.pow(2.0d, this.retryCount - 1.0d)), MAX_RETRY_WAIT_TIME_MS));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOGGER.warn("Unexpected interruption during retrying", e);
            }
            if (this.connector.isClosed()) {
                LOGGER.info("IoTDBThriftConnectorV2 has been stopped, we will not retry this request {} after {} times", new Object[]{this.req, Integer.valueOf(this.retryCount), exc});
            } else {
                LOGGER.warn("IoTDBThriftConnectorV2 failed to transfer request {} after {} times, retrying...", new Object[]{this.req, Integer.valueOf(this.retryCount), exc});
                retryTransfer(this.connector, this.requestCommitId);
            }
        });
    }

    protected abstract void retryTransfer(IoTDBThriftConnectorV2 ioTDBThriftConnectorV2, long j);
}
