package org.apache.iotdb.db.queryengine.execution.exchange.sink;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.Validate;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel.class */
public class SinkChannel implements ISinkChannel {
    public static final int MAX_ATTEMPT_TIMES = 3;
    private static final long DEFAULT_RETRY_INTERVAL_IN_MS = 1000;
    private final TEndPoint remoteEndpoint;
    private final TFragmentInstanceId remoteFragmentInstanceId;
    private final String remotePlanNodeId;
    private final String localPlanNodeId;
    private final TFragmentInstanceId localFragmentInstanceId;
    private final String fullFragmentInstanceId;
    private final LocalMemoryManager localMemoryManager;
    private final ExecutorService executorService;
    private final TsBlockSerde serde;
    private final MPPDataExchangeManager.SinkListener sinkListener;
    private final String threadName;
    private final IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mppDataExchangeServiceClientManager;
    private volatile ListenableFuture<Void> blocked;
    private static final Logger LOGGER = LoggerFactory.getLogger(SinkChannel.class);
    private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
    private static final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRIC_SET = DataExchangeCostMetricSet.getInstance();
    private static final DataExchangeCountMetricSet DATA_EXCHANGE_COUNT_METRIC_SET = DataExchangeCountMetricSet.getInstance();
    private final LinkedHashMap<Integer, Pair<TsBlock, Long>> sequenceIdToTsBlock = new LinkedHashMap<>();
    private int nextSequenceId = 0;
    private boolean aborted = false;
    private boolean closed = false;
    private boolean noMoreTsBlocks = false;
    private final AtomicBoolean invokedOnFinished = new AtomicBoolean(false);
    private long maxBytesCanReserve = IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
    private long retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;
    private long bufferRetainedSizeInBytes = 0;
    private long currentTsBlockSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;

    /* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel$SendEndOfDataBlockEventTask.class */
    class SendEndOfDataBlockEventTask implements Runnable {
        SendEndOfDataBlockEventTask() {
        }

        /* JADX WARN: Code restructure failed: missing block: B:14:0x00f3, code lost:
        
            r8.this$0.noMoreTsBlocks = true;
         */
        /* JADX WARN: Code restructure failed: missing block: B:15:0x0103, code lost:
        
            if (r8.this$0.isFinished() == false) goto L33;
         */
        /* JADX WARN: Code restructure failed: missing block: B:16:0x0106, code lost:
        
            r8.this$0.invokeOnFinished();
         */
        /* JADX WARN: Code restructure failed: missing block: B:17:0x010d, code lost:
        
            r8.this$0.sinkListener.onEndOfBlocks(r8.this$0);
         */
        /* JADX WARN: Code restructure failed: missing block: B:18:0x011e, code lost:
        
            r0.close();
         */
        /* JADX WARN: Code restructure failed: missing block: B:19:0x0134, code lost:
        
            return;
         */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 309
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.iotdb.db.queryengine.execution.exchange.sink.SinkChannel.SendEndOfDataBlockEventTask.run():void");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/exchange/sink/SinkChannel$SendNewDataBlockEventTask.class */
    public class SendNewDataBlockEventTask implements Runnable {
        private final int startSequenceId;
        private final List<Long> blockSizes;

        SendNewDataBlockEventTask(int i, List<Long> list) {
            Validate.isTrue(i >= 0, "Start sequence ID should be greater than or equal to zero, but was: " + i + ".", new Object[0]);
            this.startSequenceId = i;
            this.blockSizes = (List) Validate.notNull(list);
        }

        @Override // java.lang.Runnable
        public void run() {
            SetThreadName setThreadName = new SetThreadName(SinkChannel.this.threadName);
            try {
                SinkChannel.LOGGER.debug("[NotifyNewTsBlock] [{}, {}) to {}.{}", new Object[]{Integer.valueOf(this.startSequenceId), Integer.valueOf(this.startSequenceId + this.blockSizes.size()), SinkChannel.this.remoteFragmentInstanceId, SinkChannel.this.remotePlanNodeId});
                int i = 0;
                TNewDataBlockEvent tNewDataBlockEvent = new TNewDataBlockEvent(SinkChannel.this.remoteFragmentInstanceId, SinkChannel.this.remotePlanNodeId, SinkChannel.this.localFragmentInstanceId, this.startSequenceId, this.blockSizes);
                while (true) {
                    if (i >= 3) {
                        break;
                    }
                    i++;
                    long nanoTime = System.nanoTime();
                    try {
                        SyncDataNodeMPPDataExchangeServiceClient syncDataNodeMPPDataExchangeServiceClient = (SyncDataNodeMPPDataExchangeServiceClient) SinkChannel.this.mppDataExchangeServiceClientManager.borrowClient(SinkChannel.this.remoteEndpoint);
                        try {
                            syncDataNodeMPPDataExchangeServiceClient.onNewDataBlockEvent(tNewDataBlockEvent);
                            if (syncDataNodeMPPDataExchangeServiceClient != null) {
                                syncDataNodeMPPDataExchangeServiceClient.close();
                            }
                            SinkChannel.DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(DataExchangeCostMetricSet.SEND_NEW_DATA_BLOCK_EVENT_TASK_CALLER, System.nanoTime() - nanoTime);
                            SinkChannel.DATA_EXCHANGE_COUNT_METRIC_SET.recordDataBlockNum(DataExchangeCountMetricSet.SEND_NEW_DATA_BLOCK_NUM_CALLER, this.blockSizes.size());
                        } catch (Throwable th) {
                            if (syncDataNodeMPPDataExchangeServiceClient != null) {
                                try {
                                    syncDataNodeMPPDataExchangeServiceClient.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    } catch (Exception e) {
                        try {
                            SinkChannel.LOGGER.warn("Failed to send new data block event, attempt times: {}", Integer.valueOf(i), e);
                            if (i == 3) {
                                SinkChannel.this.sinkListener.onFailure(SinkChannel.this, e);
                            }
                            try {
                                Thread.sleep(SinkChannel.this.retryIntervalInMs);
                            } catch (InterruptedException e2) {
                                Thread.currentThread().interrupt();
                                SinkChannel.this.sinkListener.onFailure(SinkChannel.this, e);
                            }
                            SinkChannel.DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(DataExchangeCostMetricSet.SEND_NEW_DATA_BLOCK_EVENT_TASK_CALLER, System.nanoTime() - nanoTime);
                            SinkChannel.DATA_EXCHANGE_COUNT_METRIC_SET.recordDataBlockNum(DataExchangeCountMetricSet.SEND_NEW_DATA_BLOCK_NUM_CALLER, this.blockSizes.size());
                        } catch (Throwable th3) {
                            SinkChannel.DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(DataExchangeCostMetricSet.SEND_NEW_DATA_BLOCK_EVENT_TASK_CALLER, System.nanoTime() - nanoTime);
                            SinkChannel.DATA_EXCHANGE_COUNT_METRIC_SET.recordDataBlockNum(DataExchangeCountMetricSet.SEND_NEW_DATA_BLOCK_NUM_CALLER, this.blockSizes.size());
                            throw th3;
                        }
                    }
                }
                setThreadName.close();
            } catch (Throwable th4) {
                try {
                    setThreadName.close();
                } catch (Throwable th5) {
                    th4.addSuppressed(th5);
                }
                throw th4;
            }
        }
    }

    public SinkChannel(TEndPoint tEndPoint, TFragmentInstanceId tFragmentInstanceId, String str, String str2, TFragmentInstanceId tFragmentInstanceId2, LocalMemoryManager localMemoryManager, ExecutorService executorService, TsBlockSerde tsBlockSerde, MPPDataExchangeManager.SinkListener sinkListener, IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> iClientManager) {
        this.remoteEndpoint = (TEndPoint) Validate.notNull(tEndPoint);
        this.remoteFragmentInstanceId = (TFragmentInstanceId) Validate.notNull(tFragmentInstanceId);
        this.remotePlanNodeId = (String) Validate.notNull(str);
        this.localPlanNodeId = (String) Validate.notNull(str2);
        this.localFragmentInstanceId = (TFragmentInstanceId) Validate.notNull(tFragmentInstanceId2);
        this.fullFragmentInstanceId = FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(tFragmentInstanceId2);
        this.localMemoryManager = (LocalMemoryManager) Validate.notNull(localMemoryManager);
        this.executorService = (ExecutorService) Validate.notNull(executorService);
        this.serde = (TsBlockSerde) Validate.notNull(tsBlockSerde);
        this.sinkListener = (MPPDataExchangeManager.SinkListener) Validate.notNull(sinkListener);
        this.mppDataExchangeServiceClientManager = iClientManager;
        this.threadName = FragmentInstanceId.createFullId(tFragmentInstanceId2.queryId, tFragmentInstanceId2.fragmentId, tFragmentInstanceId2.instanceId);
        localMemoryManager.getQueryPool().registerPlanNodeIdToQueryMemoryMap(tFragmentInstanceId2.queryId, this.fullFragmentInstanceId, str2);
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink
    public synchronized ListenableFuture<?> isFull() {
        checkState();
        return this.closed ? Futures.immediateVoidFuture() : Futures.nonCancellationPropagating(this.blocked);
    }

    private void submitSendNewDataBlockEventTask(int i, List<Long> list) {
        this.executorService.submit(new SendNewDataBlockEventTask(i, list));
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink
    public synchronized void send(TsBlock tsBlock) {
        long nanoTime = System.nanoTime();
        try {
            Validate.notNull(tsBlock, "tsBlocks is null", new Object[0]);
            if (this.closed) {
                DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - nanoTime);
                return;
            }
            checkState();
            if (!this.blocked.isDone()) {
                throw new IllegalStateException("Sink handle is blocked.");
            }
            if (this.noMoreTsBlocks) {
                DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - nanoTime);
                return;
            }
            long retainedSizeInBytes = tsBlock.getRetainedSizeInBytes();
            int i = this.nextSequenceId;
            this.blocked = (ListenableFuture) this.localMemoryManager.getQueryPool().reserve(this.localFragmentInstanceId.getQueryId(), this.fullFragmentInstanceId, this.localPlanNodeId, retainedSizeInBytes, this.maxBytesCanReserve).left;
            this.bufferRetainedSizeInBytes += retainedSizeInBytes;
            this.sequenceIdToTsBlock.put(Integer.valueOf(this.nextSequenceId), new Pair<>(tsBlock, Long.valueOf(this.currentTsBlockSize)));
            this.nextSequenceId++;
            this.currentTsBlockSize = retainedSizeInBytes;
            submitSendNewDataBlockEventTask(i, ImmutableList.of(Long.valueOf(retainedSizeInBytes)));
            DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - nanoTime);
        } catch (Throwable th) {
            DATA_EXCHANGE_COST_METRIC_SET.recordDataExchangeCost(DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - nanoTime);
            throw th;
        }
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink
    public synchronized void setNoMoreTsBlocks() {
        LOGGER.debug("[StartSetNoMoreTsBlocks]");
        if (this.aborted || this.closed) {
            return;
        }
        this.executorService.submit(new SendEndOfDataBlockEventTask());
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink
    public synchronized void abort() {
        LOGGER.debug("[StartAbortSinkChannel]");
        if (this.aborted || this.closed) {
            return;
        }
        this.sequenceIdToTsBlock.clear();
        if (this.blocked != null) {
            this.bufferRetainedSizeInBytes -= this.localMemoryManager.getQueryPool().tryCancel(this.blocked);
        }
        if (this.bufferRetainedSizeInBytes > 0) {
            this.localMemoryManager.getQueryPool().free(this.localFragmentInstanceId.getQueryId(), this.fullFragmentInstanceId, this.localPlanNodeId, this.bufferRetainedSizeInBytes);
            this.bufferRetainedSizeInBytes = 0L;
        }
        this.sinkListener.onAborted(this);
        this.aborted = true;
        LOGGER.debug("[EndAbortSinkChannel]");
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink
    public synchronized void close() {
        LOGGER.debug("[StartCloseSinkChannel]");
        if (this.closed || this.aborted) {
            return;
        }
        this.sequenceIdToTsBlock.clear();
        if (this.blocked != null) {
            this.bufferRetainedSizeInBytes -= this.localMemoryManager.getQueryPool().tryCancel(this.blocked);
        }
        if (this.bufferRetainedSizeInBytes > 0) {
            this.localMemoryManager.getQueryPool().free(this.localFragmentInstanceId.getQueryId(), this.fullFragmentInstanceId, this.localPlanNodeId, this.bufferRetainedSizeInBytes);
            this.bufferRetainedSizeInBytes = 0L;
        }
        invokeOnFinished();
        this.closed = true;
        LOGGER.debug("[EndCloseSinkChannel]");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void invokeOnFinished() {
        if (this.invokedOnFinished.compareAndSet(false, true)) {
            this.sinkListener.onFinish(this);
        }
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink
    public boolean isClosed() {
        return this.closed;
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink
    public synchronized boolean isAborted() {
        return this.aborted;
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink
    public synchronized boolean isFinished() {
        return this.noMoreTsBlocks && this.sequenceIdToTsBlock.isEmpty();
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink
    public synchronized long getBufferRetainedSizeInBytes() {
        return this.bufferRetainedSizeInBytes;
    }

    public ByteBuffer getSerializedTsBlock(int i, int i2) {
        throw new UnsupportedOperationException();
    }

    public synchronized ByteBuffer getSerializedTsBlock(int i) throws IOException {
        if (this.aborted || this.closed) {
            LOGGER.debug("SinkChannel still receive getting TsBlock request after being aborted={} or closed={}", Boolean.valueOf(this.aborted), Boolean.valueOf(this.closed));
            throw new GetTsBlockFromClosedOrAbortedChannelException("SinkChannel is aborted or closed. ");
        }
        Pair<TsBlock, Long> pair = this.sequenceIdToTsBlock.get(Integer.valueOf(i));
        if (pair != null && pair.left != null) {
            return this.serde.serialize((TsBlock) pair.left);
        }
        LOGGER.warn("The TsBlock doesn't exist. Sequence ID is {}, remaining map is {}", Integer.valueOf(i), this.sequenceIdToTsBlock.entrySet());
        throw new IllegalStateException("The data block doesn't exist. Sequence ID: " + i);
    }

    public void acknowledgeTsBlock(int i, int i2) {
        long j = 0;
        synchronized (this) {
            if (this.aborted || this.closed) {
                return;
            }
            Iterator<Map.Entry<Integer, Pair<TsBlock, Long>>> it = this.sequenceIdToTsBlock.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Integer, Pair<TsBlock, Long>> next = it.next();
                if (next.getKey().intValue() >= i) {
                    if (next.getKey().intValue() >= i2) {
                        break;
                    }
                    j += ((Long) next.getValue().right).longValue();
                    this.bufferRetainedSizeInBytes -= ((Long) next.getValue().right).longValue();
                    it.remove();
                    LOGGER.debug("[ACKTsBlock] {}.", next.getKey());
                }
            }
            if (j > 0) {
                this.localMemoryManager.getQueryPool().free(this.localFragmentInstanceId.getQueryId(), this.fullFragmentInstanceId, this.localPlanNodeId, j);
            }
            if (isFinished()) {
                invokeOnFinished();
            }
        }
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink
    public TFragmentInstanceId getLocalFragmentInstanceId() {
        return this.localFragmentInstanceId;
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink
    public void setMaxBytesCanReserve(long j) {
        this.maxBytesCanReserve = Math.min(this.maxBytesCanReserve, j);
    }

    public String toString() {
        return String.format("Query[%s]-[%s-%s-SinkChannel]:", this.localFragmentInstanceId.queryId, Integer.valueOf(this.localFragmentInstanceId.fragmentId), this.localFragmentInstanceId.instanceId);
    }

    private void checkState() {
        if (this.aborted) {
            throw new IllegalStateException("SinkChannel is aborted.");
        }
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkChannel
    public synchronized void open() {
        if (this.aborted || this.closed) {
            return;
        }
        this.blocked = (ListenableFuture) this.localMemoryManager.getQueryPool().reserve(this.localFragmentInstanceId.getQueryId(), this.fullFragmentInstanceId, this.localPlanNodeId, DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, this.maxBytesCanReserve).left;
        this.bufferRetainedSizeInBytes = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkChannel
    public boolean isNoMoreTsBlocks() {
        return this.noMoreTsBlocks;
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkChannel
    public int getNumOfBufferedTsBlocks() {
        return this.sequenceIdToTsBlock.size();
    }

    public void setRetryIntervalInMs(long j) {
        this.retryIntervalInMs = j;
    }
}
