package org.apache.iotdb.db.queryengine.execution.exchange;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.Validate;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.exception.exchange.GetTsBlockFromClosedOrAbortedChannelException;
import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelIndex;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkChannel;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.LocalSinkChannel;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.SinkChannel;
import org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.source.LocalSourceHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.source.PipelineSourceHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.source.SourceHandle;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TCloseSinkChannelEvent;
import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockResponse;
import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.class */
public class MPPDataExchangeManager implements IMPPDataExchangeManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(MPPDataExchangeManager.class);
    private final LocalMemoryManager localMemoryManager;
    private final Supplier<TsBlockSerde> tsBlockSerdeFactory;
    private final ExecutorService executorService;
    private final IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mppDataExchangeServiceClientManager;
    private final Map<TFragmentInstanceId, Map<String, ISourceHandle>> sourceHandles = new ConcurrentHashMap();
    private final Map<TFragmentInstanceId, ISinkHandle> shuffleSinkHandles = new ConcurrentHashMap();
    private MPPDataExchangeServiceImpl mppDataExchangeService;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager$ISinkChannelListenerImpl.class */
    public class ISinkChannelListenerImpl implements SinkListener {
        private final TFragmentInstanceId shuffleSinkHandleId;
        private final FragmentInstanceContext context;
        private final IMPPDataExchangeManagerCallback<Throwable> onFailureCallback;
        private final AtomicInteger cnt;
        private final AtomicBoolean hasDecremented = new AtomicBoolean(false);

        public ISinkChannelListenerImpl(TFragmentInstanceId tFragmentInstanceId, FragmentInstanceContext fragmentInstanceContext, IMPPDataExchangeManagerCallback<Throwable> iMPPDataExchangeManagerCallback, AtomicInteger atomicInteger) {
            this.shuffleSinkHandleId = tFragmentInstanceId;
            this.context = fragmentInstanceContext;
            this.onFailureCallback = iMPPDataExchangeManagerCallback;
            this.cnt = atomicInteger;
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener
        public void onFinish(ISink iSink) {
            if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                MPPDataExchangeManager.LOGGER.debug("[SkHListenerOnFinish]");
            }
            decrementCnt();
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener
        public void onEndOfBlocks(ISink iSink) {
            if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                MPPDataExchangeManager.LOGGER.debug("[SkHListenerOnEndOfTsBlocks]");
            }
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener
        public Optional<Throwable> onAborted(ISink iSink) {
            if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                MPPDataExchangeManager.LOGGER.debug("[SkHListenerOnAbort]");
            }
            decrementCnt();
            return this.context.getFailureCause();
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener
        public void onFailure(ISink iSink, Throwable th) {
            MPPDataExchangeManager.LOGGER.warn("ISinkChannel failed due to", th);
            decrementCnt();
            if (this.onFailureCallback != null) {
                this.onFailureCallback.call(th);
            }
        }

        private void decrementCnt() {
            if (this.hasDecremented.compareAndSet(false, true) && this.cnt.decrementAndGet() == 0) {
                closeShuffleSinkHandle();
            }
        }

        private void closeShuffleSinkHandle() {
            ISinkHandle iSinkHandle = (ISinkHandle) MPPDataExchangeManager.this.shuffleSinkHandles.remove(this.shuffleSinkHandleId);
            if (iSinkHandle != null) {
                if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                    MPPDataExchangeManager.LOGGER.debug("Close ShuffleSinkHandle: {}", this.shuffleSinkHandleId);
                }
                iSinkHandle.close();
            }
        }
    }

    /* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager$MPPDataExchangeServiceImpl.class */
    class MPPDataExchangeServiceImpl implements MPPDataExchangeService.Iface {
        private final DataExchangeCostMetricSet DATA_EXCHANGE_COST_METRICS = DataExchangeCostMetricSet.getInstance();
        private final DataExchangeCountMetricSet DATA_EXCHANGE_COUNT_METRICS = DataExchangeCountMetricSet.getInstance();

        MPPDataExchangeServiceImpl() {
        }

        public TGetDataBlockResponse getDataBlock(TGetDataBlockRequest tGetDataBlockRequest) throws TException {
            long nanoTime = System.nanoTime();
            try {
                SetThreadName setThreadName = new SetThreadName(FragmentInstanceId.createFullId(tGetDataBlockRequest.sourceFragmentInstanceId.queryId, tGetDataBlockRequest.sourceFragmentInstanceId.fragmentId, tGetDataBlockRequest.sourceFragmentInstanceId.instanceId));
                try {
                    if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                        MPPDataExchangeManager.LOGGER.debug("[ProcessGetTsBlockRequest] sequence ID in [{}, {})", Integer.valueOf(tGetDataBlockRequest.getStartSequenceId()), Integer.valueOf(tGetDataBlockRequest.getEndSequenceId()));
                    }
                    TGetDataBlockResponse tGetDataBlockResponse = new TGetDataBlockResponse(new ArrayList());
                    ISinkHandle iSinkHandle = (ISinkHandle) MPPDataExchangeManager.this.shuffleSinkHandles.get(tGetDataBlockRequest.getSourceFragmentInstanceId());
                    if (iSinkHandle == null) {
                        setThreadName.close();
                        this.DATA_EXCHANGE_COST_METRICS.recordDataExchangeCost(DataExchangeCostMetricSet.GET_DATA_BLOCK_TASK_SERVER, System.nanoTime() - nanoTime);
                        this.DATA_EXCHANGE_COUNT_METRICS.recordDataBlockNum(DataExchangeCountMetricSet.GET_DATA_BLOCK_NUM_SERVER, tGetDataBlockRequest.getEndSequenceId() - tGetDataBlockRequest.getStartSequenceId());
                        return tGetDataBlockResponse;
                    }
                    SinkChannel sinkChannel = (SinkChannel) iSinkHandle.getChannel(tGetDataBlockRequest.getIndex());
                    for (int startSequenceId = tGetDataBlockRequest.getStartSequenceId(); startSequenceId < tGetDataBlockRequest.getEndSequenceId(); startSequenceId++) {
                        try {
                            tGetDataBlockResponse.addToTsBlocks(sinkChannel.getSerializedTsBlock(startSequenceId));
                        } catch (IOException | IllegalStateException e) {
                            throw new TException(e);
                        } catch (GetTsBlockFromClosedOrAbortedChannelException e2) {
                            TGetDataBlockResponse tGetDataBlockResponse2 = new TGetDataBlockResponse(new ArrayList());
                            setThreadName.close();
                            this.DATA_EXCHANGE_COST_METRICS.recordDataExchangeCost(DataExchangeCostMetricSet.GET_DATA_BLOCK_TASK_SERVER, System.nanoTime() - nanoTime);
                            this.DATA_EXCHANGE_COUNT_METRICS.recordDataBlockNum(DataExchangeCountMetricSet.GET_DATA_BLOCK_NUM_SERVER, tGetDataBlockRequest.getEndSequenceId() - tGetDataBlockRequest.getStartSequenceId());
                            return tGetDataBlockResponse2;
                        }
                    }
                    setThreadName.close();
                    this.DATA_EXCHANGE_COST_METRICS.recordDataExchangeCost(DataExchangeCostMetricSet.GET_DATA_BLOCK_TASK_SERVER, System.nanoTime() - nanoTime);
                    this.DATA_EXCHANGE_COUNT_METRICS.recordDataBlockNum(DataExchangeCountMetricSet.GET_DATA_BLOCK_NUM_SERVER, tGetDataBlockRequest.getEndSequenceId() - tGetDataBlockRequest.getStartSequenceId());
                    return tGetDataBlockResponse;
                } finally {
                }
            } catch (Throwable th) {
                this.DATA_EXCHANGE_COST_METRICS.recordDataExchangeCost(DataExchangeCostMetricSet.GET_DATA_BLOCK_TASK_SERVER, System.nanoTime() - nanoTime);
                this.DATA_EXCHANGE_COUNT_METRICS.recordDataBlockNum(DataExchangeCountMetricSet.GET_DATA_BLOCK_NUM_SERVER, tGetDataBlockRequest.getEndSequenceId() - tGetDataBlockRequest.getStartSequenceId());
                throw th;
            }
        }

        public void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent tAcknowledgeDataBlockEvent) {
            long nanoTime = System.nanoTime();
            try {
                try {
                    SetThreadName setThreadName = new SetThreadName(FragmentInstanceId.createFullId(tAcknowledgeDataBlockEvent.sourceFragmentInstanceId.queryId, tAcknowledgeDataBlockEvent.sourceFragmentInstanceId.fragmentId, tAcknowledgeDataBlockEvent.sourceFragmentInstanceId.instanceId));
                    try {
                        if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                            MPPDataExchangeManager.LOGGER.debug("Received AcknowledgeDataBlockEvent for TsBlocks whose sequence ID are in [{}, {}) from {}.", new Object[]{Integer.valueOf(tAcknowledgeDataBlockEvent.getStartSequenceId()), Integer.valueOf(tAcknowledgeDataBlockEvent.getEndSequenceId()), tAcknowledgeDataBlockEvent.getSourceFragmentInstanceId()});
                        }
                        ISinkHandle iSinkHandle = (ISinkHandle) MPPDataExchangeManager.this.shuffleSinkHandles.get(tAcknowledgeDataBlockEvent.getSourceFragmentInstanceId());
                        if (iSinkHandle != null) {
                            ((SinkChannel) iSinkHandle.getChannel(tAcknowledgeDataBlockEvent.getIndex())).acknowledgeTsBlock(tAcknowledgeDataBlockEvent.getStartSequenceId(), tAcknowledgeDataBlockEvent.getEndSequenceId());
                            setThreadName.close();
                            return;
                        }
                        if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                            MPPDataExchangeManager.LOGGER.debug("received ACK event but target FragmentInstance[{}] is not found.", tAcknowledgeDataBlockEvent.getSourceFragmentInstanceId());
                        }
                        setThreadName.close();
                        this.DATA_EXCHANGE_COST_METRICS.recordDataExchangeCost(DataExchangeCostMetricSet.ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_SERVER, System.nanoTime() - nanoTime);
                        this.DATA_EXCHANGE_COUNT_METRICS.recordDataBlockNum(DataExchangeCountMetricSet.ON_ACKNOWLEDGE_DATA_BLOCK_NUM_SERVER, tAcknowledgeDataBlockEvent.getEndSequenceId() - tAcknowledgeDataBlockEvent.getStartSequenceId());
                    } catch (Throwable th) {
                        try {
                            setThreadName.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    MPPDataExchangeManager.LOGGER.warn("ack TsBlock [{}, {}) failed.", new Object[]{Integer.valueOf(tAcknowledgeDataBlockEvent.getStartSequenceId()), Integer.valueOf(tAcknowledgeDataBlockEvent.getEndSequenceId()), th3});
                    throw th3;
                }
            } finally {
                this.DATA_EXCHANGE_COST_METRICS.recordDataExchangeCost(DataExchangeCostMetricSet.ON_ACKNOWLEDGE_DATA_BLOCK_EVENT_TASK_SERVER, System.nanoTime() - nanoTime);
                this.DATA_EXCHANGE_COUNT_METRICS.recordDataBlockNum(DataExchangeCountMetricSet.ON_ACKNOWLEDGE_DATA_BLOCK_NUM_SERVER, tAcknowledgeDataBlockEvent.getEndSequenceId() - tAcknowledgeDataBlockEvent.getStartSequenceId());
            }
        }

        public void onCloseSinkChannelEvent(TCloseSinkChannelEvent tCloseSinkChannelEvent) throws TException {
            try {
                SetThreadName setThreadName = new SetThreadName(FragmentInstanceId.createFullId(tCloseSinkChannelEvent.sourceFragmentInstanceId.queryId, tCloseSinkChannelEvent.sourceFragmentInstanceId.fragmentId, tCloseSinkChannelEvent.sourceFragmentInstanceId.instanceId));
                try {
                    if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                        MPPDataExchangeManager.LOGGER.debug("Closed source handle of ShuffleSinkHandle {}, channel index: {}.", tCloseSinkChannelEvent.getSourceFragmentInstanceId(), Integer.valueOf(tCloseSinkChannelEvent.getIndex()));
                    }
                    ISinkHandle iSinkHandle = (ISinkHandle) MPPDataExchangeManager.this.shuffleSinkHandles.get(tCloseSinkChannelEvent.getSourceFragmentInstanceId());
                    if (iSinkHandle != null) {
                        iSinkHandle.getChannel(tCloseSinkChannelEvent.getIndex()).close();
                        setThreadName.close();
                    } else {
                        if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                            MPPDataExchangeManager.LOGGER.debug("received CloseSinkChannelEvent but target FragmentInstance[{}] is not found.", tCloseSinkChannelEvent.getSourceFragmentInstanceId());
                        }
                        setThreadName.close();
                    }
                } finally {
                }
            } catch (Throwable th) {
                MPPDataExchangeManager.LOGGER.warn("Close channel of ShuffleSinkHandle {}, index {} failed.", new Object[]{tCloseSinkChannelEvent.getSourceFragmentInstanceId(), Integer.valueOf(tCloseSinkChannelEvent.getIndex()), th});
                throw th;
            }
        }

        public void onNewDataBlockEvent(TNewDataBlockEvent tNewDataBlockEvent) throws TException {
            long nanoTime = System.nanoTime();
            try {
                SetThreadName setThreadName = new SetThreadName(MPPDataExchangeManager.createFullIdFrom(tNewDataBlockEvent.targetFragmentInstanceId, tNewDataBlockEvent.targetPlanNodeId));
                try {
                    if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                        MPPDataExchangeManager.LOGGER.debug("New data block event received, for plan node {} of {} from {}.", new Object[]{tNewDataBlockEvent.getTargetPlanNodeId(), tNewDataBlockEvent.getTargetFragmentInstanceId(), tNewDataBlockEvent.getSourceFragmentInstanceId()});
                    }
                    Map map = (Map) MPPDataExchangeManager.this.sourceHandles.get(tNewDataBlockEvent.getTargetFragmentInstanceId());
                    SourceHandle sourceHandle = map == null ? null : (SourceHandle) map.get(tNewDataBlockEvent.getTargetPlanNodeId());
                    if (sourceHandle != null && !sourceHandle.isAborted() && !sourceHandle.isFinished()) {
                        sourceHandle.updatePendingDataBlockInfo(tNewDataBlockEvent.getStartSequenceId(), tNewDataBlockEvent.getBlockSizes());
                        setThreadName.close();
                        return;
                    }
                    if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                        MPPDataExchangeManager.LOGGER.debug("received NewDataBlockEvent but the downstream FragmentInstance[{}] is not found", tNewDataBlockEvent.getTargetFragmentInstanceId());
                    }
                    setThreadName.close();
                    this.DATA_EXCHANGE_COST_METRICS.recordDataExchangeCost(DataExchangeCostMetricSet.SEND_NEW_DATA_BLOCK_EVENT_TASK_SERVER, System.nanoTime() - nanoTime);
                    this.DATA_EXCHANGE_COUNT_METRICS.recordDataBlockNum(DataExchangeCountMetricSet.SEND_NEW_DATA_BLOCK_NUM_SERVER, tNewDataBlockEvent.getBlockSizes().size());
                } finally {
                }
            } finally {
                this.DATA_EXCHANGE_COST_METRICS.recordDataExchangeCost(DataExchangeCostMetricSet.SEND_NEW_DATA_BLOCK_EVENT_TASK_SERVER, System.nanoTime() - nanoTime);
                this.DATA_EXCHANGE_COUNT_METRICS.recordDataBlockNum(DataExchangeCountMetricSet.SEND_NEW_DATA_BLOCK_NUM_SERVER, tNewDataBlockEvent.getBlockSizes().size());
            }
        }

        public void onEndOfDataBlockEvent(TEndOfDataBlockEvent tEndOfDataBlockEvent) throws TException {
            SetThreadName setThreadName = new SetThreadName(MPPDataExchangeManager.createFullIdFrom(tEndOfDataBlockEvent.targetFragmentInstanceId, tEndOfDataBlockEvent.targetPlanNodeId));
            try {
                if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                    MPPDataExchangeManager.LOGGER.debug("End of data block event received, for plan node {} of {} from {}.", new Object[]{tEndOfDataBlockEvent.getTargetPlanNodeId(), tEndOfDataBlockEvent.getTargetFragmentInstanceId(), tEndOfDataBlockEvent.getSourceFragmentInstanceId()});
                }
                Map map = (Map) MPPDataExchangeManager.this.sourceHandles.get(tEndOfDataBlockEvent.getTargetFragmentInstanceId());
                SourceHandle sourceHandle = map == null ? null : (SourceHandle) map.get(tEndOfDataBlockEvent.getTargetPlanNodeId());
                if (sourceHandle != null && !sourceHandle.isAborted() && !sourceHandle.isFinished()) {
                    sourceHandle.setNoMoreTsBlocks(tEndOfDataBlockEvent.getLastSequenceId());
                    setThreadName.close();
                } else {
                    if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                        MPPDataExchangeManager.LOGGER.debug("received onEndOfDataBlockEvent but the downstream FragmentInstance[{}] is not found", tEndOfDataBlockEvent.getTargetFragmentInstanceId());
                    }
                    setThreadName.close();
                }
            } catch (Throwable th) {
                try {
                    setThreadName.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
    }

    /* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager$PipelineSinkListenerImpl.class */
    static class PipelineSinkListenerImpl implements SinkListener {
        private final FragmentInstanceContext context;
        private final IMPPDataExchangeManagerCallback<Throwable> onFailureCallback;

        public PipelineSinkListenerImpl(FragmentInstanceContext fragmentInstanceContext, IMPPDataExchangeManagerCallback<Throwable> iMPPDataExchangeManagerCallback) {
            this.context = fragmentInstanceContext;
            this.onFailureCallback = iMPPDataExchangeManagerCallback;
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener
        public void onFinish(ISink iSink) {
            if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                MPPDataExchangeManager.LOGGER.debug("[SkHListenerOnFinish]");
            }
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener
        public void onEndOfBlocks(ISink iSink) {
            if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                MPPDataExchangeManager.LOGGER.debug("[SkHListenerOnEndOfTsBlocks]");
            }
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener
        public Optional<Throwable> onAborted(ISink iSink) {
            if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                MPPDataExchangeManager.LOGGER.debug("[SkHListenerOnAbort]");
            }
            return this.context.getFailureCause();
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener
        public void onFailure(ISink iSink, Throwable th) {
            MPPDataExchangeManager.LOGGER.warn("Sink handle failed due to", th);
            if (this.onFailureCallback != null) {
                this.onFailureCallback.call(th);
            }
        }
    }

    /* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager$PipelineSourceHandleListenerImpl.class */
    static class PipelineSourceHandleListenerImpl implements SourceHandleListener {
        private final IMPPDataExchangeManagerCallback<Throwable> onFailureCallback;

        public PipelineSourceHandleListenerImpl(IMPPDataExchangeManagerCallback<Throwable> iMPPDataExchangeManagerCallback) {
            this.onFailureCallback = iMPPDataExchangeManagerCallback;
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SourceHandleListener
        public void onFinished(ISourceHandle iSourceHandle) {
            if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                MPPDataExchangeManager.LOGGER.debug("[ScHListenerOnFinish]");
            }
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SourceHandleListener
        public void onAborted(ISourceHandle iSourceHandle) {
            if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                MPPDataExchangeManager.LOGGER.debug("[ScHListenerOnAbort]");
            }
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SourceHandleListener
        public void onFailure(ISourceHandle iSourceHandle, Throwable th) {
            MPPDataExchangeManager.LOGGER.warn("Source handle failed due to: ", th);
            if (this.onFailureCallback != null) {
                this.onFailureCallback.call(th);
            }
        }
    }

    /* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager$ShuffleSinkListenerImpl.class */
    class ShuffleSinkListenerImpl implements SinkListener {
        private final FragmentInstanceContext context;
        private final IMPPDataExchangeManagerCallback<Throwable> onFailureCallback;

        public ShuffleSinkListenerImpl(FragmentInstanceContext fragmentInstanceContext, IMPPDataExchangeManagerCallback<Throwable> iMPPDataExchangeManagerCallback) {
            this.context = fragmentInstanceContext;
            this.onFailureCallback = iMPPDataExchangeManagerCallback;
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener
        public void onFinish(ISink iSink) {
            if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                MPPDataExchangeManager.LOGGER.debug("[ShuffleSinkHandleListenerOnFinish]");
            }
            MPPDataExchangeManager.this.shuffleSinkHandles.remove(iSink.getLocalFragmentInstanceId());
            this.context.finished();
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener
        public void onEndOfBlocks(ISink iSink) {
            if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                MPPDataExchangeManager.LOGGER.debug("[ShuffleSinkHandleListenerOnEndOfTsBlocks]");
            }
            this.context.transitionToFlushing();
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener
        public Optional<Throwable> onAborted(ISink iSink) {
            if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                MPPDataExchangeManager.LOGGER.debug("[ShuffleSinkHandleListenerOnAbort]");
            }
            MPPDataExchangeManager.this.shuffleSinkHandles.remove(iSink.getLocalFragmentInstanceId());
            return this.context.getFailureCause();
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SinkListener
        public void onFailure(ISink iSink, Throwable th) {
            MPPDataExchangeManager.LOGGER.warn("Sink failed due to", th);
            if (this.onFailureCallback != null) {
                this.onFailureCallback.call(th);
            }
        }
    }

    /* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager$SinkListener.class */
    public interface SinkListener {
        void onFinish(ISink iSink);

        void onEndOfBlocks(ISink iSink);

        Optional<Throwable> onAborted(ISink iSink);

        void onFailure(ISink iSink, Throwable th);
    }

    /* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager$SourceHandleListener.class */
    public interface SourceHandleListener {
        void onFinished(ISourceHandle iSourceHandle);

        void onAborted(ISourceHandle iSourceHandle);

        void onFailure(ISourceHandle iSourceHandle, Throwable th);
    }

    /* loaded from: input_file:org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager$SourceHandleListenerImpl.class */
    class SourceHandleListenerImpl implements SourceHandleListener {
        private final IMPPDataExchangeManagerCallback<Throwable> onFailureCallback;

        public SourceHandleListenerImpl(IMPPDataExchangeManagerCallback<Throwable> iMPPDataExchangeManagerCallback) {
            this.onFailureCallback = iMPPDataExchangeManagerCallback;
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SourceHandleListener
        public void onFinished(ISourceHandle iSourceHandle) {
            if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                MPPDataExchangeManager.LOGGER.debug("[ScHListenerOnFinish]");
            }
            Map map = (Map) MPPDataExchangeManager.this.sourceHandles.get(iSourceHandle.getLocalFragmentInstanceId());
            if ((map == null || map.remove(iSourceHandle.getLocalPlanNodeId()) == null) && MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                MPPDataExchangeManager.LOGGER.debug("[ScHListenerAlreadyReleased]");
            }
            if (map == null || !map.isEmpty()) {
                return;
            }
            MPPDataExchangeManager.this.sourceHandles.remove(iSourceHandle.getLocalFragmentInstanceId());
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SourceHandleListener
        public void onAborted(ISourceHandle iSourceHandle) {
            if (MPPDataExchangeManager.LOGGER.isDebugEnabled()) {
                MPPDataExchangeManager.LOGGER.debug("[ScHListenerOnAbort]");
            }
            onFinished(iSourceHandle);
        }

        @Override // org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.SourceHandleListener
        public void onFailure(ISourceHandle iSourceHandle, Throwable th) {
            MPPDataExchangeManager.LOGGER.warn("Source handle failed due to: ", th);
            if (this.onFailureCallback != null) {
                this.onFailureCallback.call(th);
            }
        }
    }

    public MPPDataExchangeManager(LocalMemoryManager localMemoryManager, Supplier<TsBlockSerde> supplier, ExecutorService executorService, IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> iClientManager) {
        this.localMemoryManager = (LocalMemoryManager) Validate.notNull(localMemoryManager);
        this.tsBlockSerdeFactory = (Supplier) Validate.notNull(supplier);
        this.executorService = (ExecutorService) Validate.notNull(executorService);
        this.mppDataExchangeServiceClientManager = (IClientManager) Validate.notNull(iClientManager);
    }

    public MPPDataExchangeServiceImpl getOrCreateMPPDataExchangeServiceImpl() {
        if (this.mppDataExchangeService == null) {
            this.mppDataExchangeService = new MPPDataExchangeServiceImpl();
        }
        return this.mppDataExchangeService;
    }

    public void deRegisterFragmentInstanceFromMemoryPool(String str, String str2) {
        this.localMemoryManager.getQueryPool().deRegisterFragmentInstanceFromQueryMemoryMap(str, str2);
    }

    public LocalMemoryManager getLocalMemoryManager() {
        return this.localMemoryManager;
    }

    public int getShuffleSinkHandleSize() {
        return this.shuffleSinkHandles.size();
    }

    public int getSourceHandleSize() {
        return this.sourceHandles.values().stream().mapToInt((v0) -> {
            return v0.size();
        }).sum();
    }

    private synchronized ISinkChannel createLocalSinkChannel(TFragmentInstanceId tFragmentInstanceId, TFragmentInstanceId tFragmentInstanceId2, String str, String str2, FragmentInstanceContext fragmentInstanceContext, AtomicInteger atomicInteger) {
        SharedTsBlockQueue sharedTsBlockQueue;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Create local sink handle to plan node {} of {} for {}", new Object[]{str, tFragmentInstanceId2, tFragmentInstanceId});
        }
        Map<String, ISourceHandle> map = this.sourceHandles.get(tFragmentInstanceId2);
        LocalSourceHandle localSourceHandle = map == null ? null : (LocalSourceHandle) map.get(str);
        if (localSourceHandle != null) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Get SharedTsBlockQueue from local source handle");
            }
            sharedTsBlockQueue = localSourceHandle.getSharedTsBlockQueue();
        } else {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Create SharedTsBlockQueue");
            }
            sharedTsBlockQueue = new SharedTsBlockQueue(tFragmentInstanceId, str2, this.localMemoryManager, this.executorService);
        }
        Objects.requireNonNull(fragmentInstanceContext);
        return new LocalSinkChannel(tFragmentInstanceId, sharedTsBlockQueue, new ISinkChannelListenerImpl(tFragmentInstanceId, fragmentInstanceContext, fragmentInstanceContext::failed, atomicInteger));
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.IMPPDataExchangeManager
    public ISinkChannel createLocalSinkChannelForPipeline(DriverContext driverContext, String str) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Create local sink handle for {}", driverContext.getDriverTaskID());
        }
        SharedTsBlockQueue sharedTsBlockQueue = new SharedTsBlockQueue(driverContext.getDriverTaskID().getFragmentInstanceId().toThrift(), str, this.localMemoryManager, this.executorService);
        sharedTsBlockQueue.allowAddingTsBlock();
        FragmentInstanceContext fragmentInstanceContext = driverContext.getFragmentInstanceContext();
        Objects.requireNonNull(driverContext);
        return new LocalSinkChannel(sharedTsBlockQueue, new PipelineSinkListenerImpl(fragmentInstanceContext, driverContext::failed));
    }

    private ISinkChannel createSinkChannel(TFragmentInstanceId tFragmentInstanceId, TEndPoint tEndPoint, TFragmentInstanceId tFragmentInstanceId2, String str, String str2, FragmentInstanceContext fragmentInstanceContext, AtomicInteger atomicInteger) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Create sink handle to plan node {} of {} for {}", new Object[]{str, tFragmentInstanceId2, tFragmentInstanceId});
        }
        LocalMemoryManager localMemoryManager = this.localMemoryManager;
        ExecutorService executorService = this.executorService;
        TsBlockSerde tsBlockSerde = this.tsBlockSerdeFactory.get();
        Objects.requireNonNull(fragmentInstanceContext);
        return new SinkChannel(tEndPoint, tFragmentInstanceId2, str, str2, tFragmentInstanceId, localMemoryManager, executorService, tsBlockSerde, new ISinkChannelListenerImpl(tFragmentInstanceId, fragmentInstanceContext, fragmentInstanceContext::failed, atomicInteger), this.mppDataExchangeServiceClientManager);
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.IMPPDataExchangeManager
    public ISinkHandle createShuffleSinkHandle(List<DownStreamChannelLocation> list, DownStreamChannelIndex downStreamChannelIndex, ShuffleSinkHandle.ShuffleStrategyEnum shuffleStrategyEnum, TFragmentInstanceId tFragmentInstanceId, String str, FragmentInstanceContext fragmentInstanceContext) {
        if (this.shuffleSinkHandles.containsKey(tFragmentInstanceId)) {
            throw new IllegalStateException("ShuffleSinkHandle for " + tFragmentInstanceId + " is in the map.");
        }
        AtomicInteger atomicInteger = new AtomicInteger(list.size());
        List list2 = (List) list.stream().map(downStreamChannelLocation -> {
            return createChannelForShuffleSink(tFragmentInstanceId, str, downStreamChannelLocation, fragmentInstanceContext, atomicInteger);
        }).collect(Collectors.toList());
        Objects.requireNonNull(fragmentInstanceContext);
        ShuffleSinkHandle shuffleSinkHandle = new ShuffleSinkHandle(tFragmentInstanceId, list2, downStreamChannelIndex, shuffleStrategyEnum, new ShuffleSinkListenerImpl(fragmentInstanceContext, fragmentInstanceContext::failed));
        this.shuffleSinkHandles.put(tFragmentInstanceId, shuffleSinkHandle);
        return shuffleSinkHandle;
    }

    private ISinkChannel createChannelForShuffleSink(TFragmentInstanceId tFragmentInstanceId, String str, DownStreamChannelLocation downStreamChannelLocation, FragmentInstanceContext fragmentInstanceContext, AtomicInteger atomicInteger) {
        return DataNodeEndPoints.isSameNode(downStreamChannelLocation.getRemoteEndpoint()) ? createLocalSinkChannel(tFragmentInstanceId, downStreamChannelLocation.getRemoteFragmentInstanceId(), downStreamChannelLocation.getRemotePlanNodeId(), str, fragmentInstanceContext, atomicInteger) : createSinkChannel(tFragmentInstanceId, downStreamChannelLocation.getRemoteEndpoint(), downStreamChannelLocation.getRemoteFragmentInstanceId(), downStreamChannelLocation.getRemotePlanNodeId(), str, fragmentInstanceContext, atomicInteger);
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.IMPPDataExchangeManager
    public ISourceHandle createLocalSourceHandleForPipeline(SharedTsBlockQueue sharedTsBlockQueue, DriverContext driverContext) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Create local source handle for {}", driverContext.getDriverTaskID());
        }
        Objects.requireNonNull(driverContext);
        return new PipelineSourceHandle(sharedTsBlockQueue, new PipelineSourceHandleListenerImpl(driverContext::failed), driverContext.getDriverTaskID().toString());
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.IMPPDataExchangeManager
    public synchronized ISourceHandle createLocalSourceHandleForFragment(TFragmentInstanceId tFragmentInstanceId, String str, String str2, TFragmentInstanceId tFragmentInstanceId2, int i, IMPPDataExchangeManagerCallback<Throwable> iMPPDataExchangeManagerCallback) {
        SharedTsBlockQueue sharedTsBlockQueue;
        if (this.sourceHandles.containsKey(tFragmentInstanceId) && this.sourceHandles.get(tFragmentInstanceId).containsKey(str)) {
            throw new IllegalStateException("Source handle for plan node " + str + " of " + tFragmentInstanceId + " exists.");
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Create local source handle from {} for plan node {} of {}", new Object[]{tFragmentInstanceId2, str, tFragmentInstanceId});
        }
        ISinkHandle iSinkHandle = this.shuffleSinkHandles.get(tFragmentInstanceId2);
        if (iSinkHandle != null) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Get SharedTsBlockQueue from local sink handle");
            }
            sharedTsBlockQueue = ((LocalSinkChannel) iSinkHandle.getChannel(i)).getSharedTsBlockQueue();
        } else {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Create SharedTsBlockQueue");
            }
            sharedTsBlockQueue = new SharedTsBlockQueue(tFragmentInstanceId2, str2, this.localMemoryManager, this.executorService);
        }
        LocalSourceHandle localSourceHandle = new LocalSourceHandle(tFragmentInstanceId, str, sharedTsBlockQueue, new SourceHandleListenerImpl(iMPPDataExchangeManagerCallback));
        this.sourceHandles.computeIfAbsent(tFragmentInstanceId, tFragmentInstanceId3 -> {
            return new ConcurrentHashMap();
        }).put(str, localSourceHandle);
        return localSourceHandle;
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.IMPPDataExchangeManager
    public ISourceHandle createSourceHandle(TFragmentInstanceId tFragmentInstanceId, String str, int i, TEndPoint tEndPoint, TFragmentInstanceId tFragmentInstanceId2, IMPPDataExchangeManagerCallback<Throwable> iMPPDataExchangeManagerCallback) {
        Map<String, ISourceHandle> map = this.sourceHandles.get(tFragmentInstanceId);
        if (map != null && map.containsKey(str)) {
            throw new IllegalStateException("Source handle for plan node " + str + " of " + tFragmentInstanceId + " exists.");
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Create source handle from {} for plan node {} of {}", new Object[]{tFragmentInstanceId2, str, tFragmentInstanceId});
        }
        SourceHandle sourceHandle = new SourceHandle(tEndPoint, tFragmentInstanceId2, tFragmentInstanceId, str, i, this.localMemoryManager, this.executorService, this.tsBlockSerdeFactory.get(), new SourceHandleListenerImpl(iMPPDataExchangeManagerCallback), this.mppDataExchangeServiceClientManager);
        this.sourceHandles.computeIfAbsent(tFragmentInstanceId, tFragmentInstanceId3 -> {
            return new ConcurrentHashMap();
        }).put(str, sourceHandle);
        return sourceHandle;
    }

    @Override // org.apache.iotdb.db.queryengine.execution.exchange.IMPPDataExchangeManager
    public void forceDeregisterFragmentInstance(TFragmentInstanceId tFragmentInstanceId) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("[StartForceReleaseFIDataExchangeResource]");
        }
        ISinkHandle iSinkHandle = this.shuffleSinkHandles.get(tFragmentInstanceId);
        if (iSinkHandle != null) {
            iSinkHandle.abort();
            this.shuffleSinkHandles.remove(tFragmentInstanceId);
        }
        Map<String, ISourceHandle> map = this.sourceHandles.get(tFragmentInstanceId);
        if (map != null) {
            for (Map.Entry<String, ISourceHandle> entry : map.entrySet()) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("[CloseSourceHandle] {}", entry.getKey());
                }
                entry.getValue().abort();
            }
            this.sourceHandles.remove(tFragmentInstanceId);
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("[EndForceReleaseFIDataExchangeResource]");
        }
    }

    public static String createFullIdFrom(TFragmentInstanceId tFragmentInstanceId, String str) {
        return FragmentInstanceId.createFullId(tFragmentInstanceId.queryId, tFragmentInstanceId.fragmentId, tFragmentInstanceId.instanceId) + "." + str;
    }
}
