package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.builder.IoTDBThriftAsyncPipeTransferBatchReqBuilder;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.class */
public class PipeTransferTabletBatchEventHandler implements AsyncMethodCallback<TPipeTransferResp> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTabletBatchEventHandler.class);
    private final List<Long> requestCommitIds;
    private final List<Event> events;
    private final TPipeTransferReq req;
    private final IoTDBDataRegionAsyncConnector connector;

    public PipeTransferTabletBatchEventHandler(IoTDBThriftAsyncPipeTransferBatchReqBuilder ioTDBThriftAsyncPipeTransferBatchReqBuilder, IoTDBDataRegionAsyncConnector ioTDBDataRegionAsyncConnector) throws IOException {
        this.requestCommitIds = ioTDBThriftAsyncPipeTransferBatchReqBuilder.deepCopyRequestCommitIds();
        this.events = ioTDBThriftAsyncPipeTransferBatchReqBuilder.deepCopyEvents();
        this.req = ioTDBThriftAsyncPipeTransferBatchReqBuilder.toTPipeTransferReq();
        this.connector = ioTDBDataRegionAsyncConnector;
    }

    public void transfer(AsyncPipeDataTransferServiceClient asyncPipeDataTransferServiceClient) throws TException {
        asyncPipeDataTransferServiceClient.pipeTransfer(this.req, this);
    }

    public void onComplete(TPipeTransferResp tPipeTransferResp) {
        if (tPipeTransferResp == null) {
            onError(new PipeException("TPipeTransferResp is null"));
            return;
        }
        try {
            TSStatus status = tPipeTransferResp.getStatus();
            if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
                this.connector.statusHandler().handle(status, tPipeTransferResp.getStatus().getMessage(), this.events.toString());
            }
            Iterator<Event> it = this.events.iterator();
            while (it.hasNext()) {
                EnrichedEvent enrichedEvent = (Event) it.next();
                if (enrichedEvent instanceof EnrichedEvent) {
                    enrichedEvent.decreaseReferenceCount(PipeTransferTabletBatchEventHandler.class.getName(), true);
                }
            }
        } catch (Exception e) {
            onError(e);
        }
    }

    public void onError(Exception exc) {
        LOGGER.warn("Failed to transfer TabletInsertionEvent batch {} (request commit ids={}).", new Object[]{this.events.stream().map(event -> {
            return event instanceof EnrichedEvent ? ((EnrichedEvent) event).coreReportMessage() : event.toString();
        }).collect(Collectors.toList()), this.requestCommitIds, exc});
        this.connector.addFailureEventsToRetryQueue(this.events);
    }
}
