package org.apache.iotdb.db.pipe.connector.payload.evolvable.builder;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.class */
public abstract class PipeTransferBatchReqBuilder implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferBatchReqBuilder.class);
    protected final int maxDelayInMs;
    protected final PipeMemoryBlock allocatedMemoryBlock;
    protected final List<Event> events = new ArrayList();
    protected final List<Long> requestCommitIds = new ArrayList();
    protected final List<ByteBuffer> binaryBuffers = new ArrayList();
    protected final List<ByteBuffer> insertNodeBuffers = new ArrayList();
    protected final List<ByteBuffer> tabletBuffers = new ArrayList();
    protected long firstEventProcessingTime = Long.MIN_VALUE;
    protected long totalBufferSize = 0;

    /* JADX INFO: Access modifiers changed from: protected */
    public PipeTransferBatchReqBuilder(PipeParameters pipeParameters) {
        this.maxDelayInMs = pipeParameters.getIntOrDefault(Arrays.asList("connector.batch.max-delay-seconds", "sink.batch.max-delay-seconds"), 1) * 1000;
        long longOrDefault = pipeParameters.getLongOrDefault(Arrays.asList("connector.batch.size-bytes", "sink.batch.size-bytes"), 16777216L);
        this.allocatedMemoryBlock = PipeResourceManager.memory().tryAllocate(longOrDefault).setShrinkMethod(j -> {
            return Math.max(j / 2, 0L);
        }).setShrinkCallback((l, l2) -> {
            LOGGER.info("The batch size limit has shrunk from {} to {}.", l, l2);
        }).setExpandMethod(j2 -> {
            return Math.min(Math.max(j2, 1L) * 2, longOrDefault);
        }).setExpandCallback((l3, l4) -> {
            LOGGER.info("The batch size limit has expanded from {} to {}.", l3, l4);
        });
        if (getMaxBatchSizeInBytes() != longOrDefault) {
            LOGGER.info("PipeTransferBatchReqBuilder: the max batch size is adjusted from {} to {} due to the memory restriction", Long.valueOf(longOrDefault), Long.valueOf(getMaxBatchSizeInBytes()));
        }
    }

    public synchronized boolean onEvent(TabletInsertionEvent tabletInsertionEvent) throws IOException, WALPipeException {
        if (!(tabletInsertionEvent instanceof EnrichedEvent)) {
            return false;
        }
        long commitId = ((EnrichedEvent) tabletInsertionEvent).getCommitId();
        if (this.events.isEmpty() || !this.events.get(this.events.size() - 1).equals(tabletInsertionEvent)) {
            if (((EnrichedEvent) tabletInsertionEvent).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName())) {
                this.events.add(tabletInsertionEvent);
                this.requestCommitIds.add(Long.valueOf(commitId));
                this.totalBufferSize += buildTabletInsertionBuffer(tabletInsertionEvent);
                if (this.firstEventProcessingTime == Long.MIN_VALUE) {
                    this.firstEventProcessingTime = System.currentTimeMillis();
                }
            } else {
                ((EnrichedEvent) tabletInsertionEvent).decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false);
            }
        }
        return this.totalBufferSize >= getMaxBatchSizeInBytes() || System.currentTimeMillis() - this.firstEventProcessingTime >= ((long) this.maxDelayInMs);
    }

    public synchronized void onSuccess() {
        this.binaryBuffers.clear();
        this.insertNodeBuffers.clear();
        this.tabletBuffers.clear();
        this.events.clear();
        this.requestCommitIds.clear();
        this.firstEventProcessingTime = Long.MIN_VALUE;
        this.totalBufferSize = 0L;
    }

    public PipeTransferTabletBatchReq toTPipeTransferReq() throws IOException {
        return PipeTransferTabletBatchReq.toTPipeTransferReq(this.binaryBuffers, this.insertNodeBuffers, this.tabletBuffers);
    }

    protected long getMaxBatchSizeInBytes() {
        return this.allocatedMemoryBlock.getMemoryUsageInBytes();
    }

    public boolean isEmpty() {
        return this.binaryBuffers.isEmpty() && this.insertNodeBuffers.isEmpty() && this.tabletBuffers.isEmpty();
    }

    public List<Event> deepCopyEvents() {
        return new ArrayList(this.events);
    }

    protected int buildTabletInsertionBuffer(TabletInsertionEvent tabletInsertionEvent) throws IOException, WALPipeException {
        ByteBuffer wrap;
        if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
            PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent = (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
            InsertNode insertNodeViaCacheIfPossible = pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
            if (Objects.isNull(insertNodeViaCacheIfPossible)) {
                wrap = pipeInsertNodeTabletInsertionEvent.getByteBuffer();
                this.binaryBuffers.add(wrap);
            } else {
                wrap = insertNodeViaCacheIfPossible.serializeToByteBuffer();
                this.insertNodeBuffers.add(wrap);
            }
        } else {
            PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent = (PipeRawTabletInsertionEvent) tabletInsertionEvent;
            PublicBAOS publicBAOS = new PublicBAOS();
            try {
                DataOutputStream dataOutputStream = new DataOutputStream(publicBAOS);
                try {
                    pipeRawTabletInsertionEvent.convertToTablet().serialize(dataOutputStream);
                    ReadWriteIOUtils.write(Boolean.valueOf(pipeRawTabletInsertionEvent.isAligned()), dataOutputStream);
                    wrap = ByteBuffer.wrap(publicBAOS.getBuf(), 0, publicBAOS.size());
                    dataOutputStream.close();
                    publicBAOS.close();
                    this.tabletBuffers.add(wrap);
                } finally {
                }
            } catch (Throwable th) {
                try {
                    publicBAOS.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        return wrap.limit();
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        clearEventsReferenceCount(PipeTransferBatchReqBuilder.class.getName());
        this.allocatedMemoryBlock.close();
    }

    public void decreaseEventsReferenceCount(String str, boolean z) {
        Iterator<Event> it = this.events.iterator();
        while (it.hasNext()) {
            EnrichedEvent enrichedEvent = (Event) it.next();
            if (enrichedEvent instanceof EnrichedEvent) {
                enrichedEvent.decreaseReferenceCount(str, z);
            }
        }
    }

    public void clearEventsReferenceCount(String str) {
        Iterator<Event> it = this.events.iterator();
        while (it.hasNext()) {
            EnrichedEvent enrichedEvent = (Event) it.next();
            if (enrichedEvent instanceof EnrichedEvent) {
                enrichedEvent.clearReferenceCount(str);
            }
        }
    }
}
