/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.db.mpp.execution.exchange;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.SetThreadName;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.Validate;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.mpp.execution.exchange.ISinkHandle;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SinkHandle
implements ISinkHandle {
    private static final Logger logger = LoggerFactory.getLogger(SinkHandle.class);
    public static final int MAX_ATTEMPT_TIMES = 3;
    private static final long DEFAULT_RETRY_INTERVAL_IN_MS = 1000L;
    private final TEndPoint remoteEndpoint;
    private final TFragmentInstanceId remoteFragmentInstanceId;
    private final String remotePlanNodeId;
    private final TFragmentInstanceId localFragmentInstanceId;
    private final LocalMemoryManager localMemoryManager;
    private final ExecutorService executorService;
    private final TsBlockSerde serde;
    private final MPPDataExchangeManager.SinkHandleListener sinkHandleListener;
    private final String threadName;
    private long retryIntervalInMs;
    private final LinkedHashMap<Integer, TsBlock> sequenceIdToTsBlock = new LinkedHashMap();
    private final IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mppDataExchangeServiceClientManager;
    private volatile ListenableFuture<Void> blocked = Futures.immediateFuture(null);
    private int nextSequenceId = 0;
    private long bufferRetainedSizeInBytes = 0L;
    private boolean aborted = false;
    private boolean noMoreTsBlocks = false;

    public SinkHandle(TEndPoint remoteEndpoint, TFragmentInstanceId remoteFragmentInstanceId, String remotePlanNodeId, TFragmentInstanceId localFragmentInstanceId, LocalMemoryManager localMemoryManager, ExecutorService executorService, TsBlockSerde serde, MPPDataExchangeManager.SinkHandleListener sinkHandleListener, IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> mppDataExchangeServiceClientManager) {
        this.remoteEndpoint = (TEndPoint)Validate.notNull((Object)remoteEndpoint);
        this.remoteFragmentInstanceId = (TFragmentInstanceId)Validate.notNull((Object)remoteFragmentInstanceId);
        this.remotePlanNodeId = (String)Validate.notNull((Object)remotePlanNodeId);
        this.localFragmentInstanceId = (TFragmentInstanceId)Validate.notNull((Object)localFragmentInstanceId);
        this.localMemoryManager = (LocalMemoryManager)Validate.notNull((Object)localMemoryManager);
        this.executorService = (ExecutorService)Validate.notNull((Object)executorService);
        this.serde = (TsBlockSerde)Validate.notNull((Object)serde);
        this.sinkHandleListener = (MPPDataExchangeManager.SinkHandleListener)Validate.notNull((Object)sinkHandleListener);
        this.mppDataExchangeServiceClientManager = mppDataExchangeServiceClientManager;
        this.retryIntervalInMs = 1000L;
        this.threadName = MPPDataExchangeManager.createFullIdFrom(localFragmentInstanceId, "SinkHandle");
    }

    @Override
    public synchronized ListenableFuture<?> isFull() {
        if (this.aborted) {
            throw new IllegalStateException("Sink handle is aborted.");
        }
        return Futures.nonCancellationPropagating(this.blocked);
    }

    private void submitSendNewDataBlockEventTask(int startSequenceId, List<Long> blockSizes) {
        this.executorService.submit(new SendNewDataBlockEventTask(startSequenceId, blockSizes));
    }

    @Override
    public synchronized void send(List<TsBlock> tsBlocks) {
        Validate.notNull(tsBlocks, (String)"tsBlocks is null", (Object[])new Object[0]);
        if (this.aborted) {
            throw new IllegalStateException("Sink handle is aborted.");
        }
        if (!this.blocked.isDone()) {
            throw new IllegalStateException("Sink handle is blocked.");
        }
        if (this.noMoreTsBlocks) {
            return;
        }
        long retainedSizeInBytes = 0L;
        for (TsBlock tsBlock : tsBlocks) {
            retainedSizeInBytes += tsBlock.getRetainedSizeInBytes();
        }
        ArrayList<Long> tsBlockSizes = new ArrayList<Long>();
        int startSequenceId = this.nextSequenceId;
        this.blocked = this.localMemoryManager.getQueryPool().reserve(this.localFragmentInstanceId.getQueryId(), retainedSizeInBytes);
        this.bufferRetainedSizeInBytes += retainedSizeInBytes;
        for (TsBlock tsBlock : tsBlocks) {
            this.sequenceIdToTsBlock.put(this.nextSequenceId, tsBlock);
            ++this.nextSequenceId;
        }
        for (int i = startSequenceId; i < this.nextSequenceId; ++i) {
            tsBlockSizes.add(this.sequenceIdToTsBlock.get(i).getRetainedSizeInBytes());
        }
        this.submitSendNewDataBlockEventTask(startSequenceId, tsBlockSizes);
    }

    @Override
    public synchronized void send(int partition, List<TsBlock> tsBlocks) {
        throw new UnsupportedOperationException();
    }

    private void sendEndOfDataBlockEvent() throws Exception {
        logger.info("send end of data block event");
        TEndOfDataBlockEvent endOfDataBlockEvent = new TEndOfDataBlockEvent(this.remoteFragmentInstanceId, this.remotePlanNodeId, this.localFragmentInstanceId, this.nextSequenceId - 1);
        for (int attempt = 0; attempt < 3; ++attempt) {
            try (SyncDataNodeMPPDataExchangeServiceClient client = (SyncDataNodeMPPDataExchangeServiceClient)this.mppDataExchangeServiceClientManager.borrowClient((Object)this.remoteEndpoint);){
                client.onEndOfDataBlockEvent(endOfDataBlockEvent);
                break;
            }
            catch (Throwable e) {
                logger.error("Failed to send end of data block event, attempt times: {}", (Object)attempt, (Object)e);
                if (attempt == 3) {
                    throw e;
                }
                Thread.sleep(this.retryIntervalInMs);
                continue;
            }
        }
    }

    @Override
    public synchronized void setNoMoreTsBlocks() {
        logger.info("start to set no-more-tsblocks");
        if (this.aborted) {
            return;
        }
        try {
            this.sendEndOfDataBlockEvent();
        }
        catch (Exception e) {
            throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
        }
        logger.info("set noMoreTsBlocks to true");
        this.noMoreTsBlocks = true;
        if (this.isFinished()) {
            logger.info("revoke onFinish() of sinkHandleListener");
            this.sinkHandleListener.onFinish(this);
        }
        logger.info("revoke onEndOfBlocks() of sinkHandleListener");
        this.sinkHandleListener.onEndOfBlocks(this);
    }

    @Override
    public synchronized void abort() {
        logger.info("SinkHandle is being aborted.");
        this.sequenceIdToTsBlock.clear();
        this.aborted = true;
        this.bufferRetainedSizeInBytes -= this.localMemoryManager.getQueryPool().tryCancel(this.blocked);
        if (this.bufferRetainedSizeInBytes > 0L) {
            this.localMemoryManager.getQueryPool().free(this.localFragmentInstanceId.getQueryId(), this.bufferRetainedSizeInBytes);
            this.bufferRetainedSizeInBytes = 0L;
        }
        this.sinkHandleListener.onAborted(this);
        logger.info("SinkHandle is aborted");
    }

    @Override
    public boolean isAborted() {
        return this.aborted;
    }

    @Override
    public boolean isFinished() {
        return this.noMoreTsBlocks && this.sequenceIdToTsBlock.isEmpty();
    }

    @Override
    public long getBufferRetainedSizeInBytes() {
        return this.bufferRetainedSizeInBytes;
    }

    public int getNumOfBufferedTsBlocks() {
        return this.sequenceIdToTsBlock.size();
    }

    ByteBuffer getSerializedTsBlock(int partition, int sequenceId) {
        throw new UnsupportedOperationException();
    }

    ByteBuffer getSerializedTsBlock(int sequenceId) throws IOException {
        TsBlock tsBlock = this.sequenceIdToTsBlock.get(sequenceId);
        if (tsBlock == null) {
            throw new IllegalStateException("The data block doesn't exist. Sequence ID: " + sequenceId);
        }
        return this.serde.serialize(tsBlock);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void acknowledgeTsBlock(int startSequenceId, int endSequenceId) {
        long freedBytes = 0L;
        SinkHandle sinkHandle = this;
        synchronized (sinkHandle) {
            if (this.aborted) {
                return;
            }
            Iterator<Map.Entry<Integer, TsBlock>> iterator = this.sequenceIdToTsBlock.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<Integer, TsBlock> entry = iterator.next();
                if (entry.getKey() < startSequenceId) continue;
                if (entry.getKey() >= endSequenceId) break;
                freedBytes += entry.getValue().getRetainedSizeInBytes();
                this.bufferRetainedSizeInBytes -= entry.getValue().getRetainedSizeInBytes();
                iterator.remove();
            }
        }
        if (this.isFinished()) {
            this.sinkHandleListener.onFinish(this);
        }
        this.localMemoryManager.getQueryPool().free(this.localFragmentInstanceId.getQueryId(), freedBytes);
    }

    public TEndPoint getRemoteEndpoint() {
        return this.remoteEndpoint;
    }

    public TFragmentInstanceId getRemoteFragmentInstanceId() {
        return this.remoteFragmentInstanceId;
    }

    public String getRemotePlanNodeId() {
        return this.remotePlanNodeId;
    }

    @Override
    public TFragmentInstanceId getLocalFragmentInstanceId() {
        return this.localFragmentInstanceId;
    }

    public String toString() {
        return String.format("Query[%s]-[%s-%s-SinkHandle]:", this.localFragmentInstanceId.queryId, this.localFragmentInstanceId.fragmentId, this.localFragmentInstanceId.instanceId);
    }

    public void setRetryIntervalInMs(long retryIntervalInMs) {
        this.retryIntervalInMs = retryIntervalInMs;
    }

    class SendNewDataBlockEventTask
    implements Runnable {
        private final int startSequenceId;
        private final List<Long> blockSizes;

        SendNewDataBlockEventTask(int startSequenceId, List<Long> blockSizes) {
            Validate.isTrue((startSequenceId >= 0 ? 1 : 0) != 0, (String)("Start sequence ID should be greater than or equal to zero, but was: " + startSequenceId + "."), (Object[])new Object[0]);
            this.startSequenceId = startSequenceId;
            this.blockSizes = (List)Validate.notNull(blockSizes);
        }

        @Override
        public void run() {
            try (SetThreadName sinkHandleName = new SetThreadName(SinkHandle.this.threadName, new Object[0]);){
                logger.info("Send new data block event [{}, {})", (Object)this.startSequenceId, (Object)(this.startSequenceId + this.blockSizes.size()));
                int attempt = 0;
                TNewDataBlockEvent newDataBlockEvent = new TNewDataBlockEvent(SinkHandle.this.remoteFragmentInstanceId, SinkHandle.this.remotePlanNodeId, SinkHandle.this.localFragmentInstanceId, this.startSequenceId, this.blockSizes);
                while (attempt < 3) {
                    ++attempt;
                    try (SyncDataNodeMPPDataExchangeServiceClient client = (SyncDataNodeMPPDataExchangeServiceClient)SinkHandle.this.mppDataExchangeServiceClientManager.borrowClient((Object)SinkHandle.this.remoteEndpoint);){
                        client.onNewDataBlockEvent(newDataBlockEvent);
                        break;
                    }
                    catch (Throwable e) {
                        logger.error("Failed to send new data block event, attempt times: {}", (Object)attempt, (Object)e);
                        if (attempt == 3) {
                            SinkHandle.this.sinkHandleListener.onFailure(SinkHandle.this, e);
                        }
                        try {
                            Thread.sleep(SinkHandle.this.retryIntervalInMs);
                        }
                        catch (InterruptedException ex) {
                            Thread.currentThread().interrupt();
                            SinkHandle.this.sinkHandleListener.onFailure(SinkHandle.this, e);
                        }
                    }
                }
            }
        }
    }
}

