/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.mpp.execution.exchange;

import io.airlift.concurrent.SetThreadName;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.apache.commons.lang3.Validate;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManager;
import org.apache.iotdb.db.mpp.execution.exchange.IMPPDataExchangeManagerCallback;
import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.exchange.ISourceHandle;
import org.apache.iotdb.db.mpp.execution.exchange.LocalSinkHandle;
import org.apache.iotdb.db.mpp.execution.exchange.LocalSourceHandle;
import org.apache.iotdb.db.mpp.execution.exchange.SharedTsBlockQueue;
import org.apache.iotdb.db.mpp.execution.exchange.SinkHandle;
import org.apache.iotdb.db.mpp.execution.exchange.SourceHandle;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
import org.apache.iotdb.mpp.rpc.thrift.TAcknowledgeDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockRequest;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataBlockResponse;
import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MPPDataExchangeManager
implements IMPPDataExchangeManager {
    private static final Logger logger = LoggerFactory.getLogger(MPPDataExchangeManager.class);
    private final LocalMemoryManager localMemoryManager;
    private final Supplier<TsBlockSerde> tsBlockSerdeFactory;
    private final ExecutorService executorService;
    private final IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mppDataExchangeServiceClientManager;
    private final Map<TFragmentInstanceId, Map<String, ISourceHandle>> sourceHandles;
    private final Map<TFragmentInstanceId, ISinkHandle> sinkHandles;
    private MPPDataExchangeServiceImpl mppDataExchangeService;

    public MPPDataExchangeManager(LocalMemoryManager localMemoryManager, Supplier<TsBlockSerde> tsBlockSerdeFactory, ExecutorService executorService, IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mppDataExchangeServiceClientManager) {
        this.localMemoryManager = (LocalMemoryManager)Validate.notNull((Object)localMemoryManager);
        this.tsBlockSerdeFactory = (Supplier)Validate.notNull(tsBlockSerdeFactory);
        this.executorService = (ExecutorService)Validate.notNull((Object)executorService);
        this.mppDataExchangeServiceClientManager = (IClientManager)Validate.notNull(mppDataExchangeServiceClientManager);
        this.sourceHandles = new ConcurrentHashMap<TFragmentInstanceId, Map<String, ISourceHandle>>();
        this.sinkHandles = new ConcurrentHashMap<TFragmentInstanceId, ISinkHandle>();
    }

    public MPPDataExchangeServiceImpl getOrCreateMPPDataExchangeServiceImpl() {
        if (this.mppDataExchangeService == null) {
            this.mppDataExchangeService = new MPPDataExchangeServiceImpl();
        }
        return this.mppDataExchangeService;
    }

    @Override
    public synchronized ISinkHandle createLocalSinkHandle(TFragmentInstanceId localFragmentInstanceId, TFragmentInstanceId remoteFragmentInstanceId, String remotePlanNodeId, FragmentInstanceContext instanceContext) {
        SharedTsBlockQueue queue;
        if (this.sinkHandles.containsKey(localFragmentInstanceId)) {
            throw new IllegalStateException("Local sink handle for " + localFragmentInstanceId + " exists.");
        }
        logger.debug("Create local sink handle to plan node {} of {} for {}", new Object[]{remotePlanNodeId, remoteFragmentInstanceId, localFragmentInstanceId});
        if (this.sourceHandles.containsKey(remoteFragmentInstanceId) && this.sourceHandles.get(remoteFragmentInstanceId).containsKey(remotePlanNodeId)) {
            logger.debug("Get shared tsblock queue from local source handle");
            queue = ((LocalSourceHandle)this.sourceHandles.get(remoteFragmentInstanceId).get(remotePlanNodeId)).getSharedTsBlockQueue();
        } else {
            logger.debug("Create shared tsblock queue");
            queue = new SharedTsBlockQueue(remoteFragmentInstanceId, this.localMemoryManager);
        }
        LocalSinkHandle localSinkHandle = new LocalSinkHandle(remoteFragmentInstanceId, remotePlanNodeId, localFragmentInstanceId, queue, new SinkHandleListenerImpl(instanceContext, instanceContext::failed));
        this.sinkHandles.put(localFragmentInstanceId, localSinkHandle);
        return localSinkHandle;
    }

    @Override
    public ISinkHandle createSinkHandle(TFragmentInstanceId localFragmentInstanceId, TEndPoint remoteEndpoint, TFragmentInstanceId remoteFragmentInstanceId, String remotePlanNodeId, FragmentInstanceContext instanceContext) {
        if (this.sinkHandles.containsKey(localFragmentInstanceId)) {
            throw new IllegalStateException("Sink handle for " + localFragmentInstanceId + " exists.");
        }
        logger.debug("Create sink handle to plan node {} of {} for {}", new Object[]{remotePlanNodeId, remoteFragmentInstanceId, localFragmentInstanceId});
        SinkHandle sinkHandle = new SinkHandle(remoteEndpoint, remoteFragmentInstanceId, remotePlanNodeId, localFragmentInstanceId, this.localMemoryManager, this.executorService, this.tsBlockSerdeFactory.get(), new SinkHandleListenerImpl(instanceContext, instanceContext::failed), this.mppDataExchangeServiceClientManager);
        this.sinkHandles.put(localFragmentInstanceId, sinkHandle);
        return sinkHandle;
    }

    @Override
    public synchronized ISourceHandle createLocalSourceHandle(TFragmentInstanceId localFragmentInstanceId, String localPlanNodeId, TFragmentInstanceId remoteFragmentInstanceId, IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
        SharedTsBlockQueue queue;
        if (this.sourceHandles.containsKey(localFragmentInstanceId) && this.sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
            throw new IllegalStateException("Source handle for plan node " + localPlanNodeId + " of " + localFragmentInstanceId + " exists.");
        }
        logger.debug("Create local source handle from {} for plan node {} of {}", new Object[]{remoteFragmentInstanceId, localPlanNodeId, localFragmentInstanceId});
        if (this.sinkHandles.containsKey(remoteFragmentInstanceId)) {
            logger.debug("Get shared tsblock queue from local sink handle");
            queue = ((LocalSinkHandle)this.sinkHandles.get(remoteFragmentInstanceId)).getSharedTsBlockQueue();
        } else {
            logger.debug("Create shared tsblock queue");
            queue = new SharedTsBlockQueue(localFragmentInstanceId, this.localMemoryManager);
        }
        LocalSourceHandle localSourceHandle = new LocalSourceHandle(remoteFragmentInstanceId, localFragmentInstanceId, localPlanNodeId, queue, new SourceHandleListenerImpl(onFailureCallback));
        this.sourceHandles.computeIfAbsent(localFragmentInstanceId, key -> new ConcurrentHashMap()).put(localPlanNodeId, localSourceHandle);
        return localSourceHandle;
    }

    @Override
    public ISourceHandle createSourceHandle(TFragmentInstanceId localFragmentInstanceId, String localPlanNodeId, TEndPoint remoteEndpoint, TFragmentInstanceId remoteFragmentInstanceId, IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
        if (this.sourceHandles.containsKey(localFragmentInstanceId) && this.sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
            throw new IllegalStateException("Source handle for plan node " + localPlanNodeId + " of " + localFragmentInstanceId + " exists.");
        }
        logger.debug("Create source handle from {} for plan node {} of {}", new Object[]{remoteFragmentInstanceId, localPlanNodeId, localFragmentInstanceId});
        SourceHandle sourceHandle = new SourceHandle(remoteEndpoint, remoteFragmentInstanceId, localFragmentInstanceId, localPlanNodeId, this.localMemoryManager, this.executorService, this.tsBlockSerdeFactory.get(), new SourceHandleListenerImpl(onFailureCallback), this.mppDataExchangeServiceClientManager);
        this.sourceHandles.computeIfAbsent(localFragmentInstanceId, key -> new ConcurrentHashMap()).put(localPlanNodeId, sourceHandle);
        return sourceHandle;
    }

    @Override
    public void forceDeregisterFragmentInstance(TFragmentInstanceId fragmentInstanceId) {
        logger.info("Force deregister fragment instance");
        if (this.sinkHandles.containsKey(fragmentInstanceId)) {
            ISinkHandle sinkHandle = this.sinkHandles.get(fragmentInstanceId);
            sinkHandle.abort();
            this.sinkHandles.remove(fragmentInstanceId);
        }
        if (this.sourceHandles.containsKey(fragmentInstanceId)) {
            Map<String, ISourceHandle> planNodeIdToSourceHandle = this.sourceHandles.get(fragmentInstanceId);
            for (Map.Entry<String, ISourceHandle> entry : planNodeIdToSourceHandle.entrySet()) {
                logger.info("Close source handle {}", this.sourceHandles);
                entry.getValue().abort();
            }
            this.sourceHandles.remove(fragmentInstanceId);
        }
    }

    public static String createFullIdFrom(TFragmentInstanceId fragmentInstanceId, String suffix) {
        return FragmentInstanceId.createFullId(fragmentInstanceId.queryId, fragmentInstanceId.fragmentId, fragmentInstanceId.instanceId) + "." + suffix;
    }

    class SinkHandleListenerImpl
    implements SinkHandleListener {
        private final FragmentInstanceContext context;
        private final IMPPDataExchangeManagerCallback<Throwable> onFailureCallback;

        public SinkHandleListenerImpl(FragmentInstanceContext context, IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
            this.context = context;
            this.onFailureCallback = onFailureCallback;
        }

        @Override
        public void onFinish(ISinkHandle sinkHandle) {
            logger.info("onFinish is invoked");
            this.removeFromMPPDataExchangeManager(sinkHandle);
            this.context.finished();
        }

        @Override
        public void onEndOfBlocks(ISinkHandle sinkHandle) {
            this.context.transitionToFlushing();
        }

        @Override
        public void onAborted(ISinkHandle sinkHandle) {
            logger.info("onAborted is invoked");
            this.removeFromMPPDataExchangeManager(sinkHandle);
        }

        private void removeFromMPPDataExchangeManager(ISinkHandle sinkHandle) {
            logger.info("release resources of finished sink handle");
            if (!MPPDataExchangeManager.this.sinkHandles.containsKey(sinkHandle.getLocalFragmentInstanceId())) {
                logger.info("resources already been released");
            }
            MPPDataExchangeManager.this.sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
        }

        @Override
        public void onFailure(ISinkHandle sinkHandle, Throwable t) {
            logger.error("Sink handle failed due to", t);
            if (this.onFailureCallback != null) {
                this.onFailureCallback.call(t);
            }
        }
    }

    class SourceHandleListenerImpl
    implements SourceHandleListener {
        private final IMPPDataExchangeManagerCallback<Throwable> onFailureCallback;

        public SourceHandleListenerImpl(IMPPDataExchangeManagerCallback<Throwable> onFailureCallback) {
            this.onFailureCallback = onFailureCallback;
        }

        @Override
        public void onFinished(ISourceHandle sourceHandle) {
            logger.info("finished and release resources");
            if (!MPPDataExchangeManager.this.sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId()) || !((Map)MPPDataExchangeManager.this.sourceHandles.get(sourceHandle.getLocalFragmentInstanceId())).containsKey(sourceHandle.getLocalPlanNodeId())) {
                logger.info("resources has already been released");
            } else {
                ((Map)MPPDataExchangeManager.this.sourceHandles.get(sourceHandle.getLocalFragmentInstanceId())).remove(sourceHandle.getLocalPlanNodeId());
            }
            if (MPPDataExchangeManager.this.sourceHandles.containsKey(sourceHandle.getLocalFragmentInstanceId()) && ((Map)MPPDataExchangeManager.this.sourceHandles.get(sourceHandle.getLocalFragmentInstanceId())).isEmpty()) {
                MPPDataExchangeManager.this.sourceHandles.remove(sourceHandle.getLocalFragmentInstanceId());
            }
        }

        @Override
        public void onAborted(ISourceHandle sourceHandle) {
            logger.info("onAborted is invoked");
            this.onFinished(sourceHandle);
        }

        @Override
        public void onFailure(ISourceHandle sourceHandle, Throwable t) {
            logger.error("Source handle failed due to: ", t);
            if (this.onFailureCallback != null) {
                this.onFailureCallback.call(t);
            }
        }
    }

    class MPPDataExchangeServiceImpl
    implements MPPDataExchangeService.Iface {
        MPPDataExchangeServiceImpl() {
        }

        public TGetDataBlockResponse getDataBlock(TGetDataBlockRequest req) throws TException {
            try (SetThreadName fragmentInstanceName = new SetThreadName(MPPDataExchangeManager.createFullIdFrom(req.sourceFragmentInstanceId, "SinkHandle"), new Object[0]);){
                logger.debug("Get data block request received, for data blocks whose sequence ID in [{}, {}) from {}.", new Object[]{req.getStartSequenceId(), req.getEndSequenceId(), req.getSourceFragmentInstanceId()});
                if (!MPPDataExchangeManager.this.sinkHandles.containsKey(req.getSourceFragmentInstanceId())) {
                    throw new TException("Source fragment instance not found. Fragment instance ID: " + req.getSourceFragmentInstanceId() + ".");
                }
                TGetDataBlockResponse resp = new TGetDataBlockResponse();
                SinkHandle sinkHandle = (SinkHandle)MPPDataExchangeManager.this.sinkHandles.get(req.getSourceFragmentInstanceId());
                for (int i = req.getStartSequenceId(); i < req.getEndSequenceId(); ++i) {
                    try {
                        ByteBuffer serializedTsBlock = sinkHandle.getSerializedTsBlock(i);
                        resp.addToTsBlocks(serializedTsBlock);
                        continue;
                    }
                    catch (IOException e) {
                        throw new TException((Throwable)e);
                    }
                }
                TGetDataBlockResponse tGetDataBlockResponse = resp;
                return tGetDataBlockResponse;
            }
        }

        public void onAcknowledgeDataBlockEvent(TAcknowledgeDataBlockEvent e) throws TException {
            try (SetThreadName fragmentInstanceName = new SetThreadName(MPPDataExchangeManager.createFullIdFrom(e.sourceFragmentInstanceId, "SinkHandle"), new Object[0]);){
                logger.debug("Acknowledge data block event received, for data blocks whose sequence ID in [{}, {}) from {}.", new Object[]{e.getStartSequenceId(), e.getEndSequenceId(), e.getSourceFragmentInstanceId()});
                if (!MPPDataExchangeManager.this.sinkHandles.containsKey(e.getSourceFragmentInstanceId())) {
                    logger.warn("received ACK event but target FragmentInstance[{}] is not found.", (Object)e.getSourceFragmentInstanceId());
                    return;
                }
                ((SinkHandle)MPPDataExchangeManager.this.sinkHandles.get(e.getSourceFragmentInstanceId())).acknowledgeTsBlock(e.getStartSequenceId(), e.getEndSequenceId());
            }
            catch (Throwable t) {
                logger.error("ack TsBlock [{}, {}) failed.", new Object[]{e.getStartSequenceId(), e.getEndSequenceId(), t});
                throw t;
            }
        }

        public void onNewDataBlockEvent(TNewDataBlockEvent e) throws TException {
            try (SetThreadName fragmentInstanceName = new SetThreadName(MPPDataExchangeManager.createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId + ".SourceHandle"), new Object[0]);){
                logger.debug("New data block event received, for plan node {} of {} from {}.", new Object[]{e.getTargetPlanNodeId(), e.getTargetFragmentInstanceId(), e.getSourceFragmentInstanceId()});
                if (!MPPDataExchangeManager.this.sourceHandles.containsKey(e.getTargetFragmentInstanceId()) || !((Map)MPPDataExchangeManager.this.sourceHandles.get(e.getTargetFragmentInstanceId())).containsKey(e.getTargetPlanNodeId()) || ((ISourceHandle)((Map)MPPDataExchangeManager.this.sourceHandles.get(e.getTargetFragmentInstanceId())).get(e.getTargetPlanNodeId())).isAborted() || ((ISourceHandle)((Map)MPPDataExchangeManager.this.sourceHandles.get(e.getTargetFragmentInstanceId())).get(e.getTargetPlanNodeId())).isFinished()) {
                    logger.warn("received NewDataBlockEvent but the downstream FragmentInstance[{}] is not found", (Object)e.getTargetFragmentInstanceId());
                    return;
                }
                SourceHandle sourceHandle = (SourceHandle)((Map)MPPDataExchangeManager.this.sourceHandles.get(e.getTargetFragmentInstanceId())).get(e.getTargetPlanNodeId());
                sourceHandle.updatePendingDataBlockInfo(e.getStartSequenceId(), e.getBlockSizes());
            }
        }

        public void onEndOfDataBlockEvent(TEndOfDataBlockEvent e) throws TException {
            try (SetThreadName fragmentInstanceName = new SetThreadName(MPPDataExchangeManager.createFullIdFrom(e.targetFragmentInstanceId, e.targetPlanNodeId + ".SourceHandle"), new Object[0]);){
                logger.debug("End of data block event received, for plan node {} of {} from {}.", new Object[]{e.getTargetPlanNodeId(), e.getTargetFragmentInstanceId(), e.getSourceFragmentInstanceId()});
                if (!MPPDataExchangeManager.this.sourceHandles.containsKey(e.getTargetFragmentInstanceId()) || !((Map)MPPDataExchangeManager.this.sourceHandles.get(e.getTargetFragmentInstanceId())).containsKey(e.getTargetPlanNodeId()) || ((ISourceHandle)((Map)MPPDataExchangeManager.this.sourceHandles.get(e.getTargetFragmentInstanceId())).get(e.getTargetPlanNodeId())).isAborted() || ((ISourceHandle)((Map)MPPDataExchangeManager.this.sourceHandles.get(e.getTargetFragmentInstanceId())).get(e.getTargetPlanNodeId())).isFinished()) {
                    logger.warn("received onEndOfDataBlockEvent but the downstream FragmentInstance[{}] is not found", (Object)e.getTargetFragmentInstanceId());
                    return;
                }
                SourceHandle sourceHandle = (SourceHandle)MPPDataExchangeManager.this.sourceHandles.getOrDefault(e.getTargetFragmentInstanceId(), Collections.emptyMap()).get(e.getTargetPlanNodeId());
                sourceHandle.setNoMoreTsBlocks(e.getLastSequenceId());
            }
        }
    }

    public static interface SinkHandleListener {
        public void onFinish(ISinkHandle var1);

        public void onEndOfBlocks(ISinkHandle var1);

        public void onAborted(ISinkHandle var1);

        public void onFailure(ISinkHandle var1, Throwable var2);
    }

    public static interface SourceHandleListener {
        public void onFinished(ISourceHandle var1);

        public void onAborted(ISourceHandle var1);

        public void onFailure(ISourceHandle var1, Throwable var2);
    }
}

